Spark Streaming uses kafka low-level api + zookeeper to save offset
http://www.klion26.com/spark-streaming-save-offset-to-zookeeper.html
http://www.klion26.com/spark-streaming-saving-offset-in-zookeeper-2.html
In the Spark Streaming consumption of Kafka data, there are two ways are 1) Receiver-based based on the createStream method and 2) Direct Approach (No Receivers) way createDirectStream method, the detailed reference Spark Streaming + Kafka Integration Guide, But the second use of kafka offset is stored in the checkpoint, if the program restart, then part of the data will be lost, you can refer to Spark & Kafka - Achieving zero data-loss.
This article mainly discusses how to save the offset in kafka into zookeeper and how to read the existing offset from zookeeper in the case of using the second direct approach.
The general idea is that when the kafka stream is initialized, it is checked whether the zookeeper holds an offset, and it is read from the offset, and it is not read from the latest / old. In the consumption of kafka data at the same time, each partition offset saved to zookeeper for backup, the specific implementation of the following reference code
The above section tells us how to save the offset in zk and reuse it, but there is a minor problem in the program. "If the program has stopped for a long time and then started, the offset saved in zk has expired. What? "This article will solve this problem
If the offset on kafka has expired, it will be reported OffsetOutOfRange exception, because the previous saved in the zk offset has been found in the topic. So we need to add a judgment condition in the case of finding offset from zk. If the offset saved in zk is less than the smallest offset in the current kafka topic, it is set to the smallest offset in kafka topic. Assuming that we last saved in zk offset value of 123 (a partition), and then the program stopped for a week, now kafka topic of the smallest offset into 200, then use the previous code, it will be OffsetOutOfRange exception, because 123 the corresponding data has been found. Here we give, how to get <topic, parition> the smallest offset, so that we can be compared
The above code can get all the partition leader address, and then replace the leader address above the first code in the broker_list can.
In this way, in the spark streaming kafka offset will be saved to zk, and reuse most of the situation are covered to the
Above for the reproduced, the following code for their own integration, made a wheel.
You can cooperate with the spark streaming checkpoint, not open, because the spark streaming checkpoint will save some of the state information running spark, if the program was modified to start from the checkpoint may be wrong.
http://www.klion26.com/spark-streaming-saving-offset-in-zookeeper-2.html
In the Spark Streaming consumption of Kafka data, there are two ways are 1) Receiver-based based on the createStream method and 2) Direct Approach (No Receivers) way createDirectStream method, the detailed reference Spark Streaming + Kafka Integration Guide, But the second use of kafka offset is stored in the checkpoint, if the program restart, then part of the data will be lost, you can refer to Spark & Kafka - Achieving zero data-loss.
This article mainly discusses how to save the offset in kafka into zookeeper and how to read the existing offset from zookeeper in the case of using the second direct approach.
The general idea is that when the kafka stream is initialized, it is checked whether the zookeeper holds an offset, and it is read from the offset, and it is not read from the latest / old. In the consumption of kafka data at the same time, each partition offset saved to zookeeper for backup, the specific implementation of the following reference code
- val topic : String = "topic_name" //消费的 topic 名字
- val topics : Set[String] = Set(topic) //创建 stream 时使用的 topic 名字集合
- val topicDirs = new ZKGroupTopicDirs("test_spark_streaming_group", topic) //创建一个 ZKGroupTopicDirs 对象,对保存
- val zkTopicPath = s"${topicDirs.consumerOffsetDir}" //获取 zookeeper 中的路径,这里会变成 /consumers/test_spark_streaming_group/offsets/topic_name
- val zkClient = new ZkClient("10.4.232.77:2181") //zookeeper 的host 和 ip,创建一个 client
- val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}") //查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)
- var kafkaStream : InputDStream[(String, String)] = null
- var fromOffsets: Map[TopicAndPartition, Long] = Map() //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
- if (children > 0) { //如果保存过 offset,这里更好的做法,还应该和 kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误
- for (i <- 0 until children) {
- val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
- val tp = TopicAndPartition(topic, i)
- fromOffsets += (tp -> partitionOffset.toLong) //将不同 partition 对应的 offset 增加到 fromOffsets 中
- logInfo("@@@@@@ topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")
- }
- val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message()) //这个会将 kafka 的消息进行 transform,最终 kafka 的数据都会变成 (topic_name, message) 这样的 tuple
- kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParam, fromOffsets, messageHandler)
- }
- else {
- kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics) //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset
- }
- var offsetRanges = Array[OffsetRange]()
- kafkaStream.transform{ rdd =>
- offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset
- rdd
- }.map(msg => msg._2).foreachRDD { rdd =>
- for (o <- offsetRanges) {
- val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
- ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString) //将该 partition 的 offset 保存到 zookeeper
- logInfo(s"@@@@@@ topic ${o.topic} partition ${o.partition} fromoffset ${o.fromOffset} untiloffset ${o.untilOffset} #######")
- }
- rdd.foreachPartition(
- message => {
- while(message.hasNext) {
- logInfo(s"@^_^@ [" + message.next() + "] @^_^@")
- }
- }
- )
- }
The above section tells us how to save the offset in zk and reuse it, but there is a minor problem in the program. "If the program has stopped for a long time and then started, the offset saved in zk has expired. What? "This article will solve this problem
If the offset on kafka has expired, it will be reported OffsetOutOfRange exception, because the previous saved in the zk offset has been found in the topic. So we need to add a judgment condition in the case of finding offset from zk. If the offset saved in zk is less than the smallest offset in the current kafka topic, it is set to the smallest offset in kafka topic. Assuming that we last saved in zk offset value of 123 (a partition), and then the program stopped for a week, now kafka topic of the smallest offset into 200, then use the previous code, it will be OffsetOutOfRange exception, because 123 the corresponding data has been found. Here we give, how to get <topic, parition> the smallest offset, so that we can be compared
- val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
- val tp = TopicAndPartition(topic, i)
- val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
- val consumerMin = new SimpleConsumer("broker_host", 9092, 10000, 10000, "getMinOffset") //注意这里的 broker_host,因为这里会导致查询不到,解决方法在下面
- val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
- var nextOffset = partitionOffset.toLong
- if (curOffsets.length > 0 && nextOffset < curOffsets.head) { // 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择
- nextOffset = curOffsets.head
- }
- fromOffsets += (tp -> nextOffset)
- val topic_name = "topic_name" //topic_name 表示我们希望获取的 topic 名字
- val topic2 = List(topic_name)
- val req = new TopicMetadataRequest(topic2, 0)
- val getLeaderConsumer = new SimpleConsumer("broker_host", 9092, 10000, 10000, "OffsetLookup") // 第一个参数是 kafka broker 的host,第二个是 port
- val res = getLeaderConsumer.send(req)
- val topicMetaOption = res.topicsMetadata.headOption
- val partitions = topicMetaOption match {
- case Some(tm) =>
- tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String] // 将结果转化为 partition -> leader 的映射关系
- case None =>
- Map[Int, String]()
- }
The above code can get all the partition leader address, and then replace the leader address above the first code in the broker_list can.
In this way, in the spark streaming kafka offset will be saved to zk, and reuse most of the situation are covered to the
Above for the reproduced, the following code for their own integration, made a wheel.
You can cooperate with the spark streaming checkpoint, not open, because the spark streaming checkpoint will save some of the state information running spark, if the program was modified to start from the checkpoint may be wrong.
- package com.test.streaming
- import kafka.api.{TopicMetadataRequest, PartitionOffsetRequestInfo, OffsetRequest}
- import kafka.consumer.SimpleConsumer
- import kafka.message.MessageAndMetadata
- import kafka.serializer.StringDecoder
- import kafka.utils.{ZkUtils, ZKGroupTopicDirs}
- import org.I0Itec.zkclient.ZkClient
- import org.apache.spark.streaming.dstream.InputDStream
- import org.apache.spark.{rdd, SparkConf}
- import org.apache.spark.streaming.kafka.{OffsetRange, HasOffsetRanges, KafkaUtils}
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import kafka.common.TopicAndPartition
- object KafkaTest {
- def createContext(checkpointDirectory: String) = {
- println("create spark")
- val topics = "test_tpoics"
- val group = "test-kafka"
- val zkQuorum ="10.16.10.191:2181"
- val brokerList = "10.10.10.196:8092,10.10.10.196:8092"
- // val Array(topics, group, zkQuorum,brokerList) = args
- val sparkConf = new SparkConf().setAppName("Test-SparkDemo-kafka").setMaster("local[3]")
- sparkConf.set("spark.streaming.kafka.maxRatePerPartition","1")
- val ssc = new StreamingContext(sparkConf, Seconds(2))
- // ssc.checkpoint(checkpointDirectory)
- val topicsSet = topics.split(",").toSet
- val kafkaParams = Map[String, String](
- "metadata.broker.list" -> brokerList,
- "group.id" -> group,
- "zookeeper.connect"->zkQuorum,
- "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString
- )
- val topicDirs = new ZKGroupTopicDirs("test_spark_streaming_group",topics)
- val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
- val hostAndPort = "10.16.10.191:2181"
- val zkClient = new ZkClient(hostAndPort)
- val children = zkClient.countChildren(zkTopicPath)
- var kafkaStream :InputDStream[(String,String)] = null
- var fromOffsets: Map[TopicAndPartition, Long] = Map()
- if (children > 0) {
- //---get partition leader begin----
- val topicList = List(topics)
- val req = new TopicMetadataRequest(topicList,0) //得到该topic的一些信息,比如broker,partition分布情况
- val getLeaderConsumer = new SimpleConsumer("10.16.10.196",8092,10000,10000,"OffsetLookup") // low level api interface
- val res = getLeaderConsumer.send(req) //TopicMetadataRequest topic broker partition 的一些信息
- val topicMetaOption = res.topicsMetadata.headOption
- val partitions = topicMetaOption match{
- case Some(tm) =>
- tm.partitionsMetadata.map(pm=>(pm.partitionId,pm.leader.get.host)).toMap[Int,String]
- case None =>
- Map[Int,String]()
- }
- //--get partition leader end----
- for (i <- 0 until children) {
- val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
- val tp = TopicAndPartition(topics, i)
- //---additional begin-----
- val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,1))) // -2,1
- val consumerMin = new SimpleConsumer(partitions(i),8092,10000,10000,"getMinOffset")
- val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
- var nextOffset = partitionOffset.toLong
- if(curOffsets.length >0 && nextOffset < curOffsets.head){ //如果下一个offset小于当前的offset
- nextOffset = curOffsets.head
- }
- //---additional end-----
- fromOffsets += (tp -> nextOffset)
- }
- val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
- kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
- }else{
- println("create")
- kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
- }
- var offsetRanges = Array[OffsetRange]()
- kafkaStream.transform{
- rdd=>offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- rdd
- }.map(msg=>msg._2).foreachRDD{rdd=>
- for(offset <- offsetRanges ){
- val zkPath = s"${topicDirs.consumerOffsetDir}/${offset.partition}"
- ZkUtils.updatePersistentPath(zkClient,zkPath,offset.fromOffset.toString)
- }
- rdd.foreachPartition(
- message=>{
- while(message.hasNext){
- println(message.next())
- }
- })
- }
- ssc
- }
- def main(args: Array[String]) {
- val checkpointDirectory = "kafka-checkpoint2"
- System.setProperty("hadoop.home.dir","D:\\Program Files\\hadoop-2.2.0")
- val ssc = StreamingContext.getOrCreate(checkpointDirectory,
- () => {
- createContext(checkpointDirectory)
- })
- ssc.start()
- ssc.awaitTermination()
- }
- }
Commentaires
Enregistrer un commentaire