Monday, June 29, 2020

Spark - An In-Memory Cluster Computing Framework

In the modern information age were data is collected at a staggering scale from countless interconnected devices, the need to be able to stream, process and analyze this data often in real time has become crucial for many companies. The Internet of Things (IoT) were objects and devices are embedded with tiny sensors that communicate with each other and the user, creating an interconnected system generating a massive amounts of data. Such large volume of data needs to be processed in real time to drive intelligent solutions and revolutionary features of modern applications. A massive distributed parallel processing of vast amounts and varieties of data is tough to manage with the current analytics capabilities in the cloud. Apache Spark with its stack components enables to process such decentralized data from fog computing solution. Apache Spark is powerful cluster computing engine and it stands out for its ability to process large volumes of data significantly faster than MapReduce because data is persisted in-memory on Spark’s own processing framework. Spark can be used for both batch processing as well as real-time processing. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. Apache Spark also provides a suite of web user interfaces (UIs) which can be used to monitor the status and resource consumption of the Spark cluster. Spark is framework is comprised of Spark Core and four libraries namely, Spark SQL, Spark Streaming, MLlib and GraphX, which are optimized to address different use cases.



Hadoop and Spark
Hadoop was the only major player in Big Data processing until Apache Spark was released in 2014. With Spark providing convenient APIs and increased speeds up to 100 times faster than Hadoop MapReduce, it is dominating the Big Data landscape. Spark is not a replacement for Hadoop entirely, however it provides a promising alternative to Hadoop MapReduce.

Hadoop MapReduce manages scheduling and task allocation processes within the cluster along with workloads which are suited for batch processing. Multiple MapReduce jobs could be strung together to create a data pipeline. In between every stage of that pipeline, the MapReduce code would read data from the disk, and when completed, would write the data back to the disk. This process was inefficient because it had to read all the data from disk at the beginning of each stage of the process. Hadoop MapReduce persists data back to the disk after every map or reduce action. It also kills its processes as soon as a job is complete to reduce memory footprint.

Spark on the other hand does not write the data onto the disk, instead it performs all its activities (transformations) within the memory (RAM) thus increasing the processing performance. Although when data is too large to fit within the memory, Spark can also use the disk to process data, degrading the performance. Even without using in-memory cache, it still outperforms Hadoop MapReduce. For example, Spark set the record for the GraySort benchmark in 2014 by sorting 100 TB of data in 23 minutes compared to 72 minutes using Hadoop MapReduce on a cluster of 2100 nodes in 2013. Spark also overcomes the disk I/O limitations by caching data in memory as much as possible using RDDs to reduce disk I/O. On average, Spark outperforms Hadoop by up to 20 times in iterative algorithms (machine learning), because of Spark's efficient reuse of intermediate results. Spark offers faster approach to process data without passing data through MapReduce processes in Hadoop. Spark is not designed to deal with the data management and cluster administration tasks associated with running data processing and analysis workloads at scale. It leverages Hadoop YARN or Apache Mesos which offer functionalities around distributed cluster management. Spark can also run on top of Hadoop benefiting from Hadoop's cluster manager (YARN) and underlying storage such as HDFS and HBase. Spark without Hadoop, integrates with alternative cluster managers like Mesos and storage platforms like Cassandra and Amazon S3. Spark allows to better manage a wide range of data processing tasks, from batch processing to streaming data and graph analysis.

Spark Architecture
Apache Spark has a well-defined and layered architecture where all the spark components and layers are loosely coupled and integrated with various extensions and libraries. Apache Spark Architecture is based on two main abstractions.

Resilient Distributed Dataset (RDD)
RDD is an immutable, fundamental collection of data elements or datasets that are processed in parallel. RDD is logically partitioned across many servers so that they can be computed on different nodes of the cluster. RDD hides the data partitioning and distribution which allows Spark to design parallel computational framework with a higher-level API using Scala, Python and R. The dataset in RDD (records of data) can be loaded from any datasource, e.g. text/JSON files, a database via JDBC, etc. Each dataset in an RDD can be divided into logical partitions, which are then executed on different worker nodes of a cluster. Spark supports two types of RDD’s – Hadoop Datasets which are created from the files stored on HDFS and the parallelized collections which are based on existing Scala collections. RDD is mostly stored in memory for as much time as possible, although it can also be stored on hard drive if required. Since RDDs are immutable, no changes take place in them once they are created, which allows to maintain consistency over the cluster. RDD can define placement preferences of RDD records in order to compute partitions as close to the records as possible. RDD can be cached either in memory using persist method (or on disk) for faster access, when the RDD is used several times during the processing. RDDs can also be partitioned manually to correctly balance partitions and distribute them across the nodes within a cluster. Generally, smaller partitions allow distributing RDD data more equally among more executors, while it's easy to work with fewer partitions. The number of partitions of a RDD can be controlled by using repartition or coalesce transformations. RDDs live in a single SparkContext that creates a logical boundary. An RDD is a named (by name) and uniquely identified (by id) entity in a SparkContext. RDDs are resilient i.e. fault tolerant and able to recompute missing or damaged partitions caused by node failures with the help of RDD lineage graph.

Spark RDD’s support two different types of operations – Transformations and Actions, which can be performed on RDD as well as on data storage to form another RDDs. Transformations create a new dataset (new RDD) from an existing one, while Actions return a value to the driver program after running a computation on the dataset. Some examples of transformations include map, filter, select, and aggregate (groupBy), while examples of actions are count, show, reduce or writing data out to file systems. When transformation is applied on RDD it creates a DAG (Directed Acyclic Graph) using applied operation, source RDD and function used for transformation. It will build this DAG graph using the references until any Action operation is applied on the last lined up RDD, were the DAG is submitted to DAG scheduler for execution. The result values of action which is the actual dataset are stored to driver program or to the external storage system. Hence spark transformations is lazy, as actual computations is only triggered when an action is invoked. At high level, there are two transformations that can be applied onto the RDDs, namely Narrow transformation and Wide transformation.
Narrow Transformation

Narrow transformation — In Narrow transformation, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. It doesn't require the data to be shuffled across the partitions. Hence the input & output data stays in the same partition, i.e. it is self-sufficient. Only a limited subset of partitions used to calculate the result. Spark groups narrow transformations as a single stage known as pipelining. Common Narrow transformations are map, flatMap, MapPartitions, filter, sample & union.
 





Wide Transformation

Wide transformation — In wide transformation, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. Wide transformations are also known as shuffle transformations because data shuffle is required to process the data.
Common wide transformation are gropupByKey, ReduceByKey, coalesce, repartition, join, intersection.

RDDs can be created using following methods. 
  • Parallelized Collections: The simplest way to create RDDs is by using existing collection from the driver program and passing it to SparkContext's parallelize() method. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. The parallelize method also takes a number of partitions, represented by slices parameter below, which cuts the dataset into specified partitions.  Entire dataset needs to be on one machine in order to operate parallelize method. Due to this property, this process is rarely used outside of testing and prototyping.

    Parallelized collections are created by calling below methods of SparkContext.
    sparkContext.paralleleize(col, slices)
    sparkContext.makeRDD(coll, slices)
    sparkContext.range(start, end, step, slices)
    
  • External Datasets: Spark supports creating RDDs on any storage source supported by Hadoop, including local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat. Text file RDDs can be created using SparkContext's textFile(name, partitions) method. This method takes an URI for the file (either a local path, or URI like hdfs://, s3n://, etc) and reads it as a collection of lines. When using local file system the file is either copied to all workers or shared using network-mounted shared file system. RDD of pairs of a file and its content from a directory can can be created using SparkContext's wholeTextFiles(name, partitions) method. This method reads a directory containing multiple small text files, and returns each of them as (filename, content) pairs. All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz"). The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file and does not allow to have fewer partitions than the blocks. The SparkContext’s sequenceFile[K, V] method where K and V are key and value types respectively in the SequenceFiles file. The SparkContext's hadoopRDD method is used for Hadoop InputFormats which takes JobConf, input format class, key class and value class. The RDD.saveAsObjectFile and SparkContext.objectFile supports saving an RDD in a simple format consisting of serialized Java objects.
  • Existing Spark RDD: A new RDD can also be created from existing RDDs with a different dataset using the transformation process which is carried out frequently as RDDs are immutable. The new RDD created from an existing Spark RDD also carries a pointer to the parent RDD in Spark. All such dependencies between the RDDs are logged in a lineage graph.
Below are the examples of creating new RDDs using above methods.
// creating RDD using parallelize collection
val rdd1 = spark.sparkContext.parallelize(Array("sun","mon","tue","wed","thu","fri"),4)
val result = rdd1.coalesce(3)
result.foreach(println)

// creating RDD using external storage
val dataRDD = spark.read.textFile("path/of/text/file").rdd

// creating RDD using existing RDDs i.e. transformations
val words=spark.sparkContext.parallelize(Seq("sun", "rises", "in", "the", "east", "and", "sets", "in", “the”, "west"))
val wordPair = words.map(w => (w.charAt(0), w))
wordPair.foreach(println)

A RDD Lineage (RDD operator or RDD dependency graph) is a graph of all the parent RDDs of a RDD. It is built as a result of applying transformations to the RDD and creates a logical execution plan. The lineage graph determines what transformations need to be executed after an action has been called. The logical execution plan starts with the earliest RDDs (those with no dependencies on other RDDs or reference cached data) and ends with the RDD that produces the result of the action that has been called to execute. The RDD's toDebugString() method is used to get details of RDD lineage graph.


Intrinsic properties of RDD Object:
  • A list of parent RDD’s that are the dependencies of the RDD.
  • A list of partitions that a dataset is divided into.
  • A compute function for computing each split on partitions.
  • An optional Partitioner for key-value RDDs, that defines the hashing of keys, and the pairs partitioned.
  • An optional list of preferred locations to computer each split on, i.e. hosts for a partition where the records live or are the closest to read from.
The goal of RDD is to reuse intermediate in-memory results across multiple data-intensive workloads with no need for copying large amounts of data over the network. Some of the limitations of RDD include, RDD is not optimized to work with structural data, it cannot infer schema of the ingested data and RDDs degrade performance when there is not enough memory to store them.

Directed Acyclic Graph (DAG)
A Directed Acyclic Graph (DAG) is a graph that has no cycles and flows in one direction. DAGs are useful for representing many different types of flows, including data processing flows. The DAG in Spark are used to perform sequence of computations on the data (RDDs). The DAG consists of nodes representing RDD partitions, while the directed edges representing the transitions (transformation) from one data partition state to another. DAG is the scheduling layer of the Apache Spark architecture that implements stage-oriented scheduling. Compared to MapReduce that creates a graph in two stages (Map and Reduce), Spark creates the DAGs which may contain multiple stages forming a tree-like structure. The DAG scheduler divides the operators into stages of tasks. A stage is comprised of tasks which are based on partitions of the input data (block size). The DAG scheduler pipelines operators together. For e.g. Many map operators can be scheduled in a single stage. The final result of the DAG scheduler is a set of stages. The Stages are passed on to the Task Scheduler. The task scheduler launches tasks via cluster manager (Spark Standalone/Yarn/Mesos). The task scheduler is unaware about the dependencies between the stages of tasks. The worker nodes execute the tasks on the Slave.



The Apache Spark framework uses a master–slave architecture that consists of a driver, which runs as a master node, and many executors that run across as worker nodes in the cluster. Executors are agents that are responsible for executing a task. The Driver Program calls the main program of an application and creates SparkContext. A SparkContext consists of all the basic functionalities. The Spark driver is a JVM process that coordinates workers and execution of the task. The driver contains various other components such as DAG Scheduler, Task Scheduler, Backend Scheduler, and Block Manager, which are responsible for translating the user-written code into jobs that are actually executed on the cluster. Spark Driver and SparkContext collectively watch over the job execution within the cluster. Spark Driver works with the Cluster Manager (YARN, Mesos) which allocates resources to manage various jobs. Each job is split into multiple smaller tasks which are further distributed to worker nodes. An action is one of the ways of sending data from Executer to the driver.




Broadcast Variables and Accumulators

It is important to note that Spark breaks the transformation and action computations into tasks to run on separate machines, and each machine runs both its part of the transformation (e.g. map) and a local reduction, returning only its answer to the driver program. Spark breaks up the processing of RDD operations into tasks which are individually executed by an executor. Prior to execution, Spark computes the closure of the task by identifying the variables and methods which must be visible for the executor to perform its computations on the RDD. The closure is then serialized and sent to each executor as copies to be executed individually. Hence regular variables should not be referenced in such transformation/action operations as they are executed separately on different machines.
       Sometimes though, a variable needs to be shared across multiple tasks, or between tasks and the driver program. Spark supports two types of shared variables, broadcast variables which can be used to cache a value in memory on all nodes, and accumulators which are variables that are only added to, such as counters and sums.

Broadcast Variables: Broadcast variables is a read-only variable cached on each machine as opposed to variable copies send with tasks. They are used to give every node a copy of a large input dataset in an efficient manner. Spark distributes the broadcast variables using actions that are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. Hence explicitly created broadcast variables are useful only when tasks across multiple stages need the same data. Broadcast variables are created from a variable v by calling SparkContext.broadcast(v) and its value can be accessed by calling the value() method. The broadcast variable is used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. Also the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable. The unpersist() method is used to temporarily release while destroy() method to permanently release all the resources of the broadcast variable copied onto executors.

Accumulators: Accumulators are variables that are added to through associated operations. They are used for aggregating information across the executors and to implement counters or sums. Accumulators provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. Spark supports accumulators of only numeric types but allows to add new types. Accumulators can unnamed or named accumulators, were named accumulator is displayed in Spark's web UI for the stage that modifies that accumulator along with its value. A numeric accumulator can be created by calling SparkContext.longAccumulator() or SparkContext.doubleAccumulator() to accumulate values of type Long or Double, respectively. Tasks running on a cluster can add to the accumulator using the add method. However only the driver program can read the accumulator’s value, using its value method. Spark guarantees that each task’s update to the accumulator inside actions will only be applied once, regardless of any restarted tasks. For RDD transformations, each task’s update may be applied more than once if tasks or job stages are re-executed. Accumulators do not change the lazy evaluation model of Spark. The accumulator value is only updated once the RDD is computed as part of an action. As a result of this, accumulators used inside functions like map() or filter() wont get executed unless some action happen on the RDD.

Shuffle operations

Shuffle operations re-distribute the data across partitions such that they are grouped differently. It involves copying data across executors on different machines thus making the shuffle a complex and costly operation (involves disk and network I/O). Prime examples of shuffle is reduceByKey or aggregateByKey operation. The result of executing a reduce function against all the values associated with a single key is single value combining all values associated with the key. Since not all values for a single key necessarily reside on the same partition, or even the same machine, poses a challenge as they must be co-located to compute the result. During computation, a single task operates on a single partition to organize all the data to execute a single reduce function task. Spark then performs an all-to-all operation were it reads from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key. Internally Spark generates sets of map tasks to organize the data, and a set of reduce tasks to aggregate them. The map tasks reside in memory until they fit (written to disk when size exceeds) and are sorted based on the target partition to write into a single file. The reduce tasks read the relevant sorted blocks to aggregate the result. The newly shuffled data has no ordering of partitions or elements. The methods mapPartitionsrepartitionAndSortWithinPartitions and sortBy are used to ordered the resultant shuffled data. The repartition operations like repartition and coalesce cause shuffle.

There are two main shuffle implementations available in Spark, namely Sort Shuffle and Tungsten Sort. Although before Spark 1.2.0, Hash Shuffle was used as default, which created a separate file for each mapper task and for each reducer, resulting in large number of open files in the filesystem which impacted the performance for large operations. Sort Shuffle implementation outputs a single file ordered and indexed by reducer id, which allows to easily fetch the chunk of data related a given reducer id by using the position of related data block in the file and doing a single file seek operation before reading the file. It also fallbacks to having separate files for the reducers when the amount of reducers is smaller than spark.shuffle.sort.bypassMergeThreshold. In Sort Shuffle, the sorted results of map operation are not reused for reduce operation, instead the results of reduce operation are sorted again using TimSort. The intermediate results are written to disk when not enough memory is available. Sort Shuffle implementation creates smaller files for map operation, but sorting of results is slow compared to Hash Shuffle.

Project Tungsten was the largest change to Spark’s execution engine since the project’s inception. It focuses on substantially improving the efficiency of memory and CPU for Spark applications, to push performance closer to the limits of modern hardware. The cache-aware computation allows Spark applications to spend less CPU time waiting to fetch data from main memory and more time doing useful work. It also used code generation to exploit modern compilers and CPUs. As Spark applications pushed the boundary of performance, the overhead of JVM objects and GC had become non-negligible. Java objects had a large inherent memory overhead, with a simple 4 byte string becoming over 48 bytes in total in JVM object model, due to UTF-16 encoding, byte headers and hash code data. Further JVM garbage collection exploits the transient nature of young generation objects having a high rate of allocation/deallocation, which works only when GC can reliably estimate the life cycle of objects. As Spark knows much more information than the JVM garbage collector about the life cycle of memory blocks, it can manage memory more efficiently than the JVM. Hence an explicit memory manager was introduced to convert most Spark operations to operate directly against binary data rather than Java objects. It builds on sun.misc.Unsafe, an advanced functionality provided by the JVM that exposes C-style memory access e.g. explicit allocation, deallocation, pointer arithmetics. Also each method call is compiled by JIT into a single machine instruction. 
             Tungsten Sort is a shuffle implementation as part of Project Tungsten, which operates directly on serialized binary data (without deserializing it) and uses memory copy functions sun.misc.Unsafe to directly copy serialized data. It also uses special cache-efficient sorter ShuffleExternalSorter that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, it works more efficiently with CPU cache. The serialized data spilled from the memory is directly stored on the disk. The extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized stream output, which is enabled by shuffle.unsafe.fastMergeEnabled parameter. Tungsten sort also uses off-heap storage array using LongArray to save 64 bit long pointers which are used to access all our memory blocks (in heap/ off heap). Tungsten sort works only for non-aggregation operation, as aggregation requires deserialization to aggregate new incoming values.

RDD Persistence and Caching

When an RDD is cached or persisted, each node stores any RDD partitions which is computed within the memory in order to reuse them for other actions on the same or derived dataset. Due to caching and reuse of RDDs, the performance of the future RDD operations increase by over 10 times. An RDD can be marked for persistence or caching by using the persist() or cache() methods on it. Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. An RDD can be removed manually from the cache instead of waiting for it to fall out of the cache, the RDD.unpersist() method is used. Spark’s cache is fault-tolerant as any lost RDD partition is automatically recomputed using the transformations that originally created it. Also each persisted RDD can be stored using a different storage level based on storage location and format. These storage levels are set by passing a StorageLevel object to the persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY. Below are the set of storage levels.

Storage Level Description
MEMORY_ONLY RDDs are stored as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK RDDs are stored as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER RDDs are stored as serialized Java objects. It is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2 Same as the levels above, but replicate each partition on two cluster nodes.


Spark Core
Spark Core contains the basic functionality of Spark, including components for task scheduling, memory management, fault recovery, interacting with storage systems etc. Spark Core also has API that defines RDDs (Resilient Distributed Datasets), which are Spark’s main programming abstraction. RDDs represent a collection of items distributed across many compute nodes that can be manipulated in parallel. Spark Core provides many APIs for building and manipulating these collections.

Spark SQL
Spark SQL is Spark’s package for processing structured and semi-structured data. It allows querying data via SQL as well as Hive Query Language (HQL) and supports many sources of data, including Hive tables, Parquet, and JSON. Spark SQL sits on top of Spark Core to introduce a new data abstraction called SchemaRDD, which provides support for structured and semi-structured data. Spark SQL provides a natural syntax for querying JSON data along with automatic inference of JSON schemas for both reading and writing data. It understands the nested fields in JSON data and allows users to directly access these fields without any explicit transformations. Spark SQL can also act as a distributed query engine using its JDBC/ODBC or command-line interface. In this mode, end-users or applications can interact with Spark SQL directly to run SQL queries, without the need to write any code.

DataFrames

A DataFrame is a distributed collection of data organized into rows and named columns. It organizes data into rows, where each row consists of a set of columns, and each column has a name and an associated type. It is conceptually equivalent to a table in a relational database, but with richer optimizations under the hood. DataFrame works only on structured and semi-structured data by organizing the data in the named column. Data is stored in row-columnar format, row chunk size is set by spark.sql.inMemoryColumnarStorage.batchSize. Each column in each partition stores min-max values for partition pruning. It allows better comparison ratio than standard RDD. It also delivers faster performance for small subsets of columns. DataFrame serializes the data into off-heap storage (in memory) in binary format and then performs many transformations directly on it. The Tungsten physical execution explicitly manages memory and dynamically generates byte code for expression evaluation. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API provides a higher-level abstraction which allows to use a query language to manipulate data. It is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows i.e. Dataset[Row] in Scala. DataFrames operations are referred to as untyped transformations, in contrast to typed transformations of DataSets.
// create a basic SparkSession using SparkSession.builder()
val spark = SparkSession.builder()
                        .appName("Spark SQL basic example")
                        .config("spark.some.config.option", "some-value")
                        .getOrCreate()
                        
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

val rdd = sc.parallelize(1 to 10).map(x => (x, x * x))
val dataframe = spark.createDataFrame(rdd).toDF("key", "square")
dataframe.show()
// creates a DataFrame based on the content of a JSON file
val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()

// Select only the "name" column
df.select("name").show()

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()

// Select people older than 21
df.filter($"age" > 21).show()

// Count people by age
df.groupBy("age").count().show()

Temporary views in Spark SQL are session-scoped and they disappear once the session terminates. A global temporary view is used for sharing the view among all the sessions and keeping it alive until the Spark application terminates. Global temporary view is tied to a system preserved database global_temp, and a qualified name must be used to refer it, e.g. SELECT * FROM global_temp.view1.
// Register the DataFrame as a SQL temporary view which is only available within the session
df.createOrReplaceTempView("people")
// SQL function on SparkSession enables to run SQL queries programmatically and return DataFrame as a result
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

// Register the DataFrame as a global temporary view which is shared among all sessions and is alive until Spark application terminates
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// Global temporary view is available across sessions
spark.newSession().sql("SELECT * FROM global_temp.people").show()

DataSets

Dataset is a strongly-typed, immutable distributed collection of objects that are mapped to a relational schema. Datasets efficiently processes structured and unstructured data. A Dataset can be constructed from JVM objects of row or a collection of row object, which can be manipulated using functional transformations (map, flatMap, filter, etc.) similar to RDD. The Spark Dataset API is an extension to Data frame API and provides both type safety and object-oriented programming interface. It is available in Scala and Java. At the core of the Dataset API is a new concept called an encoder, which is responsible for converting between JVM objects and tabular representation. The tabular representation is stored using Spark’s internal Tungsten binary format, allowing for operations on serialized data and improved memory utilization. Encoders serialize the objects into binary format for efficient processing or transmitting of data over the network. Encoders are code generated dynamically and use a format that allows Spark to perform many operations like filtering, sorting and hashing without deserializing the bytes back into an object. Spark supports automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans. The highly optimized encoders use runtime code generation to build custom bytecode for serialization, speeding up the serialization process and significantly reducing the size of encoded data. The encoders also serve as a powerful bridge between semi-structured formats (e.g. JSON) and type-safe languages like Java and Scala. Dataset also provides compile-time type safety which enables to check for errors before running production applications. Aggregate operations in datasets runs much faster than the corresponding naive RDD implementation. Dataset API also reduces the memory usage, by creating a more optimal layout in memory when caching Datasets.

// Define spark context and create instance of SQLContext
val conf = new SparkConf().setAppName("SQL Application").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Required to ensure that toDF() and toDS() methods works as expected
import spark.implicits._

// Read JSON file to initialize as University DataSet
case class University(name: String, numStudents: Long, yearFounded: Long)
val schools = sqlContext.read.json("/schools.json").as[University]
schools.map(s => s"${s.name} is ${2015 – s.yearFounded} years old")

// create a Dataset using SparkSession.createDataset() and the toDS 
val movies = Seq(Movie("Avengers", "Awesome", 2019L),  Movie("Justice League", "Nice", 2018L))
val moviesDS = sqlContext.createDataset(movies)
moviesDS.show()
val moviesDS1 = movies.toDS()
moviesDS1.show()

// Encoders are created for case classes
case class Employee(name: String, age: Long)
val caseClassDS = Seq(Employee("Amy", 32)).toDS
caseClassDS.show()

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// convert DataFrame to strongly typed Dataset
case class Movie(actor_name:String, movie_title:String, produced_year:Long)
val movies = Seq(("Damon, Matt", "The Bourne Ultimatum", 2007L), ("Damon, Matt", "Good Will Hunting", 1997L))
val moviesDF = movies.toDF.as[Movie]

Spark SQL can load data from JSON data source and execute Spark SQL query. The schema of dataset is automatically inferred and natively available without any user specification. This can be achieved using SQLContext's jsonFile and jsonRDD methods. We can create a SchemaRDD for a given JSON dataset and then can register the SchemaRDD as a table as shown in below example. SchemaRDDs can be created from many types of data sources, such as Apache Hive tables, Parquet files, JDBC, Avro file, or as the result of queries on existing SchemaRDDs. Since JSON is semi-structured and different elements might have different schemas, Spark SQL resolves conflicts on data types of a field. It is not required to know all fields appearing in the JSON dataset. The specified schema can either be with a subset of the fields appearing in the dataset or can have field that does not exist. It is easy to write SQL queries on the JSON dataset and the result of a query is represented by another SchemaRDD.
// Create a SQLContext using an existing SparkContext (sc)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create a SchemaRDD for the JSON dataset. 
val employee = sqlContext.jsonFile("/path/to/employee.json")
// Register the created SchemaRDD as a temporary table.
employee.registerTempTable("employee")
// Visualize the employee schema
employee.printSchema()

// Example content of the employee JSON file is {"name":"Jackson", "address":{"city":"Los Angeles","state":"California"}}
val nameAndAddress = sqlContext.sql("SELECT name, address.city, address.state FROM employee")
nameAndAddress.collect.foreach(println)
Spark SQL supports two different methods for converting existing RDDs into Datasets. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. It is more concise and works well when the schema is already known while writing the Spark application. The case class, which can contain complex types, defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. The second method for creating Datasets is through a programmatic interface which allows to construct a schema and then apply it to an existing RDD. This method is verbose but it allows to construct Datasets when the columns and their types are unknown until runtime. First an RDD of Rows is created from the original RDD, then schema is created matching the structure of Rows in the newly created RDD, and finally the schema is applied to the RDD of Rows via createDataFrame method provided by SparkSession.

Spark SQL supports Built-in and User Defined Scalar Functions, that return a single value per row. Similarly Spark SQL also supports Built-in and User Defined Aggregate functions such as count(), countDistinct(), avg(), max(), min(), etc that return a single value on a group of rows.

Catalyst optimizer

Catalyst optimizer is the core of Spark SQL which builds an extensible query optimizer. Catalyst optimizer contains general library for representing trees and applying rules to manipulate them. On top that it has libraries specific to relational query processing (e.g., expressions, logical query plans), and sets of rules which handle different phases of query execution; analysis, logical optimization, physical planning, and code generation to compile parts of queries to Java bytecode. Catalyst also offers several public extension points, including external data sources and user-defined types. Catalyst also supports both rule-based and cost-based optimization. The data type in Catalyst is an immutable tree composed of node objects, which can be either a Literal, an Attribute or an Operation. Trees are manipulated by rules, which apply a pattern matching function recursively on all the nodes of the tree, transforming the nodes that match each pattern to a result. Rules are executed multiple times in order to fully transform a tree. Catalyst groups rules into batches, and executes each batch until it reaches a fixed point were the tree stops changing after applying its rules. The Catalyst’s tree transformation is used four phases, (1) analyzing a logical plan to resolve references, (2) logical plan optimization, (3) physical planning, and (4) code generation to compile parts of the query to Java bytecode. In the physical planning phase, Catalyst generates one or more physical plans and selects a plan using a cost model. All other phases are purely rule-based optimizations such as pipelining projections or filters into one Spark map operation performed in physical planner. Each phase uses different types of tree nodes. Catalyst relies on a special feature of the Scala language, quasi quotes, to make code generation simpler.

Performance Tuning

Spark SQL can cache tables using an in-memory columnar format by calling spark.catalog.cacheTable("tableName") or dataFrame.cache(). Then Spark SQL will scan only required columns and will automatically tune compression to minimize memory usage and GC pressure. The spark.catalog.uncacheTable("tableName") method is called to remove the table from memory. Configuration of in-memory caching can be done using the setConf method on SparkSession or by running SET key=value commands using SQL. Spark SQL supports providing join strategy hints, namely BROADCAST, MERGE, SHUFFLE_HASH and SHUFFLE_REPLICATE_NL, which instructs Spark to use the hinted strategy on each specified relation when joining them with another relation.  Spark prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint over the SHUFFLE_REPLICATE_NL hint, when both sides of the join have hints. Spark SQL also provides coalesce hints to control the number of output files similar to the coalesce, repartition and repartitionByRange in Dataset API. Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default and can be turned on using the umbrella configuration of spark.sql.adaptive.enabled. As of Spark 3.0, AQE has three major features, including coalescing post-shuffle partitions, converting sort-merge join to broadcast join, and skew join optimization.

Structured Streaming

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine.  It takes care of running streaming computation incrementally and continuously and updating the final result as streaming data continues to arrive. The Spark SQL engine also ensures fault-tolerance through checkpointing and Write-Ahead Logs. The Spark SQL's Dataset/DataFrame API can also be used for stream aggregations, event-time windows, stream-to-batch joins etc. Structured Streaming queries are processed by default using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end low latencies (100 ms) and exactly-once fault-tolerance guarantees. A new low-latency processing mode called Continuous Processing is added since Spark 2.3, which achieves end-to-end latencies as low as 1 millisecond with fault-tolerance guarantees. Structured Streaming treats live data stream as a table which is being continuously appended and runs the standard batch-like query as an incremental query on the unbounded table. For every data item arriving on the stream (every second), a new row being appended to the Input Table which eventually updates the Result Table. The source data item is then discarded after updating the result. Whenever the result table gets updated the changed result rows is updated to an external sink. There are few types of built-in output sinks, e.g. File sink, Kafka sink, Foreach sink, and, Console and Memory sink for debugging. The result rows written to external storage is based on the defined output mode. In the Complete Mode the entire updated Result Table is written to the external storage, while for Append Mode and Update Mode, only new rows are appended or updated respectively in the Result Table. The foreach and foreachBatch operations allow to apply arbitrary operations and writing logic on the output of a streaming query. The Dataset.writeStream() also provides trigger settings for a streaming query, which define the timing of streaming data processing, whether the query is going to be executed as micro-batch query with a fixed batch interval or as a continuous processing query.
val spark = SparkSession.builder
                        .appName("NetworkWordCount")
                        .getOrCreate()
  
import spark.implicits._

// lines is the DataFrame representing an unbounded table containing the streaming text data
val lines = spark.readStream
                 .format("socket")
                 .option("host", "localhost")
                 .option("port", 9999)
                 .load()

// First the .as[String] converts DataFrame to a Dataset of String, then the lines are split into words using flatMap()
val words = lines.as[String].flatMap(_.split(" "))

// Dataset are grouped by unique values to generate running word count Dataframe
val wordCounts = words.groupBy("value").count()

// Start running the query in background with complete output mode, that prints running counts on console
val query = wordCounts.writeStream
                      .outputMode("complete")
                      .format("console")
                      .start()

// using the query object handle, wait for the termination of the query
query.awaitTermination()

Event-time is the time when the data was generated and is embedded in the data itself, which allows to perform window-based aggregations. Since Spark is updating the Result Table, it has full control over updating old aggregates when there is arrival of late data (based on event-time), as well as cleaning up old aggregates to limit the size of intermediate state data. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream. The engine uses checkpointing and write-ahead logs to record the offset range of the data being processed in each trigger (interval). Structured Streaming ensures end-to-end exactly-once semantics under any failure, using replayable sources and idempotent sinks.

Structured Streaming allows all kinds of operations to be applied on streaming DataFrames/Datasets, ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame as well as another streaming Dataset/DataFrame. The result of the streaming join is generated incrementally and will be the exactly the same as if it was with a static Dataset/DataFrame. Structured Streaming supports inner join and some type of outer joins between a streaming and a static DataFrame/Dataset. With Spark 2.3, support for stream-stream joins is added which enables to join two streaming Datasets/DataFrames. Since any row received from one input stream can match with any future yet-to-be-received row from the other input stream, the past input is buffered as the streaming state in order to match every future input with past input and accordingly generate joined results. All the late and out-of-order data are handled using watermarks in Stream-stream Joins. Structured Streaming provides a StreamingQuery object which is created when a query is started to monitor (progress and errors) and manage the query.

Continuous Processing

Continuous processing is a new, experimental streaming execution mode introduced in Spark 2.3 that enables low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees. On contrary the default micro-batch processing engine provides the latencies at best of ~100ms. The Continuous processing engine launches multiple long-running tasks that continuously read data from sources, process it and continuously write to sinks. The number of tasks required by the query depends on how many partitions the query can read from the sources in parallel which depends on number of cores within the cluster. Since it is an experimental mode, currently there is no automatic retry mechanism for failed tasks and requires tasks to be restarted manually from the checkpoint. Further stopping a continuous processing stream may produce spurious task termination warnings which could safely be ignored.

Spark Streaming
Spark Streaming is a Spark component that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It leverages Spark Core's fast scheduling capability to perform streaming analytics, by ingesting data in mini-batches and performing RDD transformations on those mini-batches of data.

The traditional stream processing systems are designed with a continuous operator model to process the data. In such systems, there is a set of worker nodes, each running one or more continuous operators. Each continuous operator processes the streaming data one record at a time and forwards the records to other operators in the pipeline. There are “source” operators for receiving data from ingestion systems and “sink” operators which output data to downstream systems. When such system is scaled to handle large volumes of real time data, node failures and unpredictable traffic causes system failures and uneven resource allocation. In case of node failure, the system has to restart the failed continuous operator on another node and replay some part of the data stream to recompute the lost information, thus halting the pipeline. Spark Streaming solves these issues by using a new architecture called discretized streams leveraging the fault tolerance and other features from the Spark engine.



Spark Streaming discretizes the streaming data into tiny, sub-second micro-batches. Such splitting of stream data into micro-batches allows for fine-grained allocation of computations to resources, thus load balancing tasks across the workers. Initially Spark Streaming’s Receivers receive data in parallel from the source and separates them into blocks. The source is polled after every batch interval defined in the application to create a batch for each incoming record. Then the receiver replicates the block of data among the executors (worker nodes), buffering it within their memory. The Receivers are long running process in one of the Executors with its life span same as the driver program. Then the latency-optimized Spark engine runs short tasks (tens of milliseconds) to process the batches and output the results to other systems. Internally the Driver launches tasks on every batch interval to process the blocks of data and the subsequent results are sinked to the destination location. Spark tasks are assigned dynamically to the workers based on the locality of the data and available resources. This enables both better load balancing and faster fault recovery. Spark can handle node failures by relaunching the failed tasks in parallel into other nodes within the cluster, evenly distributing all the recomputations across many nodes. Each batch of streaming data is represented by an RDD, a fault-tolerant distributed dataset in Spark. A series of such RDDs is called a DStream. The common representation of RDD allows batch and streaming workloads to interoperate seamlessly. It allows to introduce new operators dynamically for ad-hoc queries which combine streaming data with static datasets or support interactive queries. Spark interoperability also extends to rich libraries like MLlib (machine learning), SQL, DataFrames, and GraphX. This allows Spark to support advanced analytics like machine learning and SQL queries were workloads are complex and require continuously updates to data models. Since Spark worker/executor is a long-running task, it occupies one of the cores allocated to the Spark Streaming application. Hence the Spark Streaming application needs to allocate enough cores (or threads, if running locally) to process the received data, as well as to run the receiver(s). In other words, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. 



DStreams

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset. Each RDD in a DStream contains data from a certain interval. Any operation applied on a DStream translates to operations on the underlying RDDs. There are two types of operations performed on DStreams i.e transformations and output operations. Every Spark Streaming application processes the DStream RDDs using Spark transformations which create new RDDs. Any operation applied on a DStream translates to operations on the underlying RDDs, which in turn, applies the transformation to the elements of the RDD. Output operations, like saving the HDFS or calling an external API produces output in batches. Similar to RDDs, DStreams allow to persist the stream’s data in memory using the persist() method.

Input DStreams

Input DStreams are DStreams representing the stream of input data received from streaming sources. Every input DStream (except file stream) is associated with a Receiver object which receives the data from a source and stores it in Spark’s memory for processing. Spark Streaming provides two categories of built-in streaming sources.
  • Basic sources: Sources directly available in the StreamingContext API. For example: file systems, socket connections, and Akka actors.
  • Advanced sources: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through extra utility classes which require adding extra dependencies.
Multiple streams of data can be received in parallel in a streaming application by creating multiple input DStreams. This will create multiple receivers which will simultaneously receive multiple data streams. 

Failure Recovery and Checkpoints

When there is a failure in an Executor, the Receiver and the stored memory blocks are lost and the Driver triggers a new receiver and the tasks will be resumed using the replicated memory blocks. On other hand when the Driver Program itself fails, then the corresponding Executors as well as their computations, and all the stored memory blocks will be lost. In such scenario, Spark provides a feature called DStream Checkpointing. Checkpointing enables a periodic storage of DAG of DStreams to fault tolerant storage e.g. HDFS. So when the Driver, Receiver and Executors are restarted, the Active Driver program can make use of this persisted Checkpoint state to resume the processing. Even if we are able to restart the Checkpoint state and start processing from the previous state with the new Active Executor, Driver program, and Receiver - we need to have a mechanism to recover the memory blocks at that state. In order to achieve this, Spark comes with a feature called Write Ahead Log (WAL) - This will synchronously saves memory blocks into fault-tolerant storage.

Spark streaming provides checkpointing of Metadata and actual data. Metadata includes the configuration used to create the streaming application, the set of DStream operations that define the streaming application and jobs  are queued but have not completed yet. Metadata checkpoints are used to recover the  from failure of the node running the driver of the streaming application. After the RDD transformations, the generated RDDs depend on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increases in recovery time, intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains. Checkpoints are generally enabled for stateful transformations when either updateStateByKey or reduceByKeyAndWindow are used in the application or for recovering from driver failure using Metadata checkpoints. Checkpoints can be enabled by setting a directory in a fault-tolerant and reliable file system for e.g., HDFS, S3, etc used to store checkpoint data. It is achieved by using streamingContext.checkpoint(checkpointDirectory). Further to recover the application from driver failures, the application creates a new StreamingContext when started for first time, but it re-creates the StreamingContext from the checkpoint data when it is restarted after failure. Since checkpointing of RDDs by saving into a reliable storage increases the processing time, it is recommended that the default batch interval for checkpoint should be 10 seconds. Accumulators and Broadcast variables cannot be recovered from checkpoint in Spark Streaming

Example of Spark Streaming

Below is the example of Spark Streaming dataset from netcat server as a source and computation of word count on the incoming data. The Spark configuration is defined and Master URL for spark cluster is configured (using args(0)). The setMaster() method of SparkConf allows to configure a Spark, Mesos, Kubernetes or YARN cluster URL, or a special “local[*]” string to run in local mode (detects the number of cores in the local system). The * in “local[*]” corresponds to number of threads configured to run the tasks locally. When using input DStream based on receiver (e.g. sockets, Kafka, etc.), it is recommended to have a thread for processing the received data and additional threads equal to the number of receivers running in the system. Spark internally creates a SparkContext which can be accessed as ssc.sparkContext. Then a new StreamingContext is initialized with batch interval as 1 second. The streaming context's, ssc.socketTextStream(…) creates a DStream from text data received over a TCP socket connection. Besides sockets, the StreamingContext API provides methods for creating DStreams from files and Akka actors as input sources.

The streaming context connects with the receiver and defines micro batches using the time duration specified.
      def main(args: Array[String]) {
        if (args.length < 3) {
          System.err.println("Usage: NetworkWordCount <master> <hostname> <port> <duration> <checkpoint directory>")
          System.exit(1)
        }
    
        StreamingExamples.setStreamingLogLevels()
    
        val ssc = StreamingContext.getOrCreate(args(4), () => createContext(args))
    
        // Start receiving data and processing
        ssc.start()
        // Wait for the processing to be stopped (manually or due to any error)
        ssc.awaitTermination()
      }
    
      def createContext(args: Array[String]) = {
    
        // Create a local StreamingContext with two working thread and batch interval of 1 second.
        // The master requires 2 cores to prevent a starvation scenario. Hence args(0) is local[2].
        val sparkConf = new SparkConf().setMaster(args(0)).setAppName("NetworkWordCount")
        sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    
        val ssc = new StreamingContext(sparkConf, Seconds(args(3).toInt))
    
        // Create a socket stream(ReceiverInputDStream) on target ip:port
        val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    
        // Split words by space to form DStream[String]
        val words = lines.flatMap(_.split(" "))
    
        // count the words to form DStream[(String, Int)]
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

        // print the word count in the batch    
        wordCounts.print()
    
        ssc.checkpoint(args(4))
    
        ssc
      }

File Streams
DStream can be created for reading data from files on any file system compatible with the HDFS API (e.g. HDFS, S3, NFS, etc.), using streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory). Spark streaming provides monitoring of special directory called dataDirectory and process any files created in the dataDirectory except files in nested directories or files with different formats. Once files are moved into dataDirectory they cannot be changed, as files updated with new data will not be read. The simple text files can be read using streamingContext.textFileStream(dataDirectory). Reading of file streams does not require running a receiver, hence no allocation of CPU cores.

It is very important to state that once a streaming context has been started, no new streaming computations can be set up or added to it. Also once a context has been stopped, it cannot be restarted. Further only a single StreamingContext can be active in a JVM at any given time. The stop() method on StreamingContext stops both the StreamingContext and the internal SparkContext. To stop only the StreamingContext, the optional parameter stopSparkContext of stop() is passed as false. A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped, without stopping the original SparkContext, before the next StreamingContext is created.

Receiver Reliability
When the Spark system is receiving data from reliable sources e.g. Kafka or Flume, acknowledging that the data received is correct ensures that there is no data loss in case of any failure. To enable this Spark provides two kinds of receivers:

Reliable Receiver: Sends acknowledgment to a reliable source that the data has been received correctly and stored in Spark with Replication.

Unreliable Receiver: It doesn’t send any acknowledgement to a source upon receiving data. It is used for sources that do not support acknowledgement, or don't require additional complexity of acknowledgement.

Transformations on DStreams

Transformation Description entity
map(func) return a new DStream by passing each element of the source DStream through a function func
flatMap(func) each input item can be mapped to 0 or more output items
filter(func) return only the records of the source DStream on which the predicate func returns true
repartition(numPartitions) Changes the level of parallelism in this DStream by creating more or fewer partitions
count() counting the number of elements in each RDD of the source DStream
reduce(func) Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func
reduceByKey(func, [numTasks]) return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function

Window Operations

Spark Streaming also provides windowed computations, which allow you to apply transformations over a sliding window of data. Every time the window slides over a source DStream, the source RDDs that fall within the window are combined and operated upon to produce the RDDs of the windowed DStream. The below diagram shows that the window operation is applied over the last 3 time units of data, and slides by 2 time units. Any window operation takes parameters window length, the duration of the window and the sliding interval, interval at which the window operation is performed. Both the window length and sliding interval should be multiples of the batch interval of the source DStream.



The below example applies reduceByKey operation on the pairs of DStream of (word, 1) pairs over the last 30 seconds of data, using the operation reduceByKeyAndWindow.
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

updateStateByKey

The updateStateByKey operation allows to maintain arbitrary state while continuously updating it with new information. This can be achieved by following steps.

Define the state - The state can be an arbitrary data type.
Define the state update function - Specify with a function how to update the state using the previous state and the new values from an input stream.

Spark applies the state update function for all existing keys, regardless of them having new data in a batch or not. If the update function returns None then the key-value pair will be eliminated.
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

Data Serialization

Spark streaming enables serialization of below two types of data, mostly to persist data.

Input data: By default, the input data received through Receivers is stored in the executors’ memory with StorageLevel.MEMORY_AND_DISK_SER_2. The data is serialized into bytes to reduce GC overheads, and replicated for tolerating executor failures. Also, the data is kept first in memory, and spilled over to disk only if the memory is insufficient to hold all of the input data necessary for the streaming computation. Such serialization is an overhead as the receiver must deserialize the received input data and re-serialize it using Spark’s serialization format.

Persisted RDDs generated by Streaming Operations: RDDs generated by streaming computations may be persisted in memory. For example, window operations persist data in memory as they would be processed multiple times. However, unlike the Spark Core default of StorageLevel.MEMORY_ONLY, persisted RDDs generated by streaming computations are persisted with StorageLevel.MEMORY_ONLY_SER i.e. serialized by default to minimize GC overheads.

Transform Operation

The transform operation, along with variations like transformWith, allows arbitrary RDD-to-RDD functions to be applied on a DStream. It can be used to apply any RDD operation that is not directly exposed in the DStream API. This enables to for example to do real-time data cleaning by joining the input data stream with precomputed spam information and then filtering based on it.
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}

Join Operations

Streams can be very easily joined with other streams using different kinds of joins in Spark Streaming. For each batch interval, the RDD generated by stream1 will be joined with the RDD generated by stream2. It also supports the leftOuterJoin, rightOuterJoin, fullOuterJoin operations. Stream can be joined with a dataset using DStream.transform operation. The function provided to transform is evaluated for every batch interval and therefore it will use the current dataset that dataset reference points to.


Spark ML
ML is a distributed machine learning framework above Spark because of the distributed memory-based Spark architecture. Spark MLlib is nine times as fast as the Hadoop disk-based version of Apache Mahout (before Mahout gained a Spark interface). Mahout machine learning library running on MapReduce is slow. MLlib is Apache Spark's scalable machine learning library which substitutes Mahout. It implements a set of commonly used machine learning and statistical algorithms which include correlations and hypothesis testing, classification and regression, clustering, and principal component analysis.

GraphX
GraphX is a distributed graph-processing framework on top of Spark. It provides an API for expressing graph computation that can model the user-defined graphs by using Pregel abstraction API. GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and aggregateMessages) as well as an optimized variant of the Pregel API to support graph computation.

Limitations of Spark
Spark was not designed as a multi-user environment. Spark users are required to know whether the memory they have access to is sufficient for a dataset. Adding more users further complicates this since the users will have to coordinate memory usage to run projects concurrently. Due to this inability to handle this type of concurrency, users will want to consider an alternate engine, such as Apache Hive, for large, batch projects. Spark Streaming works with the ingestion (input) timestamp of the data rather than the event-time. Due to this it puts the data in a batch even if the event was generated earlier and belonged to the earlier batch, which may result in less accurate information as it is equal to the data loss.