Spark Streaming

I. Overview

Spark Streaming is a scalable, high-throughput, and fault-tolerant component for processing real-time data streams based on the Core Spark API. Spark Streaming can receive data from various data sources, such as Kafka, Flume, Kinesis, or TCP. You can also use some of the complex data that can be encapsulated with high-order functions such as map, reduce, join and window Algorithm for further processing. Finally, the processed data can be written to the file system, the database, or directly for real-time display. In addition, you can also apply some machine learning or graph calculation algorithms on the data stream.
这里写图片描述
The figure above shows Spark Streaming's overall data flow. Spark Streaming can refer to the following diagram, Spark Streaming to receive real-time data, and then split the data into a batch, and then through the Spark Engine, respectively, each batch and output.
这里写图片描述
One of the most important concepts in Spark Streaming is DStream, the discretized stream, which consists of a series of successive data sets. There are two ways to create a DStream, one for generating the initial DStream from the data source and the other by DStream A. A DStream is essentially a series of RDDs.
This article DStream how to write Spark Streaming programs based on DStream . Spark Streaming provides Scala, Java, and Python interfaces. In the official documentation, there are sample programs implemented in these three languages, where only the programs written by Scala are analyzed.

Second, the sample program

Before delving into the features and principles of Spark Streaming, write a simple Spark Streaming program and run it for the entry first to understand some of the basics. This sample program receives data from the TCP socket and performs a Word Count operation.

1, Streaming programming

You first need to import Spark Streaming related classes, where StreamingContext is the primary entry for all Streaming programs. The next code to create a local StreamingContext , batch time is 1 second, the number of execution threads is 2.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// 创建一个local StreamingContext batch时间为1秒,execution线程数为2
// master的线程数数最少为2,后面会详细解释

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, econds(1))
 
 
Use the above ssc object, you can create a line variable used to localhost the data received from TCP, 
and specify the machine name localhost port number 9999 
val lines = ssc.socketTextStream("localhost", 9999)
Each line in the lines is a line of text in TCP. Next, use a space to split each row of statements.
val words = lines .flatMap(_. split ( " " ))


 
The flatMap used above is a one-to-many DStream , which means that each entry is recorded, 
and multiple words are DStream words to the spaces. 
These words form a new DStream words . Then count the number of words. 
  
import org .apache .spark .streaming .StreamingContext ._ 
val pairs = words .map (word => (word, 1 )) 
val wordCounts = pairs .reduceByKey (_ + _)
wordCounts .print ()
 
In the above code, each word is mapped to the form (word, 1) , that is, the paris variable, using the map method. 
  And then call the reduceByKey method, the same number of words will be reduceByKey , and finally print out the results of the reduceByKey . 
After writing the above code, the Spark Streaming program has not yet run and needs to write the following two lines of code to enable the Spark Streaming program to actually start executing.
  
ssc.start()
ssc.awaitTermination()
 

, TCP to send data and run Spark Streaming program

(1) Run Netcat
Use the following command to start a Netcat
nc -lk 9999
  • 1
Next, you can enter any statement on the command line.
(2) Run the Spark Streaming program
./bin/run-example streaming .NetworkWordCount localhost 9999
  • 1
Any statements entered in Netcat after the program is running will count the number of occurrences of each word, for example Write pictures described here

Third, the basic concept

This section details the basic concepts in Spark Streaming.

1, depending on the configuration

Spark Streaming related jar package can also use the Maven to manage, write a Spark Streaming program, the need to write the following content to the Maven project
 
 <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
 
For
 data from Kafka, Flume, Kinesis, the Spark Streaming core API does not 
provide these classes and interfaces, and the following dependencies 
need to be added.
 
Source 
Artifact
Kafka Spark-streaming-kafka-0-8_2.11
Flume Spark-streaming-flume_2.11
Kinesis Spark-streaming-kinesis-asl_2.11 [Amazon Software License]
 

2, initialize StreamingContext

The main entry of the Spark Streaming program is a StreamingContext object. At the StreamingContext of the program, the object needs to be StreamingContext . The code is as follows
import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))  
 
Where the app appName is the name of the current appName and can be appName on the Cluster UI.
   master is the Spark mode of operation , you can refer to Spark, Mesos or YARN cluster URL ,
 or set to local[*] in the form of local mode to run. 
 When you run a Streaming application in a production environment, you 
will not normally write the master parameter in the code, but will use 
the spark-submit command to commit the dynamic --master , as well as the launch of the application with spark-submit . 
 
 As for the batch time interval settings, the need to consider the 
performance requirements of the program and the cluster can provide 
resources. 
You can also generate a StreamingContext object based on the SparkContext object, using the streamingContext code:

import org.apache.spark.streaming._
val sc = ...                
val ssc = new StreamingContext(sc, Seconds(1))
 
 
When the context is initialized, the work that needs to be done is: 
  1. Generate input DStreams based on the data source type
  2. transformation entered DStreams by calling transformation and output DStreams
  3. Use the code streamingContext.start() start the program and start streamingContext.start() data
  4. Use the code streamingContext.awaitTermination() wait for the program to run streamingContext.awaitTermination() manual stop, or streamingContext.awaitTermination() Error after the streamingContext.awaitTermination() )
  5. You can use streamingContext.stop() to stop the streamingContext.stop() manually
Points to note:
  • When a context starts running, you can not add new computational logic to it
  • When a context is stopped, it can not be restart
  • In a JVM, only one StreamingContext object is running at the same time
  • The stop() method in SparkContext also SparkContext . If you only need to stop StreamingContext , set the optional StreamingContext of the stop() method to false to prevent the SparkContext being SparkContext
  • A SparkContext object can be used to StreamingContext multiple StreamingContext objects, as long as the new StreamingContext object is stopped before the new StreamingContext object is created.

3, Discrete data streams (Discretized Streams, DStreams)

DStream is the most basic and most important abstract concept in Spark Streaming. DStream consists of a series of data that can be either the data received from the data source or the data that has been transform from the data source via the transform . In essence, a DStream is composed of a series of RDDs , DStream in each RDD contains a batch of data.
Write pictures described here
DStream on the DStream is finally RDDs on the RDDs . For example, in the previous Word Count code will lines into words logic, lines on the flatMap on the form shown in the flatMap figure, the role of each of the flatMap RDD .
Write pictures described here
The RDDs on these RDDs will be RDDs by the Spark Engine. For DStream , DStream provides a more DStream use of high-end API, so DStream do not need too much DStream to the details of each DStream .
DStream can be DStream on the follow-up article will be further DStream .

4, input and receive DStream


(1) the basic data source <br> in the previous Word Count example program, has been used to ssc.socketTextStream(...) , this will be based on the TCP socket to generate a data to create a DStream . In addition to sockets, the StreamingContext API also supports the creation of DStream as a data source.
  • File data source: If you need to receive data from a file system, such as HDFS, S3, NFS, etc., you can use the following code
streamingContext.fileStream[ KeyClass, ValueClass, InputFormatClass ]( dataDirectory )
  • 1
The Spark Streaming program monitors the user's input dataDirectory path, receives and dataDirectory all the files in the path, but does not support files in dataDirectory .
Need to pay attention to the place:
A, all file data formats must be the same
B, the file under the path should be atomically moved to that path, or renamed to that path
C, the file can not be changed after entering the path, so this data source does not support the form of StreamingContext.textFileStream(dataDirectory) data write For simple text file, there is a simple StreamingContext.textFileStream(dataDirectory) method to deal with. And the form of the file data source does not need to run a receiver process, so the audit number of Execution is not required.
  • Data sources based on custom Receiver: DStream also supports reading data from user-defined Receivers.
  • RDDs Sequence Data Source: Use streamingContext.queueStream(queueOfRDDs) to convert a series of RDDs into a DStream. Each RDD in the queue is treated as a DStream in DStream and then processed as Streaming.
(2) high-order data sources
(3) custom data source <br> In addition to the above two types of data sources, you can also support custom data sources. When you customize a data source, you need to implement a user-defined receiver that can receive data from a custom data source and send it to a Spark. For details, refer to the Custom Receiver Guide .
(4) reliability of data reception

5, Transformations on DStreams

Similar to RDDs, transformations can cause the contents of the input DStream to be converted transformations to the DStream logic. DStreams support some of the same DStreams on many transformations .
Specific meaning and use of the method can refer to another blog: Spark Streaming in the operation function analysis
In these transformations above, there are some things that need further analysis
(1) UpdateStateByKey operation
(2) Transform operation The transform transform and its similar some of the transformwith , you can make the elements of the DStream can call any RDD-to-RDD DStream . You can make DStream call some only RDD and DStream API does not provide the operator. For example, the DStream API does not support every data in a data DStream. You can do join transform directly with another data set, but you can do this with transform . This DStream can be said to further enrich the DStream function.
And then enumerated a use of this operation scenes, will be calculated somewhere in a duplicate of information and real-time data flow in the record join, and then filter operation, can be used as a data clean method.
 val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // 一个包含重复信息的RDD

val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...)
  ...})
 
 
It is transform to note that the method of transform is invoked by each batch. 
 This allows you to do some time-varying operations on RDD, that is, 
RDD, partition, and broadcast variables can change between different 
batches. 
(3) Window operation Spark Streaming provides a class of window-based operations that can perform some transformations on a data set in a sliding window. The following figure shows an example of a window operation Write pictures described here
In the figure above, the window slides on a DStream source, and DStream exposed to the window in the RDDs source allow the window to be RDDs . As you can see in the figure above, only three RDDs can be seen at any one time in the window, and the window slides forward every 2 seconds. The two parameters mentioned here, just any window operation must be specified.
  • Window length: For example, the window length is 3
  • Slip interval: how long the window slides forward, 2 in the figure above.
    It should be noted that the above two parameters must be an integer multiple of the batch time, and the batch time in the above figure is 1.
    Next, a simple window operation example is shown. For example, on the basis of the previous word count sample program, I would like to count the number of occurrences of each word in the current 30 seconds every 10 seconds. A brief paris DStream of this function is that on the current 30 seconds of the paris DStream 's data set, call the reduceByKey . In order to achieve this function, you can use the window reduceByKeyAndWindow .
val windowedWordCounts = pairs.reduceByKeyAndWindow ((a:Int,b:Int) => (a + b), Seconds( 30 ), Seconds( 10 ))
  • 1
More window operations can refer to: Operation functions in Spark Streaming

6, DStreams on the output operation

The output DStream on the DStream allows the data in the DStream to be sent to an external system, such as a database or file system. DStream only through the output DStream , where the data can be used by external systems. And the following output operations really trigger the transformations that are called on the transformations object. This is similar to the Actions operator on RDDs.
For the use and function of the output operation, refer to: Operation Function Analysis in Spark Streaming
The foreachRDD is a further analysis of the foreachRDD to the external database to write some of the data foreachRDD .

dstream.foreachRDD is the most common and most dstream.foreachRDD in DStream output operations. How to use this operation correctly and efficiently, the following will list some of the use of methods and cases, can help readers in the course of use to avoid stepping on some of the pit.
Normally, if you want to write data to an external system, you need to create a connection object (for example, to provide a TCP connection tool for connecting to a remote server), use this connection tool to send data to the remote system The In Spark Streaming, the developer is likely to create the object at the Driver side, and then go to the Worker side to use the object to process the record. For example, the following example
dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // 在driver端执行
  rdd.foreach { record =>
    connection.send(record) // 在wroker端执行
  }}
The use of this method is wrong, when the driver to create the connection object, the need to serialize the connection object and sent to the wroker side. In general, the connection object is not transferable, that is, wroker side can not get the connection object, of course, can not be sent through the connection object sent out. In this case, the application system's error message may be a serialization error (the connection object can not be serialized), or an initialization error (the connection object needs to be initialized at the wroker side), and so on.
The correct way is to create this connection object at the worker side.
However, even if the worker creates the object, it may make the following mistakes.
dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }}
The above code creates a connection object for each record, resulting in too many connected objects. The number of creation of the connection object is limited by the time and system resources, so creating a connection object for each record can result in unnecessary high load on the system, further resulting in reduced system throughput.
A good way is to use rdd.foreachPartition , and then for each partition of RDD, so that a partition in the partition using the same partition object. As shown in the following code
 
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }}

Finally, you can optimize the above code by using the connection object pool. Using the RDDs/batches pool can further improve the RDDs/batches object, so that multiple RDDs/batches can be reused between the RDDs/batches object.
 
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>

    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // 使用完之后,将连接对象归还到池中以便下一次使用
  }}
It should be noted that the object in the connection pool is preferably set to lazy mode, that is, to create a connection object when it is actually used, and to set a life cycle for the connection object, and to log out of the connection object for a certain period of time.
Summarize the key points:
  • DStreams of transformations is transformations by the output RDDs , similar to the actions RDDs . The above list of some of the DStream output DStream can be one of the elements into RDD , which can be called RDD provide some API DStream , then if the RDD call actions will immediately force the received data processing. Therefore, if the user application does not need any output in the DStream, or only use some similar to the dstream.foreachRDD but in this dstream.foreachRDD does not call any of the RDD action dstream.foreachRDD , the program will not carry out any actual dstream.foreachRDD The The system will simply receive data, discard any data.
  • By default, the output operation is performed sequentially.

7, accumulators and broadcast variables

Spark Streaming's accumulator and checkpoint can not be checkpoint from checkpoint . If both the checkpoint and the checkpoint and the checkpoint variable are used in the checkpoint , it is best to checkpoint and the checkpoint variable so that the checkpoint and the checkpoint variable can be checkpoint when the driver fails to restart The Refer to the following code
object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }}

object DroppedWordsCounter {

  @volatile private var instance: Accumulator[Long] = null

  def getInstance(sc: SparkContext): Accumulator[Long] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.accumulator(0L, "WordsInBlacklistCounter")
        }
      }
    }
    instance
  }}

wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
  // Get or register the blacklist Broadcast
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use blacklist to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter += count
      false
    } else {
      true
    }
  }.collect()
  val output = "Counts at time " + time + " " + counts})
See the full code for the source code

8, DataFrame and SQL operations

In streaming data can also be very DataFrames use DataFrames and SQL DataFrames . To support this operation, you need to initialize a SQLContext object with the SparkContext object used by the SparkContext object. The SQLContext object is set to a lazy SQLContext object. The DataFrames code on the previous Word Count to make some changes, through the use of DataFrames and SQL to achieve Word Count function. Each RDD is DataFrame into a DataFrame object, and then DataFrame as a DataFrame table, and finally in this DataFrame table on the SQL query.

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // 获取单例SQLContext对象
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._

  // 将RDD[String]转化成DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // 注册表
  wordsDataFrame.registerTempTable("words")

  // 在该临时表上执行sql语句操作
  val wordCountsDataFrame =
    sqlContext.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()}

 See the full code for the source code . 
You can also perform SQL Streaming on the Streaming data that another thread gets (this involves running StreamingContext ). StreamingContext object can not sense the StreamingContext SQL query, so it is possible for the StreamingContext object to delete the StreamingContext data before the SQL query StreamingContext . In order to ensure that StreamingContext does not delete the StreamingContext data that needs to be used, it is StreamingContext to tell StreamingContext retain a certain amount of StreamingContext data. For example, if you want to execute a SQL query on a batch of data, but you need to execute the SQL for 5 minutes, you need to execute the streamingContext.remember(Minutes(5)) StreamingContext to tell StreamingContext to keep the StreamingContext data for 5 minutes The
For more information on DataFrames, refer to another blog: Spark-SQL DataFrame Operation Daquan

9, MLlib operation

10, cache and persistence

Similar to RDDs , DStreams also allow RDDs to DStreams data in the stream into memory. Using the persist() method on a DStream object will DStream each RDD in the DStream object into memory. This function is particularly useful when a DStream data needs to be evaluated multiple times. For window reduceByWindow such as reduceByWindow , and reduceByWindow states such as updateStateByKey , the DStream object is updateStateByKey by default. Therefore, the program will automatically run the window and the operation of the state involved in the operation of the DStream object generated by the persistence of memory, do not need to show the implementation of the developer implementation persist() operation.
For those streams that are received over the network (such as Kafka, Flume, Socket, etc.), the default persistence level is to persist the data to two nodes to ensure their fault tolerance.
Note that, unlike RDDs , the DStream 's DStream level by default is to DStream the data in memory. This feature will be analyzed later in the performance tuning. For an introduction to the persistence level, see rdd-persistence

11, checkpoint

When the Streaming running, it Streaming needs 7 * 24 in the running state, so you need to have some fault Streaming . Streaming settings are able to support Streaming to quickly recover from a failed state. Checkpoint to save the data there are two:
1. Metadata Streaming : Save the Streaming . Mainly used to restore the Streaming running on the driver node of the Streaming . Metadata includes:
A, configuration information: create a Streaming application configuration information b, DStream : DStream on a series of methods of DStream c, DStream batch: record into the waiting queue but has not yet DStream of the batch
2. Data checkpoint: Save the calculated RDD. In some cross-batch calculations and save the state of the operation, you must set the checkpoint. Because RDDs are calculated based on other batches of data in these operations, the calculation link will be longer and longer over time, and the cost of error recalculation will be particularly high.
Metadata checkpoint information is mainly used to restore the failure of the driver, the data checkpoint is mainly used for the calculation of recovery.
(1) When need to use checkpoints
When the application occurs in the following two cases, you need to configure the checkpoint.

- use state-related operator - such as updateStateByKey or reduceByKeyAndWindow , etc. In this case, you must set a reduceByKeyAndWindow for the updateStateByKey reduceByKeyAndWindow check the RDD.
- Driver-side application recovery - When the application is recovering from a failed state, it is necessary to read the relevant metadata information from the checkpoint.
(2) checkpoint settings
Usually in a fault-tolerant, highly reliable file system (such as HDFS, S3, etc.) set a checkpoint path, used to save checkpoint data. Setting the streamingContext.checkpoint(checkpointDirectory) You can use streamingContext.checkpoint(checkpointDirectory) in your streamingContext.checkpoint(checkpointDirectory) to specify the path.
If you want the StreamingContext.getOrCreate to use the StreamingContext.getOrCreate in the StreamingContext.getOrCreate when failing to restart, you need the StreamingContext.getOrCreate with the StreamingContext.getOrCreate two features that need to use the StreamingContext.getOrCreate code to re-create the StreamingContext object on failure:
  • When the StreamingContext is running for the first time, create a new StreamingContext object, and then start the program to process the DStream.
  • When the StreamingContext fails to restart, you can get the metadata StreamingContext from the set StreamingContext path, create a StreamingContext object, and return to the failed state.
    The following Scala code to achieve the above requirements.
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // 创建一个新的StreamingContext对象
    val lines = ssc.socketTextStream(...) // 得到DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // 设置checkpoint路径
    ssc
}

// 用checkpoint元数据创建StreamingContext对象或根据上面的函数创建新的对象
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// 设置context的其他参数
context. ...

// 启动context
context.start()
context.awaitTermination()
 
If the checkpointDirectory path exists, it will use the StreamingContext metadata to restore a StreamingContext object. If the path does not exist, or if the program is running for the first time, the functionToCreateContext is used to create a new StreamingContext object.
The RecoverableNetWorkWordCount sample code demonstrates an example of recovering an application from a checkpoint .

It should be noted that, want to use the above getOrCreate function, the need to run the getOrCreate to support the failure to getOrCreate restart the function. This function is analyzed in the next section.
In addition, the process of writing data to the checkpoint is to increase the system load. Therefore, it is necessary to set the time interval for writing the checkpoint data reasonably. For small batch time intervals (eg 1 second), if each batch performs a checkpoint write operation, it will significantly reduce the throughput performance of the system. On the contrary, if the write checkpoint data interval is too long, will lead to lineage too long. For those states that need to check the RDD for the RDD, the checkpoint write interval is preferably set to an integer multiple of the batch time interval. For example, for a 1 second batch interval, set to 10 seconds. For dstream.checkpoint(checkpointInterval) time dstream.checkpoint(checkpointInterval) , you can use dstream.checkpoint(checkpointInterval) . In general, the checkpoint interval is set to 5~10 times the sliding time interval is more reasonable.

12, deploy the application

This section focuses on how to deploy a Spark Streaming application.

(1) Requirements <br> Run a Spark Streaming application that needs to meet the requirements.
  • You need to have a cluster with a cluster manager - you can refer to the Spark application deployment document
  • The application is tagged into a JAR package - the application needs to be tagged as a JAR package. Then use the spark-submit command to run the application, in the JAR package does not need to enter the Spark and Spark Streaming related JAR package. However, if the application uses high-level data sources such as Kafka or Flume, it is necessary to move these dependent JAR packages and these dependencies into the application JAR package. For example, if you use KafkaUtils in your KafkaUtils , you need to bring spark-streaming-kafka-0.8_2.11 and its spark-streaming-kafka-0.8_2.11 into the spark-streaming-kafka-0.8_2.11 JAR package.
  • Set up enough memory for the Executor - Since the received data must be stored in memory, you must set enough memory for the Executor to accommodate the received data. Note that if you do a 10-minute window operation in the application, the system saves at least 10 minutes of data in memory. So the application needs the memory in addition to the data received by the decision, but also need to consider the type of operation in the application.
  • Set the checkpoint - if the application needs to use the checkpoint, you need to set the checkpoint path on the file storage system.
  • Set the driver to automatically restart the application - in order to achieve the driver after the failure to automatically restart the function, the application running the system must be able to monitor the driver process, and if the driver failed to automatically restart the application. Different clusters use different ways to achieve automatic restart function.
    • Spark Standalone - In this mode, the driver program runs on a wroker node. And, Standalone Cluster Manager will monitor the driver program, if you find the driver to stop running, and its status code is non-zero value or run the driver because the node failed to cause the driver failed, it will automatically restart the application. Specific monitoring and failure restart can be further referenced Spark Standalone guide
    • YARN - Yarn supports mechanisms for similar auto-restart applications. More details can be further referenced by YARN related documentation
    • Mesos - Mesos uses Marathon for automatic restart
  • Set write ahead logs - from the Spark-1.2 version, the introduction of a write ahead log mechanism to achieve fault tolerance. If the WAL function is set, all received data is written to the write ahead log. This can prevent the driver from restarting data loss, so you can ensure that the data zero loss, which can refer to the previous introduction. Open this function by spark.streaming.receiver.writeAheadLog.enable=true . However, the opening of this feature will reduce the throughput of data reception. This can be done by simultaneously running multiple receive processes in parallel (which is described later in the Performance Tuning section) to counteract the negative impact. In addition, if the storage level of the input data stream has been set to Storagelevel.MEMORY_AND_DISK_SET , the WAL function can be turned off because the received data has already been saved on the file system. When using S3 and any other file system that does not support flushng function to write ahead logs, remember to set spark.streaming.driver.writeAheadLog.closeFileAfterWrite and spark.streaming.receiver.writeAheadLog.closeFileAfterWrite two spark.streaming.receiver.writeAheadLog.closeFileAfterWrite .
  • Setting Spark Streaming Maximum Data Reception - If you are running a Streaming application that does not have a lot of resources, the data processing capacity can not keep up with the rate at which data is received, and you can set a maximum number of received records per second for the application. For the spark.streaming.receiver.maxRate Receiver mode, set spark.streaming.receiver.maxRate , for Direct Kafka mode, set spark.streaming.kafka.maxRatePerPartition limit the rate of data read from each Kafka spark.streaming.kafka.maxRatePerPartition . If a Topic has 8 spark.streaming.kafka.maxRatePerpartition=100 , spark.streaming.kafka.maxRatePerpartition=100 , then the maximum received record for each batch is 800 . Starting with the Spark-1.5 release, a backpressure to avoid setting this backpressure . Spark Streaming automatically calculates the current rate limit and dynamically adjusts the threshold. Turn on the backpressure function by setting spark.streaming.backpressure.enabled to true .
(2) to upgrade the application code <br> If the running application is updated, you need to run the updated code, there are two mechanisms.
  • The upgraded application is started directly and executed in parallel with the existing application. In the process of running the old and new applications in parallel, will receive and deal with some of the same data.
  • Gracefully stops the running application and then launches the upgraded application, and the new application will continue to process the data from the old application stop. It should be noted that in this way, the data source needs to have the ability to cache data, otherwise the old and new applications in the intermittent period of convergence, the data can not be processed. Such as Kafka and Flume have the ability to cache data. Also, in this case, re- SparkStreamingContext object from the SparkStreamingContext old SparkStreamingContext is no longer SparkStreamingContext because the SparkStreamingContext may not contain updated code logic, which can cause an error in the program. In this case, either re-specify a checkpoint or delete the previous checkpoint.

13, monitor the application

When the Spark Streaming application is running, the Spark Web UI page will have a Streaming tab, which will display some Streaming related parameters, such as whether Receiver is running, how many records are received, how many records are processed, and so on. As well as Batch-related information, including batch execution time, waiting time, the number of completed batch, the number of running batch and so on. There are two time parameters inside the need to pay attention to understand some:
  • Processing Time - the processing time of the data in each batch
  • Scheduling Delay - the current batch from the queue into the implementation of the delay time
    If the processing time is longer than the batch time span, or the delay time is increasing, indicating that the system has been unable to handle the current amount of data, and this time need to consider how to reduce the processing time of each batch. How to reduce the batch processing time, you can refer to the fourth section.
    In addition to monitoring page, Spark also offers StreamingListener interface, you can get to the receiver as well as batch processing times and other information through this interface.

Fourth, performance tuning

In order to make the Spark Streaming applications to run better, require some tuning settings, this section will analyze some performance tuning of parameters and set the rules. In performance tuning, mainly to consider the following two questions:
  • How to make full use of cluster resources to reduce the processing time for each Batch
  • Batch how to set a reasonable size, so that the application can be timely processing of the received data

1, reducing the processing time for each of the Batch

The following content in Spark tuning has been introduced in some places re-emphasize here that need attention in Streaming.

(1) process the received data parallelism data <br> received through a network (such as Kafka, Flume, socket, etc.), then you first need to deserialize stored in Spark. When receiving the data become the bottleneck of the system, we need to consider how to improve the system's ability to receive data. Each DStream runs a process input data stream received at a node Worker. If more than one process the received data stream is created, you can generate a plurality of input DStream. For example, for Kafka data source, if you are using a DStream receive data from two Topic in it, you can open these two Topic, received by the data receiving two separate processes. Upon receipt of the two DSTREAM receiver, it can then be combined in the two applications DStream. Such as shown in the following code
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
Note that a parameter spark.streaming.blockInterval. For Receiver, the received data before saving to memory Spark, will converge together to form a block. Batch number in each block determines the number of such data processing task program is running. Each task number batck each receiver corresponding substantially (BATCH interval / block interval). For an example 2mof the batch, if the block interval is 200msso, there will be 10 task. If the number is too small task, processing of the data will not be very efficient. In the case of a fixed batch time, if you want to increase the number of the task, then you need to reduce the blockIntervalparameter, the default value of this parameter 200ms, the parameter official recommended lower limit 50ms, below which 50msmay cause other problems.
Another method to improve the processing capability of data concurrency explicit repartition receive data inputStream.repartition(<number of partitions>).

Parallelism (2) data processing <br> for reduceByKeyand reduceByKeyAndWindowoperation, the parallel number by the task parameters spark.default.parallelismcontrolled. If you want to increase the degree of parallelism of processing data, such methods can be invoked when the specified parameters in parallel, or the spark.default.parallelismparameters adjusted according to actual situation cluster.
(3) the sequence of data <br> by adjusting a sequence of parameters related to data serialization improve performance. In Streaming applications, there are two types of data need to be serialized operations.
  • Input data: By default, Receiver received data StorageLevel.MEMORY_AND_DIS_SER_2stored in the form of Executor memory. That is, in order to reduce overhead GC, data bytes are serialized into forms, and also taking into account the executor failed FT. This data is first stored in memory, when will spill out of memory to disk. One obvious problem with this approach is that, after receiving the data Spark, first need to deserialize the data, and then follow the sequence of the Spark manner again the data.
  • Streaming operations persistent RDD: Streaming calculated to produce the RDD may also persist into memory. Data manipulation functions such as the window will be cached for subsequent repeated use. And Streaming applications, this data is stored in the level StorageLevel.MEMORY_ONLY_SET(default is Spark Core StorageLevel.MEMORY_ONLY). Streaming data on more than one sequence of operation, which is primarily to reduce overhead GC.
    In both cases above, it may be used Kyroway to serialize the data, while reducing the memory and CPU overhead. For further reference to the sequence of the Spark tuning . For Kyroparameter setting mode, refer to Spark Kyro parameter settings .
    Under normal circumstances, if the amount of data that needs to be cached is small, the data can be directly stored in a non-serialized form, this will not bring significant GC overhead. For example, only a few seconds BATCH time, and does not use the window function operation, the specified storage level may be displayed when persistent, lasting avoid data serialization operation on the data.

    (4) improve task performance starts <br> If the number of starts per second task too much (usually more than 50), then start on the task frequently is also a loss can not be ignored. When this happens, we need to consider the Execution mode. In general, in the Spark of Standalone mode and coarse-grained Mesos mode task start time will be lower than fine-grained Mesos mode.

2, how to set the correct time interval Batch

In order to make a stable Spark Streaming applications running on the cluster, the application needs to ensure timely processing when the data is received. If the processing rate does not match with the accumulation of time, the data waiting to be processed will be more and more, eventually leading to the application can not function properly. At best batch processing time interval is less than the batch. Therefore, the correct settings Batch reasonable time interval is very important.

3, memory tuning

For the Spark and use memory more details GC Spark application performance tuning in Spark tuning has been described in more detail. Here a simple analysis of some of the Spark Streaming application will use parameters.

A Spark Streaming applications require much memory resource cluster, it is largely determined by the application specific logic, that need to see the application transformationstype. When such code is used up to 10 minutes to a window operation, it is necessary to be able to use 10 minutes of data are stored into the memory the amount of memory. If updateStateByKeythis operation, the data of different keyparticularly, also uses more memory. If the application logic is relatively simple, merely receiving - filter - when storing a series of operations, the amount of memory consumption will be significantly reduced.

By default, the received data Receivers will StorageLevel.MEMORY_AND_DISK_SER_2level process storage, when the memory when the spill will not fit on the disk, but this would reduce the processing performance of the application, the application in order to more efficient operation, much better to assign some memory for use. Typically by a small amount in the case of data, evaluate the amount of memory used for data, then calculate the total amount of allocated memory size required formal application deployment.

Another aspect of the regulation of memory garbage collection is provided. Application system is a low-latency, JVM lead to the application when garbage collection pauses long run is a very nasty scene.
Here are some aspects may be used to adjust the amount of memory usage and GC properties:
  • DStreams persistence level: mentioned earlier, the input data may be persisted in a default sequence of byte form. Compared with non-sequential storage, this will reduce the memory usage and reduce the burden of garbage collection. Use Kryomanner using the memory and the data size can be further reduced after the sequence of the sequence. We want to further reduce the amount of memory and can further increase the compression on a data parameter spark.rdd.compressset.
  • Clear old data: By default, all input data and DStream through different transformationsautomatic clean up persistent data will be. Spark Streaming according to transformationsdifferent to decide which data needs to be cleared away. For example, when a 10-minute window function, Spark Streaming saved data at least 10 minutes. The data you want to save even more time, you can set streamingContext.remenberparameters.
  • Garbage collection algorithm using CMS: CMS is especially recommended to reduce garbage collection GC pressure. By providing a driver spark-submitcommand --driver-java-optionsto specify the parameters provided by the executor spark.executor.extraJavaOptionsto the specified parameters.
  • Other suggestions: to further reduce the burden on GC, you can use the following methods.
    • Tachyon offers use of OFF_HEAPstorage levels to persist RDDs, you can refer to the RDD Persistence
    • Reduce the heap size, use more executors. This reduces the pressure in each JVM GC heap.

Five, fault tolerance

This section focuses on the approach after the Spark Streaming application to fail.

Commentaires

  1. very informative blog and useful article thank you for sharing with us , keep posting learn more Big Data Hadoop Online Course Hyderabad

    RépondreSupprimer
  2. I like your post very much. It is very much useful for my research. I hope you to share more info about this. Keep posting Spark Online Training Hyderabad

    RépondreSupprimer

Enregistrer un commentaire

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization