Flume + kafka + spark streaming

Kafka is a high-throughput distributed distribution subscription messaging system that handles all of the workflow data in a consumer-scale site. Kafka is designed to serve as a unified information gathering platform, able to collect feedback in real time, and need to be able to support a larger amount of data, and have good fault tolerance.
Apache kafka is a kind of message middleware.
First, the term introduction
Broker
The Kafka cluster contains one or more servers, which are called broker.
Topic
Each message posted to the Kafka cluster has a category, this category is called Topic. (The material of the different Topic is stored separately. Logically, the message of a Topic is stored on one or more broker, but the user can specify the topic of the message to produce or consume the data without having to worry about where the data is stored. Each topic has both modes: (queues: consumer groups) Allows consumer groups of the same name to divide up processing; publish subscriptions: allow you to broadcast messages to multiple consumer groups (different names)).
Partition
Partition is a physical concept, each Topic contains one or more Partition.
Producer
Responsible for publishing messages to Kafka broker, such as flume collection machine is Producer.
Consumer
Message consumers, clients who read messages to Kafka broker. For example, Hadoop machine is Consumer.
Consumer Group
Each Consumer belongs to a specific Consumer Group (you can specify a group name for each Consumer, or a default group if no group name is specified).
Second, use the scene
1, Messaging
For some conventional messaging systems, kafka is a good choice; partitons / replication and fault tolerance can make kafka have good scalability and performance advantages, but so far we should be very clear that kafka does not provide JMS Kafka can only use the "conventional" message system, to a certain extent, has not yet ensure that the sending and receiving of messages is absolutely reliable (the "message"), such as "message" (message acknowledgment) For example, message retransmission, message sending, etc.)
2, Websit activity tracking
Kafka can be used as the best tool for website activity tracking; you can send web / user actions and other information to kafka. Real-time monitoring, or offline statistical analysis
3, Log Aggregation
Kafka can determine that it is very suitable as a "log collection center"; the application can send the operation log "bulk" to "asynchronous" to the kafka cluster, rather than in the local or DB; kafka can batch submit messages / compressed messages, etc. , Which the producer side, almost do not feel the cost of performance. At this time the consumer side can have hadoop and other systematic storage and analysis system.
4, it applies to two major categories of applications to build real-time streaming data pipeline, reliable access to systems and applications between the data.
Build real-time streaming applications that convert or react to data streams.
Third, distributed
Log partitions are distributed to multiple servers in the cluster. Each server handles the partition to which it is assigned. Depending on the configuration, each partition can also be replicated to other servers as backup fault tolerance. Each partition has a leader, zero or more follower. The Leader handles all read and write requests for this partition, while follower replicates the data passively. If the leader is down, the other follower will be elected as a new leader. A server may also be a partition leader, another partition of follower. This can balance the load, to avoid all the requests are only one or several servers to deal with.
Fourth, the message processing order
Kafka keeps the message in the same order. At this point Kafka did better, although it did not completely solve the problem. Kafka uses a divide-and-conquer strategy: partitioning. Because the message in the Topic partition can only be handled by the only consumer in the consumer group, the message must be processed in the order of precedence. But it is only to ensure that the Topic of a partition order processing, can not guarantee cross-partition message processing order. So, if you want to process all the messages in the order, it only provides one partition.
Fifth, the installation
Kafka installed and started
Six, Key and Value
Kafka is a distributed messaging system, Producer produces messages and pushes it to Broker, and Consumer then pulls the message from Broker. Producer produced by the message is expressed by the Message, the user is concerned, it is key-value pairs.
 Message => Crc MagicByte Attributes Key Value 
  • 1
Kafka will be based on the incoming key to calculate its partition, but the key can not pass, can be null, empty words, producer will send this message randomly to a partition.
Write pictures described here
MessageSet used to combine multiple Message, which in each Message on the basis of the addition of Offset and MessageSize, its structure is:
 MessageSet => [ Offset MessageSize Message ] 
  • 1
