Spark + Kafka's Direct way to send the offset to the Zookeeper implementation

Apache Spark 1.3.0 introduces the Direct API, uses Kafka 's low-level API to read data from the Kafka cluster, and maintains offset-related information within the Spark Streaming system and implements zero data loss in this way Data loss) is more efficient than using Receiver-based methods. But because the Spark Streaming system itself maintains Kafka 's read offset, and the Spark Streaming system does not send this consumption offset to Zookeeper, which will result in offset-based Kafka cluster monitoring software (eg Apache Kafka Web Console by Kafka, KafkaOffsetMonitor for Apache Kafka, etc.). This article is based on the problem in order to solve this problem, so that we prepared the Spark Streaming program can receive data in each time after the automatic update Zookeeper Kafka offset. We can know from Spark's official HasOffsetRanges that offsetRanges Spark's internal HasOffsetRanges Kafka cheap HasOffsetRanges is stored in the HasOffsetRanges class, which we can get in the Spark Streaming program:
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
So that we can get the partition consumption information, just traverse offsetsList, and then send this information to Zookeeper to update the Kafka consumption offset. The complete code snippet is as follows:
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
      messages.foreachRDD(rdd => {
        val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        val kc = new KafkaCluster(kafkaParams)
        for (offsets < - offsetsList) {
          val topicAndPartition = TopicAndPartition("iteblog", offsets.partition)
          val o = kc.setConsumerOffsets(args(0), Map((topicAndPartition, offsets.untilOffset)))
          if (o.isLeft) {
            println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
KafkaCluster class is used to KafkaCluster classes KafkaCluster with the Kafka cluster. We can set its Map((topicAndPartition, offsets.untilOffset)) offset Map((topicAndPartition, offsets.untilOffset)) for each Map((topicAndPartition, offsets.untilOffset)) , and then call the KafkaCluster class Method to update the information inside the Zookeeper, so that we can update the Kafka offset, and finally we can KafkaOffsetMonitor software to monitor Kafka in the corresponding Topic of consumer information, the following figure is KafkaOffsetMonitor monitoring

From the figure we can see that KafkaOffsetMonitor monitoring software has been able to monitor the consumption of Kafka-related partitions, which is very important to monitor our entire Spark Streaming program, because we can understand Spark read speed at any time. In addition, the complete code for the KafkaCluster tool class is as follows:
package org.apache.spark.streaming.kafka
import kafka.api.OffsetCommitRequest
import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal
 * User: 过往记忆
 * Date: 2015-06-02
 * Time: 下午23:46
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
  type Err = ArrayBuffer[Throwable]
  @transient private var _config: SimpleConsumerConfig = null
  def config: SimpleConsumerConfig = this.synchronized {
    if (_config == null) {
      _config = SimpleConsumerConfig(kafkaParams)
  def setConsumerOffsets(groupId: String,
                         offsets: Map[TopicAndPartition, Long]
                          ): Either[Err, Map[TopicAndPartition, Short]] = {
    setConsumerOffsetMetadata(groupId, { kv =>
      kv._1 -> OffsetMetadataAndError(kv._2)
  def setConsumerOffsetMetadata(groupId: String,
                                metadata: Map[TopicAndPartition, OffsetMetadataAndError]
                                 ): Either[Err, Map[TopicAndPartition, Short]] = {
    var result = Map[TopicAndPartition, Short]()
    val req = OffsetCommitRequest(groupId, metadata)
    val errs = new Err
    val topicAndPartitions = metadata.keySet
    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
      val resp = consumer.commitOffsets(req)
      val respMap = resp.requestInfo
      val needed = topicAndPartitions.diff(result.keySet)
      needed.foreach { tp: TopicAndPartition =>
        respMap.get(tp).foreach { err: Short =>
          if (err == ErrorMapping.NoError) {
            result += tp -> err
          } else {
      if (result.keys.size == topicAndPartitions.size) {
        return Right(result)
    val missing = topicAndPartitions.diff(result.keySet)
    errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
                         (fn: SimpleConsumer => Any): Unit = {
    brokers.foreach { hp =>
      var consumer: SimpleConsumer = null
      try {
        consumer = connect(hp._1, hp._2)
      } catch {
        case NonFatal(e) =>
      } finally {
        if (consumer != null) {
  def connect(host: String, port: Int): SimpleConsumer =
    new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)


Posts les plus consultés de ce blog

Spark performance optimization: shuffle tuning

Spark optimization

Use Apache Spark to write data to ElasticSearch