Spark Streaming +Kafka partition

Spark Streaming + Kafka uses the underlying API to read Kafka's Partition data directly, manually updating Offset to Zookeeper cluster

Posted at 2016/11/30 20:22:01 1121 people read
Category: Spark Kafka
  Spark Streaming + Kafka uses the underlying API to read Kafka's Partition data directly, and the normal Offset is stored in CheckPoint.  But this can not be achieved Kafka monitoring tool for Kafka monitoring, so manually update Offset to Zookeeper cluster 

  Related source brief introduction: 
1: TopicAndPartition is an example class that encapsulates the id of topic and partition
  Case class TopicAndPartition (topic: String, partitionId: Int) 
  2: OffsetRange is the encapsulation of topic name, partition id, fromOffset (current consumption start offset), untilOffset (current consumption end offset).  So OffsetRange contains information such as: topic name, partition Id, start offset, end offset. 
  / **
    * @param topic Kafka topic name
    * @param partition Kafka partition id
    * @param fromOffset inclusive derived offset
    * @param untilOffset exclusive ending offset
    * /
 Final class OffsetRange private (val topic: String, val partition: Int, val fromOffset: Long, val untilOffset: Long) extends Serializable 
  3: Code implementation: 
  Object Iteblog {

   Val brokerAddress = ""

   Val groupID = "testGroup"

   Val kafkaParams = Map [String, String] (
     "" -> brokerAddress,
     "" -> "iteblog")

   Def main (args: Array [String]) {

     Val sparkConf = new SparkConf (). SetAppName ("Test")
     SparkConf.set ("spark.kryo.registrator", "utils.CpcKryoSerializer")
     Val sc = new SparkContext (sparkConf)

     Val ssc = new StreamingContext (sc, Seconds (2))
     Val topicsSet = Set ("iteblog")

     Val messages = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
     Messages.foreachRDD (rdd => {
       // convert RDD to HasOffsetRanges type (KafkaRDD extends HasOffsetRanges)
       // OffsetRange Description: Represents a range of offsets from a single Kafka TopicAndPartition.
       // OffsetRange Description: Instances of this class can be created with `OffsetRange.create ()`.
       Val offsetsList: Array [OffsetRange] = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges
 // implementation code for offsetRanges (in KafkaRDD): tp: TopicAndPartition, fo: fromOffset
 // val offsetRanges = {case (tp, fo) =>
 // val uo = untilOffsets (tp)
 // offsetRange (tp.topic, tp.partition, fo, uo.offset)
 //} .toArray

       Val kc = new KafkaCluster (kafkaParams)
       For (offsets <- offsetsList) {
        // TopicAndPartition The main construction parameter is the first topic, the second is the partition id
         Val topicAndPartition = TopicAndPartition ("iteblog", offsets.partition) //offsets.partition represents Kafka partition id
         Val o = kc.setConsumerOffsets (groupID, Map ((topicAndPartition, offsets.untilOffset))) // offsets.untilOffset: yes if (o.isLeft) {
           Println (s "Error updating the offset to Kafka cluster: $ {o.left.get}")

     Ssc.start ()
     Ssc.awaitTermination ()
     Ssc.stop ()
Core code to explain:
  Using KafkaUtils' createDirectStream method, call the underlying API to consume Kafka Partition data (Kafka Partition and RDD Partition one-to-one correspondence).  The return value of createDirectStream is DStream and the bottom is RDD. 
  Val messages = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet) 
  Messages.foreachRDD is the basis for calculating the offset range for messages underlying RDD. 
  KafkaRDD and HasOffsetRanges relationship (construction parameters and generic omitted, see the source code): 
  KafkaRDD extends RDD [R] (sc, Nil) with Logging with HasOffsetRanges 
  // rdd is a variable in messages.foreachRDD, rdd whose type is KafkaRDD, but because of the polymorphic reason rdd is not actually a KafkaRDD type, but an RDD type, it needs to transition to HasOffsetRanges down, calling the offsetRanges method.  (Memories OffsetRange is the package for what? Answer: topic name, partition Id, start offset, end offset. ) 
  Val offsetsList: Array [OffsetRange] = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges
OffsetRanges implementation code (KafkaRDD): tp: TopicAndPartition, fo: fromOffset
  Val offsetRanges = {case (tp, fo) =>
    Val uo = untilOffsets (tp)
    OffsetRange (tp.topic, tp.partition, fo, uo.offset)
 } .toArray 
  KafkaCluster completes the wrapper class that updates the offset to the zookeeper cluster. 
  Val kc = new KafkaCluster (kafkaParams) 
  The offsetRanges array is traversed : setConsumerOffsets is KafkaCluster 
  Emphasize: What is OffsetRange's encapsulation?  Answer: topic name, partition Id, start offset, end offset 
  For (offsets <- offsetsList) { 
  //offsets.untilOffset is the end offset 
Val o = kc.setConsumerOffsets (groupID, Map ((topicAndPartition, offsets.untilOffset))) if (o.isLeft) {println (s "Error updating the offset to Kafka cluster: $ {o.left.get}")}} }
The above is done on the createDirectStream created in the DStream in an underlying RDD to complete the offset update to the zookeeper cluster, through the foreachRDD to complete the update for all RDD! The The At this point has been completed RDD offset calculation and update, but the specific update method? In KafkaCluster. Next look at the KafkaCluster code.
  Direct on code: 

  Package org.apache.spark.streaming.kafka

 // use org.apache.spark.streaming.kafka reasons: private [spark] object SimpleConsumerConfig limit only in the spark package!

 Import kafka.api.OffsetCommitRequest
 Import kafka.common. {ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
 Import kafka.consumer.SimpleConsumer
 Import org.apache.spark.SparkException
 Import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig
 Import scala.collection.mutable.ArrayBuffer
 Import scala.util.Random
 Import scala.util.control.NonFatal

 Class KafkaCluster (val kafkaParams: Map [String, String]) extends Serializable {// Err is the type ArrayBuffer [Throwable] alias 
  Type Err = ArrayBuffer [Throwable]

    @transient private var _config: SimpleConsumerConfig = null

    Def config: SimpleConsumerConfig = this.synchronized {
      If (_config == null) {
        // SimpleConsumerConfig apply method part of the code:
        // val brokers = kafkaParams.get (""). OrElse (kafkaParams.get ("bootstrap.servers"))
        // so kafkaParams must contain the key = or bootstrap.servers corresponding to the Value
        _config = SimpleConsumerConfig (kafkaParams)

   / **
     * @param groupId: String
     * @param offsets: Map [TopicAndPartition, Long]
     * @return
     * /
    Def setConsumerOffsets (groupId: String,
                           Offsets: Map [TopicAndPartition, Long]
                            ): Either [Err, Map [TopicAndPartition, Short]] = {
      SetConsumerOffsetMetadata (groupId, {kv =>
        Kv._1 -> OffsetMetadataAndError (kv._2)

    Def setConsumerOffsetMetadata (groupId: String,
                                  Metadata: map [TopicAndPartition, OffsetMetadataAndError]
                                   ): Either [Err, Map [TopicAndPartition, Short]] = {
      Var result = Map [TopicAndPartition, Short] ()
      Val req = OffsetCommitRequest (groupId, metadata)
      Val errs = new Err
      Val topicAndPartitions = metadata.keySet
      WithBrokers (Random.shuffle (config.seedBrokers), errs) {consumer =>
        Val resp = consumer.commitOffsets (req)
        Val respMap = resp.requestInfo
        Val needed = topicAndPartitions.diff (result.keySet)
        Need.foreach {tp: TopicAndPartition =>
          RespMap.get (tp) .foreach {err: Short =>
            If (err == ErrorMapping.NoError) {
              Result + = tp -> err
            } Else {
              Errs.append (ErrorMapping.exceptionFor (err))
        If (result.keys.size == topicAndPartitions.size) {
          Return right (result)
      Val missing = topicAndPartitions.diff (result.keySet)
      Errs.append (new SparkException (s "Could not set offsets for $ {missing}"))
      Left (errs)

    Private def withBrokers (brokers: Iterable [(String, Int)], errs: Err)
                           (Fn: SimpleConsumer => Any): Unit = {
      Brokers.foreach {hp =>
        Var consumer: SimpleConsumer = null
        Try {
          Consumer = connect (hp._1, hp._2)
          Fn (consumer)
        } Catch {
          Case NonFatal (e) =>
            Errs.append (e)
        } Finally {
          If (consumer! = Null) {
            Consumer.close ()

    Def connect (host: String, port: Int): SimpleConsumer =
      New SimpleConsumer (host, port, config.socketTimeoutMs,
        Config.socketReceiveBufferBytes, config.clientId)
  Small expansion: 
  Map of the continuous acquisition of value usage: 
  Object MapDemo extends app {
   Val map = Map ("1" -> "11", "2" -> "22", "3" -> "33") {
     Case (a, b) =>
       Println (a + "" + b)

 / / Syntax: first get the value of key = 4, if there is a return to get key = 5 value, if there is a return, there is no direct exception.
   Val result = map.get ("4"). OrElse (map.get ("5")). GetOrElse (throw new Exception ("exception"))
   Println (result)

  The core code to explain the SimpleConsumerConfig apply method: 
  / **
      * Make a consumer config without optional or zookeeper.connect,
      * Communicating with brokers also needs common settings such as timeout
      * /
     Def apply (kafkaParams: Map [String, String]): SimpleConsumerConfig = {
 // these keys are from other pre-existing kafka configs for specifying brokers, accept either // map Get the value usage See above example val brokers = kafkaParams.get ("")
         .orElse (kafkaParams.get ("bootstrap.servers"))
         .getOrElse (throw new SparkException (
           "Must specify or bootstrap.servers"))

       Val props = new Properties ()
       KafkaParams.foreach {case (key, value) =>
         // prevent warnings on parameters ConsumerConfig does not know about
         If (key! = "" && key! = "Bootstrap.servers") {
           Props.put (key, value)
      // If there is no zookeeper.connect and, set its value to an empty string. 
  Seq ("zookeeper.connect", ""). Foreach {s =>
         If (! Props.contains (s)) {
           Props.setProperty (s, "")

       New SimpleConsumerConfig (brokers, props)
  OffsetMetadataAndError sample class: 
  Case class OffsetMetadataAndError (offsetMetadata: OffsetMetadata, error: Short = Errors.NONE.code) {
   Def offset = offsetMetadata.offset

   Def metadata = offsetMetadata.metadata

   Override def toString = "[% s, ErrorCode% d]". Format (offsetMetadata, error)
TopicAndPartition is an example class that encapsulates the id of topic and partition
  / **
     * @param groupId: String
     * @param offsets: Map [TopicAndPartition, Long] 
  * Map in the key and value meaning TopicAndPartition is topic and partition id encapsulation long is the end of consumption offset 
* @retter * / def setConsumerOffsets (groupId: String, offsets: Map [TopicAndPartition, Long]): Either [Err, Map [TopicAndPartition, Short]] = {setConsumerOffsetMetadata (groupId, {kv => kv._1 - > OffsetMetadataAndError (kv._2)})}


  1. It is nice blog Thank you provide important information and I am searching for the same information to save my time Big Data Hadoop Online Course Bangalore


Enregistrer un commentaire

Posts les plus consultés de ce blog

Spark performance optimization: shuffle tuning

Spark optimization

Use Apache Spark to write data to ElasticSearch