Its meaning is MessageSet is an array, each element of the array consists of three parts, namely Offset, MessageSize and Message, their meaning are:
Write pictures described here
Seven small examples
1. Start ZooKeeper
Into the kafka directory, plus daemon said in the background to start, do not take up the current command line window.
Bin / zookeeper-server-start.sh -daemon config / zookeeper.properties
If you want to close, the following one
Bin / zookeeper-server-stop.sh
ZooKeeper port number is 2181, enter jps view process number is QuorumPeerMain
2. start kafka
In the server.properties to join, the first is to ensure that you delete the title can be deleted, the second case, then the report can not find the error:
Delete.topic.enable = true
Listeners = PLAINTEXT: // localhost: 9092
then:
Bin / kafka-server-start.sh -daemon config / server.properties
If you want to close, the following one
Bin / kafka-server-stop.sh
Kafka's port number is 9092, enter jps view process number is Kafka
3. Create a topic (topic)
Create a Topic named "test" with only one partition and one backup:
Bin / kafka-topics.sh -create -zookeeper localhost: 2181 -replication-factor 1 -partitions 1 -topic test
Once you've created it, you can see which topics have been created by running the following command:
Bin / kafka-topics.sh -list -zookeeper localhost: 2181
View specific topic information:
Bin / kafka-topics.sh -describe -zookeeper localhost: 2181 -topic test
4. Send message to start kafka Producer:
Bin / kafka-console-producer.sh -broker-list localhost: 9092 -topic test
5. Receive a message to open a new command line window to start kafka consumer:
Bin / kafka-console-consumer.sh -zookeeper localhost: 2181 -topic test -from-beginning
6. Finally, in the producer window, enter the message, you can display in the consumer window:
Write pictures described here
Write pictures described here

Spark streaming

Chinese Study Guide
Spark Streaming is a real-time computing framework built on Spark that extends Spark's ability to handle large-scale streaming data.
The advantages of Spark Streaming are:
Can run on 100 + nodes, and reach the second delay.
Using memory-based Spark as the execution engine, with efficient and fault-tolerant features.
Can integrate Spark's batch and interactive queries.
A simple interface similar to that for batch processing to implement complex algorithms.
First, Spark Streaming divides the real-time input data stream into blocks in units of time slices Δt (e.g., 1 second). Spark Streaming treats each piece of data as an RDD and processes each small piece of data using RDD operations. Each block will generate a Spark Job processing, the final result also returns multiple blocks.
In Spark Streaming, the interfaces provided by the DDL (RDD sequence representing the data flow) are similar to those provided by the RDD.
Just as Spark Streaming's original goal, it allows users to combine applications such as streaming, batch and interactive queries through rich APIs and memory-based high-speed computing engines. So Spark Streaming is suitable for applications that require historical data and real-time data combination analysis. Of course, for real-time requirements are not particularly high application can also be fully qualified. In addition, RDD data reuse mechanism can be more efficient fault-tolerant processing.
When a context definition is defined, you must follow the following steps:
Define the input source;
Ready for flow calculation instructions;
Use the streamingContext.start () method to receive and process data;
The process will continue until the streamingContext.stop () method is called.
You can use the existing SparkContext object to create a StreamingContext object:

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
 
Window function <br> For the window function in spark streaming, see: 
  Window function explanation 
On the form of non-(K, V) RDD windowdown reduce:
1.reduceByWindow (reduceFunc, windowDuration, slideDuration)
2.reduceByWindow (reduceFunc, invReduceFunc, windowDuration, slideDuration)
On (K, V) form RDD by Key window down:
1.reduceByKeyAndWindow (reduceFunc, windowDuration, slideDuration)
2.reduceByKeyAndWindow (reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions, filterFunc)
In terms of efficiency, the method with invReduceFunc should be selected.
You can do further optimization by reusing the connection objects between multiple RDDs or batch data. Developers can keep a static pool of objects and reuse objects in the pool to push multiple batches of RDDs to external systems for further cost savings:
 


dstream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
          // ConnectionPool is a static, lazily initialized pool of connections
          val connection = ConnectionPool.getConnection()
          partitionOfRecords.foreach(record => connection.send(record))
          ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      })
  })
 
Spark execution time is less, but the database pressure is relatively large, will always occupy the resources. 
Small example:
package SparkStreaming

import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._

object Spark_streaming_Test {
  def main(args: Array[String]): Unit = {
    //local[2]表示在本地建立2个working线程
    //当运行在本地,如果你的master URL被设置成了“local”,这样就只有一个核运行任务。这对程序来说是不足的,因为作为receiver的输入DStream将会占用这个核,这样就没有剩余的核来处理数据了。
    //所以至少得2个核,也就是local[2]
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    //时间间隔是1秒
    val ssc = new StreamingContext(conf, Seconds(1))
    //有滑动窗口时,必须有checkpoint
    ssc.checkpoint("F:\\checkpoint")
    //DStream是一个基类
    //ssc.socketTextStream() 将创建一个 SocketInputDStream;这个 InputDStream 的 SocketReceiver 将监听服务器 9999 端口
    //ssc.socketTextStream()将 new 出来一个 DStream 具体子类 SocketInputDStream 的实例。
    val lines = ssc.socketTextStream("192.168.1.66", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    //    val lines = ssc.textFileStream("F:\\scv")
    val words = lines.flatMap(_.split(" ")) // DStream transformation
    val pairs = words.map(word => (word, 1)) // DStream transformation
    //    val wordCounts = pairs.reduceByKey(_ + _) // DStream transformation
    //每隔3秒钟,计算过去5秒的词频,显然一次计算的内容与上次是有重复的。如果不想重复,把2个时间设为一样就行了。
    //    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(5), Seconds(3))
    val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, _ - _, Seconds(5), Seconds(3))
    windowedWordCounts.filter(x => x._2 != 0).print()
    //    wordCounts.print() // DStream output,打印每秒计算的词频
    //需要注意的是,当以上这些代码被执行时,Spark Streaming仅仅准备好了它要执行的计算,实际上并没有真正开始执行。在这些转换操作准备好之后,要真正执行计算,需要调用如下的方法
    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
    //在StreamingContext上调用stop()方法,也会关闭SparkContext对象。如果只想仅关闭StreamingContext对象,设置stop()的可选参数为false
    //一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面StreamingContext创建之前关闭(不关闭SparkContext)
    ssc.stop()
  }
}
 
Start up 
  Start-dfs.sh 
  Start-yarn.sh 
Write pictures described here
  2. Terminal input: 
  Nc -lk 9999 
  Then run the spark program in IEDA.  As the 9999 port has not written anything, so the operation is the following figure: 
Write pictures described here
Only time, did not print out something. And then enter the following things in the terminal, you can also copy from other places.
Hello world
Hello hadoop
Hadoop love
Love cat
Cat love rabbit
At this point, IDEA's console outputs the following stuff.
Write pictures described here
3. Run the time window below, note that if you add a time window you must have checkpoint
Enter the following, do not enter all the time, lose a few lines at a time.
Checkpoint
Hello world
Hello hadoop
Hadoop love
Love cat
Cat love rabbit
Ni hao a
Hello world
Hello hadoop
Hadoop love
Love cat
Cat love rabbit
Hello world
Hello hadoop
Hadoop love
Love cat
Cat love rabbit
First ++ - the kind of:
Write pictures described here
Write pictures described here
Write pictures described here
Write pictures described here
And then is not the kind of:
Write pictures described here
Write pictures described here
++ - that is because the past RDD also brought in the calculation, so there is a 0 case, in order to avoid this situation can only be filtered before printing 0 to print. And no ++ - that kind of situation is not necessary to do so.
Checkpointing
Set up a directory in the faulty, reliable file system (HDFS, s3, etc.) to save checkpoint information. You can do this through the streamingContext.checkpoint (checkpointDirectory) method.
The default interval is a multiple of the batch interval, at least 10 seconds. It can be set by dstream.checkpoint. It should be noted that, as the streaming application continues to run, checkpoint data storage space will continue to increase. Therefore, you need to carefully set the checkpoint time interval. The smaller the setting, the more the number of checkpoints will be, and the larger the space will be. If the setting is larger, the more data and progress will be lost. It is generally recommended to set the batch duration 5 to 10 times.
 

package streaming

import java.io.File
import java.nio.charset.Charset

import com.google.common.io.Files

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by Administrator on 2017/3/12.
  */

object RecoverableNetworkWordCount {

  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = {
    println("Creating new context") //如果没有出现这句话,说明StreamingContext是从checkpoint里面加载的
    val outputFile = new File(outputPath) //输出文件的目录
    if (outputFile.exists()) outputFile.delete()
    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1)) //时间间隔是1秒
    ssc.checkpoint(checkpointDirectory) //设置一个目录用于保存checkpoint信息

    val lines = ssc.socketTextStream(ip, port)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    val windowedWordCounts = wordCounts.reduceByKeyAndWindow(_ + _, _ - _, Seconds(30), Seconds(10))
    windowedWordCounts.checkpoint(Seconds(10))//一般推荐设置为 batch duration 的5~10倍,即StreamingContext的第二个参数的5~10倍
    windowedWordCounts.print()
    Files.append(windowedWordCounts + "\n", outputFile, Charset.defaultCharset())
    ssc
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 4) {
      System.exit(1)
    }
    val ip = args(0)
    val port = args(1).toInt
    val checkpointDirectory = args(2)
    val outputPath = args(3)
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createContext(ip, port, outputPath, checkpointDirectory))
    ssc.start()
    ssc.awaitTermination()
  }
}
 
optimization 
 
 1. Data reception parallel level Create multiple input DStream and 
configure them to receive different partitions from the source of the 
data stream, in order to achieve multi-stream reception.  Thus allowing data to be received in parallel to improve overall throughput. 
 
 val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
 
The alternative method of multiple input streams or multi-receivers is to 
explicitly reassign the incoming data streams (using 
inputStream.repartition ()) and allocate the received batch data by the 
number of machines in the cluster before further operation. 
 
 2. Task serialization run kyro serialization any can reduce the size of
 the task, thereby reducing the task sent to the slave time. 
 
    val conf = new SparkConf().setAppName("analyse_domain_day").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
3. Set the appropriate batch interval (ie, the capacity of the batch data) 
  The batch time should be less than the batch interval. 
 If the time interval is 1 second, but the processing takes 2 seconds, 
then the processing can not keep up to receive, the data to be processed
 will be more and more, and finally bang. 
 
 A good way to find the correct batch capacity is to test your 
application with a conservative batch interval (5-10 seconds) and a low 
data rate. 
 In order to verify that your system can meet the data processing rate, 
you can check the end-to-end delay value to judge (in the Spark driver 
log4j log to see "Total delay" or use StreamingListener interface).  If the delay is stable, the system is stable.  If the delay continues to grow, then the system can not keep up with the data processing rate, is unstable.  You can try to increase the data processing rate or reduce the batch capacity for further testing. 

DEMO

Spark flow kafka has two ways:
One is the use of receivers (receivers) and kafaka high-level API implementation.
One is not using the receiver, directly with the kafka API to achieve the bottom (spark1.3 later introduced).
Compared to Receiver based on several advantages:
1, do not need to create multiple kafka input streams, then Union them, and use DirectStream, spark Streaming will create the same number of RDD partitions as kafka partitions, and will read data from kafka in parallel, the number of Spark partitions and Kafka The number of partitions is a one-to-one relationship.
2, the first realization of the data zero loss is the data stored in the WAL in advance, will copy the data again, will lead to data is copied twice: once was Kafka copy; another is written to the WAL.
Direct is the direct operation of the underlying kafka metadata information, so that if the calculation fails, you can re-read the data, re-processing. That the data will be processed. Pull data, RDD is in the implementation of the time to pull directly to the data.
3, Receiver way to read kafka, the use of high-level API will be written to the offset ZK, although this method can be saved in the data in the WAL to ensure that the data is wrong, but may be sparkStreaming and ZK saved offset Inconsistent data has been consumed many times.
The second way does not use ZK to save the offset, eliminating the inconsistency between the two, to ensure that each record is only Spark Streaming operation once, even in the case of failure to deal with. If you want to update the offset data in ZK, you need to write your own code to achieve.
Since direct operation is kafka, kafka is equivalent to your underlying file system. This time to ensure strict business consistency, that will be dealt with, and will only be dealt with once.
First go to maven's official website to download jar package
Spark-streaming_2.10-1.6.2.jar
Spark-streaming-kafka_2.10-1.6.2.jar
My Scala is 2.10, spark is 1.6.0, download spark.streaming and kafka version to correspond to, spark-streaming_2.10-1.6.2.jar 2.10 is Scala version number, 1.6.2 is spark version number. Of course, download 1.6.1 also line.
Need to add kafka-clients-0.8.2.1.jar and kafka_2.10-0.8.2.1.jar
Where 2.10 is the Scala version number, 0.8.2.1 is the kafka version number. On this version, other versions do not correspond, will go wrong.
In the kafka configuration file inside:
Delete.topic.enable = true
Host.name = 192.168.1.66
Zookeeper.connect = 192.168.1.66: 2181
I am here to write the host name, all kinds of error, so simply write IP address.
The steps to start kafka and ZK are the same as kafka 1-2.
Go to /kafka_2.10-0.8.2.1 Create a new theme:
Bin / kafka-topics.sh -create -zookeeper 192.168.1.66:2181 -replication-factor 1 -partitions 1 -topic test
Start a producer:
Bin / kafka-console-producer.sh -broker-list 192.168.1.66:9092 -topic test
After running the spark program on your own computer, on the command line, enter:
Write pictures described here
The console will show:
Write pictures described here
  
 
 
 
 
 package SparkStreaming

//TopicAndPartition是对 topic和partition的id的封装的一个样例类
import kafka.common.TopicAndPartition
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import kafka.serializer.StringDecoder

object SparkStreaming_Kafka_Test {

  val kafkaParams = Map(
    //kafka broker的IP加端口号,这个是必须的
    "metadata.broker.list" -> "192.168.1.66:9092",
    // "group.id" -> "group1",
    /*此配置参数表示当此groupId下的消费者,
     在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),
     consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),
     smallest表示最小offset,即从topic的开始位置消费所有消息.*/
    "auto.offset.reset" -> "smallest"
  )

  val topicsSet = Set("test")

  //  val zkClient = new ZkClient("xxx:2181,xxx:2181,xxx:2181",Integer.MAX_VALUE,100000,ZKStringSerializer)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming_Kafka_Test")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(2))
    ssc.checkpoint("F:\\checkpoint")
    /*
    KafkaUtils.createDirectStream[
       [key的数据类型], [value的数据类型], [key解码的类], [value解码的类] ](
       streamingContext, [Kafka配置的参数,是一个map], [topics的集合,是一个set])
       */
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    val lines = messages.map(_._2) //取value
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
} 
 
 
Offset is only stored in the checkpoint ssc, eliminating the zk and ssc offset inconsistencies.  So checkpoint has been able to ensure fault tolerance. 
 
 If you need to write the offset to ZK, first in the project to create a
 new package: org.apache.spark.streaming.kafka, and then build a 
KafkaCluster class:
 
package org.apache.spark.streaming.kafka

import kafka.api.OffsetCommitRequest
import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig

import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal

class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
  type Err = ArrayBuffer[Throwable]

  @transient private var _config: SimpleConsumerConfig = null

  def config: SimpleConsumerConfig = this.synchronized {
    if (_config == null) {
      _config = SimpleConsumerConfig(kafkaParams)
    }
    _config
  }

  def setConsumerOffsets(groupId: String, offsets: Map[TopicAndPartition, Long], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = {
    val meta = offsets.map {
      kv => kv._1 -> OffsetAndMetadata(kv._2)
    }
    setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
  }

  def setConsumerOffsetMetadata(groupId: String, metadata: Map[TopicAndPartition, OffsetAndMetadata], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = {
    var result = Map[TopicAndPartition, Short]()
    val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
    val errs = new Err
    val topicAndPartitions = metadata.keySet
    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
      val resp = consumer.commitOffsets(req)
      val respMap = resp.commitStatus
      val needed = topicAndPartitions.diff(result.keySet)
      needed.foreach { tp: TopicAndPartition =>
        respMap.get(tp).foreach { err: Short =>
          if (err == ErrorMapping.NoError) {
            result += tp -> err
          } else {
            errs.append(ErrorMapping.exceptionFor(err))
          }
        }
      }
      if (result.keys.size == topicAndPartitions.size) {
        return Right(result)
      }
    }
    val missing = topicAndPartitions.diff(result.keySet)
    errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
    Left(errs)
  }

  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)(fn: SimpleConsumer => Any): Unit = {
    brokers.foreach { hp =>
      var consumer: SimpleConsumer = null
      try {
        consumer = connect(hp._1, hp._2)
        fn(consumer)
      } catch {
        case NonFatal(e) =>
          errs.append(e)
      } finally {
        if (consumer != null) {
          consumer.close()
        }
      }
    }
  }

  def connect(host: String, port: Int): SimpleConsumer =
    new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)
}
 
 
Then in the main function:
    messages.foreachRDD(rdd => {
      // 先处理消息
      val lines = rdd.map(_._2) //取value
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
      wordCounts.foreach(println)
      // 再更新offsets
      //spark内部维护kafka偏移量信息是存储在HasOffsetRanges类的offsetRanges中
      //OffsetRange 包含信息有:topic名字,分区Id,开始偏移,结束偏移。
      val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset
      val kc = new KafkaCluster(kafkaParams)
      for (offsets <- offsetsList) {
        val topicAndPartition = TopicAndPartition("test", offsets.partition)
        val o = kc.setConsumerOffsets("group1", Map((topicAndPartition, offsets.untilOffset)),8)
        if (o.isLeft) {
          println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
        }
      }
    })
 
 
 
 
The
 following is a kafka API to write a program to read their own 
documents, as kafka producers, need to Scala and kafka all the jar 
package are imported, lib the following are imported into the folder. 
  If there is no 2 computers, you can open two development environments, IDEA as a consumer, eclipse as a producer. 
  The producer code is as follows:
 
 ===================
package spark_streaming_kafka_test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MakeRealtimeDate extends Thread {

    private Producer<Integer, String> producer;

    public MakeRealtimeDate() {
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("zk.connect", "192.168.1.66:2181");
        props.put("metadata.broker.list", "192.168.1.66:9092");
        ProducerConfig pc = new ProducerConfig(props);
        producer = new Producer<Integer, String>(pc);
    }

    public void run() {
        while (true) {
            File file = new File("C:\\Users\\Administrator\\Desktop\\wordcount.txt");
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new FileReader(file));
            } catch (FileNotFoundException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            String lineTxt = null;
            try {
                while ((lineTxt = reader.readLine()) != null) {
                    System.out.println(lineTxt);
                    producer.send(new KeyedMessage<Integer, String>("test", lineTxt));
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    }

    public static void main(String[] args) {
        new MakeRealtimeDate().start();
    }

} 
===================== 
 
First
 start the sparkstreaming consumer statistics before writing the number 
of programs, and then start the producer program we are now writing, and
 finally in the IDEA console to see real-time results.  

 
  
   
 
 
 
 
 

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization