Spark Streaming + Kafka Integration (Kafkapers Version

This blog is based on Spark Streaming integration Kafka- official documentation .
This article explains how Spark Streaming receives data from Kafka. Spark Streaming There are two main ways to receive data from Kafka, one based on the Receiver-based receive method based on the Kafka high-level API implementation, and the other is the No Receivers added after the Spark 1.3 release. These two ways of writing code, performance is not the same. The follow-up section of this article to analyze the two ways one by one.

First, based on Receiver mode

This mode will mainly use a Receiver component to receive data from Kafka, which Receiver is based on Kafka's high-level consumer API implementation. Receivers The data received from Kafka is saved on executors, and Spark Streaming starts Job to process the data.
However, in the case of the default configuration, this mode will have data loss occurs. In order to achieve zero data loss, you need to start in the Spark Streaming Write Ahead Logs function. WAL will synchronize all the data received from Kafka to a distributed file system, such as HDFS. In this way, Spark Streaming can be guaranteed to recover from unreliable data sources when data fails.
For a description of Write Ahead Logs, you can refer to the Streaming application deployment documentation .
Then explain how to implement this model when the actual application.

1, depending on the configuration

This mode relies on the following information about the jar package
 groupId = org.apache.spark artifactId = spark-streaming-kafka- 0 - 8 _2. 11 version = 2.0 . 1 
  • 1

2, the preparation of procedures

You need to import the KafkaUtils class to create input DStream.
import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext,
    [ZK quorum],
    [consumer group id],
    [per-topic number of Kafka partitions to consume])
You can also specify the input data key and value for the decoding class.  You can refer to the KafkaUtilsAPI documentation , or the KafkaWordCount class provided in the Spark source. 
The point to note is:
  1. Kafka in the Topic of the partitions and Spark Streaming in the RDDs of the partitions do not have a corresponding relationship. So partitions number of partitions of a partitions topic in the KafkaUtils.createStream() method only partitions number of threads that receive and consume data from a single Receiver. And does not provide Spark's ability to process data concurrently.
  2. Use multiple Receiver to read data from Kafka's different groups and topics to generate multiple input DStream
  3. If the Write Ahead Logs function is enabled, the received data has been backed up before processing. So you need to adjust the storage level of the input data stream to StorageLevel.MEMORY_AND_DISK_SER mode. You need to call the KafkaUtils.createStream method when passing a StorageLevel.MEMORY_AND_DISK_SER .
    Receivers and Write Ahead Logs, the Spark Streaming application uses the Kafka high-end API to save the consumed offsets in Zookeeper. Although this use can ensure that data loss is avoided, there is no guarantee that the data will be processed multiple times in some failed situations, ie, At Least Once. Because the Spark Streaming read data Offset are maintained by the Zookeeper. This can not guarantee synchronization when Spak Streaming and Zookeeper maintain offsets.

3, application operation

Like other Spark applications, the Spark Streaming application can also be started with spark-submit .
Need to rely on the spark-streaming-kafka-0.8_2.11 and the JAR package spark-streaming-kafka-0.8_2.11 package need to enter the spark-streaming-kafka-0.8_2.11 where the JAR package. And to ensure that the spark-core_2.11 provides spark-core_2.11 and spark-streaming_2.11 .
You can also use the spark-submit --jars the spark-streaming-kafka-0-8_2.11 .

Second, the direct mode (no Receiver) mode

This mode does not require the use of Receivers to receive data from Kafka. In this mode, the Streaming application periodically queries each Kafka Topic's Partitions, the latest consumer Offsets, to define the range of data that each batch needs to process based on these Offsets data. With these Offset ranges, the Streaming application uses Kafka's Simple Consumer API to read data from Kafka.
This mode has the following advantages over Receivers-based models:
  1. Concurrency is simpler: no longer need to define multiple Kafka input DStream and then merge multiple inputs. By using directStream , Spark Streaming creates RDD partitions with the same number of Kafka partitions to receive data, which are concurrently read from Kafka. So in this mode, Kafka Partition and RDD Partitions have a one-to-one relationship. This correspondence is better understood and debugged.
  2. Efficient: Since there is no Receiver, you do not need to enable the Write Ahead Logs feature. Failure to retry can read data directly from Kafka.
  3. Guaranteed Exactly-Once: In this mode, the read data is not passed through Zookeeper. Offsets are maintained by the Spark Streaming application and can be recorded in the checkpoint. So this ensures that the Spark Streaming data is read exactly once. If you want to achieve exactly the output of the results, the operation of the stored data and offsets in the application to the external data system must be idempentent or atomic transaction. You can refer to the Spark Streaming output operation semantics .
    One drawback of this model is that it does not update the offsets state in Zookeeper, so those Kafka monitoring tools based on Zookeeper will fail in this case, such as KafkaOffsetsMonitor and so on. However, if you can manually get the offset of each batch in the application and manually update it to Zookeeper.
    Then explain how to implement this model when the actual application.

1, depending on the configuration

This mode relies on the following information about the jar package
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.0.1

2, the preparation of procedures

You need to import the KafkaUtils class to create input DStream.

import org.apache.spark.streaming.kafka._

val directKafkaStream = KafkaUtils.createDirectStream[
    [key class], [value class], [key decoder class], [value decoder class] ](
    streamingContext, [map of Kafka parameters], [set of topics to consume])
You can access MessageAndMetadata passing a messageHandler object into the createDirectStream method, which will MessageAndMetadata the metadata of the current message. For the use of this method, you can DirectKafkaWordCount read the API DirectKafkaWordCount or the DirectKafkaWordCount sample program provided in the Spark source.
In the Kafka [map of Kafka parameters] , you must specify or bootstrap.servers . By default, the latest offset from each Kafka partition starts to consume. If you set auto.offset.reset to smallest here, Spark Streaming will start consuming from the smallest offset.
You can also pass the offset KafkaUtils.createDirectStream into the KafkaUtils.createDirectStream method to KafkaUtils.createDirectStream from any offset. Follow the instructions in the following code to get the offset status for each batch.

// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()

directKafkaStream.transform { rdd =>
  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}.map {
}.foreachRDD { rdd =>
  for (o <- offsetRanges) {
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
After getting the Offset information, you can manually write the data to Zookeeper if necessary. 
have to be aware of is:
  1. In the above code, the type HasOffsetRanges only if the first method on directKafkaStream succeeds. So, if you want to get offsets, you can use transform at the first method call to input DStream, get the offset, and then call the other method to handle the DStream, as shown in the code above.
  2. Kafka and RDD Partitions are mentioned in this mode, but this correspondence does not exist if shuffle or repartition in Spark Streaming repartition , such as reduceByKey or window reduceByKey .
  3. Since this mode does not have Receivers, the parameters associated with those receiver in the Spark configuration parameter will not work in this mode, such as the spark.streaming.recerver.* . The spark.streaming.kafka.* should be spark.streaming.kafka.* at this time are spark.streaming.kafka.* , A very spark.streaming.kafka.maxRatePerPartition one of these spark.streaming.kafka.maxRatePerPartition is spark.streaming.kafka.maxRatePerPartition , which takes control of the Streaming program through the Kafka direct API each partition read every second The maximum number of messages entered. This parameter is particularly important when the program is first run. If you do not set this parameter, if you set offsets to smallest when Streaming starts, the first batch will read all the data, causing the subsequent batch to jam for a long time.

3, application operation

And the first pattern is the same.


  1. very informative blog and useful article thank you for sharing with us , keep posting Big data hadoop online Course India

  2. Thank you so much for this nice information. Hope so many people will get aware of this and useful as well. And please keep update like this.

    Big Data Services

    Advanced Analytics Solutions

    Full Stack Development Services


Enregistrer un commentaire

Posts les plus consultés de ce blog

Spark performance optimization: shuffle tuning

Spark optimization

Use Apache Spark to write data to ElasticSearch