With the growing need of processing huge data it is eminent that computing at this scale with a real time component, isn’t a piece of cake using simple client-server architecture. The need was to come up with an architecture to handle the massive quantities of data by taking advantage of both batch and stream processing across clusters, with a fault tolerance and delegation in mind. It also required selecting the right set of tools that can handle and cater to such massive data processing. Though, there are several right tools that fits into the Lambda  architecture, the most famous combination that exists as of today is Kafka for Messaging and Queuing, Spark + Streaming for processing data and Cassandra (NoSQL database) as a persistence layer.

In this article we will be exploring a simple program that utilizes all of these technologies and provide a good foundation of end-to-end integration, to get you started with the Lambda Architecture. I will stick to keeping it as simple as possible, so that one does not get overwhelmed in trying to understand all the tools at once. I will still use the latest concepts and code to make this session worth an exciting. Later, I will provide some links and references for further study.

Ok ... Enough of talking ... So let’s get started!!

The Lambda Architecture

The Lambda architecture provides the model for processing large quantity of distributed data in the most reliable fashion by taking advantage of both batch and streaming. Spark does both using its inbuilt library for streaming and map reduce. Following diagram shows a high level interaction with Lambda architecture.

The speed layer consists of processing component that aggregates, reduces and transforms a micro-batch of data in a stream, which can be posted back to Kafka that can provide immediate data to UI via simple websocket, serving as speed layer. The best way is to imagine something like running total, count or current location being shown by the speed layer using hot data. Batch layer is more about persistence, machine learning and visualizing analytical trends. At the same time spark writes this data to the storage (cassandra in this case) using sink. The versatility of Apache Spark’s API for both batch/ETL and streaming workloads brings the promise of lambda architecture to the real world. Writing a consumer to fetch data from Kafka, is already explained in my previous article, so I will skip building UI here and follow with next step. We will see shortly, how to write to cassandra in a while.

Spark has numerous libraries to perform a real-time and batch transformations on the data. It has recently gained a lot of popularity due its intensive support for libraries like graph, machine learning and SQL that is backed by very active contributors, forum and documentation. Spark ecosystem includes Kafka, Spark, Spark Streaming and wide number of drivers for real time data processing and sinking to external storage like Cassandra or HDFS (Hadoop File System).

I will also skip talking about the benefits of using Kafka or Cassandra in the spark ecosystem for now with some links later in this article for further reading. Our goal is to see the big picture of the eco system and how it works.

Now let us see how spark integrates with these underlying technologies to fit as an excellent candidate for Lambda architecture.

As we can see in the above diagram that spark efficiently connects with Kafka and Cassandra to stream data in both direction, while also catering to the apps that can easily connect and request information from Sparks in real-time. This sounds cool isn’t it … Data ingestion, processing, persistence and visualization, all happening in real time and in micro batches.

As a developer you should be able to make much sense out of it, since this is something we can relate to more like a client server architecture. Check out my previous article on Kafka where I have explained how Kafka works as a Pub-Sub messaging framework. The source of data depends on one’s implementation of producer that sends data to Kafka. Spark as a consumer, collects the data by subscribing to the Kafka’s topic using Spark streaming. The data is received incrementally in batches called micro batches. Spark utilizes its core engine components that comprises of libraries like Map-Reduce, GraphX, MLLib and SQL to do the transformations and aggregations incrementally on these micro batches of data. Later it persists the data to an external storage like HDFS or Cassandra. This does not mean that spark cannot be a producer, however, in this example it is a consumer.

Spark Ecosystem

I have already demonstrated how to setup Kafka in my previous article and that should be enough for this demo. I will also skip talking about steps involved in installing Spark and Scala for the sake of this article. Refer to some of the links provided later in this article to get your environment up and running on windows. I am using Spark 2.1.0, Scala 2.11.8, JDK 1.8, Cassandra 3.10 and Kafka 0.10.

To make it more like a real time implementation, consisting of different team and skills, I have implemented Kafka Producer using .Net that mimics a real world distributed ecosystem where data ingestion can happen through multiple sources and mechanisms. Next, I have Spark core and Consumer written in Scala as a Maven project using IntelliJ.

Pre-Requisites

I am assuming that anyone reading this article knows the basics of Scala, Java and .Net programming languages. However, you can still to do this example in a purely Scala, Java or .Net code with the respective frameworks available. Scala is native to Spark, while Java and .Net uses adapters. Microsoft has a project called Mobius that is in its initial stage but very promising. Apart from the programming language, I would also encourage to do some reading on Lambda Architecture, Spark basics and Cassandra (NoSQL database) to get familiar with the concepts and practices.

I will be using maven style project structure and IntelliJ as IDE, but you are free to use your own choice of style and IDE. I am also assuming that Zookeeper, Kafka, Spark, Scala and Cassandra are already installed.

Creating Project

To begin, create a new Maven project in IntelliJ with JDK 1.8.x, and add a new module to it. Select the newly created module and add Scala framework support by right clicking and selecting "Add framework support" from the context menu. If you do not see Scala in the add framework support list, then most likely you are missing the Scala and Sbt installation. Also remove the java folder from under the source and instead add a scala folder to your source root. Make sure "scala" folder is marked as source root by again right clicking on scala folder and using "mark directory as" context menu.


Project Dependencies

Following are the dependencies I have written in the POM file. This includes the core libraries you will require for all the bells and whistles. The file is available in the GitHub project link.


Kafka Producer (.Net)

I have created some dummy data available under the resource directory to work with. You can use online tools to create similar data or use some real API to collect data from. The next step is to push the data to Kafka within a topic. Following is a simple .Net console program that reads the CSV file and pushes the data to Kafka in batches. It does that to mimic a real-time incremental data capture scenario. You can look at my article for Kafka to understand how to create a Kafka producer using .Net and Confluent library.


IntelliJ Project Structure (Scala)

The IntelliJ project includes following implementations to act as a consumer, speed layer, batch processing and serving layer. Let us walk through each objects and I will explain as we move forward.

resources:This folder contains application.conf for some static application configurations, log4j.properties and the dummy data file.

settings {
  spark_wd = "/Apache/Logs/spark/wd"
  hadoop = "hdfs://localhost:11025/user/manish"
  checkPoint = "D:/Apache/Logs/spark/cp"
}

  • spark_wd:I have configured to point to a local directory for spark working.
  • hadoop:Configured it to reference Hadoop HDFS folder if you want to read/write anything from there.
  • checkpoint:This folder is used by spark to manage its checkpoints in case of reviving from a failure

config:This package contains settings object that is used to provide application configuration settings around the application.

package config

import com.typesafe.config.ConfigFactory

object Settings {
    private val config = ConfigFactory.load()

    object WebLogGen {
        private val weblogGen = config.getConfig("settings")
        lazy val sparkWd: String = weblogGen.getString("spark_wd")
        lazy val HadoopHome: String = weblogGen.getString("hadoop")
        lazy val checkPointDir: String = weblogGen.getString("checkPoint")
    }
}

model:This package is used to declare the models that I will be using to create object from data received through Kafka streaming, and during transformations.

package object model {

    case class ProductSale(Id: Int, firstName: String, lastName: String, house: Int, street: String, city: String, state: String, zip: String, prod: String, tag: String) extends Serializable

    case class ProvinceSale(prod: String, state: String, total: Long) extends Serializable

}

utils:This package contains common code for initializing spark session. The instance is called sparkSession that requires setting up application name, working directory, check point and Cassandra connector. Note the call to getOrCreate function. A streaming application must operate 24/7 and hence must be resilient to failures. Spark, therefor maintains Checkpoints. This enables Spark to recreate its DAG (Directed Acyclic Graph) in case of failure and start from where it failed, instead of starting a fresh. However, checkpoint does not work if the underlying code has been modified. Check-pointed data is only useable as long as you haven’t modified the existing code.

Spark session is an abstraction introduced from version 2.0 that unifies the use of Spark and Spark SQL Context that caused lot of confusions.

package utils

import java.lang.management.ManagementFactory

import config.Settings.WebLogGen
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object SparkUtils {

    val isIDE: Boolean = {
        //If executing inside IDE
        ManagementFactory.getRuntimeMXBean.getInputArguments.toString.contains("IntelliJ IDEA")
    }

    def getSparkSession: SparkSession = {
        //get spark configuration
        val conf = new SparkConf()
            .setAppName("Simple-Lambda")
            .set("spark.cassandra.connection.host", "localhost")
            .set("spark.local.dir", WebLogGen.sparkWd)

        val checkpointDirectory = WebLogGen.checkPointDir

        //If executing inside IDE
        if (isIDE) {
            System.setProperty("hadoop.home.dir", WebLogGen.HadoopHome)
            conf.setMaster("local[*]")
        }

        //init spark session
        val spark = SparkSession.builder()
            .config(conf)
            .getOrCreate()

        spark.sparkContext.setLogLevel("WARN")
        spark.sparkContext.setCheckpointDir(checkpointDirectory)

        spark
    }

}

cassandra:This package contains prepared statement that is issued to Cassandra. I will be using Spark Structure Streaming API that is the latest and greatest enhancement in Spark. However, it comes with a little challenges, where support from external technologies are not fully developed and/or still in alpha/beta stage. Said that, it does not stop us from using it and be more productive by writing a little extra code.

Currently, Databricks does not have support for sink to Cassandra using Spark Structured Streaming API. It is still in beta stage. However, with some knowledge of CQL and Cassandra, it should be pretty handy to develop your custom query helper functions as shown below.

package cassandra

import com.datastax.driver.core.{ResultSet, Session}
import com.datastax.spark.connector.cql.CassandraConnector
import model.ProvinceSale
import org.apache.spark.sql.SparkSession

object Satement {
    def getConnector(spark: SparkSession): CassandraConnector ={
        val connector = CassandraConnector.apply(spark.sparkContext.getConf)
        connector
    }

    private def cql_update(prod: String, state: String, total: Long): String =
        s"""insert into demo1.sales (prod,state,total) values ('$prod', '$state', $total)""".stripMargin

    def updateProvinceSale(connector: CassandraConnector, value: ProvinceSale): ResultSet = {
        connector.withSessionDo { session =>
            session.execute(cql_update(value.prod, value.state, value.total))
        }
    }

    def createKeySpaceAndTable(session: Session, dropTable: Boolean = false): ResultSet = {
        session.execute(
            """CREATE KEYSPACE  if not exists  demo1 WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 };""")
        if (dropTable)
            session.execute("""drop table if exists my_keyspace.test_table""")

        session.execute(
            """create table if not exists demo1.sales (prod text, state text, total int, PRIMARY KEY (prod, state));""")
    }
}

streaming:This is the heart of the program where all the magic happens. This contains the code that connects to Kafka, retrieve batches of records, do transforms and aggregations and then finally save it to Cassandra.

Structured Streaming API is new to Spark and it takes a lot of loading, transforming and aggregation complexities away from the developers. It is a processing engine built on the Spark SQL engine. The Spark SQL engine takes care of running it incrementally and updating the final result continuously for each micro batches. It is an abstraction based on Repeated Queries (RQ), where the queries are repeated for each batches of data. The API simplifies the process by abstracting the incremental process such that the processing logic remains common whether processing data in whole or batches. The entire data is seen as a collection of micro batches to be processed. The developers are left with simplified outlook, where they have to be mainly concerned with source, incremental execution logic and sink.


IMPORTS


If you have done Java or .Net it should be pretty clear to you that these references are required when an object of that package is used.

package streaming

import cassandra._
import model.{ProductSale, ProvinceSale}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}
import utils.SparkUtils.getSparkSession

Function "Main()"


Entire code is under the main function. The signature of the main function is similar to the main function in Java or .Net console programs. In the following code, the first line starts a new spark session by calling the getSparkSession function defined in SparkUtils object inside utils package. It then imports implicits._ where underscore is same as "*" in java or .Net language.

object LambdaJob {
    def main(args: Array[String]): Unit = {

        // setup spark context
        val spark: SparkSession = getSparkSession
        import spark.implicits._

        //.........
    }
}

Kafka Stream


The following line of code does number of things using method chaining. It loads Kafka connection driver, defines connection options (host and port) for Kafka, subscribes as a consumer and starts reading streams. The next line extracts the "VALUE" item that contains the actual data. I have ignored other Meta data for now.

// load stream from kafka
val input: DataFrame = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "sales")
    .load()

// select the values
val df = input.selectExpr("CAST(value AS STRING)").as[String]

Create LIST<MODEL>


The next job is to deserialize the raw string into workable list of objects. To do this Spark exposes an awesome Structured Streaming API that are lot easier than the previous versions, which required to learn RDD (Resilient Distributed Datasets), DStreams and DataFrames. Datasets is one of the abstraction introduced that represents the set of data in same way whether it is batch or stream. The new API hides away the need of understanding the difference between batch and stream. The API we will use will function on both (streams and batch), whereas the underlying core of spark does all the hard work of joining and combining the state full activities.

// transform the raw input to a Dataset of model.ProductSale
val ds = df.map(r => r.split(",")).map(c => ProductSale(c(0).toInt, c(1), c(2), c(3).toInt, c(4), c(5), c(6),c(7), c(8), c(9)))

The code simply splits the string by comma and deserializes it to the ProductSale model.

Aggregation and Transformation


These are the incremental execution with continuous transformations and aggregation happening on a batch/stream of data. The code is a simple aggregation for the sake of this article, but this is where you implement the entire gamut of complex map-reduce code.

// do continuous aggregation
val aggDF = ds.groupBy("prod", "state").count()

// transform aggregated dataset to a Dataset of model.ProvinceSale
val provincialSale = aggDF.map(r => ProvinceSale(r.getString(0), r.getString(1), r.getLong(2))

Cassandra CQL


The following code defines a Cassandra connector and an immutable function to sink the aggregated data to Cassandra.

///// CASSANDRA ////
val connector = Satement.getConnector(spark)

// This Foreach sink writer writes the output to cassandra.
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[ProvinceSale] {
    override def open(partitionId: Long, version: Long) = true
    override def process(value: ProvinceSale): Unit = {
        Satement.updateProvinceSale(connector, value)
    }
    override def close(errorOrNull: Throwable): Unit = {}
}
///// CASSANDRA END ////

As we pass the incremental executions, it is time to sink the data to an external storage. The sink has three main parameters:

  • Complete: Entire result is written to external storage
  • Append: Only new from last trigger is written to external storage.
  • Update: Only updated from the last trigger is written to external storage

The code query.awaitTermination() is what tells spark to wait until the stream is over. It accepts parameter to indicate the waiting time and if no value is provided it means wait indefinitely.

val query: StreamingQuery = provincialSale.writeStream
    .queryName("ProvincialSales")
    .outputMode("complete")
    .foreach(writer)
    .start()

query.awaitTermination()

Run the Program

To execute the program you need to have Kafka and Cassandra running in the background. Once you have that you can start the LambdaJob consumer followed with Producer code in .Net. This will insert the required data as expected into Cassandra demo1.sales table. The code for this demo is available at my GitHub account for download.


Further Reading

Enjoy!!