Spark Streaming
I. Overview
Spark Streaming is a scalable, high-throughput, and fault-tolerant component for processing real-time data streams based on the Core Spark API. Spark Streaming can receive data from various data sources, such as Kafka, Flume, Kinesis, or TCP. You can also use some of the complex data that can be encapsulated with high-order functions such asmap, reduce, join
and window
Algorithm for further processing. Finally, the processed data can be written to the file system, the database, or directly for real-time display. In addition, you can also apply some machine learning or graph calculation algorithms on the data stream. The figure above shows Spark Streaming's overall data flow. Spark Streaming can refer to the following diagram, Spark Streaming to receive real-time data, and then split the data into a batch, and then through the Spark Engine, respectively, each batch and output.
One of the most important concepts in Spark Streaming is DStream, the discretized stream, which consists of a series of successive data sets. There are two ways to create a DStream, one for generating the initial DStream from the data source and the other by DStream A. A DStream is essentially a series of RDDs.
This article
DStream
how to write Spark Streaming programs based on DStream
.
Spark Streaming provides Scala, Java, and Python interfaces. In the
official documentation, there are sample programs implemented in these
three languages, where only the programs written by Scala are analyzed. Second, the sample program
Before delving into the features and principles of Spark Streaming, write a simple Spark Streaming program and run it for the entry first to understand some of the basics. This sample program receives data from the TCP socket and performs a Word Count operation.1, Streaming programming
You first need to import Spark Streaming related classes, where StreamingContext is the primary entry for all Streaming programs. The next code to create alocal StreamingContext
, batch time is 1 second, the number of execution threads is 2.import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// 创建一个local StreamingContext batch时间为1秒,execution线程数为2
// master的线程数数最少为2,后面会详细解释
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, econds(1))
Use the above ssc
object, you can create a line variable used to localhost
the data received from TCP,
and specify the machine name localhost
port number 9999
val lines = ssc.socketTextStream("localhost", 9999)
Each line in the lines
is a line of text in TCP. Next, use a space to split each row of statements.val words = lines .flatMap(_. split ( " " ))
The flatMap
used above is a one-to-many DStream
, which means that each entry is recorded,
and multiple words are DStream words
to the spaces.
These words form a new DStream words
.
Then count the number of words.
import org .apache .spark .streaming .StreamingContext ._
val pairs = words .map (word => (word, 1 ))
val wordCounts = pairs .reduceByKey (_ + _)
wordCounts .print ()
In the above code, each word is mapped to the form (word, 1)
, that is, the paris variable, using the map
method.
And then call the reduceByKey
method, the same number of words will be reduceByKey
, and finally print out the results of the reduceByKey
.
After writing the above code, the Spark Streaming program has not yet
run and needs to write the following two lines of code to enable the
Spark Streaming program to actually start executing.
ssc.start()
ssc.awaitTermination()
, TCP to send data and run Spark Streaming program
(1) Run NetcatUse the following command to start a Netcat
nc -lk 9999
- 1
(2) Run the Spark Streaming program
./bin/run-example streaming .NetworkWordCount localhost 9999
- 1
Third, the basic concept
This section details the basic concepts in Spark Streaming.1, depending on the configuration
Spark Streaming related jar package can also use the Maven to manage, write a Spark Streaming program, the need to write the following content to the Maven project
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.0</version>
</dependency>
For
data from Kafka, Flume, Kinesis, the Spark Streaming core API does not
provide these classes and interfaces, and the following dependencies
need to be added.
| Artifact |
---|---|
Kafka | Spark-streaming-kafka-0-8_2.11 |
Flume | Spark-streaming-flume_2.11 |
Kinesis | Spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
2, initialize StreamingContext
The main entry of the Spark Streaming program is aStreamingContext
object. At the StreamingContext
of the program, the object needs to be StreamingContext
. The code is as follows import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
Where the app appName
is the name of the current appName
and can be appName
on the Cluster UI.
master
is the Spark mode of operation , you can refer to Spark, Mesos or YARN cluster URL ,
or set to local[*]
in the form of local mode to run.
When you run a Streaming application in a production environment, you
will not normally write the master parameter in the code, but will use
the spark-submit
command to commit the dynamic --master
, as well as the launch of the application with spark-submit .
As for the batch time interval settings, the need to consider the
performance requirements of the program and the cluster can provide
resources.
You can also generate a StreamingContext
object based on the SparkContext
object, using the streamingContext code:import org.apache.spark.streaming._
val sc = ...
val ssc = new StreamingContext(sc, Seconds(1))
When the context is initialized, the work that needs to be done is:
- Generate input
DStreams
based on the data source type -
transformation
enteredDStreams
by callingtransformation
and outputDStreams
- Use the code
streamingContext.start()
start the program and startstreamingContext.start()
data - Use the code
streamingContext.awaitTermination()
wait for the program to runstreamingContext.awaitTermination()
manual stop, orstreamingContext.awaitTermination()
Error
after thestreamingContext.awaitTermination()
) - You can use
streamingContext.stop()
to stop thestreamingContext.stop()
manually
- When a
context
starts running, you can not add new computational logic to it - When a
context
is stopped, it can not berestart
- In a JVM, only one
StreamingContext
object is running at the same time - The
stop()
method inSparkContext
alsoSparkContext
. If you only need to stopStreamingContext
, set the optionalStreamingContext
of thestop()
method tofalse
to prevent theSparkContext
beingSparkContext
- A
SparkContext
object can be used toStreamingContext
multipleStreamingContext
objects, as long as the newStreamingContext
object is stopped before the newStreamingContext
object is created.
3, Discrete data streams (Discretized Streams, DStreams)
DStream
is the most basic and most important abstract concept in Spark Streaming. DStream
consists of a series of data that can be either the data received from the data source or the data that has been transform
from the data source via the transform
. In essence, a DStream
is composed of a series of RDDs
, DStream
in each RDD
contains a batch of data. DStream
on the DStream
is finally RDDs
on the RDDs
. For example, in the previous Word Count code will lines
into words
logic, lines
on the flatMap
on the form shown in the flatMap
figure, the role of each of the flatMap
RDD
. The
RDDs
on these RDDs
will be RDDs
by the Spark Engine. For DStream
, DStream
provides a more DStream
use of high-end API, so DStream
do not need too much DStream
to the details of each DStream
. DStream
can be DStream
on the follow-up article will be further DStream
. 4, input and receive DStream
(1) the basic data source <br> in the previous Word Count example program, has been used to
ssc.socketTextStream(...)
, this will be based on the TCP socket to generate a data to create a DStream
. In addition to sockets, the StreamingContext API also supports the creation of DStream
as a data source. - File data source: If you need to receive data from a file system, such as HDFS, S3, NFS, etc., you can use the following code
streamingContext.fileStream[ KeyClass, ValueClass, InputFormatClass ]( dataDirectory )
- 1
dataDirectory
path, receives and dataDirectory
all the files in the path, but does not support files in dataDirectory
. Need to pay attention to the place:
A, all file data formats must be the same
B, the file under the path should be atomically moved to that path, or renamed to that path
C, the file can not be changed after entering the path, so this data source does not support the form of
StreamingContext.textFileStream(dataDirectory)
data write For simple text file, there is a simple StreamingContext.textFileStream(dataDirectory)
method to deal with. And the form of the file data source does not need to run a receiver process, so the audit number of Execution is not required. - Data sources based on custom Receiver: DStream also supports reading data from user-defined Receivers.
- RDDs Sequence Data Source: Use
streamingContext.queueStream(queueOfRDDs)
to convert a series of RDDs into a DStream. Each RDD in the queue is treated as aDStream
in DStream and then processed as Streaming.
(3) custom data source <br> In addition to the above two types of data sources, you can also support custom data sources. When you customize a data source, you need to implement a user-defined receiver that can receive data from a custom data source and send it to a Spark. For details, refer to the Custom Receiver Guide .
(4) reliability of data reception
5, Transformations on DStreams
Similar to RDDs,transformations
can cause the contents of the input DStream
to be converted transformations
to the DStream
logic. DStreams
support some of the same DStreams
on many transformations
. Specific meaning and use of the method can refer to another blog: Spark Streaming in the operation function analysis
In these
transformations
above, there are some things that need further analysis (1) UpdateStateByKey operation
(2) Transform operation The transform
transform
and its similar some of the transformwith
, you can make the elements of the DStream
can call any RDD-to-RDD DStream
. You can make DStream
call some only RDD and DStream API does not provide the operator. For example, the DStream API does not support every data in a data DStream. You can do join
transform
directly with another data set, but you can do this with transform
. This DStream
can be said to further enrich the DStream
function. And then enumerated a use of this operation scenes, will be calculated somewhere in a duplicate of information and real-time data flow in the record join, and then filter operation, can be used as a data clean method.
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // 一个包含重复信息的RDD
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...)
...})
It is transform
to note that the method of transform
is invoked by each batch.
This allows you to do some time-varying operations on RDD, that is,
RDD, partition, and broadcast variables can change between different
batches.
(3) Window operation Spark Streaming provides a class of window-based operations that can perform some transformations
on a data set in a sliding window. The following figure shows an example of a window operation In the figure above, the window slides on a
DStream
source, and DStream
exposed to the window in the RDDs
source allow the window to be RDDs
.
As you can see in the figure above, only three RDDs can be seen at any
one time in the window, and the window slides forward every 2 seconds. The two parameters mentioned here, just any window operation must be specified. - Window length: For example, the window length is 3
- Slip interval: how long the window slides forward, 2 in the figure above.
It should be noted that the above two parameters must be an integer multiple of the batch time, and the batch time in the above figure is 1.
Next, a simple window operation example is shown. For example, on the basis of the previous word count sample program, I would like to count the number of occurrences of each word in the current 30 seconds every 10 seconds. A briefparis DStream
of this function is that on the current 30 seconds of theparis DStream
's data set, call thereduceByKey
. In order to achieve this function, you can use the windowreduceByKeyAndWindow
.
val windowedWordCounts = pairs.reduceByKeyAndWindow ((a:Int,b:Int) => (a + b), Seconds( 30 ), Seconds( 10 ))
- 1
6, DStreams on the output operation
The outputDStream
on the DStream
allows the data in the DStream
to be sent to an external system, such as a database or file system. DStream
only through the output DStream
, where the data can be used by external systems. And the following output operations really trigger the transformations that are called on the transformations
object. This is similar to the Actions
operator on RDDs. For the use and function of the output operation, refer to: Operation Function Analysis in Spark Streaming
The
foreachRDD
is a further analysis of the foreachRDD
to the external database to write some of the data foreachRDD
. dstream.foreachRDD
is the most common and most dstream.foreachRDD
in DStream output operations.
How to use this operation correctly and efficiently, the following will
list some of the use of methods and cases, can help readers in the
course of use to avoid stepping on some of the pit. Normally, if you want to write data to an external system, you need to create a connection object (for example, to provide a TCP connection tool for connecting to a remote server), use this connection tool to send data to the remote system The In Spark Streaming, the developer is likely to create the object at the Driver side, and then go to the
Worker
side to use the object to process the record. For example, the following example dstream.foreachRDD { rdd =>
val connection = createNewConnection() // 在driver端执行
rdd.foreach { record =>
connection.send(record) // 在wroker端执行
}}
The use of this method is wrong, when the driver to create the
connection object, the need to serialize the connection object and sent
to the wroker side.
In general, the connection object is not transferable, that is, wroker
side can not get the connection object, of course, can not be sent
through the connection object sent out.
In this case, the application system's error message may be a
serialization error (the connection object can not be serialized), or an
initialization error (the connection object needs to be initialized at
the wroker side), and so on. The correct way is to create this connection object at the worker side.
However, even if the worker creates the object, it may make the following mistakes.
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}}
The above code creates a connection object for each record, resulting in too many connected objects.
The number of creation of the connection object is limited by the time
and system resources, so creating a connection object for each record
can result in unnecessary high load on the system, further resulting in
reduced system throughput. A good way is to use
rdd.foreachPartition
, and then for each partition
of RDD, so that a partition
in the partition
using the same partition
object. As shown in the following code
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}}
Finally, you can optimize the above code by using the connection object pool. Using the
RDDs/batches
pool can further improve the RDDs/batches
object, so that multiple RDDs/batches
can be reused between the RDDs/batches
object.dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // 使用完之后,将连接对象归还到池中以便下一次使用
}}
It should be noted that the object in the connection pool is preferably
set to lazy mode, that is, to create a connection object when it is
actually used, and to set a life cycle for the connection object, and to
log out of the connection object for a certain period of time. Summarize the key points:
-
DStreams
oftransformations
istransformations
by the outputRDDs
, similar to theactions
RDDs
. The above list of some of theDStream
outputDStream
can be one of the elements intoRDD
, which can be called RDD provide some APIDStream
, then if theRDD
callactions
will immediately force the received data processing. Therefore, if the user application does not need any output in the DStream, or only use some similar to thedstream.foreachRDD
but in thisdstream.foreachRDD
does not call any of theRDD action
dstream.foreachRDD
, the program will not carry out any actualdstream.foreachRDD
The The system will simply receive data, discard any data. - By default, the output operation is performed sequentially.
7, accumulators and broadcast variables
Spark Streaming's accumulator andcheckpoint
can not be checkpoint
from checkpoint
. If both the checkpoint
and the checkpoint
and the checkpoint
variable are used in the checkpoint
, it is best to checkpoint
and the checkpoint
variable so that the checkpoint
and the checkpoint
variable can be checkpoint
when the driver fails to restart The Refer to the following code object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}}
object DroppedWordsCounter {
@volatile private var instance: Accumulator[Long] = null
def getInstance(sc: SparkContext): Accumulator[Long] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.accumulator(0L, "WordsInBlacklistCounter")
}
}
}
instance
}}
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter += count
false
} else {
true
}
}.collect()
val output = "Counts at time " + time + " " + counts})
See the full code for the source code 8, DataFrame and SQL operations
In streaming data can also be veryDataFrames
use DataFrames
and SQL DataFrames
. To support this operation, you need to initialize a SQLContext object with the SparkContext object used by the SparkContext
object. The SQLContext
object is set to a lazy SQLContext
object. The DataFrames
code on the previous Word Count to make some changes, through the use of DataFrames
and SQL
to achieve Word Count function. Each RDD is DataFrame
into a DataFrame
object, and then DataFrame
as a DataFrame
table, and finally in this DataFrame
table on the SQL query. val words: DStream[String] = ...
words.foreachRDD { rdd =>
// 获取单例SQLContext对象
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
// 将RDD[String]转化成DataFrame
val wordsDataFrame = rdd.toDF("word")
// 注册表
wordsDataFrame.registerTempTable("words")
// 在该临时表上执行sql语句操作
val wordCountsDataFrame =
sqlContext.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()}
See the full code for the source code .
You can also perform SQL
Streaming
on the Streaming
data that another thread gets (this involves running StreamingContext
). StreamingContext
object can not sense the StreamingContext
SQL query, so it is possible for the StreamingContext
object to delete the StreamingContext
data before the SQL query StreamingContext
. In order to ensure that StreamingContext
does not delete the StreamingContext
data that needs to be used, it is StreamingContext
to tell StreamingContext
retain a certain amount of StreamingContext
data.
For example, if you want to execute a SQL query on a batch of data, but
you need to execute the SQL for 5 minutes, you need to execute the streamingContext.remember(Minutes(5))
StreamingContext
to tell StreamingContext
to keep the StreamingContext
data for 5 minutes The For more information on DataFrames, refer to another blog: Spark-SQL DataFrame Operation Daquan
9, MLlib operation
10, cache and persistence
Similar toRDDs
, DStreams
also allow RDDs
to DStreams
data in the stream
into memory. Using the persist()
method on a DStream
object will DStream
each RDD
in the DStream
object into memory. This function is particularly useful when a DStream data needs to be evaluated multiple times. For window reduceByWindow
such as reduceByWindow
, and reduceByWindow
states such as updateStateByKey
, the DStream
object is updateStateByKey
by default.
Therefore, the program will automatically run the window and the
operation of the state involved in the operation of the DStream object
generated by the persistence of memory, do not need to show the
implementation of the developer implementation persist()
operation. For those
streams
that are received over the network (such as Kafka, Flume, Socket,
etc.), the default persistence level is to persist the data to two nodes
to ensure their fault tolerance. Note that, unlike
RDDs
, the DStream
's DStream
level by default is to DStream
the data in memory. This feature will be analyzed later in the performance tuning. For an introduction to the persistence level, see rdd-persistence 11, checkpoint
When theStreaming
running, it Streaming
needs 7 * 24 in the running state, so you need to have some fault Streaming
. Streaming
settings are able to support Streaming
to quickly recover from a failed state. Checkpoint to save the data there are two: 1.
Metadata
Streaming
: Save the Streaming
. Mainly used to restore the Streaming
running on the driver node of the Streaming
. Metadata includes: A, configuration information: create a Streaming application configuration information b,
DStream
: DStream
on a series of methods of DStream
c, DStream
batch: record into the waiting queue but has not yet DStream
of the batch 2. Data checkpoint: Save the calculated RDD. In some cross-batch calculations and save the state of the operation, you must set the checkpoint. Because RDDs are calculated based on other batches of data in these operations, the calculation link will be longer and longer over time, and the cost of error recalculation will be particularly high.
Metadata checkpoint information is mainly used to restore the failure of the driver, the data checkpoint is mainly used for the calculation of recovery.
(1) When need to use checkpoints
When the application occurs in the following two cases, you need to configure the checkpoint.
- use state-related operator - such as
updateStateByKey
or reduceByKeyAndWindow
, etc. In this case, you must set a reduceByKeyAndWindow
for the updateStateByKey
reduceByKeyAndWindow
check the RDD. - Driver-side application recovery - When the application is recovering from a failed state, it is necessary to read the relevant metadata information from the checkpoint.
(2) checkpoint settings
Usually in a fault-tolerant, highly reliable file system (such as HDFS, S3, etc.) set a checkpoint path, used to save checkpoint data. Setting the
streamingContext.checkpoint(checkpointDirectory)
You can use streamingContext.checkpoint(checkpointDirectory)
in your streamingContext.checkpoint(checkpointDirectory)
to specify the path. If you want the
StreamingContext.getOrCreate
to use the StreamingContext.getOrCreate
in the StreamingContext.getOrCreate
when failing to restart, you need the StreamingContext.getOrCreate
with the StreamingContext.getOrCreate
two features that need to use the StreamingContext.getOrCreate
code to re-create the StreamingContext
object on failure: - When the
StreamingContext
is running for the first time, create a newStreamingContext
object, and then start the program to process the DStream. - When the
StreamingContext
fails to restart, you can get the metadataStreamingContext
from the setStreamingContext
path, create aStreamingContext
object, and return to the failed state.
The following Scala code to achieve the above requirements.
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // 创建一个新的StreamingContext对象
val lines = ssc.socketTextStream(...) // 得到DStreams
...
ssc.checkpoint(checkpointDirectory) // 设置checkpoint路径
ssc
}
// 用checkpoint元数据创建StreamingContext对象或根据上面的函数创建新的对象
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// 设置context的其他参数
context. ...
// 启动context
context.start()
context.awaitTermination()
If the checkpointDirectory
path exists, it will use the StreamingContext
metadata to restore a StreamingContext
object. If the path does not exist, or if the program is running for the first time, the functionToCreateContext
is used to create a new StreamingContext
object. The RecoverableNetWorkWordCount sample code demonstrates an example of recovering an application from a checkpoint .
It should be noted that, want to use the above
getOrCreate
function, the need to run the getOrCreate
to support the failure to getOrCreate
restart the function. This function is analyzed in the next section. In addition, the process of writing data to the checkpoint is to increase the system load. Therefore, it is necessary to set the time interval for writing the checkpoint data reasonably. For small batch time intervals (eg 1 second), if each batch performs a checkpoint write operation, it will significantly reduce the throughput performance of the system. On the contrary, if the write checkpoint data interval is too long, will lead to lineage too long. For those states that need to check the RDD for the RDD, the checkpoint write interval is preferably set to an integer multiple of the batch time interval. For example, for a 1 second batch interval, set to 10 seconds. For
dstream.checkpoint(checkpointInterval)
time dstream.checkpoint(checkpointInterval)
, you can use dstream.checkpoint(checkpointInterval)
. In general, the checkpoint interval is set to 5~10
times the sliding time interval is more reasonable. 12, deploy the application
This section focuses on how to deploy a Spark Streaming application.(1) Requirements <br> Run a Spark Streaming application that needs to meet the requirements.
- You need to have a cluster with a cluster manager - you can refer to the Spark application deployment document
- The application is tagged into a JAR package - the application needs to be tagged as a JAR package. Then use the spark-submit command to run the application, in the JAR package does not need to enter the Spark and Spark Streaming related JAR package.
However, if the application uses high-level data sources such as Kafka
or Flume, it is necessary to move these dependent JAR packages and these
dependencies into the application JAR package. For example, if you use
KafkaUtils
in yourKafkaUtils
, you need to bringspark-streaming-kafka-0.8_2.11
and itsspark-streaming-kafka-0.8_2.11
into thespark-streaming-kafka-0.8_2.11
JAR package. - Set up enough memory for the Executor - Since the received data must be stored in memory, you must set enough memory for the Executor to accommodate the received data. Note that if you do a 10-minute window operation in the application, the system saves at least 10 minutes of data in memory. So the application needs the memory in addition to the data received by the decision, but also need to consider the type of operation in the application.
- Set the checkpoint - if the application needs to use the checkpoint, you need to set the checkpoint path on the file storage system.
-
Set the driver to automatically restart the application - in order to
achieve the driver after the failure to automatically restart the
function, the application running the system must be able to monitor the
driver process, and if the driver failed to automatically restart the
application. Different clusters use different ways to achieve automatic restart function.
- Spark Standalone - In this mode, the driver program runs on a wroker node. And, Standalone Cluster Manager will monitor the driver program, if you find the driver to stop running, and its status code is non-zero value or run the driver because the node failed to cause the driver failed, it will automatically restart the application. Specific monitoring and failure restart can be further referenced Spark Standalone guide
- YARN - Yarn supports mechanisms for similar auto-restart applications. More details can be further referenced by YARN related documentation
- Mesos - Mesos uses Marathon for automatic restart
- Set write ahead logs - from the Spark-1.2 version, the introduction of a write ahead log mechanism to achieve fault tolerance. If the WAL function is set, all received data is written to the write ahead log.
This can prevent the driver from restarting data loss, so you can
ensure that the data zero loss, which can refer to the previous
introduction. Open this function by
spark.streaming.receiver.writeAheadLog.enable=true
. However, the opening of this feature will reduce the throughput of data reception. This can be done by simultaneously running multiple receive processes in parallel (which is described later in the Performance Tuning section) to counteract the negative impact. In addition, if the storage level of the input data stream has been set toStoragelevel.MEMORY_AND_DISK_SET
, the WAL function can be turned off because the received data has already been saved on the file system. When using S3 and any other file system that does not support flushng function to write ahead logs, remember to setspark.streaming.driver.writeAheadLog.closeFileAfterWrite
andspark.streaming.receiver.writeAheadLog.closeFileAfterWrite
twospark.streaming.receiver.writeAheadLog.closeFileAfterWrite
. -
Setting Spark Streaming Maximum Data Reception - If you are running a
Streaming application that does not have a lot of resources, the data
processing capacity can not keep up with the rate at which data is
received, and you can set a maximum number of received records per
second for the application. For the
spark.streaming.receiver.maxRate
Receiver mode, setspark.streaming.receiver.maxRate
, for Direct Kafka mode, setspark.streaming.kafka.maxRatePerPartition
limit the rate of data read from each Kafkaspark.streaming.kafka.maxRatePerPartition
. If a Topic has 8spark.streaming.kafka.maxRatePerpartition=100
,spark.streaming.kafka.maxRatePerpartition=100
, then the maximum received record for each batch is800
. Starting with the Spark-1.5 release, abackpressure
to avoid setting thisbackpressure
. Spark Streaming automatically calculates the current rate limit and dynamically adjusts the threshold. Turn on thebackpressure
function by settingspark.streaming.backpressure.enabled
totrue
.
- The upgraded application is started directly and executed in parallel with the existing application. In the process of running the old and new applications in parallel, will receive and deal with some of the same data.
-
Gracefully stops the running application and then launches the upgraded
application, and the new application will continue to process the data
from the old application stop.
It should be noted that in this way, the data source needs to have the
ability to cache data, otherwise the old and new applications in the
intermittent period of convergence, the data can not be processed. Such as Kafka and Flume have the ability to cache data. Also, in this case, re-
SparkStreamingContext
object from theSparkStreamingContext
oldSparkStreamingContext
is no longerSparkStreamingContext
because theSparkStreamingContext
may not contain updated code logic, which can cause an error in the program. In this case, either re-specify a checkpoint or delete the previous checkpoint.
13, monitor the application
When the Spark Streaming application is running, the Spark Web UI page will have aStreaming
tab, which will display some Streaming related parameters, such as
whether Receiver is running, how many records are received, how many
records are processed, and so on.
As well as Batch-related information, including batch execution time,
waiting time, the number of completed batch, the number of running batch
and so on. There are two time parameters inside the need to pay attention to understand some: - Processing Time - the processing time of the data in each batch
- Scheduling Delay - the current batch from the queue into the implementation of the delay time
If the processing time is longer than the batch time span, or the delay time is increasing, indicating that the system has been unable to handle the current amount of data, and this time need to consider how to reduce the processing time of each batch. How to reduce the batch processing time, you can refer to the fourth section.
In addition to monitoring page, Spark also offers StreamingListener interface, you can get to the receiver as well as batch processing times and other information through this interface.
Fourth, performance tuning
In order to make the Spark Streaming applications to run better, require some tuning settings, this section will analyze some performance tuning of parameters and set the rules. In performance tuning, mainly to consider the following two questions:- How to make full use of cluster resources to reduce the processing time for each Batch
- Batch how to set a reasonable size, so that the application can be timely processing of the received data
1, reducing the processing time for each of the Batch
The following content in Spark tuning has been introduced in some places re-emphasize here that need attention in Streaming.(1) process the received data parallelism data <br> received through a network (such as Kafka, Flume, socket, etc.), then you first need to deserialize stored in Spark. When receiving the data become the bottleneck of the system, we need to consider how to improve the system's ability to receive data. Each DStream runs a process input data stream received at a node Worker. If more than one process the received data stream is created, you can generate a plurality of input DStream. For example, for Kafka data source, if you are using a DStream receive data from two Topic in it, you can open these two Topic, received by the data receiving two separate processes. Upon receipt of the two DSTREAM receiver, it can then be combined in the two applications DStream. Such as shown in the following code
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
Note that a parameter spark.streaming.blockInterval
. For Receiver, the received data before saving to memory Spark, will converge together to form a block. Batch number in each block determines the number of such data processing task program is running. Each task number batck each receiver corresponding substantially (BATCH interval / block interval). For an example 2m
of the batch, if the block interval is 200ms
so, there will be 10 task. If the number is too small task, processing of the data will not be very efficient. In the case of a fixed batch time, if you want to increase the number of the task, then you need to reduce the blockInterval
parameter, the default value of this parameter 200ms
, the parameter official recommended lower limit 50ms
, below which 50ms
may cause other problems. Another method to improve the processing capability of data concurrency explicit repartition receive data
inputStream.repartition(<number of partitions>)
. Parallelism (2) data processing <br> for
reduceByKey
and reduceByKeyAndWindow
operation, the parallel number by the task parameters spark.default.parallelism
controlled. If
you want to increase the degree of parallelism of processing data, such
methods can be invoked when the specified parameters in parallel, or
the spark.default.parallelism
parameters adjusted according to actual situation cluster.(3) the sequence of data <br> by adjusting a sequence of parameters related to data serialization improve performance. In Streaming applications, there are two types of data need to be serialized operations.
- Input data: By default, Receiver received data
StorageLevel.MEMORY_AND_DIS_SER_2
stored in the form of Executor memory. That is, in order to reduce overhead GC, data bytes are serialized into forms, and also taking into account the executor failed FT. This data is first stored in memory, when will spill out of memory to disk. One obvious problem with this approach is that, after receiving the data Spark, first need to deserialize the data, and then follow the sequence of the Spark manner again the data. - Streaming operations persistent RDD: Streaming calculated to produce the RDD may also persist into memory. Data manipulation functions such as the window will be cached for subsequent repeated use. And Streaming applications, this data is stored in the level
StorageLevel.MEMORY_ONLY_SET
(default is Spark CoreStorageLevel.MEMORY_ONLY
). Streaming data on more than one sequence of operation, which is primarily to reduce overhead GC.
In both cases above, it may be usedKyro
way to serialize the data, while reducing the memory and CPU overhead. For further reference to the sequence of the Spark tuning . ForKyro
parameter setting mode, refer to Spark Kyro parameter settings .
Under normal circumstances, if the amount of data that needs to be cached is small, the data can be directly stored in a non-serialized form, this will not bring significant GC overhead. For example, only a few seconds BATCH time, and does not use the window function operation, the specified storage level may be displayed when persistent, lasting avoid data serialization operation on the data.
(4) improve task performance starts <br> If the number of starts per second task too much (usually more than 50), then start on the task frequently is also a loss can not be ignored. When this happens, we need to consider the Execution mode. In general, in the Spark of Standalone mode and coarse-grained Mesos mode task start time will be lower than fine-grained Mesos mode.
2, how to set the correct time interval Batch
In order to make a stable Spark Streaming applications running on the cluster, the application needs to ensure timely processing when the data is received. If the processing rate does not match with the accumulation of time, the data waiting to be processed will be more and more, eventually leading to the application can not function properly. At best batch processing time interval is less than the batch. Therefore, the correct settings Batch reasonable time interval is very important.3, memory tuning
For the Spark and use memory more details GC Spark application performance tuning in Spark tuning has been described in more detail. Here a simple analysis of some of the Spark Streaming application will use parameters.A Spark Streaming applications require much memory resource cluster, it is largely determined by the application specific logic, that need to see the application
transformations
type. When
such code is used up to 10 minutes to a window operation, it is
necessary to be able to use 10 minutes of data are stored into the
memory the amount of memory. If updateStateByKey
this operation, the data of different key
particularly, also uses more memory. If
the application logic is relatively simple, merely receiving - filter -
when storing a series of operations, the amount of memory consumption
will be significantly reduced. By default, the received data Receivers will
StorageLevel.MEMORY_AND_DISK_SER_2
level
process storage, when the memory when the spill will not fit on the
disk, but this would reduce the processing performance of the
application, the application in order to more efficient operation, much
better to assign some memory for use. Typically by a small
amount in the case of data, evaluate the amount of memory used for data,
then calculate the total amount of allocated memory size required
formal application deployment. Another aspect of the regulation of memory garbage collection is provided. Application system is a low-latency, JVM lead to the application when garbage collection pauses long run is a very nasty scene.
Here are some aspects may be used to adjust the amount of memory usage and GC properties:
- DStreams persistence level: mentioned earlier, the input data may be persisted in a default sequence of byte form. Compared with non-sequential storage, this will reduce the memory usage and reduce the burden of garbage collection. Use
Kryo
manner using the memory and the data size can be further reduced after the sequence of the sequence. We want to further reduce the amount of memory and can further increase the compression on a data parameterspark.rdd.compress
set. - Clear old data: By default, all input data and DStream through different
transformations
automatic clean up persistent data will be. Spark Streaming according totransformations
different to decide which data needs to be cleared away. For example, when a 10-minute window function, Spark Streaming saved data at least 10 minutes. The data you want to save even more time, you can setstreamingContext.remenber
parameters. - Garbage collection algorithm using CMS: CMS is especially recommended to reduce garbage collection GC pressure. By providing a driver
spark-submit
command--driver-java-options
to specify the parameters provided by the executorspark.executor.extraJavaOptions
to the specified parameters. - Other suggestions: to further reduce the burden on GC, you can use the following methods.
- Tachyon offers use of
OFF_HEAP
storage levels to persist RDDs, you can refer to the RDD Persistence - Reduce the heap size, use more executors. This reduces the pressure in each JVM GC heap.
- Tachyon offers use of
very informative blog and useful article thank you for sharing with us , keep posting learn more Big Data Hadoop Online Course Hyderabad
RépondreSupprimerI like your post very much. It is very much useful for my research. I hope you to share more info about this. Keep posting Spark Online Training Hyderabad
RépondreSupprimer