Spark Streaming +Kafka partition
Spark Streaming + Kafka uses the underlying API to read Kafka's Partition data directly, manually updating Offset to Zookeeper cluster
Posted at 2016/11/30 20:22:01 1121 people read
Category: Spark Kafka
Spark Streaming + Kafka uses the underlying API to read Kafka's Partition data directly, and the normal Offset is stored in CheckPoint. But this can not be achieved Kafka monitoring tool for Kafka monitoring, so manually update Offset to Zookeeper cluster
Related source brief introduction:
1: TopicAndPartition is an example class that encapsulates the id of topic and partition
Case class TopicAndPartition (topic: String, partitionId: Int)
2: OffsetRange is the encapsulation of topic name, partition id, fromOffset (current consumption start offset), untilOffset (current consumption end offset). So OffsetRange contains information such as: topic name, partition Id, start offset, end offset.
/ ** * * @param topic Kafka topic name * @param partition Kafka partition id * @param fromOffset inclusive derived offset * @param untilOffset exclusive ending offset * / Final class OffsetRange private (val topic: String, val partition: Int, val fromOffset: Long, val untilOffset: Long) extends Serializable
3: Code implementation:
Object Iteblog { Val brokerAddress = "http://www.iteblog.com:9092" Val groupID = "testGroup" Val kafkaParams = Map [String, String] ( "Metadata.broker.list" -> brokerAddress, "Group.id" -> "iteblog") Def main (args: Array [String]) { Val sparkConf = new SparkConf (). SetAppName ("Test") SparkConf.set ("spark.kryo.registrator", "utils.CpcKryoSerializer") Val sc = new SparkContext (sparkConf) Val ssc = new StreamingContext (sc, Seconds (2)) Val topicsSet = Set ("iteblog") Val messages = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet) Messages.foreachRDD (rdd => { // convert RDD to HasOffsetRanges type (KafkaRDD extends HasOffsetRanges) // OffsetRange Description: Represents a range of offsets from a single Kafka TopicAndPartition. // OffsetRange Description: Instances of this class can be created with `OffsetRange.create ()`. Val offsetsList: Array [OffsetRange] = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges // implementation code for offsetRanges (in KafkaRDD): tp: TopicAndPartition, fo: fromOffset // val offsetRanges = fromOffsets.map {case (tp, fo) => // val uo = untilOffsets (tp) // offsetRange (tp.topic, tp.partition, fo, uo.offset) //} .toArray Val kc = new KafkaCluster (kafkaParams) For (offsets <- offsetsList) { // TopicAndPartition The main construction parameter is the first topic, the second is the partition id Val topicAndPartition = TopicAndPartition ("iteblog", offsets.partition) //offsets.partition represents Kafka partition id Val o = kc.setConsumerOffsets (groupID, Map ((topicAndPartition, offsets.untilOffset))) // offsets.untilOffset: yes if (o.isLeft) { Println (s "Error updating the offset to Kafka cluster: $ {o.left.get}") } } }) Ssc.start () Ssc.awaitTermination () Ssc.stop () } }Core code to explain:
Using KafkaUtils' createDirectStream method, call the underlying API to consume Kafka Partition data (Kafka Partition and RDD Partition one-to-one correspondence). The return value of createDirectStream is DStream and the bottom is RDD.
Val messages = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
Messages.foreachRDD is the basis for calculating the offset range for messages underlying RDD.
KafkaRDD and HasOffsetRanges relationship (construction parameters and generic omitted, see the source code):
KafkaRDD extends RDD [R] (sc, Nil) with Logging with HasOffsetRanges
// rdd is a variable in messages.foreachRDD, rdd whose type is KafkaRDD, but because of the polymorphic reason rdd is not actually a KafkaRDD type, but an RDD type, it needs to transition to HasOffsetRanges down, calling the offsetRanges method. (Memories OffsetRange is the package for what? Answer: topic name, partition Id, start offset, end offset. )Val offsetsList: Array [OffsetRange] = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges
OffsetRanges implementation code (KafkaRDD): tp: TopicAndPartition, fo: fromOffsetVal offsetRanges = fromOffsets.map {case (tp, fo) => Val uo = untilOffsets (tp) OffsetRange (tp.topic, tp.partition, fo, uo.offset) } .toArrayKafkaCluster completes the wrapper class that updates the offset to the zookeeper cluster.
Val kc = new KafkaCluster (kafkaParams)
The offsetRanges array is traversed : setConsumerOffsets is KafkaCluster
Emphasize: What is OffsetRange's encapsulation? Answer: topic name, partition Id, start offset, end offset
For (offsets <- offsetsList) {The above is done on the createDirectStream created in the DStream in an underlying RDD to complete the offset update to the zookeeper cluster, through the foreachRDD to complete the update for all RDD! The The At this point has been completed RDD offset calculation and update, but the specific update method? In KafkaCluster. Next look at the KafkaCluster code.//offsets.untilOffset is the end offset
Val o = kc.setConsumerOffsets (groupID, Map ((topicAndPartition, offsets.untilOffset))) if (o.isLeft) {println (s "Error updating the offset to Kafka cluster: $ {o.left.get}")}} }Direct on code:
Package org.apache.spark.streaming.kafka // use org.apache.spark.streaming.kafka reasons: private [spark] object SimpleConsumerConfig limit only in the spark package! Import kafka.api.OffsetCommitRequest Import kafka.common. {ErrorMapping, OffsetMetadataAndError, TopicAndPartition} Import kafka.consumer.SimpleConsumer Import org.apache.spark.SparkException Import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig Import scala.collection.mutable.ArrayBuffer Import scala.util.Random Import scala.util.control.NonFatal Class KafkaCluster (val kafkaParams: Map [String, String]) extends Serializable {// Err is the type ArrayBuffer [Throwable] alias Type Err = ArrayBuffer [Throwable] @transient private var _config: SimpleConsumerConfig = null Def config: SimpleConsumerConfig = this.synchronized { If (_config == null) { // SimpleConsumerConfig apply method part of the code: // val brokers = kafkaParams.get ("metadata.broker.list"). OrElse (kafkaParams.get ("bootstrap.servers")) // so kafkaParams must contain the key = metadata.broker.list or bootstrap.servers corresponding to the Value _config = SimpleConsumerConfig (kafkaParams) } _config } / ** * * @param groupId: String * @param offsets: Map [TopicAndPartition, Long] * @return * / Def setConsumerOffsets (groupId: String, Offsets: Map [TopicAndPartition, Long] ): Either [Err, Map [TopicAndPartition, Short]] = { SetConsumerOffsetMetadata (groupId, offsets.map {kv => Kv._1 -> OffsetMetadataAndError (kv._2) }) } Def setConsumerOffsetMetadata (groupId: String, Metadata: map [TopicAndPartition, OffsetMetadataAndError] ): Either [Err, Map [TopicAndPartition, Short]] = { Var result = Map [TopicAndPartition, Short] () Val req = OffsetCommitRequest (groupId, metadata) Val errs = new Err Val topicAndPartitions = metadata.keySet WithBrokers (Random.shuffle (config.seedBrokers), errs) {consumer => Val resp = consumer.commitOffsets (req) Val respMap = resp.requestInfo Val needed = topicAndPartitions.diff (result.keySet) Need.foreach {tp: TopicAndPartition => RespMap.get (tp) .foreach {err: Short => If (err == ErrorMapping.NoError) { Result + = tp -> err } Else { Errs.append (ErrorMapping.exceptionFor (err)) } } } If (result.keys.size == topicAndPartitions.size) { Return right (result) } } Val missing = topicAndPartitions.diff (result.keySet) Errs.append (new SparkException (s "Could not set offsets for $ {missing}")) Left (errs) } Private def withBrokers (brokers: Iterable [(String, Int)], errs: Err) (Fn: SimpleConsumer => Any): Unit = { Brokers.foreach {hp => Var consumer: SimpleConsumer = null Try { Consumer = connect (hp._1, hp._2) Fn (consumer) } Catch { Case NonFatal (e) => Errs.append (e) } Finally { If (consumer! = Null) { Consumer.close () } } } } Def connect (host: String, port: Int): SimpleConsumer = New SimpleConsumer (host, port, config.socketTimeoutMs, Config.socketReceiveBufferBytes, config.clientId) }Small expansion:
Map of the continuous acquisition of value usage:
Object MapDemo extends app { Val map = Map ("1" -> "11", "2" -> "22", "3" -> "33") Map.map { Case (a, b) => Println (a + "" + b) } / / Syntax: first get the value of key = 4, if there is a return to get key = 5 value, if there is a return, there is no direct exception. Val result = map.get ("4"). OrElse (map.get ("5")). GetOrElse (throw new Exception ("exception")) Println (result) }The core code to explain the SimpleConsumerConfig apply method:
/ ** * Make a consumer config without optional group.id or zookeeper.connect, * Communicating with brokers also needs common settings such as timeout * / Def apply (kafkaParams: Map [String, String]): SimpleConsumerConfig = { // these keys are from other pre-existing kafka configs for specifying brokers, accept either // map Get the value usage See above example val brokers = kafkaParams.get ("metadata.broker.list") .orElse (kafkaParams.get ("bootstrap.servers")) .getOrElse (throw new SparkException ( "Must specify metadata.broker.list or bootstrap.servers")) Val props = new Properties () KafkaParams.foreach {case (key, value) => // prevent warnings on parameters ConsumerConfig does not know about If (key! = "Metadata.broker.list" && key! = "Bootstrap.servers") { Props.put (key, value) } } // If there is no zookeeper.connect and group.id, set its value to an empty string. Seq ("zookeeper.connect", "group.id"). Foreach {s => If (! Props.contains (s)) { Props.setProperty (s, "") } } New SimpleConsumerConfig (brokers, props) }OffsetMetadataAndError sample class:
TopicAndPartition is an example class that encapsulates the id of topic and partitionCase class OffsetMetadataAndError (offsetMetadata: OffsetMetadata, error: Short = Errors.NONE.code) { Def offset = offsetMetadata.offset Def metadata = offsetMetadata.metadata Override def toString = "[% s, ErrorCode% d]". Format (offsetMetadata, error) }/ ** * * @param groupId: String * @param offsets: Map [TopicAndPartition, Long]* Map in the key and value meaning TopicAndPartition is topic and partition id encapsulation long is the end of consumption offset
* @retter * / def setConsumerOffsets (groupId: String, offsets: Map [TopicAndPartition, Long]): Either [Err, Map [TopicAndPartition, Short]] = {setConsumerOffsetMetadata (groupId, offsets.map {kv => kv._1 - > OffsetMetadataAndError (kv._2)})}
It is nice blog Thank you provide important information and I am searching for the same information to save my time Big Data Hadoop Online Course Bangalore
RépondreSupprimerkayseriescortu.com - alacam.org - xescortun.com
RépondreSupprimerPERDE MODELLERİ
RépondreSupprimerMobil Onay
mobil ödeme bozdurma
NFTNASİLALİNİR
ankara evden eve nakliyat
trafik sigortası
dedektör
web sitesi kurma
Ask Romanlari