Spark + Kafka's Direct way to send the offset to the Zookeeper implementation
Apache Spark 1.3.0 introduces the Direct API, uses Kafka 's low-level API to read data from the Kafka cluster, and maintains offset-related information within the Spark Streaming system and implements zero data loss in this way Data loss) is more efficient than using Receiver-based methods. But because the Spark Streaming system itself maintains Kafka
's read offset, and the Spark Streaming system does not send this
consumption offset to Zookeeper, which will result in offset-based Kafka
cluster monitoring software (eg Apache Kafka Web Console by Kafka, KafkaOffsetMonitor for Apache Kafka, etc.).
This article is based on the problem in order to solve this problem, so
that we prepared the Spark Streaming program can receive data in each
time after the automatic update Zookeeper Kafka offset. We can know from Spark's official
So that we can get the partition consumption information, just traverse
offsetsList, and then send this information to Zookeeper to update the
Kafka consumption offset. The complete code snippet is as follows:
From the figure we can see that KafkaOffsetMonitor monitoring software has been able to monitor the consumption of Kafka-related partitions, which is very important to monitor our entire Spark Streaming program, because we can understand Spark read speed at any time. In addition, the complete code for the KafkaCluster tool class is as follows:
HasOffsetRanges
that offsetRanges
Spark's internal HasOffsetRanges
Kafka cheap HasOffsetRanges
is stored in the HasOffsetRanges
class, which we can get in the Spark Streaming program: val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges |
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) messages.foreachRDD(rdd = > { val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val kc = new KafkaCluster(kafkaParams) for (offsets < - offsetsList) { val topicAndPartition = TopicAndPartition( "iteblog" , offsets.partition) val o = kc.setConsumerOffsets(args( 0 ), Map((topicAndPartition, offsets.untilOffset))) if (o.isLeft) { println(s "Error updating the offset to Kafka cluster: ${o.left.get}" ) } } }) |
KafkaCluster
class is used to KafkaCluster
classes KafkaCluster
with the Kafka cluster. We can set its Map((topicAndPartition, offsets.untilOffset))
offset Map((topicAndPartition, offsets.untilOffset))
for each Map((topicAndPartition, offsets.untilOffset))
, and then call the KafkaCluster
class Method to update the information inside the Zookeeper, so that we
can update the Kafka offset, and finally we can KafkaOffsetMonitor
software to monitor Kafka in the corresponding Topic of consumer
information, the following figure is KafkaOffsetMonitor monitoringFrom the figure we can see that KafkaOffsetMonitor monitoring software has been able to monitor the consumption of Kafka-related partitions, which is very important to monitor our entire Spark Streaming program, because we can understand Spark read speed at any time. In addition, the complete code for the KafkaCluster tool class is as follows:
package org.apache.spark.streaming.kafka import kafka.api.OffsetCommitRequest import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} import kafka.consumer.SimpleConsumer import org.apache.spark.SparkException import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig import scala.collection.mutable.ArrayBuffer import scala.util.Random import scala.util.control.NonFatal /** * User: 过往记忆 * Date: 2015-06-02 * Time: 下午23:46 * bolg: https://www.iteblog.com * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ class KafkaCluster( val kafkaParams : Map[String, String]) extends Serializable { type Err = ArrayBuffer[Throwable] @ transient private var _ config : SimpleConsumerConfig = null def config : SimpleConsumerConfig = this .synchronized { if ( _ config == null ) { _ config = SimpleConsumerConfig(kafkaParams) } _ config } def setConsumerOffsets(groupId : String, offsets : Map[TopicAndPartition, Long] ) : Either[Err, Map[TopicAndPartition, Short]] = { setConsumerOffsetMetadata(groupId, offsets.map { kv = > kv. _ 1 -> OffsetMetadataAndError(kv. _ 2 ) }) } def setConsumerOffsetMetadata(groupId : String, metadata : Map[TopicAndPartition, OffsetMetadataAndError] ) : Either[Err, Map[TopicAndPartition, Short]] = { var result = Map[TopicAndPartition, Short]() val req = OffsetCommitRequest(groupId, metadata) val errs = new Err val topicAndPartitions = metadata.keySet withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer = > val resp = consumer.commitOffsets(req) val respMap = resp.requestInfo val needed = topicAndPartitions.diff(result.keySet) needed.foreach { tp : TopicAndPartition = > respMap.get(tp).foreach { err : Short = > if (err == ErrorMapping.NoError) { result + = tp -> err } else { errs.append(ErrorMapping.exceptionFor(err)) } } } if (result.keys.size == topicAndPartitions.size) { return Right(result) } } val missing = topicAndPartitions.diff(result.keySet) errs.append( new SparkException(s "Couldn't set offsets for ${missing}" )) Left(errs) } private def withBrokers(brokers : Iterable[(String, Int)], errs : Err) (fn : SimpleConsumer = > Any) : Unit = { brokers.foreach { hp = > var consumer : SimpleConsumer = null try { consumer = connect(hp. _ 1 , hp. _ 2 ) fn(consumer) } catch { case NonFatal(e) = > errs.append(e) } finally { if (consumer ! = null ) { consumer.close() } } } } def connect(host : String, port : Int) : SimpleConsumer = new SimpleConsumer(host, port, config.socketTimeoutMs, config.socketReceiveBufferBytes, config.clientId) } |
Commentaires
Enregistrer un commentaire