Sunday, August 2, 2020

Flink - Stream Processing in Real Time

A decade ago most of the data processing and analysis within software industry was carried on by batch systems with some lag time. Now as new technologies and platforms evolve, many organizations are gradually shifting towards a stream-based approach to process their data on the fly, since most of the data is being streamed. The concept isn't new, and was explored within Dataflow programming in the 1980s. Today majority of the large-scale data processing systems handle the data, which is produced continuously over time. These continuous streams of data come from variety of sources, for example web logs, application logs, sensors, or as changes to application state in databases (transaction log records) etc. Stream processing continuously incorporates new data to compute a result, with the input data being unbounded. It can work with a lot less hardware than batch processing as data is processed as it comes, spreading the processing over time. 

Apache Storm was the first widely used framework for stream processing were processing programs ran continuously on data and produce outcomes in real-time, while the data is generated. Apache Spark became the most popular, matured and widely adopted streaming platform with many features such as structured streaming, custom memory management, watermarks, event time processing support etc. Spark is essentially a batch with Spark streaming as micro-batching and special case of Spark Batch. It is not truly real time processing and has some latency. It is also stateless by design and tuning Spark becomes challenging with many parameters. 

Apache Flink is an open source platform for distributed stream and batch data processing. It is essentially a true streaming engine with a special batch processing mode of streaming with bounded data. The core of Apache Flink is a streaming dataflow engine, which supports communication, distribution and fault tolerance for distributed stream data processing. Apache Flink is a hybrid platform for supporting both batch and stream processing. It supports different use cases based on real-time processing, machine learning projects, batch processing, graph analysis and others. Flink does not implement the allocation and management of compute resources in a cluster, process coordination, highly available durable data storage, and failure recovery. Instead, it focuses on its core function, distributed data stream processing and leverages existing cluster infrastructure and services. Flink is well integrated with cluster resource managers, such as Apache Mesos, YARN, and Kubernetes, but can also be configured to run as a stand-alone cluster. Flink does not provide durable, distributed storage. Instead, it takes advantage of distributed filesystems like HDFS or object stores such as S3. Flink depends on Apache ZooKeeper for leader election in highly available setups.

Streaming Basics

Dataflow program describes how data flows between operations. Dataflow programs are commonly represented as directed graphs, where nodes are called operators and represent computations and edges represent data dependencies. Operators are basic functional units in the data flow which consume the data from inputs, perform a computation on them, and produce data to outputs for further processing. A dataflow graph is a directed acyclic graph (DAG) that consists of stateful operators and data streams which represent the data produced by an operator and available for consumption by operators. A data stream is a potentially unbounded sequence of events. Operators without input ports are called data sources and operators without output ports are called data sinks. A dataflow graph must have at least one data source and one data sink. The operators are parallelized into one or more parallel instances called subtasks and streams are split into one or more stream partitions, with one partition per subtask. Each operator might have several parallel tasks running on different physical machines. Ideally any streaming application should have low latency with few milliseconds for processing an event and high throughput i.e. rate of processing events which is measured in events or operations per time unit. The peak throughput is the performance limit when the system is at its maximum load. Streaming operations can be stateless or stateful were they maintain information about the events they have received before. Stateful stream processing applications are more challenging to parallelize and operate in a fault-tolerant manner because state needs to be efficiently partitioned and reliably recovered in the case of failures. Data ingestion is the operation of fetching raw data from external sources and converting it into a format suitable for processing. Data egress on the other hand is the operation of producing output in a form suitable for consumption by external systems. Transformation operations are single-pass operations that process each event independently. These operations consume one event after the other and apply some transformation to the event data, producing a new output stream.

Lambda architecture is a pattern which combine batch and stream processing systems to implement multiple paths of computation. A streaming fast path for timely approximate results, and a batch offline path for late accurate results. These approaches suffer from high latency (imposed by batches),  high complexity, as well as arbitrary inaccuracy since the time is not explicitly managed by the application code.  Batch programs are special cases of streaming programs, where the stream is finite, and the order and time of records does not matter. Flink has a specialized API for processing static data sets and uses specialized data structures and algorithms for the batch versioned operators such as join or grouping, and uses dedicated scheduling strategies. The result is that Flink presents itself as a full-fledged and efficient batch processor on top of a streaming runtime, including libraries for graph analysis and machine learning.

Flink Components

The core of Flink is the distributed dataflow engine, which executes dataflow programs. A Flink runtime program is a DAG of stateful operators connected with data streams. Flink provides two main core APIs: the DataStream API for both bounded and unbounded streams, and the DataSet API for the bounded data sets. A DataSet is treated internally as a stream of data. They provide common building blocks for data processing, such as various forms of transformations, joins, aggregations, windows, state, etc. Both the DataSet API and DataStream APIs create runtime programs executable by the dataflow engine. Flink bundles domain-specific libraries and APIs that generate DataSet and DataStream API programs, namely, FlinkML for machine learning, Gelly for graph processing and Table API for SQL-like operations. Gelly is a Graph API for Flink which simplifies graph analysis within Fink applications. Gelly allows to transform and modify the graphs using high-level functions, by providing methods to create, transform and modify graphs. FlinkML was the legacy Machine Learning (ML) library for Flink. It is deprecated/removed in Flink 1.9, with the new Flink-ML interface being developed the umbrella of FLIP-39, and is being actively extended under FLINK-12470. The Table API is an extension of relational model were Tables have a schema attached with the API offering comparable operations, such as select, project, join, group-by, aggregate, etc. The Table API programs goes through an optimizer that applies optimization rules before execution. Flink also offers support for SQL query expressions, which is similar to the Table API both in semantics and expressiveness. The SQL abstraction closely interacts with the Table API, and SQL queries can be executed over tables defined in the Table API. FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink which allows to detect event patterns in an endless stream of events.



Flink Architecture

The Flink runtime consists of two types of processes: a JobManager and one or more TaskManagers high are responsible for executing the applications and failure recovery. The JobManagers and TaskManagers can be started either directly on the machines as a standalone cluster, or in containers, or managed by resource frameworks like YARN or Mesos. Once started the TaskManagers connect to the JobManager making themselves available to be assigned work.



JobManager

The JobManager is the master process that controls the execution of a single application. Each application is controlled by its own separate JobManager. The JobManager receives an application consisting of JobGraph (logical dataflow graph) and JAR file containing required classes and libraries for execution. The JobManager converts the JobGraph into a physical dataflow graph called the ExecutionGraph, which consists of tasks that can be executed in parallel. The JobManager requests the necessary resources (TaskManager slots) to execute the tasks from the ResourceManager. Once it receives enough TaskManager slots, it distributes the tasks of the ExecutionGraph to the TaskManagers that execute them. The JobManager coordinates the distributed execution of the dataflow. It tracks the state and progress of each task (operator), schedules new tasks, and coordinates checkpoints, savepoints and recovery.

TaskManager

A TaskManager is a JVM (worker) process, and executes one or more subtasks in separate threads. It executes the tasks of a dataflow, and buffer and exchange the data streams. Each TaskManager provides a certain number of task slots (unit of resource scheduling) which limit the number of tasks the TaskManager can execute. In other words, it represents the number of concurrent processing tasks executed by TaskManager. The task slot ensures that subtask will not compete with other subtasks from different jobs for managed memory, but instead has an amount of reserved managed memory. By default, Flink allows subtasks to share task slots even if they are subtasks of different tasks, so long as they are from the same job. Hence one task slot may be able to hold an entire job pipeline thus providing better resource utilization. The TaskManager once started registers its slots to the ResourceManager. When instructed by the ResourceManager, the TaskManager offers one or more of its slots to a JobManager. The JobManager can then assign tasks to the slots to execute them. The TaskManager reports the status of the tasks to the JobManager. During execution, a TaskManager exchanges data with other TaskManagers running tasks of the same application. There must always be at least one TaskManager.



ResourceManager

The ResourceManager is responsible for allocation/deallocation of resources and provisioning them in a Flink cluster. It manages TaskManager slots, which is the unit of resource scheduling in a Flink cluster. When the ResourceManager receives TaskManager slot request from JobManager, it instructs a TaskManager with idle slots to offer them to the JobManager. If the ResourceManager does not have enough slots to fulfill the JobManager’s request, the ResourceManager can talk to a resource provider to provision containers in which TaskManager processes are started. The ResourceManager also takes care of terminating idle TaskManagers to free compute resources. Flink implements multiple ResourceManagers for different environments and resource providers such as YARN, Mesos, Kubernetes and standalone deployments. In a standalone setup, the ResourceManager can only distribute the slots of available TaskManagers and cannot start new TaskManagers on its own.

Dispatcher

The Dispatcher provides a REST interface to submit Flink applications for execution and starts a new JobMaster for each submitted job. It also runs the Flink WebUI to provide information about job executions.The REST interface enables the dispatcher to serve as an HTTP entry point to clusters that are behind a firewall. The dispatcher also runs a web dashboard to provide information about job executions.

JobMaster

JobMaster is one of the components running in the JobManager. It is responsible for supervising the execution of the Tasks of a single job and thus manages the execution of a single JobGraph. JobMaster is responsible for execution of multiple jobs in parallel within a Flink cluster, each having its own JobMaster. There is always at least one JobManager. A high-availability setup might have multiple JobManagers, one of which is always the leader, and the others are standby.

Operator Chaining

Flink chains operator (e.g. two subsequent map transformations) subtasks together into tasks by default for distributed execution. Each task is executed by one thread. Chaining two subsequent transformations means co-locating them within the same thread for better performance. Chaining operators together into tasks reduces the overhead of thread-to-thread handover and buffering, and increases overall throughput while decreasing latency. The StreamExecutionEnvironment.disableOperatorChaining() method is used to disable chaining in the whole job.



Flink Application Execution
Flink Application is a user program which spawns one or multiple Flink jobs, either on local JVM or remote clusters, from its main() method. The ExecutionEnvironment which can be either LocalEnvironment or RemoteEnvironment, provides methods to control the job execution (by setting parallelism) and to interact with the outside resources. The jobs of a Flink Application can either be submitted to a long-running Flink Session Cluster, a dedicated Flink Job Cluster, or a Flink Application Cluster.

Flink Session Cluster is long running pre-existing cluster which can accept multiple job submissions. The cluster continues to run even after all the jobs are finished, and it life is not bounded by any Flink job. All the jobs share same cluster and compete with each other for cluster resources (like network bandwidth). Such pre-existing cluster saves time for acquiring the resources and starting the TaskManagers, but the shared setup means a crash of one TaskManager will fail all the jobs running over it.

Flink Job Cluster the cluster manager such as YARN or Kubernetes is used to spin up a cluster for each submitted job and is only available for that job. When the client requests resources from the cluster manager and submits the job to the Dispatcher, TaskManagers are allocated lazily based on job's resource requirements. Once the job is finished, the Flink Job Cluster is torn down. Since the cluster is restricted to a single job, failure does not affect's other jobs. Since allocation of resources takes time, Flink Job Clusters are generally used for long running and time sensitive requirements.

Flink Application Cluster is a dedicated Flink cluster that only executes jobs from one Flink Application and where the main() method runs on the cluster rather than the client. The job submission is done by packaging the application classes & libraries into a JAR and the cluster entrypoint (ApplicationClusterEntryPoint) is responsible for calling the main() method to extract the JobGraph. Flink Application Cluster continues to run until the lifetime of the Flink Application.

Stateful Stream Processing

The stateful operations are the one were they remember information across multiple events, by storing data across the processing of individual elements/events. Every function and operator in Flink can be stateful. State enables Flink to be fault tolerant using checkpoints and savepoints. Flink redistributes the state across multiple parallel instances, thus rescaling its applications. Keyed state is a type of state maintained in an embedded key/value store. Key state is partitioned and distributed strictly together with the streams that are read by the stateful operators. Hence, access to the key/value state is only possible on keyed streams and is restricted to the values associated with the current event’s key. Aligning the keys of streams and state makes ensures that all state updates are local operations, thus guaranteeing consistency without transaction overhead. This also allows Flink to redistribute the state and adjust the stream partitioning transparently. Keyed State is organized into Key Groups which are an atomic unit by which Flink can redistribute Keyed State.

Checkpoints

Flink implements fault tolerance using a combination of stream replay and checkpointing. A checkpoint marks a specific point in each of the input streams along with the corresponding state, for each of the operators. Flink's fault tolerance mechanism continuously draws consistent snapshots of the distributed data stream and operator state asynchronously. These snapshots act as consistent checkpoints to which the system can fall back. The stream flow is resumed from the checkpoint while maintaining consistency by restoring the state of the operators and replaying the records from the point of the checkpoint. These snapshots are very light-weight and can be drawn frequently without much impact on performance. The state of the streaming applications is usually stored at a configurable distributed file system. In case of failure, Flink stops the distributed streaming dataflow. The system then restarts the operators and resets them to the latest successful checkpoint. The input streams are reset to the point of the state snapshot. Any records that are processed as part of the restarted parallel data-flow are guaranteed to not have affected the previously checkpointed state. Batch programs in Flink does not use any checkpointing and recovers by fully replaying the streams since its inputs are bounded.

By default checkpoints are disabled in Flink and can be enabled by calling enableCheckpointing(interval) on the StreamExecutionEnvironment, where the checkpoint interval is passed in milliseconds. Some of the other checkpoint parameters include guarantee level (exactly-once or at-least-once), checkpoint timeout, minimum time between checkpoints (ensuring some progress between checkpoints), number of concurrent checkpoints (by default is 1), externalized checkpoints (which write their meta-data to a persistent storage to resume on failure), fail/continue task on checkpoint errors, prefer checkpoint for recovery and unaligned checkpoints (reduces checkpointing times under backpressure). The checkpoints are stored in memory, file system or database based on the State backend configuration. By default, state is kept in memory in the TaskManagers and checkpoints are stored in memory in the JobManager. The state backend can be configured via StreamExecutionEnvironment.setStateBackend(..). Flink currently only provides processing guarantees for jobs without iterations. Hence enabling checkpoint on an iterative job causes an exception, which still can be forced though by using env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true) flag.

Stream Barriers

Stream Barriers is a core element in Flink’s distributed snapshotting as they are injected into the data stream and flow with the records as part of the data stream. A barrier separates the records in the data stream into the set of records that goes into the current snapshot, and the records that go into the next snapshot. Every barrier carries the ID of the snapshot whose records it pushed in front of it. Barriers are very lightweight. Multiple barriers from different snapshots can be in the stream at the same time. Stream barriers are injected into the parallel data flow at the stream sources. Barriers for snapshot are injected in the source stream at the position up to which the snapshot covers the data and it flows downstream. When an intermediate operator receives a barrier for snapshot from all of its input streams, it emits a barrier for snapshot into all of its outgoing streams. The operator cannot process any further records from the incoming stream after receiving snapshot barrier, until it has receives the same barrier from the other inputs as well. When the sink operator receives the barrier from all of its input streams, it acknowledges that snapshot to the checkpoint coordinator. After all sinks have acknowledged a snapshot, it is considered completed.  When the last stream receives the barrier, the operator emits all pending outgoing records, and then emits snapshot's barriers itself. The operator snapshots the state, resumes processing records from all the input streams, and finally writes the state asynchronously to the state backend. Any state present within the operators becomes the part of the snapshots. Operators snapshot their state at the point in time when they have received all snapshot barriers from their input streams, and before emitting the barriers to their output streams. Only the updates to the state from the records before the barriers are applied and the state is stored in a preconfigured reliable distributed storage. In case of a failure Flink selects the latest completed checkpoint, with the system redeploying the entire distributed dataflow and gives each operator the state that was snapshotted as part of checkpoint. Such checkpointing is called aligned checkpointing, were the sources are set to start reading the stream from checkpoint's snapshot position.

Starting with Flink 1.11, the checkpoints can overtake all in-flight data as long as the in-flight data becomes part of the operator state, also known as unaligned checkpointing. In unaligned checkpointing, operators first recover the in-flight data before starting processing any data from upstream operators in unaligned checkpointing, performing same recovery steps as aligned checkpoints. It ensures that barriers are arriving at the sink as fast as possible. The alignment step could add latency to the streaming program which is usually a few milliseconds or in some cases noticeably higher. The alignments can be skipped when causing high latency, in which case the operator continues processing all inputs, even after some checkpoint barriers for checkpoint has arrived. Alignment happens only for operators with multiple predecessors (joins) as well as operators with multiple senders (after a stream repartitioning/shuffle). 

Savepoints

All programs that use checkpointing can resume execution from a savepoint. They allow updating both the programs and the Flink cluster without losing any state. Savepoints are manually triggered checkpoints, which take a snapshot of the program and write it out to a state backend, using the checkpointing mechanism. Savepoints are similar to checkpoints except that they are triggered by the user and don’t automatically expire when newer checkpoints are completed.


Timely Stream Processing

Timely stream processing is an extension of stateful stream processing in which time plays crucial role in the computation, for example time series analysis such as aggregation by time periods. There are different notions of time in streaming, namely, Event Time and Processing Time. 

Processing Time

Processing time refers to using the system time (clock) of the machine that is executing the respective operation. Processing time requires no coordination between streams and machines. It provides the best performance and the lowest latency. Processing time cannot be determined always in a distributed and asynchronous environments, as its susceptible to the speed at which records arrive and flow between operators within the system, as well as other outages. 

Event Time

Event time is the time that each individual event occurred on its producing device and is embedded within the records before they enter Flink. In event time, the progress of time depends on the data, not on any wall clocks. Event time programs specifies how to generate Event Time Watermarks, which indicates the progress in event time. Ideally event time yields consistent and deterministic results, regardless of the arrival order by timestamp of the events. But the latency incurred by waiting for out-of-order events which is a finite time period limits its deterministic nature. When all the data arrives, the event time produces consistent and correct results after working through the out-of-order late events.

Watermarks

Flink stream processor uses the watermarks mechanism to measure the progress in event time and for example, to get notified when to close the window in progress after the event time passes beyond an hour for an hourly window operator. Watermarks are part of the data stream and carry a timestamp to determine that the event time has reached a specified time within that stream. It indicates that there should be no more elements in the stream with a timestamp lower than the watermark timestamp, after the specified watermark point. In other words watermark is a declaration that by specified point in the stream, all events up to a certain timestamp should have arrived. This makes watermarks crucial for processing out-of-order streams were events are not ordered by their timestamps. The operator also advances its internal event time clock to the value of the watermark after reaching the watermark. Watermarks are generated at, or directly after source functions, defining the event time at a particular parallel source. Each parallel subtask of a source function usually generates its watermarks independently.As the watermarks flow through the streaming program, they advance the event time at the operators where they arrive. Whenever an operator advances its event time, it generates a new watermark downstream for its successor operators. It is possible that certain elements will violate the watermark condition, by arriving with timestamp even after the Watermark for corresponding timestamp has already occurred. In real world it becomes difficult to specify the time by which all the elements of a certain event timestamp will have occurred. Delaying watermarks causes delays in evaluation of event time windows.

DataStream API

DataStream API provides APIs to implement transformation on data streams, e.g. filtering, updating state, defining windows, aggregating.

The first step of the Flink program is to create a context for execution using StreamExecutionEnvironment, which can be LocalStreamEnvironment or RemoteStreamEnvironment. It is an entry class that can be used to set parameters, create data sources, and submit tasks. The instance of StreamExecutionEnvironment is obtained by using its static methods, getExecutionEnvironment(), createLocalEnvironment() and createRemoteEnvironment(String host, int port, String... jarFiles). Ideally the getExecutionEnvironment() method is used to provide the corresponding environment depending on the context. When executing the program within an IDE or as a regular Java program, it creates a local environment to execute the program in local machine. On other hand when the program is bundled into a JAR file and invoked using command line, the Flink cluster manager executes the main method with the getExecutionEnvironment() returning an execution environment of a cluster. The StreamExecutionEnvironment contains the ExecutionConfig which allows to set job specific configuration values for the runtime. The throughput and latency can be configured in Flink using environment.setBufferTimeout(timeoutMillis) on the execution environment which sets a maximum wait time for the data element buffers (which avoid network traffic by one-by-one element transfer) to fill up. Setting buffer timeout to -1 removes the timeout, flushing the buffer only after its full.

Next, either the initial data is generated, or data is read from external data source using a socket connection. This creates a DataStream of specific type. DataStream is the core API for stream processing in Flink. The DataStream class is used to represent an immutable collection of data, which can be either finite or unbounded. Since the data is immutable, no new elements can be added and removed from the data collection. DataStream defines many common operations such as filtering, conversion, aggregation, window, and association. Once the output DataStream containing final results is achieved, the it is written to an outside system by creating a sink. The methods such as writeAsText(String path) and print() are used to create a sink. Finally, the execute() method on the StreamExecutionEnvironment is called to trigger the Flink program execution. All Flink programs are executed lazily. The data loading and transformation operator operations such as source creation, aggregation, and printing etc, only build the graphs of internal operator operations, by adding to a dataflow graph. Only after calling the execute() method, the dataflow graphs are submitted to the cluster or the local computer for execution. Depending on the type of the ExecutionEnvironment the execution will be triggered on the local machine or submit the program for execution on a cluster. The execute() method will wait for the job to finish and then return a JobExecutionResult, this contains execution times and accumulator results. Flink also allows to trigger asynchronous job execution by calling executeAysnc() on the StreamExecutionEnvironment, which returns a JobClient to communicate with the submitted job.

The below program counts the words coming from a web socket in 5 second windows. The program gets its input stream from netcat running on the terminal using the command "nc -lk 9999".
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print()
    env.execute("Window Stream WordCount")
  }
}

The Flink program reads data by attaching a source using StreamExecutionEnvironment.addSource(sourceFunction). Flink provides many pre-implemented source functions and allows to write custom sources by implementing the SourceFunction for non-parallel sources, or implementing the ParallelSourceFunction interface or extending the RichParallelSourceFunction for parallel sources. The StreamExecutionEnvironment has several predefined stream sources such as File based sources (readTextFile, readFile), Socket based source (socketTextStream), Collection-based sources (fromCollection, fromElements, fromParallelCollection, generateSequence) and Custom source (addSource). 

Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams, for example writeAsText(), writeAsCsv(), writeUsingOutputFormat() for custom file formats, writeToSocket() and addSink() which invokes a custom sink function. It is recommended to use the flink-connector-filesystem for reliable exactly-once delivery of a stream into a file system.

An iterative operation is carried out by embedding the iteration in a single operator or by using a sequence of operators, one for each iteration. Flink supports iterative streaming by implementing a step function and embed it into an IterativeStream. Since there is no maximum number of iterations, the stream should eventually be forwarded to a downstream using either a split transformation or a filter. The below program continuously subtracts 1 from a series of integers until they reach zero.
val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)

val iteratedStream = someIntegers.iterate(
  iteration => {
    val minusOne = iteration.map( v => v - 1)
    val stillGreaterThanZero = minusOne.filter (_ > 0)
    val lessThanZero = minusOne.filter(_ <= 0)
    (stillGreaterThanZero, lessThanZero)
  }
)
Operators in Flink transform one or more DataStreams into a new DataStream. Flink Programs can combine multiple transformations using such operators to create a sophisticated dataflow topologies. Examples of transfromation operators are: map(), flatMap(), filter(), reduce(), split(), select(), iterate(), keyBy() which partitions a stream into disjoint partitions by keys, fold() which combines current value with last folded value, variosu aggregations, window() and windowAll() which group stream events according to some characteristic.

Flink provides special data sources e.g. env.fromElements(), env.fromCollection(collection) which are backed by Java collections to ease testing. Similarly Flink provides a sink to collect DataStream results for testing, for example, "DataStreamUtils.collect(myDataStream.javaStream).asScala".

Windows
Windows split the stream into buckets of finite size, over which computations can be applied. A window is created when the first element belonging to window arrives and removed the time passes its end timestamp. Flink guarantees removal only for time-based windows. The keyBy(...) splits the infinite stream into logical keyed streams were any attribute of the event can be used as a key. Keyed streams allow windowed computation to be performed in parallel by multiple tasks processing independently. All elements referring to the same key will be sent to the same parallel task. In non-keyed streams the original stream is never split into multiple logical streams and all the windowing logic is performed by a single task. The window(...) or windowAll(...) is called for non-keyed streams in Non-Keyed Windows. 

Each window has a Trigger and a function to be applied to window data, attached to it. A trigger can also decide to purge a window’s contents any time between its creation and removal. The WindowAssigner defines how the elements are assigned to the windows and is passed to passed to window(...) (for keyed streams) or the windowAll() (for non-keyed streams) call. Flink comes with pre-defined window assigners for the most common use cases, namely tumbling windows, sliding windows, session windows and global windows.  The window function defines the computation to be performed on the windows. The window function can be one of ReduceFunction, AggregateFunction, FoldFunction or ProcessWindowFunction. A Trigger determines when a window is ready to be processed by the window function. Each WindowAssigner comes with a default Trigger. A custom trigger can be specified using trigger(...). The trigger interface has five methods namely, onElement(), onEventTime(), onProcessingTime(), onMerge() and clear(), which allows the Trigger to react to different events. The onElement(), onEventTime() and onProcessingTime() methods decide how to act on their invocation event by returning a TriggerResult. TriggerResult enum has values, CONTINUE which does nothing, FIRE which triggers computation, PURGE which clears elements in window, and FIRE_AND_PURGE which both triggers computation and clearing window afterwards. Flink’s windowing model allows specifying an optional Evictor in addition to the WindowAssigner and the Trigger, using the evictor(...) method. The evictor has the ability to remove elements from a window after the trigger fires and before and/or after the window function is applied. The Evictor interface has two methods evictBefore() and evictAfter(), which contains the eviction logic to be applied before or after the window function. The result of a windowed operation is a DataStream and no information about the windowed operations is retained in the result elements. The only relevant information that is set on the result elements is the element timestamp.

DataSet API

Data set is a collection of finite bounded data. Dataset API perform batch operations on the data over a period. It provides different kinds of transformations on the datasets like filtering, mapping, aggregating, joining and grouping. Datasets are created from sources like local files or by reading a file from a particular source and the result data can be written on different sinks like distributed files or command line terminal. DataSet programs are similar to DataStream programs with the difference that the transformations are on data sets. Below is the WordCount example on DataSet.

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()
  }
}
Data transformations transform one or more DataSets into a new DataSet. Some transformations (join, coGroup, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate) allow data being grouped on a key before they are applied. The parallelism of a transformation can be defined by setParallelism(int). DataSets are created using the abstractions behind InputFormat, either from files or from Java collections. Flink comes with several built-in formats to create data sets from common file formats using ExecutionEnvironment methods. For example, the methods readTextFile(path), readCsvFile(path), readSequenceFile(key, value, path) read data from specified files, while methods fromCollection(Iterable), fromElements(elements: _*), fromParallelCollection(SplittableIterator) create data set from specified iterator/elements, with generateSequence(from, to) generating sequence from the specified interval. Data sinks consume the DataSets and store or return them to external destination. Data sink operations are described using an OutputFormat. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataSet, for example writeAsText(), writeAsCsv(...), print(), write() and output(). Iteration operators in Flink encapsulate a part of the program and execute it repeatedly, feeding back the result of one iteration (the partial solution) into the next iteration. There are two types of iterations in Flink, BulkIteration and DeltaIteration.

Parameters can be passed to flink functions using either the constructor or the withParameters(Configuration) method. The parameters are serialized as part of the function object and shipped to all parallel task instances. Flink also allows to pass custom configuration values to the ExecutionConfig interface of the environment. Since the execution config is accessible in all (rich) user functions, the custom configuration will be available globally in all functions. Objects in the global job parameters are accessible in many places in the system. All user functions implementing a RichFunction interface have access through the runtime context.

Semantic annotations are used to give hints to Flink the behavior of a function. They tell the system which fields of a function's input does the function reads and evaluates and which fields it unmodified forwards from its input to its output. Semantic annotations are a powerful means to speed up execution, because they allow the system to reason about reusing sort orders or partitions across multiple operations. Semantic annotations may eventually save the program from unnecessary data shuffling or unnecessary sorts and significantly improve the performance of a program. Incorrect semantic annotations would cause Flink to make incorrect assumptions about the program, leading to incorrect results. Hence if the behavior of an operator is not clearly predictable, it is better to not provide any annotation. @ForwardedFields (or @ForwardedFieldsFirst) annotation declares input fields which are forwarded unmodified by a function to the same position or to another position in the output. This information is used by the optimizer to infer whether a data property such as sorting or partitioning is preserved by a function. @NonForwardedFields (or @NonForwardedFieldsFirst) annotation declares all the fields which are not preserved on the same position in a function's output. The values of all other fields are considered to be preserved at the same position in the output. Hence, @NonForwardedFields annotation is opposite of @ForwardedFields annotation. @ReadFields (or @ReadFieldsFirst) annotation declares all fields that are accessed and evaluated by a function, i.e., all fields that are used by the function to compute its result. For example, fields which are evaluated in conditional statements or used for computations must be marked as read, while fields which are unmodified forwarded to output without evaluation are considered not read.

Broadcast variables allow to make a data set available to all parallel instances of an operation, in addition to the regular input of the operation. They enable data set to be accessible at the operator as a Collection. The broadcast sets are registered by name via withBroadcastSet(DataSet, String), and accessible via getRuntimeContext().getBroadcastVariable(String) at the target operator. Flink offers a distributed cache which enable to share locally accessible files to parallel instances of user functions. A program registers a file or directory of a local or remote filesystem such as HDFS or S3 under a specific name in its ExecutionEnvironment as a cached file. When the program is executed, Flink automatically copies the file or directory to the local filesystem of all workers. A user function can look up the file or directory under the specified name and access it from the worker’s local filesystem.

Table API and SQL

Flink provides two relational APIs, the Table API and SQL, for unified stream and batch processing. The Table API is a language-integrated query API for Scala and Java that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way. Flink’s SQL support is based on Apache Calcite which implements the SQL standard. Queries specified in both interface have the same semantics and specify the same result regardless whether the input is a batch input (DataSet) or a stream input (DataStream). The Table API and the SQL interfaces (still actively being developed) are tightly integrated with each other as well as Flink’s DataStream and DataSet APIs. This enables to switch seamlessly between all APIs and libraries. The central concept of Table API and SQL is that the Table serves as input and output of queries.

Flink provides planners, namely Blink planner (Flink 1.9) and old planner, which are responsible for translating relational operators into an executable, optimized Flink job. Both of the planners come with different optimization rules and runtime classes. Blink planner treats batch jobs as a special case of streaming, were batch jobs are only translated into DataStream programs (not DataSet) similar to the streaming jobs. Blink planner also does not support BatchTableSource and uses bounded StreamTableSource.

The TableEnvironment is a central concept of the Table API and SQL integration. It is responsible for registering a Table in internal catalog and registering catalogs. It also enables to execute SQL queries and converts DataStream or DataSet into a Table. A TableEnvironment is created by calling the BatchTableEnvironment.create() or StreamTableEnvironment.create() with a StreamExecutionEnvironment or an ExecutionEnvironment and an optional TableConfig. The TableConfig can be used to configure the TableEnvironment or to customize the query optimization and translation process. A Table is always bounded to a specific TableEnvironment and its not possible to combine tables (using join or union) of different TableEnvironments in the same query. TableEnvironment maintains a map of catalogs of tables which are created with an identifier. Each identifier consists of catalog name, database name and object name. Tables can be either regular TABLES describing external data such as a file or database table, else virtual VIEWS created from an existing Table object. TableEnvironment allows to set the current catalog and current database, thus making them optional parameters while creating tables using createTemporaryTable (..) and views using createTemporaryView(..). 

Table can be temporary for a particular Flick session or permanent visible across multiple Flink sessions and clusters. A temporary table is stored in memory and can have same identifier as an existing permanent table, in which case it shadows the permanent table making it inaccessible. An object of Table API corresponds to a VIEW (virtual table) from relational database systems. It encapsulates a logical query plan. The query defining the Table is not optimized and the subsequent queries referencing the registered table are inlined. The Table API has integrated query API which can be applied on Table class to perform relational operations. These query API methods for example, filter(), groupBy(), select() etc return a new Table object representing the result of applying the relational operation on the input Table. Some relational operations are composed of multiple method calls such as table.groupBy(...).select(), where groupBy(...) specifies a grouping of table, and select(...) the projection on the grouping of table. 

Flink’s SQL queries as opposed to query API are specified as regular Strings. Flink query is internally represented as a logical query plan which is optimized and translated into a DataStream program. Table API and SQL queries can be easily mixed because both return Table objects. A Table is emitted by writing it to a TableSink. TableSink is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra), or messaging systems (e.g., Apache Kafka). A batch Table can only be written to a BatchTableSink, while a streaming Table requires either an AppendStreamTableSink, a RetractStreamTableSink, or an UpsertStreamTableSink. The Table.executeInsert(String tableName) method looks up the TableSink from the catalog by name, validates the table schema with TableSink schema and finally emits the Table to the specified TableSink.

Some of the Table API or SQL query used frequently are as below.
  • The TableEnvironment.executeSql() method is used for executing a given statement. 
  • The Table.executeInsert() is used for inserting the table content to the given sink path.
  • The Table.execute() is used for collecting the table content to local client.
  • A Table is buffered in StatementSet first, when its emitted to a sink through StatementSet.addInsert() or an INSERT statement using StatementSet.addInsertSql(). The StatementSet.execute() finally emits the buffered data to sink.

Below is the example common structure of Table API and SQL program.
// create a TableEnvironment for blink planner streaming
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

// create a Table
tableEnv.connect(...).createTemporaryTable("table1")
// register an output Table
tableEnv.connect(...).createTemporaryTable("outputTable")

// create a Table from a Table API query
val tapiResult = tableEnv.from("table1").select(...)
// create a Table from a SQL query
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...")

// emit a Table API result Table to a TableSink, same for SQL result
val tableResult = tapiResult.executeInsert("outputTable")
tableResult...

Table API and SQL queries can be easily integrated with and embedded into DataStream and DataSet programs. Inversely, a Table API or SQL query can also be applied on the result of a DataStream or DataSet program. This can be achieved by converting a DataStream or DataSet into a Table and vice versa. The Scala Table API features implicit conversions for the DataSet, DataStream, and Table classes. The DataStream or DataSet can also be registered in a TableEnvironment as a View. The schema of the resulting temporary view depends on the data type of the registered DataStream or DataSet. DataStream or DataSet can also be converted directly into a Table, allowing the inverse as well were a Table can be converted into a DataStream or DataSet. While converting a Table into a DataStream or DataSet the data type for the resulting DataStream or DataSet (from table rows) needs to be specified. A Table which is the result of a streaming query is updated dynamically, such that as new records arrive on the query’s input streams, the table changes. To convert table to DataStream, such updates on the table by dynamic querying is encoded to be converted into DataStream. The two modes to convert a Table into a DataStream are Append Mode which is used only when dynamic Table is modified by INSERT changes and Retract Mode which is used always.

Apache Flink leverages Apache Calcite to perform sophisticated query optimization. The optimizer makes decisions based on query plan, data source statistics and fine-grain costs (memory, cpu etc) for each operator. The Table API provides a mechanism to explain the logical and optimized query plans to compute a Table using Table.explain() or StatementSet.explain() methods. Table.explain() returns the plan of a Table, while StatementSet.explain() returns the plan of multiple sinks.

Conclusion

Apache Flink is a true stream processing framework, built from the ground up to process streaming data. It processes events one at a time and treats batch processing as a special case. In contrast, Apache Spark treats a data stream as multiple tiny batches making streaming a special case of batch. Stream imperfections like out-of-order events can be easily handled using Flink's event time processing support. Flink provides additional operations that allows implementing cycles within the streaming application and perform several iterations on batch data. Flink provides a vast powerful set of operators to apply functions to a finite group of elements in a stream. Hence complex data semantics can be easily implemented using Flink’s rich programming model.

Flink implements lightweight distributed snapshots to provide low overhead and only-once processing guarantees in stream processing. Flink does not rely entirely on JVM garbage collector and implements a custom memory manager that stores data to process in byte arrays. This allows reducing the load on a garbage collector and increases performance. This enables Flink to process event streams at high throughputs with consistent low latencies. Spark Streaming is trying to catch up with Flink with Structured Streaming release and it seems to be tough fight ahead. There is multiple benchmarking comparing the both after newer releases, with Spark claiming to be faired well, which then is responded by Flink. Apache Flink is relatively new in streaming arena and has smaller community relative to Apache Spark. Also it is primarily used in streaming applications as there is no known adoption of Flink Batch in production. Flink does lack in robustness on node failures when compared to Spark. It also cannot handle skewed data very well (as Spark). With all that said, Apache Flink is the best framework out there for real-time processing and has a fast growing community by each day.