Spark Streaming uses kafka low-level api + zookeeper to save offset

http://www.klion26.com/spark-streaming-save-offset-to-zookeeper.html
http://www.klion26.com/spark-streaming-saving-offset-in-zookeeper-2.html
 In the Spark Streaming consumption of Kafka data, there are two ways are 1) Receiver-based based on the createStream method and 2) Direct Approach (No Receivers) way createDirectStream method, the detailed reference Spark Streaming + Kafka Integration Guide, But the second use of kafka offset is stored in the checkpoint, if the program restart, then part of the data will be lost, you can refer to Spark & Kafka - Achieving zero data-loss.

This article mainly discusses how to save the offset in kafka into zookeeper and how to read the existing offset from zookeeper in the case of using the second direct approach.

The general idea is that when the kafka stream is initialized, it is checked whether the zookeeper holds an offset, and it is read from the offset, and it is not read from the latest / old. In the consumption of kafka data at the same time, each partition offset saved to zookeeper for backup, the specific implementation of the following reference code

  1. val topic : String = "topic_name" //消费的 topic 名字  
  2.     val topics : Set[String] = Set(topic) //创建 stream 时使用的 topic 名字集合  
  3.     val topicDirs = new ZKGroupTopicDirs("test_spark_streaming_group", topic)  //创建一个 ZKGroupTopicDirs 对象,对保存  
  4.     val zkTopicPath = s"${topicDirs.consumerOffsetDir}" //获取 zookeeper 中的路径,这里会变成 /consumers/test_spark_streaming_group/offsets/topic_name  
  5.     val zkClient = new ZkClient("10.4.232.77:2181"//zookeeper 的host 和 ip,创建一个 client  
  6.     val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}"//查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)  
  7.     var kafkaStream : InputDStream[(String, String)] = null     
  8.     var fromOffsets: Map[TopicAndPartition, Long] = Map()   //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置  
  9.     if (children > 0) {   //如果保存过 offset,这里更好的做法,还应该和  kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误  
  10.         for (i <- 0 until children) {  
  11.           val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")  
  12.           val tp = TopicAndPartition(topic, i)  
  13.           fromOffsets += (tp -> partitionOffset.toLong)  //将不同 partition 对应的 offset 增加到 fromOffsets 中  
  14.           logInfo("@@@@@@ topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")  
  15.         }  
  16.         val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())  //这个会将 kafka 的消息进行 transform,最终 kafka 的数据都会变成 (topic_name, message) 这样的 tuple  
  17.         kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParam, fromOffsets, messageHandler)  
  18.     }  
  19.     else {  
  20.         kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics) //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset  
  21.     }  
  22.     var offsetRanges = Array[OffsetRange]()  
  23.     kafkaStream.transform{ rdd =>  
  24.       offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset  
  25.       rdd  
  26.     }.map(msg => msg._2).foreachRDD { rdd =>       
  27.       for (o <- offsetRanges) {  
  28.         val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"  
  29.         ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)  //将该 partition 的 offset 保存到 zookeeper  
  30.         logInfo(s"@@@@@@ topic  ${o.topic}  partition ${o.partition}  fromoffset ${o.fromOffset}  untiloffset ${o.untilOffset} #######")  
  31.       }  
  32.       rdd.foreachPartition(  
  33.         message => {  
  34.           while(message.hasNext) {  
  35.             logInfo(s"@^_^@   [" + message.next() + "] @^_^@")  
  36.           }  
  37.         }  
  38.       )  
  39.     }
Using the above code, we can do the Spark Streaming program to read data from Kafka is not lost
The above section tells us how to save the offset in zk and reuse it, but there is a minor problem in the program. "If the program has stopped for a long time and then started, the offset saved in zk has expired. What? "This article will solve this problem
If the offset on kafka has expired, it will be reported OffsetOutOfRange exception, because the previous saved in the zk offset has been found in the topic. So we need to add a judgment condition in the case of finding offset from zk. If the offset saved in zk is less than the smallest offset in the current kafka topic, it is set to the smallest offset in kafka topic. Assuming that we last saved in zk offset value of 123 (a partition), and then the program stopped for a week, now kafka topic of the smallest offset into 200, then use the previous code, it will be OffsetOutOfRange exception, because 123 the corresponding data has been found. Here we give, how to get <topic, parition> the smallest offset, so that we can be compared

  1. val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")  
  2. val tp = TopicAndPartition(topic, i)  
  3. val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))  
  4. val consumerMin = new SimpleConsumer("broker_host"90921000010000"getMinOffset")  //注意这里的 broker_host,因为这里会导致查询不到,解决方法在下面  
  5. val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets  
  6. var nextOffset = partitionOffset.toLong  
  7. if (curOffsets.length > 0 && nextOffset < curOffsets.head) {  // 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择  
  8.   nextOffset = curOffsets.head  
  9. }  
  10. fromOffsets += (tp -> nextOffset)
 But the above code has a certain problem, because we get offset from kafka when the need to find the corresponding leader, from the leader to get offset, rather than broker, or may get the curOffsets will be empty (that can not get) The The following code is to get different partition leader-related code
  1. val topic_name = "topic_name"     //topic_name 表示我们希望获取的 topic 名字  
  2. val topic2 = List(topic_name)         
  3. val req = new TopicMetadataRequest(topic2, 0)  
  4. val getLeaderConsumer = new SimpleConsumer("broker_host"90921000010000"OffsetLookup")  // 第一个参数是 kafka broker 的host,第二个是 port  
  5. val res = getLeaderConsumer.send(req)  
  6. val topicMetaOption = res.topicsMetadata.headOption  
  7. val partitions = topicMetaOption match {  
  8.   case Some(tm) =>  
  9.     tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]  // 将结果转化为 partition -> leader 的映射关系  
  10.   case None =>  
  11.     Map[Int, String]()  



The above code can get all the partition leader address, and then replace the leader address above the first code in the broker_list can.

In this way, in the spark streaming kafka offset will be saved to zk, and reuse most of the situation are covered to the

Above for the reproduced, the following code for their own integration, made a wheel.

You can cooperate with the spark streaming checkpoint, not open, because the spark streaming checkpoint will save some of the state information running spark, if the program was modified to start from the checkpoint may be wrong.



  1. package com.test.streaming  
  2.   
  3. import kafka.api.{TopicMetadataRequest, PartitionOffsetRequestInfo, OffsetRequest}  
  4. import kafka.consumer.SimpleConsumer  
  5. import kafka.message.MessageAndMetadata  
  6. import kafka.serializer.StringDecoder  
  7. import kafka.utils.{ZkUtils, ZKGroupTopicDirs}  
  8. import org.I0Itec.zkclient.ZkClient  
  9. import org.apache.spark.streaming.dstream.InputDStream  
  10. import org.apache.spark.{rdd, SparkConf}  
  11. import org.apache.spark.streaming.kafka.{OffsetRange, HasOffsetRanges, KafkaUtils}  
  12. import org.apache.spark.streaming.{Seconds, StreamingContext}  
  13. import kafka.common.TopicAndPartition  
  14.   
  15. object KafkaTest {  
  16.   def createContext(checkpointDirectory: String) = {  
  17.   
  18.     println("create spark")  
  19.     val topics = "test_tpoics"  
  20.     val group = "test-kafka"  
  21.     val zkQuorum ="10.16.10.191:2181"  
  22.     val brokerList = "10.10.10.196:8092,10.10.10.196:8092"  
  23.     //    val Array(topics, group, zkQuorum,brokerList) = args  
  24.     val sparkConf = new SparkConf().setAppName("Test-SparkDemo-kafka").setMaster("local[3]")  
  25.     sparkConf.set("spark.streaming.kafka.maxRatePerPartition","1")  
  26.     val ssc = new StreamingContext(sparkConf, Seconds(2))  
  27.     //    ssc.checkpoint(checkpointDirectory)  
  28.     val topicsSet = topics.split(",").toSet  
  29.     val kafkaParams = Map[String, String](  
  30.       "metadata.broker.list" -> brokerList,  
  31.       "group.id" -> group,  
  32.       "zookeeper.connect"->zkQuorum,  
  33.       "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString  
  34.     )  
  35.     val topicDirs = new ZKGroupTopicDirs("test_spark_streaming_group",topics)  
  36.     val zkTopicPath = s"${topicDirs.consumerOffsetDir}"  
  37.     val hostAndPort = "10.16.10.191:2181"  
  38.     val zkClient = new ZkClient(hostAndPort)  
  39.     val children = zkClient.countChildren(zkTopicPath)  
  40.     var kafkaStream :InputDStream[(String,String)] = null  
  41.     var fromOffsets: Map[TopicAndPartition, Long] = Map()  
  42.     if (children > 0) {  
  43.       //---get partition leader begin----  
  44.       val topicList = List(topics)  
  45.       val req = new TopicMetadataRequest(topicList,0)  //得到该topic的一些信息,比如broker,partition分布情况  
  46.       val getLeaderConsumer = new SimpleConsumer("10.16.10.196",8092,10000,10000,"OffsetLookup"// low level api interface  
  47.       val res = getLeaderConsumer.send(req)  //TopicMetadataRequest   topic broker partition 的一些信息  
  48.       val topicMetaOption = res.topicsMetadata.headOption  
  49.       val partitions = topicMetaOption match{  
  50.         case Some(tm) =>  
  51.           tm.partitionsMetadata.map(pm=>(pm.partitionId,pm.leader.get.host)).toMap[Int,String]  
  52.         case None =>  
  53.           Map[Int,String]()  
  54.       }  
  55.       //--get partition leader  end----  
  56.       for (i <- 0 until children) {  
  57.         val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")  
  58.         val tp = TopicAndPartition(topics, i)  
  59.         //---additional begin-----  
  60.         val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,1)))  // -2,1  
  61.         val consumerMin = new SimpleConsumer(partitions(i),8092,10000,10000,"getMinOffset")  
  62.         val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets  
  63.         var nextOffset = partitionOffset.toLong  
  64.         if(curOffsets.length >0 && nextOffset < curOffsets.head){  //如果下一个offset小于当前的offset  
  65.           nextOffset = curOffsets.head  
  66.         }  
  67.         //---additional end-----  
  68.         fromOffsets += (tp -> nextOffset)   
  69.       }  
  70.       val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())   
  71.       kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)  
  72.     }else{  
  73.       println("create")  
  74.       kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)  
  75.     }  
  76.     var offsetRanges = Array[OffsetRange]()  
  77.     kafkaStream.transform{  
  78.       rdd=>offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  
  79.         rdd  
  80.     }.map(msg=>msg._2).foreachRDD{rdd=>  
  81.       for(offset <- offsetRanges ){  
  82.         val zkPath = s"${topicDirs.consumerOffsetDir}/${offset.partition}"  
  83.         ZkUtils.updatePersistentPath(zkClient,zkPath,offset.fromOffset.toString)  
  84.       }  
  85.       rdd.foreachPartition(  
  86.         message=>{  
  87.           while(message.hasNext){  
  88.             println(message.next())  
  89.           }  
  90.         })  
  91.     }  
  92.     ssc  
  93.   }  
  94.   
  95.   def main(args: Array[String]) {  
  96.   
  97.     val checkpointDirectory = "kafka-checkpoint2"  
  98.     System.setProperty("hadoop.home.dir","D:\\Program Files\\hadoop-2.2.0")  
  99.     val ssc = StreamingContext.getOrCreate(checkpointDirectory,  
  100.       () => {  
  101.         createContext(checkpointDirectory)  
  102.       })  
  103.     ssc.start()  
  104.     ssc.awaitTermination()  
  105.   }  

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization