Write the offset of Spark Streaming + Kafka direct to Zookeeper and reuse it

Problem guide 1. Why do I need to see the offset to save to Zookeeper when using the Direct API?
2. How do I put offset into Zookeeper?
3. How to solve the problem of dumping in Zookeeper?


Implement the offset into Zookeeper

In the Spark Streaming consumption of Kafka data, there are two ways are 1) Receiver-based based on the createStream method and 2) Direct Approach (No Receivers) way createDirectStream method, the detailed reference Spark Streaming + Kafka Integration Guide , But the second use of kafka offset is stored in the checkpoint, if the program restart, then part of the data will be lost, you can refer to Spark & ​​Kafka - Achieving zero data-loss .

This article mainly discusses how to save the offset in kafka into zookeeper and how to read the existing offset from zookeeper in the case of using the second direct approach.

The general idea is that when the kafka stream is initialized, it is checked whether the zookeeper holds an offset, and it is read from the offset, and it is not read from the latest / old. In the consumption of kafka data at the same time, each partition offset saved to zookeeper for backup, the specific implementation of the following reference code
[Scala] plain text view copy code
  Val topicDirs = new ZKGroupTopicDirs ("test_spark_streaming_group", topic) // set the name of the topic used when creating the stream. Val topicDirs = new ZKGroupTopicDirs ("test_spark_streaming_group", topic) // Create a ZKGroupTopicDirs object, and save the path in the zookeeper for the save z zTTopicPath = s "$ {topicDirs.consumerOffsetDir}", which becomes / consumers / test_spark_streaming_group / offsets / topic_name
 
     Val zkClient = new ZkClient ("10.4.232.77:2181") / / zookeeper the host and ip, create a client
     Val children = zkClient.countChildren (s "$ {topicDirs.consumerOffsetDir}") / / query whether the path under the byte (the default byte for our own save different partition generated)
 
     Var kafkaStream: InputDStream [(String, String)] = null  
     If we have saved offset in zookeeper, we will use this offset as the starting position of kafkaStream if (children> 0) {// If you save the offset, it's better here. The practice should also be compared with the smallest offset on kafka, otherwise it will report OutOfRange error for (i <- 0 untilchildren) {
           Val partitionOffset = zkClient.readData [String] (s "$ {topicDirs.consumerOffsetDir} / $ {i}")
           Val tp = TopicAndPartition (topic, i)
           FromOffsets + = (tp -> partitionOffset.toLong) // Add the offset corresponding to the different partition to the fromOffsets logInfo ("@@@@@@ topic [" + topic + "] partition [" + i + "] offset [ "+ PartitionOffset +"] @@@@@@ ")
         }
 
         This will convert the message of kafka, and the final kafak's data will become (topic_name, message). This will change the message of kafka. () () () () () () () () () () () () () () () () () The tuple
         KafkaStream = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder, (String, String)] (ssc, kafkaParam, fromOffsets, messageHandler)
     }
     Else {
         KafkaStream = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder] (ssc, kafkaParam, topics) // If not saved, use the latest or oldest offset based on the kafkaParam configuration
     }
 
     Var offsetRanges = Array [OffsetRange] ()
     KafkaStream.transform {rdd =>
       OffsetRanges = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges // get the offset of the rdd corresponding to kafka
       Rdd
     } .map (msg => Utils.msgDecode (msg)). ForeachRDD {rdd =>
       For (o <- offsetRanges) {
         Val zkPath = s "$ {topicDirs.consumerOffsetDir} / $ {o.partition}"
         ZkUtils.updatePersistentPath (zkClient, zkPath, o.fromOffset.toString) // Save the partition's offset to zookeeper
         LogInfo (s "@@@@@@@ $ $ {o.topic} partition $ {o.partition} fromoffset $ {o.fromOffset} untiloffset $ {o.untilOffset} #######")
       }
 
       Rdd.foreachPartition (
         Message => {
           While (message.hasNext) {
             LogInfo (s "@ ^ _ ^ @ [" + message.next () + "] @ ^ _ ^ @")
           }
         }
       )
     } 
Using the above code, we can do the Spark Streaming program to read data from Kafka is not lost


Resolve the offset expiration problem saved in Zookeeper

In the previous article, we talked about how to save the offset in zk, and re-use, but the program has a small problem "If the program has been a very long and long after the start, zk saved offset has expired , What will happen? "This article will solve this problem

If the offset on kafka has expired, it will be reported OffsetOutOfRange exception, because the previous saved in the zk offset has been found in the topic. So we need to add a judgment condition in the case of finding offset from zk. If the offset saved in zk is less than the smallest offset in the current kafka topic, it is set to the smallest offset in kafka topic. Assuming that we last saved in zk offset value of 123 (a partition), and then the program stopped for a week, now kafka topic of the smallest offset into 200, then use the previous code, it will be OffsetOutOfRange exception, because 123 the corresponding data has been found. Here we give, how to get <topic, parition> the smallest offset, so that we can be compared
[Scala] plain text view copy code
  Val partitionOffset = zkClient.readData [String] (s "$ {topicDirs.consumerOffsetDir} / $ {i}")
 Val tp = TopicAndPartition (topic, i)
 
 Val requestMin = OffsetRequest (Map (tp -> PartitionOffsetRequestInfo (OffsetRequest.EarliestTime, 1)))
 $ / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / (Tp) .offsets
 Var nextOffset = partitionOffset.toLong
 If (curOffsets.length> 0 && nextOffset <curOffsets.head) {// by comparing the minimum offset on zaf from kafka and the offset saved on zk, select nextOffset = curOffsets.head
 }
 FromOffsets + = (tp -> nextOffset) / / set the correct offset, where the nextOffset set to 0 (0 is a special value), you can observe the expiration of the crash 

But the above code has a certain problem, because we get offset from kafka when the need to find the corresponding leader, from the leader to get offset, rather than broker, or may get the curOffsets will be empty (that can not get) The The following code is to get different partition leader-related code
[Scala] plain text view copy code
  Val topic_name = "topic_name" // topic_name means that we want to get the topic name val topic2 = List (topic_name)       
 Val req = new TopicMetadataRequest (topic2, 0)
 $ "The first argument is the host of kafka broker, and the second is the port of the kafka broker. The first argument is the host of kafka broker, and the second is the port of kafka broker.
 Val res = getLeaderConsumer.send (req)
 Val topicMetaOption = res.topicsMetadata.headOption
 Val partitions = topicMetaOption match {
   Case some (tm) =>
     Tam.partitionsMetadata.map (pm => (pm.partitionId, pm.leader.get.host)). ToMap [Int, String] // transform the result into partition -> leader's mapping case case =
     Map [Int, String] ()
 } 

The above code can get all the partition leader address, and then replace the leader address above the first code in the broker_list can.
In this way, in the spark streaming kafka offset will be saved to zk, and reuse most of the situation are covered to the

Source: Push Cool Author: klion26

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization