Write the offset of Spark Streaming + Kafka direct to Zookeeper and reuse it
Problem guide 1. Why do I need to see the offset to save to Zookeeper when using the Direct API?
2. How do I put offset into Zookeeper?
3. How to solve the problem of dumping in Zookeeper?
Implement the offset into Zookeeper
Source: Push Cool Author: klion26
2. How do I put offset into Zookeeper?
3. How to solve the problem of dumping in Zookeeper?
Implement the offset into Zookeeper
In the Spark Streaming consumption of Kafka data, there are two ways
are 1) Receiver-based based on the createStream method and 2) Direct
Approach (No Receivers) way createDirectStream method, the detailed reference Spark Streaming + Kafka Integration Guide
, But the second use of kafka offset is stored in the checkpoint, if
the program restart, then part of the data will be lost, you can refer
to Spark & Kafka - Achieving zero data-loss .
This article mainly discusses how to save the offset in kafka into
zookeeper and how to read the existing offset from zookeeper in the case
of using the second direct approach.
The general idea is that when the kafka stream is initialized, it is
checked whether the zookeeper holds an offset, and it is read from the
offset, and it is not read from the latest / old.
In the consumption of kafka data at the same time, each partition
offset saved to zookeeper for backup, the specific implementation of the
following reference code
[Scala] plain text view copy code
Val topicDirs = new ZKGroupTopicDirs ("test_spark_streaming_group", topic) // set the name of the topic used when creating the stream. Val topicDirs = new ZKGroupTopicDirs ("test_spark_streaming_group", topic) // Create a ZKGroupTopicDirs object, and save the path in the zookeeper for the save z zTTopicPath = s "$ {topicDirs.consumerOffsetDir}", which becomes / consumers / test_spark_streaming_group / offsets / topic_name Val zkClient = new ZkClient ("10.4.232.77:2181") / / zookeeper the host and ip, create a client Val children = zkClient.countChildren (s "$ {topicDirs.consumerOffsetDir}") / / query whether the path under the byte (the default byte for our own save different partition generated) Var kafkaStream: InputDStream [(String, String)] = null If we have saved offset in zookeeper, we will use this offset as the starting position of kafkaStream if (children> 0) {// If you save the offset, it's better here. The practice should also be compared with the smallest offset on kafka, otherwise it will report OutOfRange error for (i <- 0 untilchildren) { Val partitionOffset = zkClient.readData [String] (s "$ {topicDirs.consumerOffsetDir} / $ {i}") Val tp = TopicAndPartition (topic, i) FromOffsets + = (tp -> partitionOffset.toLong) // Add the offset corresponding to the different partition to the fromOffsets logInfo ("@@@@@@ topic [" + topic + "] partition [" + i + "] offset [ "+ PartitionOffset +"] @@@@@@ ") } This will convert the message of kafka, and the final kafak's data will become (topic_name, message). This will change the message of kafka. () () () () () () () () () () () () () () () () () The tuple KafkaStream = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder, (String, String)] (ssc, kafkaParam, fromOffsets, messageHandler) } Else { KafkaStream = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder] (ssc, kafkaParam, topics) // If not saved, use the latest or oldest offset based on the kafkaParam configuration } Var offsetRanges = Array [OffsetRange] () KafkaStream.transform {rdd => OffsetRanges = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges // get the offset of the rdd corresponding to kafka Rdd } .map (msg => Utils.msgDecode (msg)). ForeachRDD {rdd => For (o <- offsetRanges) { Val zkPath = s "$ {topicDirs.consumerOffsetDir} / $ {o.partition}" ZkUtils.updatePersistentPath (zkClient, zkPath, o.fromOffset.toString) // Save the partition's offset to zookeeper LogInfo (s "@@@@@@@ $ $ {o.topic} partition $ {o.partition} fromoffset $ {o.fromOffset} untiloffset $ {o.untilOffset} #######") } Rdd.foreachPartition ( Message => { While (message.hasNext) { LogInfo (s "@ ^ _ ^ @ [" + message.next () + "] @ ^ _ ^ @") } } ) }
Using the above code, we can do the Spark Streaming program to read data from Kafka is not lost
Resolve the offset expiration problem saved in Zookeeper
In the previous article, we talked about how to save the offset in zk,
and re-use, but the program has a small problem "If the program has been
a very long and long after the start, zk saved offset has expired ,
What will happen? "This article will solve this problem
If the offset on kafka has expired, it will be reported
OffsetOutOfRange exception, because the previous saved in the zk offset
has been found in the topic.
So we need to add a judgment condition in the case of finding offset
from zk. If the offset saved in zk is less than the smallest offset in
the current kafka topic, it is set to the smallest offset in kafka
topic.
Assuming that we last saved in zk offset value of 123 (a partition),
and then the program stopped for a week, now kafka topic of the smallest
offset into 200, then use the previous code, it will be
OffsetOutOfRange exception, because 123 the corresponding data has been
found. Here we give, how to get <topic, parition> the smallest offset, so that we can be compared
[Scala] plain text view copy code
Val partitionOffset = zkClient.readData [String] (s "$ {topicDirs.consumerOffsetDir} / $ {i}") Val tp = TopicAndPartition (topic, i) Val requestMin = OffsetRequest (Map (tp -> PartitionOffsetRequestInfo (OffsetRequest.EarliestTime, 1))) $ / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / (Tp) .offsets Var nextOffset = partitionOffset.toLong If (curOffsets.length> 0 && nextOffset <curOffsets.head) {// by comparing the minimum offset on zaf from kafka and the offset saved on zk, select nextOffset = curOffsets.head } FromOffsets + = (tp -> nextOffset) / / set the correct offset, where the nextOffset set to 0 (0 is a special value), you can observe the expiration of the crash
But the above code has a certain problem, because we get offset from
kafka when the need to find the corresponding leader, from the leader to
get offset, rather than broker, or may get the curOffsets will be empty
(that can not get) The The following code is to get different partition leader-related code
[Scala] plain text view copy code
Val topic_name = "topic_name" // topic_name means that we want to get the topic name val topic2 = List (topic_name) Val req = new TopicMetadataRequest (topic2, 0) $ "The first argument is the host of kafka broker, and the second is the port of the kafka broker. The first argument is the host of kafka broker, and the second is the port of kafka broker. Val res = getLeaderConsumer.send (req) Val topicMetaOption = res.topicsMetadata.headOption Val partitions = topicMetaOption match { Case some (tm) => Tam.partitionsMetadata.map (pm => (pm.partitionId, pm.leader.get.host)). ToMap [Int, String] // transform the result into partition -> leader's mapping case case = Map [Int, String] () }
The above code can get all the partition leader address, and then
replace the leader address above the first code in the broker_list can.
In this way, in the spark streaming kafka offset will be saved to zk, and reuse most of the situation are covered to the
Source: Push Cool Author: klion26
Commentaires
Enregistrer un commentaire