Spark, Part 2: Building Real-Time Data Processing Systems with Kafka and Spark Streaming

introduction
In many areas, such as stock market analysis, meteorological data monitoring and control, site user behavior analysis, due to data generation fast, real-time strong, large amount of data, it is difficult to collect and store storage and then do the processing, which leads to the traditional The data processing architecture can not meet the needs. The emergence of flow calculation is to better solve the problems encountered in the processing of such data. Unlike the traditional architecture, the flow calculation model captures and processes in real time during the flow of data, and calculates and analyzes the data according to the business requirements, and finally saves or distributes the results to the required components.

This article will start with the real-time data generation and flow, through a practical case, to introduce the reader how to use Apache Kafka and Spark Streaming module to build a real-time data processing system, of course, this article is just to create a good and robust The real-time data processing system is not an article can be said clearly. Before reading this article, suppose you have a basic understanding of the Apache Kafka distributed messaging system and can use the Spark Streaming API for simple programming. Next, let us look at how to build a simple real-time data processing system.

About Kafka
Kafka is a distributed, high-throughput, easy-to-expand messaging-based messaging system that was first developed by Linkedin and contributed to the Apache Software Foundation in 2011. In general, Kafka has the following typical application scenarios:

  • As a message queue. Because Kafka has high throughput and built-in message subject partitioning, backup, fault tolerance and other features, making it more suitable for use in large-scale, high-intensity message data processing system.
  • The data source of the flow calculation system. Stream Data Generation System As a producer of Kafka message data, the data stream is distributed to the Kafka message subject, and the stream data calculation system (Storm, Spark Streaming, etc.) consumes and computes data in real time. This is also the application scenario that will be covered in this article.
  • System User Behavior Data Source. In this scenario, the system distributes the user's behavior data, such as access pages, dwell time, search logs, and topics of interest, to the Kafka message topic in real time or periodically as a source of docking system data.
  • Log aggregation. Kafka can be used as an alternative to a log collection system, and we can aggregate system log data into different Kafka message topics by category.
  • Event source. In an event-driven system, we can design the event as a reasonable format, stored as Kafka message data, so that the corresponding system module to do real-time or regular processing. Because Kafka supports large amounts of data storage and has backup and fault tolerance mechanisms, it can make event-driven systems more robust and efficient.

Of course, Kafka can also support other applications, where we are not listed one by one. For more details about Kafka, please refer to Kafka's official website . It should be noted that the Kafka version used in this article is based on version 0.8.2.1 of the Scala 2.10 build.

About Spark Steaming
The Spark Streaming module is an extension to the Spark Core in order to handle persistent data streams in a high-throughput, and fault-tolerant manner. Currently Spark Streaming supports external data sources such as Flume, Kafka, Twitter, ZeroMQ, TCP Socket, and so on.

Discretized Stream, also called DStream) is a basic abstraction of Spark Streaming for persistent data streams. On an internal implementation, DStream is represented as a series of consecutive RDDs (Elastic Distributed Data sets), each representing a certain time interval The data arriving. So when the DStream operation, will be Spark Stream engine into the underlying RDD operation. The types of operations for Dstream are:

  • Transformations: Similar to the operation of RDD, Spark Streaming provides a series of conversion operations to support changes to DStream. Such as map, union, filter, transform and so on
  • Window Operations: Window operation supports the operation of data by setting the window length and the sliding interval. Common operations are reduceByWindow, reduceByKeyAndWindow, window and so on
  • Output Operations: Output operations allow DStream data to be pushed to other external systems or storage platforms, such as HDFS, Database, etc., similar to RDD Action, Output operation will actually trigger the conversion of DStream operations. Common operations are print, saveAsTextFiles, saveAsHadoopFiles, foreachRDD and so on.

For more information on DStream Operations, please refer to the Spark Streaming Programing Guide at Spark's official website.

Kafka cluster build steps
Machine preparation

In this article, we will prepare three machines to build Kafka cluster, IP address is 192.168.1.1, 192.168.1.2, 192.168.1.3, and three machine network interoperability.

2. Download and install kafka_2.10-0.8.2.1


After the download is complete, upload it to the target machine, such as 192.168.1.1, use the following command to extract:

Listing 1. Kafka installation package decompression command
[Bash shell] plain text view copy code
  Tar-xvf kafka_2.10-0.8.2.1 


The installation is complete.

3. Create the zookeeper data directory and set the server number

Perform the following on all three servers.

Switch to the current user working directory, such as / home / fams, create zookeeper save the data directory, and then in this directory to create a new server number file.

Listing 2. Creating a data directory and a server number file command
[Bash shell] plain text view copy code
  Mkdir zk_data
 Cat N> myid 


Note that the need to ensure that N in the three servers to take different values, such as 1,2,3 respectively.

4. Edit the zookeeper configuration file

Kafka installation package built-in zookeeper service. Enter the Kafka installation directory, such as /home/fams/kafka_2.10-0.8.2.1, edit the config / zookeeper.properties file, add the following configuration:

Listing 3. zookeeper configuration item
[Bash shell] plain text view copy code
  TickTime = 2000
 DataDir = / home / fams / zk_data /
 ClientPort = 2181
 InitLimit = 5
 SyncLimit = 2
 Server.1 = 192.168.1.1: 2888: 3888
 Server.2 = 192.168.1.2: 2888: 3888
 Server.3 = 192.168.1.3: 2888: 3888 


These configuration items are explained as follows:

  • TickTime: zookeeper The heartbeat interval between servers, in milliseconds.
  • DataDir: zookeeper data to save the directory, we also zookeeper server ID file saved to this directory, the following will be introduced.
  • ClientPort: The zookeeper server will listen to this port and wait for the client to connect.
  • InitLimit: The limit of the number of heartbeats that can be tolerated when the initial connection between the follower server and the leader server in the zookeeper cluster.
  • SyncLimit: The limit value of the number of heartbeats that can be tolerated during the request and response between the follower server and the leader server in the zookeeper cluster.
  • Server.N: N represents the zookeeper cluster server number. For the configuration value to 192.168.1.1:2888:3888, for example, 192.168.1.1 that the server's IP address, 2888 port that the server and leader server data exchange port, 3888 said the election of the new leader when the server used to communicate port.

5. Edit the Kafka configuration file

Edit the config / server.properties file

Add or modify the following configuration.

Listing 4. Kafka Broker configuration item
[Bash shell] plain text view copy code
  Broker.id = 0
 Port = 9092
 Host.name = 192.168.1.1
 Zookeeper.contact = 192.168.1.1: 2181,192.168.1.2: 2181,192.168.1.3: 2181
  Log.dirs = / home / fams / kafka-logs 


These configuration items are explained as follows:

  • Broker.id:Kafka broker's unique identifier that can not be repeated in the cluster.
  • Port: The listening port of the Broker, which is used to monitor the connection of Producer or Consumer.
  • Host.name: The IP address or machine name of the current Broker server.
  • Zookeeper.contact:Broker as zookeeper the client, you can connect the zookeeper address information.
  • Log.dirs: log save directory.

B. Edit the config / producer.properties file

Add or modify the following configuration:

Listing 5. Kafka Producer configuration item
[Bash shell] plain text view copy code
  Broker.list = 192.168.1.1: 9092,192.168.1.2: 9092,192.168.1.3: 9092
 Producer.type = async 


These configuration items are interpreted as follows:

  • Broker.list: List of Broker addresses in the cluster.
  • Producer.type: Producer type, async asynchronous producer, sync sync producer.

C. Edit the config / consumer.properties file

Listing 6. Kafka Consumer configuration item
[Bash shell] plain text view copy code
  Zookeeper.contact = 192.168.1.1: 2181,192.168.1.2: 2181,192.168.1.3: 2181


The configuration items are explained as follows:

  • Zookeeper.contact: Consumer can connect to the list of zookeeper server addresses.

6. Upload the modified package to another machine

At this point, we have modified all the required configuration files on the 192.168.1.1 machine. Then use the following command to package the Kafka installation package and upload it to both 192.168.1.2 and 192.168.1.3 machines.

Listing 7. Command to package and upload Kafka installation package
[Bash shell] plain text view copy code
  Tar -cvf kafka_2.10-0.8.2.1.tar ./kafka_2.10-0.8.2.1
 Scp ./kafka_2.10-0.8.2.1.tar [url = mailto: fams@192.168.1.2] fams@192.168.1.2 [/ url]: / home / fams
 Scp ./kafka_2.10-0.8.2.1.tar [url = mailto: fams@192.168.1.3] fams@192.168.1.3 [/ url]: / home / fams 


After the upload is complete, we need to 192.168.1.2 and 192.168.1.3 two machines to extract just upload the tar package, such as the list of orders. After the two machines need to be modified in the config / server.properties file broker.id and host.name. Broker.id, you can copy 1 and 2, host.name need to be changed to the current machine IP.

7. Start the zookeeper and Kafka services

Run the following command on three machines to start the zookeeper and Kafka services.

Listing 8. Starting the zookeeper service
[Bash shell] plain text view copy code
  Nohup bin / zookeeper-server-start.sh config / zookeeper.properties &


Listing 9. Starting the kafka service
[Bash shell] plain text view copy code
  Nohup bin / kafka-server-start.sh config / server.properties &


8. Verify the installation

There are two steps in our verification.

The first step, respectively, in the three machines using the following order to see if there are Kafka and zookeeper related service process.

Listing 10. View the Kafka and zookeeper service processes
[Bash shell] plain text view copy code
  Ps -ef | grep kafka 


The second step, create a message theme, and verify the message through the console producer and console consumer can be normal production and consumption.

Listing 11. Creating a message topic
[Bash shell] plain text view copy code
  Bin / kafka-topics.sh --create \
 --replication-factor 3 \
 --partition 3 \
 --topic user-behavior-topic \
 --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 


Run the following command to open the console producer.

Listing 12. Start Console Producer
[Bash shell] plain text view copy code
  Bin / kafka-console-producer.sh --broker-list 192.168.1.1:9092 --topic user-behavior-topic 

Open console consumer on another machine.

Listing 13. Start the Console Consumer
[Bash shell] plain text view copy code
  ./kafka-console-consumer.sh --zookeeper 192.168.1.2:2181 --topic user-behavior-topic --from-beginning


Then if you enter a message in the producer console, you can see this message from the consumer console on behalf of the installation is successful.

Case introduction and programming
1. Case introduction

In this case, we assume that a forum needs to update the site's hotspot module based on the user's click volume, dwell time, and whether or not to praise the web page in real time, and link the hottest topic Show them.

2. Case study

For a user who visits a forum, we need to make an abstraction of his behavior data in order to explain the calculation of the page topic heat.

First of all, we through a vector to define the user for a page of the behavior that click the page, dwell time, and whether to praise, can be expressed as follows:

(Page001.html, 1, 0.5, 1)

The first term of the vector represents the ID of the page, the second term indicates the number of clicks from entering the site to the page, the third term indicates the length of stay, in minutes, and the fourth is whether or not , -1 means step, 0 means neutral.

In this article, we assume that the click weight is 0.8, because the user may be due to no other better topic, so once again to browse this page, in order to calculate the weight of the page, topic. The dwell time weight is 0.8 because the user may open multiple tab pages at the same time, but what he really worries is just one of the topics. Whether the point of praise is 1, because this generally means that the user is interested in the topic of the page.

Finally, we define the following formula to calculate the contribution of a behavior data to the page's heat.

F (x, y, z) = 0.8x + 0.8y + z

Then for the above behavior data (page001.html, 1, 0.5, 1), using the formula available:

H (page001) = f (x, y, z) = 0.8x + 0.8y + z = 0.8 * 1 + 0.8 * 0.5 + 1 * 1 = 2.2

Readers can note that in this process, we ignore the user itself, that is, we do not care who is the user, but only concerned about its contribution to the page heat.

3. Production behavior data message

In this case we will use a program to simulate user behavior, which randomly sends 0 to 50 behavior data messages to the user-behavior-topic topic every 5 seconds. Obviously, this program plays the role of the message producer , In practice, this function is generally provided by a system. In order to simplify the message processing, we define the message format as follows:

Page ID | Clicks | Duration (Minutes) |

And assume that the site has only 100 pages. The following is the type of Scala implementation source.

Listing 14. UserBehaviorMsgProducer class source
[Java] plain text view copy code
  Import scala.util.Random
 Import java.util.Properties
 Import kafka.producer.KeyedMessage
 Import kafka.producer.ProducerConfig
 Import kafka.producer.Producer

 Class UserBehaviorMsgProducer (brokers: String, topic: String) extends Runnable {
  Private val brokerList = brokers
  Private val targetTopic = topic
  Private val props = new Properties ()
  Props.put ("metadata.broker.list", this.brokerList)
  Props.put ("serializer.class", "kafka.serializer.StringEncoder")
  Props.put ("producer.type", "async")
  Private val config = new ProducerConfig (this.props)
  Private val producer = new Producer [String, String] (this.config)

  Private val PAGE_NUM = 100
  Private val MAX_MSG_NUM = 3
  Private val MAX_CLICK_TIME = 5
  Private val MAX_STAY_TIME = 10
  // Like, 1; Dislike -1; No Feeling 0
  Private val LIKE_OR_NOT = Array [Int] (1, 0, -1)

  Def run (): Unit = {
  Val rand = new Random ()
  While (true) {
  // how many user behavior messages will be produced
  Val msgNum = rand.nextInt (MAX_MSG_NUM) + 1
  Try {
  // generate the message with format like page1 | 2 | 7.123 | 1
  For (i <- 0 to msgNum) {
  Var msg = new StringBuilder ()
  Msg.append ("page" + (rand.nextInt (PAGE_NUM) + 1))
  Msg.append ("|")
  Msg.append (rand.nextInt (MAX_CLICK_TIME) + 1)
  Msg.append ("|")
  Msg.append (rand.nextInt (MAX_CLICK_TIME) + rand.nextFloat ())
  Msg.append ("|")
  Msg.append (LIKE_OR_NOT (rand.nextInt (3)))
  Println (msg.toString ())
  // send the generated message to broker
  SendMessage (msg.toString ())
  }
  Println ("% d user behavior messages produced.". Format (msgNum + 1))
  } Catch {
  Case e: Exception => println (e)
  }
  Try {
  // sleep for 5 seconds after send a micro batch of message
  Thread.sleep (5000)
  } Catch {
  Case e: Exception => println (e)
  }
  }
  }
  Def sendMessage (message: String) = {
  Try {
  Val data = new KeyedMessage [String, String] (this.topic, message);
  Producer.send (data);
  } Catch {
  Case e: Exception => println (e)
  }
  }
 }
 Object UserBehaviorMsgProducerClient {
  Def main (args: Array [String]) {
  If (args.length <2) {
  Println ("Usage: UserBehaviorMsgProducerClient 192.168.1.1:9092 user-behavior-topic")
  System.exit (1)
  }
  // start the message producer thread
  New thread (new UserBehaviorMsgProducer (args (0), args (1))). Start ()
  }
 } 


4. Write Spark Streaming program consumer message

After figuring out the problem to be solved, you can start coding. For the problems in this case, the basic steps in the implementation are as follows:

  • Build Spark's StreamingContext instance, and open the checkpoint function. Because we need to use the updateStateByKey primitive to accumulate the warmth of the updated page topic.
  • Using the KafkaUtils.createStream method provided by Spark to consume the message theme, this method returns the ReceiverInputDStream object instance.
  • For each message, use the above formula to calculate the heat value of the page topic.
  • Define an anonymous function to add the last calculated value to the new calculated value and get the latest heat value.
  • Call the updateStateByKey primitive and pass in the anonymous function defined above to update the page hot value.
  • Finally, after getting the latest results, you need to sort the results, and finally print the highest value of 10 pages.

The source code is as follows.

Listing 15. WebPagePopularityValueCalculator class source
[Java] plain text view copy code
  Import org.apache.spark.SparkConf
 Import org.apache.spark.streaming.Seconds
 Import org.apache.spark.streaming.StreamingContext
 Import org.apache.spark.streaming.kafka.KafkaUtils
 Import org.apache.spark.HashPartitioner
 Import org.apache.spark.streaming.Duration

 Object WebPagePopularityValueCalculator {
  Private val checkpointDir = "popularity-data-checkpoint"
  Private val msgConsumerGroup = "user-behavior-topic-message-consumer-group"
 
  Def main (args: Array [String]) {
  If (args.length <2) {
  Println ("Usage: WebPagePopularityValueCalculator zkserver1: 2181,
                     Zkserver2: 2181, zkserver3: 2181 consumeMsgDataTimeInterval (secs) ")
  System.exit (1)
  }
  Val Array (zkServers, processingInterval) = args
  Val conf = new SparkConf (). SetAppName ("Web Page Popularity Value Calculator")
  Val ssc = new StreamingContext (conf, Seconds (processingInterval.toInt)) 
  // using updateStateByKey asks for enabling checkpoint
  Ssc.checkpoint (checkpointDir)
  Val kafkaStream = KafkaUtils.createStream (
  // Spark streaming context
  Ssc,
  // zookeeper quorum. Eg zkserver1: 2181, zkserver2: 2181, ...
  ZkServers,
  // kafka message consumer group ID
  MsgConsumerGroup,
  // Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread
  Map ("user-behavior-topic" -> 3))
  Val msgDataRDD = kafkaStream.map (_._ 2)
  // for debug use only
  // println ("Coming data in this interval ...")
  //msgDataRDD.print ()
  // eg page37 | 5 | 1.5119122 | -1
  Val popularityData = msgDataRDD.map {msgLine =>
  {
  Val dataArr: Array [String] = msgLine.split ("\\ |")
  Val pageID = dataArr (0)
  // calculate the popularity value
  Val popValue: Double = dataArr (1) .toFloat * 0.8 + dataArr (2) .toFloat * 0.8 + dataArr (3) .toFloat * 1
  (PageID, popValue)
  }
  }
  // sum the popular popularity and current value
  Val updatePopularityValue = (iterator: Iterator [(String, Seq [Double], Option [Double])) => {
  Iterator.flatMap (t => {
  Val newValue: Double = t._2.sum
  Val stateValue: Double = t._3.getOrElse (0);
  Some (newValue + stateValue)
  } .map (sumedValue => (t._1, sumedValue)))
  }
  Val initialRDD = ssc.sparkContext.parallelize (List (("page1", 0.00)))
  Val stateDstream = popularityData.updateStateByKey [Double] (updatePopularityValue,
  New HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
  // set the checkpoint interval to avoid too often data checkpoint which may
  // may significantly reduce operation throughput
  StateDstream.checkpoint (Duration (8 * processingInterval.toInt * 1000))
  // after calculation, we need to sort the result and only show the top 10 hot pages
  StateDstream.foreachRDD {rdd => {
  Val sortedData = rdd.map {case (k, v) => (v, k)} .sortByKey (false)
  Val topKData = sortedData.take (10) .map {case (v, k) => (k, v)}
  TopKData.foreach (x => {
  Println (x)
  }) 
  }
  }
  Ssc.start ()
  Ssc.awaitTermination ()
  }
 } 


Deployment and testing
Readers can refer to the following steps to deploy and test the sample program provided in this case.

The first step is to start the behavioral message producer program, which can be started directly in the Scala IDE, but need to add startup parameters, the first is the Kafka Broker address, and the second is the name of the target message subject.

Figure 1. UserBehaviorMsgProducer class startup parameter




After starting, you can see that the console has behavior message data generation.

Figure 2. Generated behavior message data preview



The second step, starting as a behavioral message consumer Spark Streaming program, need to start in the Spark cluster environment, the command is as follows:

Listing 16. WebPagePopularityValueCalculator class startup command
[Bash shell] plain text view copy code
  Bin / spark-submit \
 --jars $ SPARK_HOME / lib / spark-streaming-kafka_2.10-1.3.1.jar, \
 $ SPARK_HOME / lib / spark-streaming-kafka-assembly_2.10-1.3.1.jar, \
 $ SPARK_HOME / lib / kafka_2.10-0.8.2.1.jar, \
 $ SPARK_HOME / lib / kafka-clients-0.8.2.1.jar \ 
 --class com.ibm.spark.exercise.streaming.WebPagePopularityValueCalculator 
 --master spark: // <spark_master_ip>: 7077 \
 --num-executors 4 \
 --driver-memory 4g \
 --executor-memory 2g \
 --executor-cores 2 \
 /home/fams/sparkexercise.jar \
 192.168.1.1: 21181, 192.168.1.2: 2118, 192.168.1.3: 2181 2 


Since we want to call Kafka's APIs in the program and call Spark Streaming's integrated Kafka API (KafkaUtils.createStream), we need to upload the jar package in the boot command to each machine on the Spark cluster in advance (this In the example, we upload them to the lib directory of the Spark installation directory, $ SPARK_HOME / lib) and reference them in the startup command.
After the start, we can see the command line console below the message printed out, that is, the highest value of the calculation of 10 pages.

Figure 3. Web page topic hot current sort preview




We can also go to the Spark Web Console up to see the current Spark program running status, the default address is: http: // spark_master_ip: 8080.

Figure 4. View the running status of the Spark Streaming program




Precautions
Using Spark Streaming to build an efficient and robust streaming data computing system, we also need to pay attention to the following aspects.

  • Need to set the interval of data processing, that is, need to ensure that each batch of data processing time must be less than the processing interval to ensure that the next batch of data processing time, the previous batch has been processed. Obviously this needs to be determined by the amount of computing power of your Spark cluster and the amount of input data.
  • Need to improve the ability to read input data as much as possible. In the Spark Streaming and external systems such as Kafka, Flume and other integration, in order to avoid receiving data links become the bottleneck of the system, we can start multiple ReceiverInputDStream object instance.
  • Although in this case we only print out the (near) real-time results, but in fact many times these results will be saved to the database, HDFS, or sent back to Kafka for other systems to use the data for further business processing.
  • Due to the high real-time requirements of flow calculations, any system paused due to the JVM Full GC is unacceptable. In addition to the reasonable use of memory in the program, and regularly clean up unwanted cache data, CMS (Concurrent Mark and Sweep) GC is also Spark official recommended GC method, it can effectively cause the suspension caused by GC maintained at a very Low level. We can add the CMS GC-related parameters by adding the --driver-java-options option when using the spark-submit command.
  • In the official guidance provided by Spark on the integration of Kafka and Spark Streaming, there are two ways, the first is Receiver Based Approach, that is, in the Receiver to achieve Kafka consumer function to receive the message data; the second is the Direct Approach , That is, through the Receiver, but the periodic active query Kafka message partition in the latest offset value, and then to define in each batch need to deal with the message offset range. This article uses the first approach, because the second way is still in the experimental stage.
  • If you use Receiver Based Approach to integrate Kafka and Spark Streaming, you need to take into account the Driver or Worker node downtime caused by the loss of data, in the default configuration, it is possible to cause data loss, unless we open Write Ahead Log ( WAL) function. In this case, the message data received from Kafka is synchronized to WAL and saved to a reliable distributed file system, such as HDFS. You can turn this feature on by setting the spark.streaming.receiver.writeAheadLog.enable configuration item to true in the Spark configuration file (conf / spark-defaults.conf). Of course, in the case of open WAL, will cause a single Receiver throughput decline, this time, we may need to run multiple Receiver in parallel to improve this situation.
  • As the updateStateByKey operation needs to open the checkpoint function, but frequent checkpoint will cause the processing time to increase, will also cause the throughput decline. By default, the checkpoint interval will take the steaming program data processing interval or the larger of the two. The official recommended interval is 5-10 times the streaming program processing interval. Can be set by dsteam.checkpoint (checkpointInterval), the parameters need to use the sample class Duration package, the unit is milliseconds.

Concluding remarks
This article contains the basics of integrating Spark Streaming and Kafka distributed messaging systems, but it is important to note that in practical issues, we may face more problems such as performance optimization, out of memory, and other problems that have not been encountered. It is hoped that through the reading of this article, the reader can have a basic understanding of using Spark Streaming and Kafka to construct real-time data processing system, and provide a reference for further study of readers. Readers at this time to read any questions or any suggestions, please educated us, leave your comments, I will reply in time. Hope we can discuss together and make progress together.

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization