Wednesday, January 6, 2016

Spark Streaming



Learning Real-time Processing with Spark Streaming
RDD is a distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner.
There is only one SparkContext in a given JVM.

Hadoop issues:
Excessive and intensive use of disks for all intermediate stages
Only provides map and reduce operations and no other operations like joining/flattening, and grouping of datasets.
Iterative and interactive computations and workloads: For example, machine learning algorithms which reuse intermediate or working datasets across multiple parallel operations.
Real-time data processing

RDD is an immutable (read-only) collection of objects partitioned across a set of machines that can be rebuilt if a partition is lost.
Batches are nothing more than a series of RDDs. Spark Streaming provides another layer of abstraction over a series of RDDs known as DStreams (Discretized streams).

    val conf = new SparkConf()
    //Set the logical and user defined Name of this Application
    conf.setAppName("My First Spark Streaming Application")
    val streamCtx = new StreamingContext(conf, Seconds(2))
    val lines = streamCtx.socketTextStream("localhost", 9087, MEMORY_AND_DISK_SER_2)
    val words = lines.flatMap(x => x.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    myPrint(wordCounts,streamCtx)

    streamCtx.start();
    streamCtx.awaitTermination();

Java:
    JavaStreamingContext streamCtx = new JavaStreamingContext(conf, Durations.seconds(2));
    JavaReceiverInputDStream<String> lines = streamCtx.socketTextStream("localhost", 9087,StorageLevel.MEMORY_AND_DISK_SER_2());
    JavaDStream<String> words = lines.flatMap( new FlatMapFunction<String, String>() {
        @Override public Iterable<String> call(String x) {
            return Arrays.asList(x.split(" "));
        }
    });
    JavaPairDStream<String, Integer> pairs = words.mapToPair(
        new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
    });
    JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
        new Function2<Integer, Integer, Integer>() {
        @Override public Integer call(Integer i1, Integer i2) throws Exception {
            return i1 + i2;
        }
    });
   wordCounts.print(10);

$SPARK_HOME/bin/spark-submit --class chapter.two.ScalaFirstStreamingExample --master <SPARK-MASTER-URL> Spark-Examples.jar

Processing Distributed Log Files in Real Time
RDD also defines some useful child classes like PairRDDFunctions for working with key/value pairs, SequenceFileRDDFunctions for working with Hadoop sequence files and DoubleRDDFunctions for working with RDDs of doubles.

val rdd = new JdbcRDD(
ctx,
() => { DriverManager.getConnection("jdbc:derby:temp/Jdbc-RDDExample") },
      "SELECT EMP_ID,Name FROM EMP WHERE Age > = ? AND ID <= ?",20, 30, 3,
      (r: ResultSet) => { r.getInt(1); r.getString(2) } ).cache()
broadcast
HttpBroadcast leverages the HTTP server as a broadcast mechanism. In this mechanism the broadcast data is fetched from the driver, through a HTTP Server running at the driver itself and further stored in the executor Block Manager for faster accesses. TorrentBroadcast, which is also the default implementation of the broadcast, maintains its own Block Manager. The first request to access the data makes the call to its own Block Manager and, if not found, the data is fetched in chunks from the executor or driver. Torrent Broadcast works on the principle of BitTorrent and ensures that the driver is not the bottleneck in fetching the shared variables and data.


saveAsObjectFile(path):

RDD.repartition(…): Repartitions the existing dataset across the cluster of nodes
RDD.coalesce(…): Repartitions the existing dataset into a smaller number of given partitions
All operations which ends by ByKey (except count operations) like PairRDDFunctions.reducebyKey() or groupByKey
All join operations like PairRDDFunctions.join(…) or PairRDDFunctions.cogroup(…) operations

Discretized streams
Computations are structured as a series of stateless, deterministic batch computations at small time intervals.
DStreams leveraged the concepts of resilient distributed datasets and created a series of RDDs (of the same type) as one single DStream which is processed and computed at a user-defined time interval.

Flume - source, channel, interceptor and sink
Every chunk of data read by the Flume source is known as an event, which is further delivered to a channel.
Channel: Channels are the staging area where events are stored, so that they can be consumed further by Flume sinks. It decouples the sources from the sink so that the sink is not dependent upon the source. There could be scenarios where the source could extract the events or data faster than the events and data consumption capacity of the sink. Channels help the source and the sink to work independently at their own capacity. Channels can be reliable or non-reliable. For example, Flume defines standard channels like memory, JDBC, file, and so on, where the memory channel is unreliable while the file and JDBC are reliable channels.

Interceptor: Interceptor is another component which modifies, enhances or transforms (Transform of ETL process) data before it is consumed by the sink. An interceptor can modify or even drop events based on any criteria chosen by the developer or the interceptor. Flume supports the chaining of interceptors. This is made possible by specifying the list of interceptor builder class names in the configuration.

a1.sources = src-1
a1.channels = c1
a1.sinks = spark

a1.sources.src-1.type = exec
#Name of the Log File with the full path
a1.sources.src-1.command = tail -f /home/servers/node-1/appserver-1/logs/debug.log
#Define the Channel which will be used by Source to deliver the messages.
a1.sources.src-1.channels = c1

#Defining and providing Configuration of Channel for Agent-1
#Memory channel is not a reliable channel.
a1.channels.c1.type = memory
a1.channels.c1.capacity = 2000
a1.channels.c1.transactionCapacity = 100

#Configuring Sink for Agent-1
a1.sinks.spark.type = spark
#This is the Custom Sink which will be used to integrate with our Spark Application
a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
#Name of the host where this Sink is running
a1.sinks.spark.hostname = localhost
#Custom port where our Spark-Application will connect and consume the event
a1.sinks.spark.port = 4949
#Define the Channel which will be used by Sink to receive the messages.
a1.sinks.spark.channel = c1

Exec is not a reliable source and is not recommended when we need strong event delivery semantics or guaranteed delivery. It is better to develop a custom source using Flume SDK. It is recommended to have a cleanup script with an exec command which runs at regular intervals to check process tables for the tail -f command whose parent PID is 1 and kill them manually as they are dead processes.

./bin/flume-ng agent --conf conf --conf-file conf/spark-flume.conf --name a1 &

$SPARK_HOME/conf/spark-defaults.conf
spark.driver.extraClassPath=$SPARK_HOME/lib/spark-streaming-flume_2.10-1.3.0.jar:$FLUME_HOME/lib/flume-ng-sdk-1.5.2.jar:$SPARK_HOME/lib/spark-streaming-flume-sink_2.10-1.3.0.jar
spark.executor.extraClassPath=$SPARK_HOME/lib/spark-streaming-flume_2.10-1.3.0.jar:$FLUME_HOME/lib/flume-ng-sdk-1.5.2.jar:$SPARK_HOME/lib/spark-streaming-flume-sink_2.10-1.3.0.jar

var addresses = new Array[InetSocketAddress](2);  
addresses(0) = new InetSocketAddress("localhost",4949)
addresses(1) = new InetSocketAddress("localhost",4950)

//Create a Flume Polling Stream which will poll the //Sink the get the events
//from sinks every 2 seconds.
//Last 2 parameters of this method are important as the
//1.maxBatchSize = It is the maximum number of events //to be pulled from the Spark sink
//in a single RPC call.
//2.parallelism - The Number of concurrent requests //this stream should send to the sink.
//for more information refer to
//https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/streaming/flume/FlumeUtils.html
val flumeStream = FlumeUtils.createPollingStream(streamCtx,addresses,StorageLevel.MEMORY_AND_DISK_SER_2,1000,1)

//Define Output Stream Connected to Console for //printing the results
val outputStream = new ObjectOutputStream(Console.out)
//Invoking custom Print Method for writing Events to //Console
printValues(flumeStream,streamCtx, outputStream)

//Most important statement which will initiate the //Streaming Context
streamCtx.start();

DStream
PairDStreamFunctions
higher-order functions are those functions that either accept a function as input or output a function

mapPartitions
forEachRDD
Applies the given function to all RDDs in a given stream. It is a special type of function and it is worth noting that the given function is applied to all RDDs on the driver node itself but there could be actions defined in the RDD and all those actions are performed over the cluster.
  def tansfromLogData(logLine: String):Map[String,String] ={
    //Pattern which will extract the relevant data from Apache Access Log Files
     val LOG_ENTRY_PATTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+)""";
     val PATTERN = Pattern.compile(LOG_ENTRY_PATTERN);
     val matcher = PATTERN.matcher(logLine);
   
    //Matching the pattern for the each line of the Apache access Log file
    if (!matcher.find()) {
      System.out.println("Cannot parse logline" + logLine);
    }
    //Finally create a Key/Value pair of extracted data and return to calling program
    createDataMap(matcher);
 
  }

  /**
   * Create a Map of the data which is extracted by applying Regular expression.
   */
  def createDataMap(m:Matcher):Map[String,String] = {
    return Map[String, String](
      ("IP" -> m.group(1)),
      ("client" -> m.group(2)),
      ("user" -> m.group(3)),
      ("date" -> m.group(4)),
      ("method" -> m.group(5)),
      ("request" -> m.group(6)),
      ("protocol" -> m.group(7)),
      ("respCode" -> m.group(8)),
      ("size" -> m.group(9))
  )}

val transformLog = new ScalaLogAnalyzer()
//Invoking Flatmap operation to flatening the results and convert them into Key/Value pairs
val newDstream = flumeStream.flatMap { x => transformLog.tansfromLogData(new String(x.event.getBody().array())) }

/** End Common piece of code for all kinds of Transform operations*/

/**Start - Transformation Functions */
executeTransformations(newDstream,streamCtx)

val functionCountRequestType = (rdd:RDD[(String,String)]) => {
    rdd.filter(f=>f._1.contains("method"))
.map(x=>(x._2,1))
.reduceByKey(_+_)
}

val transformedRDD = dStream.transform(functionCountRequestType)
// Checkpointing must be enabled to use the updateStateByKey function.
streamCtx.checkpoint("checkpointDir")
val functionTotalCount = (values: Seq[Int], state: Option[Int])=>{
    Option(values.sum + state.sum)  
}
transformedRDD.updateStateByKey(functionTotalCount).print(100)

updateStateByKey:
http://spark.apache.org/docs/latest/streaming-programming-guide.html
Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.
The updateStateByKey operation allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps.

Define the state - The state can be an arbitrary data type.
Define the state update function - Specify with a function how to update the state using the previous state and the new values from an input stream.
In every batch, Spark will apply the state update function for all existing keys, regardless of whether they have new data in a batch or not. If the update function returns None then the key-value pair will be eliminated.

in micro-batching all batches of data are independent and only contain new events as they appear or are received by the streams, they do not add new events to the older events without changing the batch size.

Windowing functions provide exactly the same functionality where they define the scope of the data which needs to be analyzed, as in the last 10 minutes, 20 minutes, and so on, and further slide this window by a configured interval like one or two minutes.

the interval of the window is two seconds and the sliding window is one second, which means that, at the end of every one second interval, our DStream will contain the data for last two seconds.

val wStream = dStream.window(Seconds(40),Seconds(20))
val respCodeStream = wStream.filter(x=>x._1.contains("respCode")).map(x=>(x._2,1))
respCodeStream.reduceByKey(_+_).print(100)
val respCodeStream_1 = dStream.filter(x=>x._1.contains("respCode")).map(x=>(x._2,1))
respCodeStream_1.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(40),Seconds(20)).print(100)
val respCodeStream_2 = dStream.filter(x=>x._1.contains("respCode")).map(x=>(x._2,1))
respCodeStream_2.groupByKeyAndWindow(Seconds(40),Seconds(20)).print(100)

The general rule is to configure parallelism by at least twice the number of total cores in the cluster.

By default, Spark utilizes the Java serialization mechanism which is compatible with most file formats but is also slow.

We can switch to Kryo Serialization (https://github.com/EsotericSoftware/kryo), which is very compact and faster than Java serialization. Though it does not support all serializable types it is still much faster than the Java serialization mechanism for all the compatible file formats.

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

KyroSerializer by default stores the full class names with their associated objects in Spark executors memory which again is a waste of memory so, to optimize, it is advisable to register all required classes in advance with KyroSerializer so that all objects are mapped to Class IDs and not with full class names. This can be done by defining explicit registrations of all required classes using SparkConf.registerKryoClasses(….).


spark.executor.extraJavaOptions = -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -Xloggc:$JAVA_HOME/jvm.log -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy

Use arrays of objects or primitive types in your data structures. You can also use the fastutil (http://fastutil.di.unimi.it/) library which provides faster and optimized collection classes.
Avoid strings or custom objects; instead use numeric IDs for the objects.
val conf = new SparkConf().set("spark.executor.memory", "1g")
$SPARK_HOME/bin/spark-submit --executor-memory 1g …..

The Spark framework, by default, takes up 60 percent of the executor's configured memory for caching RDDs.
val conf = new SparkConf().set("spark.storage.memoryFraction","0.4")

we can also think about using off-heap caching solutions which do not use a JVM like Tachyon.
dStream.forEachRDD{ //Line-1
//assuming that this method provides the connection to underlying MQ infrastructure
val conn = getConnection(…) // Line-2
   rdd=> rdd.forEach{ // Line-3
   //Function to create messages and post to Queues/
   //Topics
   createTextMessages().post(conn) // Line-4
   }
}

The getConnection() function in the preceding code is executed over the driver while the rest of the function will be executed over the worker nodes, which in turn means that the connection object needs to be serialized and will be sent to the worker nodes, which is not a good idea.

dStream.forEachRDD{ //Line-1
  rdd=> forEachPartition{ //Line-2
  val conn = getConnection(…) // Line-3
  partition=> partition.forEach(record => createTextMessages().post(conn)) //Line-4
  }
  }
   val hConf = new JobConf(new org.apache.hadoop.conf.Configuration())

   val newTextOutputFormat = classOf[org.apache.hadoop.mapreduce.lib.output.TextOutputFormat[Text, Text]]
   dStream.saveAsNewAPIHadoopFiles("hdfs://localhost:9000/spark/streaming/newApi/data-", "", classOf[Text], classOf[Text], newTextOutputFormat ,hConf )

newDstream
    .saveToCassandra(keyspaceName, csTableName, SomeColumns("ip","client","user","date","method","request","protocol","respcode","size"))

val csRDD = streamCtx.cassandraTable("logdata", "apachelogdata").collect()
csRDD.foreach ( x => println("IP = " + x.getString("ip")))

val csTimeRDD = streamCtx.cassandraTable("logdata", "apachelogdata").
select("ip","method","date","method".writeTime.as("time")).where("method=?","GET")
csTimeRDD.collect().sortBy(x => calculateDate(x.getLong("time"))).
reverse.take(10).foreach(      
      x =>
println(x.getString("ip") + " - " + x.getString("date")+" - "+ x.getString("method")+" - "+calculateDate(x.getLong("time")) ))

DataFrames are the distributed collection of the objects in form the rows and named columns
val sqlCtx = new SQLContext(sparkCtx)
CassandraSQLContext
val dataFrame = sqlCtx.jsonFile(path)

The first method uses reflection to infer the schema of an RDD from the given data.
The second method is through a programmatic interface that allows to construct a schema and then apply it to an existing RDD and finally generate a data frame.
 we need to register the DataFrame as a temporary table within the SQL context so that we can execute the SQL queries over the registered table.
//Temporary table is destroyed as soon as SQL Context is //destroyed.
dataFrame.registerTempTable("company");
dataFrame.printSchema();

    sqlCtx.sql("Select * from company").collect().foreach(print)
    val allRec = sqlCtx.sql("Select * from company").agg(Map("No_Of_Emp"->"sum"))
    allRec.collect.foreach ( println )
sparkCtx.stop()


    //Defining Window Operation, So that we can execute SQL //Query on data received within a particular Window
    val wStream = newDstream.window(Seconds(40), Seconds(20))

    //Creating SQL DataFrame for each of the RDD's
    wStream.foreachRDD { rdd =>
      //Getting the SQL Context from Utility Method which
      //provides Singleton Instance of SQL Context
      val sqlCtx = getInstance(sparkCtx)
      //Converting JSON RDD into the SQL DataFrame by using //jsonRDD() function
      val df = sqlCtx.jsonRDD(rdd)
      //creating and Registering the Temporary table for //the Converting DataFrame into table for further //Querying
      df.registerTempTable("apacheLogData")
      val logDataFrame = sqlCtx.sql("select method, count(*) as total from apacheLogData group by method")
      logDataFrame.show()
  @transient private var instance: SQLContext = null

  //Lazy initialization of SQL Context
  def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/
Spark and Storm at Yahoo!
Apache Storm and Spark Streaming Compared
Storm has higher industry adoption and better production stability compared to Spark Streaming. Spark on the other hand has a more expressive, higher level API than Storm, which is arguably more pleasant to use, at least if you write your Spark applications in Scala (I prefer the Spark API, too).

  • A Spark cluster contains 1+ worker nodes aka slave machines (simplified view; I exclude pieces like cluster managers here.)
  • worker node can run 1+ executors.
  • An executor is a process launched for an application on a worker node, which runs tasks and keeps data in memory or disk storage across them. Each application has its own executors. An executor has a certain amount of cores aka “slots” available to run tasks assigned to it.
  • task is a unit of work that will be sent to one executor. That is, it runs (part of) the actual computation of your application. The SparkContext sends those tasks for the executors to run. Each task occupies one slot aka core in the parent executor.
  • receiver (APIdocs) is run within an executor as a long-running task. Each receiver is responsible for exactly one so-called input DStream (e.g. an input stream for reading from Kafka), and each receiver – and thus input DStream – occupies one core/slot.
  • An input DStream: an input DStream is a special DStream that connects Spark Streaming to external data sources for reading input data. For each external data source (e.g. Kafka) you need one such input DStream implementation. Once Spark Streaming is “connected” to an external data source via such input DStreams, any subsequent DStream transformations will create “normal” DStreams.
In Spark’s execution model, each application gets its own executors, which stay up for the duration of the whole application and run 1+ tasks in multiple threads. This isolation approach is similar to Storm’s model of execution. This architecture becomes more complicated once you introduce cluster managers like YARN or Mesos, which I do not cover here. See Cluster Overview in the Spark docs for further details.
Kafka
Kafka stores data in topics, with each topic consisting of a configurable number ofpartitions. The number of partitions of a topic is very important for performance considerations as this number is an upper bound on the consumer parallelism: if a topic hasN partitions, then your application can only consume this topic with a maximum of Nthreads in parallel. (At least this is the case when you use Kafka’s built-in Scala/Java consumer API.)
When I say “application” I should rather say consumer group in Kafka’s terminology. A consumer group, identified by a string of your choosing, is the cluster-wide identifier for a logical consumer application. All consumers that are part of the same consumer group share the burden of reading from a given Kafka topic, and only a maximum of N (= number of partitions) threads across all the consumers in the same group will be able to read from the topic. Any excess threads will sit idle.
Multiple Kafka consumer groups can be run in parallel: Of course you can run multiple, independent logical consumer applications against the same Kafka topic. Here, each logical application will run its consumer threads under a unique consumer group id. Each application can then also use different read parallelisms.

Rebalancing is a lifecycle event in Kafka that occurs when consumers join or leave a consumer group.
  • Your application uses the consumer group id “terran” and starts consuming with 1 thread. This thread will read from all 10 partitions. During runtime, you’ll increase the number of threads from 1 to 14. That is, there is suddenly a change of parallelism for the same consumer group. This triggers rebalancing in Kafka. Once rebalancing completes, you will have 10 of 14 threads consuming from a single partition each, and the 4 remaining threads will be idle. And as you might have guessed, the initial thread will now read from only one partition and will no longer see data from the other nine.
  1. Read parallelism: You typically want to read from all N partitions of a Kafka topic in parallel by consuming with N threads. And depending on the data volume you want to spread those threads across different NICs, which typically means across different machines. In Storm, this is achieved by setting the parallelism of the Kafka spout to Nvia TopologyBuilder#setSpout()
  2. Downstream processing parallelism: Once retrieved from Kafka you want to process the data in parallel. Depending on your use case this level of parallelism must be different from the read parallelism. If your use case is CPU-bound, for instance, you want to have many more processing threads than read threads; this is achieved by shuffling or “fanning out” the data via the network from the few read threads to the many processing threads. Hence you pay for the access to more cores with increased network communication, serialization overhead, etc. In Storm, you perform such a shuffling via a shuffle grouping from the Kafka spout to the next downstream bolt. The Spark equivalent is the repartition transformation on DStreams.
it is possible – and often desired – to decouple the level of parallelisms for reading from Kafka and for processing the data once read.

Read parallelism in Spark Streaming

Like Kafka, Spark Streaming has the concept of partitions. It is important to understand that Kafka’s per-topic partitions are not correlated to the partitions of RDDs in Spark.
The KafkaInputDStream of Spark Streaming – aka its Kafka “connector” – uses Kafka’s high-level consumer API, which means you have two control knobs in Spark that determine read parallelism for Kafka:
  1. The number of input DStreams. Because Spark will run one receiver (= task) per input DStream, this means using multiple input DStreams will parallelize the read operations across multiple cores and thus, hopefully, across multiple machines and thereby NICs.
  2. The number of consumer threads per input DStream. Here, the same receiver (= task) will run multiple threads. That is, read operations will happen in parallel but on the same core/machine/NIC.
For practical purposes option 1 is the preferred.
Why is that? First and foremost because reading from Kafka is normally network/NIC limited, i.e. you typically do not increase read-throughput by running more threads on the same machine. In other words, it is rare though possible that reading from Kafka runs into CPU bottlenecks. Second, if you go with option 2 then multiple threads will be competing for the lock to push data into so-called blocks (the += method of BlockGenerator that is used behind the scenes is synchronized on the block generator instance).
Number of partitions of the RDDs created by the input DStreams: The KafkaInputDStream will store individual messages received from Kafka into so-calledblocks. From what I understand, a new block is generated every spark.streaming.blockInterval milliseconds, and each block is turned into a partition of the RDD that will eventually be created by the DStream. If this assumption of mine is true, then the number of partitions in the RDDs created by KafkaInputDStream is determined by batchInterval / spark.streaming.blockInterval, where batchInterval is the time interval at which streaming data will be divided into batches (set via a constructor parameter of StreamingContext). For example, if the batch interval is 2 seconds (default) and the block interval is 200ms (default), your RDD will contain 10 partitions.

Option 1: Controlling the number of input DStreams

val kafkaParams: Map[String, String] = Map("group.id" -> "terran", /* ignore rest */) val numInputDStreams = 5 val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) }
In this example we create five input DStreams, thus spreading the burden of reading from Kafka across five cores and, hopefully, five machines/NICs. (I say “hopefully” because I am not certain whether Spark Streaming task placement policy will try to place receivers on different machines.) All input DStreams are part of the “terran” consumer group, and the Kafka API will ensure that these five input DStreams a) will see all available data for the topic because it assigns each partition of the topic to an input DStream and b) will not see overlapping data because each partition is assigned to only one input DStream at a time. In other words, this setup of “collaborating” input DStreams works because of the consumer group behavior provided by the Kafka API, which is used behind the scenes by KafkaInputDStream.
[When you use the multi-input-stream approach I described above, then] those consumers operate in one [Kafka] consumer group, and they try to decide which consumer consumes which partitions. And it may just fail to do syncpartitionrebalance, and then you have only a few consumers really consuming. To mitigate this problem, you can set rebalance retries very high, and pray it helps.
Then arises yet another “feature” — if your receiver dies (OOM, hardware failure), you just stop receiving from Kafka!
The “stop receiving from Kafka” issue requires some explanation. Currently, when you start your streaming application via ssc.start() the processing starts and continues indefinitely – even if the input data source (e.g. Kafka) becomes unavailable. That is, streams are not able to detect if they have lost connection to the upstream data source and thus cannot react to this event, e.g. by reconnecting or by stopping the execution. Similarly, if you lose a receiver that reads from the data source, then your streaming application will generate empty RDDs.
This is a pretty unfortunate situation. One crude workaround is to restart your streaming application whenever it runs into an upstream data source failure or a receiver failure. This workaround may not help you though if your use case requires you to set the Kafka configuration option auto.offset.reset to “smallest” – because of a known bug in Spark Streaming the resulting behavior of your streaming application may not be what you want. 
Option 2: Controlling the number of consumer threads per input DStream
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...) val consumerThreadsPerInputDstream = 3 val topics = Map("zerg.hydra" -> consumerThreadsPerInputDstream) val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
Spark ties the parallelism to the number of (RDD) partitions by running one task per RDD partition
A related DStream transformation is union. (This method also exists for StreamingContext, where it returns the unified DStream from multiple DStreams of the same type and same slide duration. Most likely you would use the StreamingContextvariant.) A union will return a UnionDStream backed by a UnionRDD. A UnionRDD is comprised of all the partitions of the RDDs being unified, i.e. if you unite 3 RDDs with 10 partitions each, then your union RDD instance will contain 30 partitions. In other words, union will squash multiple DStreams into a single DStream/RDD, but it will not change the level of parallelism. Whether you need to use union or not depends on whether your use case requires information from all Kafka partitions “in one place”, so it’s primarily because of semantic requirements. One such example is when you need to perform a (global) count of distinct elements.
val kafkaDStreams = (1 to readParallelism).map { _ => KafkaUtils.createStream(ssc, kafkaParams, topics, ...) } //> collection of five *input* DStreams = handled by five receivers/tasks val unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it //> single DStream val processingParallelism = 20 val processingDStream = unionDStream(processingParallelism)
Writing to Kafka should be done from the foreachRDD output operation:
The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to a external system, like saving the RDD to files, or writing it over the network to a database. Note that the function func is executed at the driver, and will usually have RDD actions in it that will force the computation of the streaming RDDs.
val producerPool = { // See the full code on GitHub for details on how the pool is created val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name) ssc.sparkContext.broadcast(pool) } stream.map { ... }.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { // Get a producer from the shared pool val p = producerPool.value.borrowObject() partitionOfRecords.foreach { case tweet: Tweet => // Convert pojo back into Avro binary format val bytes = converter.value.apply(tweet) // Send the bytes to Kafka p.send(bytes) } // Returning the producer to the pool also shuts it down producerPool.value.returnObject(p) }) })
Keep in mind that Spark Streaming creates many RRDs per minute, each of which contains multiple partitions, so preferably you shouldn’t create new Kafka producers for each partition, let alone for each Kafka message. The setup above minimizes the creation of Kafka producer instances, and also minimizes the number of TCP connections that are being established with the Kafka cluster. You can use this pool setup to precisely control the number of Kafka producer instances that are being made available to your streaming application (if in doubt, use fewer).

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts