Spark Streaming : Performance Tuning With Kafka and Mesos
Spark Streaming : Performance Tuning With Kafka and Mesos
After spending some time in Spark Streaming , i feel doing setup and
coding in Spark is the easiest part of development . The challenging
but interesting part lies in tuning and stabilising the application
which also takes most of the time. Its never just 1-2 fancy parameters
which can make the application performant rather its the approach which
helps to achieve that. In this post,based on my experience with spark
1.5.1, will discuss how to tune performance of spark streaming on Mesos
cluster with kafka for data ingestion.
How to start with Tuning:
The best place to start with tuning is Spark official docs itself :
The doc is very well documented covering lot of cases and tuning parameters. Everyone should read the doc twice before starting.
However, when you actually start working you
will be looking to tune parameters which are relevant to your use case
only . Sometimes lot of parameters might confuse you as well as not all
parameters will be relevant to your set up. I will discuss them in
detail explaining wherever possible which one when to use and when not
to use.
Doing the Tuning :
Its almost impossible to get the expected throughput with all parameters
correctly tuned in first attempt. It takes some patience and
perseverance to arrive at correct configurations. Its better to start
with incremental tuning. Which means, write your code and run first time
with all default configurations but keep the parameters suggested in
spark doc in mind . Keep observing Spark UI. Obviously things will go
wrong in initial attempts like some memory issue,higher processing
time,sceduling delay,etc will come.
At those times, logging properly in file will come handy to debug root cause so its essential to have proper logging in place beforehand (for proper logging, read my last post). Start tuning parameters one by one and keep observing. You will come to know which parameters are important and which are not.
At those times, logging properly in file will come handy to debug root cause so its essential to have proper logging in place beforehand (for proper logging, read my last post). Start tuning parameters one by one and keep observing. You will come to know which parameters are important and which are not.
I will be explaining one by one as per my experience.
Kafka Ingestion Approach :
There are 2 approaches to ingest data from kafka: 1. Receiver based 2. Direct kafka
The Direct Kafka approach is new and we should prefer it over Receiver based approach for better efficiency,parallelism and exactly-once semantics. I chose Direct Kafka with default checkpointing (of offsets in a specified directory for recovery from failures) as my processing had idempotent behaviour which means repeated updates for same set of records in database for few offsets in case of application restart don't have any side-effects (for details read : http://spark.apache.org/docs/latest/streaming-kafka-integration.html)
The Direct kafka approach keeps 1-1 mapping of each kafka partition to RDD partition in streaming processing. So its better to have total cores less than total partitions (noOfTopics*partitionsPerTopic > totalCoresInCluster) so that all cpu cores are fully exploited. Ideally the spark doc suggests total no of partitions should be around 2-3 times of number of cores depending on the processing. Also to reduce network calls between kafka broker and consumer, we can pass many log lines together in one kafka message . For example, each kafka message can contain 100 log lines which we can split once received inside spark before doing the actual processing.
The Direct Kafka approach is new and we should prefer it over Receiver based approach for better efficiency,parallelism and exactly-once semantics. I chose Direct Kafka with default checkpointing (of offsets in a specified directory for recovery from failures) as my processing had idempotent behaviour which means repeated updates for same set of records in database for few offsets in case of application restart don't have any side-effects (for details read : http://spark.apache.org/docs/latest/streaming-kafka-integration.html)
The Direct kafka approach keeps 1-1 mapping of each kafka partition to RDD partition in streaming processing. So its better to have total cores less than total partitions (noOfTopics*partitionsPerTopic > totalCoresInCluster) so that all cpu cores are fully exploited. Ideally the spark doc suggests total no of partitions should be around 2-3 times of number of cores depending on the processing. Also to reduce network calls between kafka broker and consumer, we can pass many log lines together in one kafka message . For example, each kafka message can contain 100 log lines which we can split once received inside spark before doing the actual processing.
Batch Interval Parameter :
Start with some intuitive batch interval say 5 or 10
seconds. Try to play around the parameter trying different values and
observe the spark UI. Will get idea what batch interval gives faster
processing time. For example, in my case 15 seconds suited my
processing.
val ssc = new StreamingContext(sc, Seconds(15))
There is another parameter called blockInterval
(default -200ms) which i found confusing initially. This parameter is
relevant for receiver based approach in kafka and not the direct
approach. it determines the parallelism in receiver based approach, so
better to avoid this parameter if using direct kafka.
sparkConf.set("spark.streaming.blockInterval", "200ms")
ConcurrentJobs Parameter :
Next i checked whether all cores were being properly
exploited (top command and mesos slave UI) and found they were
under-utilised. I found a very useful parameter which significantly
increased the processing.
sparkConf.set("spark.streaming.concurrentJobs","4")
By default the number of concurrent jobs is 1 which means at a time only 1 job will be active and till its not finished,other jobs will be queued up even if the resources are available and idle. However this parameter is intentionally not documented in Spark docs as sometimes it may cause weird behaviour as Spark Streaming creator Tathagata discussed in this useful thread : http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Tune it accordingly keeping side-effects in mind. Running concurrent jobs brings down the overall processing time and scheduling delay even if a batch takes processing time slightly more than batch interval.
By default the number of concurrent jobs is 1 which means at a time only 1 job will be active and till its not finished,other jobs will be queued up even if the resources are available and idle. However this parameter is intentionally not documented in Spark docs as sometimes it may cause weird behaviour as Spark Streaming creator Tathagata discussed in this useful thread : http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Tune it accordingly keeping side-effects in mind. Running concurrent jobs brings down the overall processing time and scheduling delay even if a batch takes processing time slightly more than batch interval.
maxRatePerPartition Parameter :
Still for some batches you might observe very long processing time . It might happen because of the reason that a batch processing cannot be faster than the slowest partition it has.
A particular partition can contain significantly more data compared to
other partitions in its batch and so it will take longer time to get
processed than it peer partitions which means it will alone increase the
batch processing time even though other partitions are processed. In
worst case, it might happen that all the concurrent jobs are taking
longer processing time because of few skewed partitions while new jobs
are queued up and cpu cores are sitting idle. To avoid this, always use
this parameter to limit the maximum rate of messages/sec in a partition
.
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", “25”)
So with batch interval of 10 sec, the above parameter
with value 25 will allow a partition to have maximum 25*10=250 messages.
Tune the maximum rate at which your application can process without
inducing delay.
Uniform Data Distribution:
One important thing is that we should be aware of our
kafka topics/partitions very well. And sometimes it might happen that
some kafka topics/partitions send data at very high rate while some send
at very low. Such scenarios if possible should be avoided as skewed
data distribution in any distributed environment leads to weird
behaviours. Its ideal to have uniform data rate in all partitions. For
example in my case, i was consuming data from many kafka topics and
nearly half of them were sending data at nearly 10 times the rate of the
other slower half. And i had no control of modifying kafka eco-system
as it was used by many other applications. So instead, i tried running 2
separate spark streaming jobs on same cluster, one for slower topics
and another for faster topics and assigned different maxRatePerPartition
to them. It clearly showed performance improvement. But it will also
mean that we have to maintain 2 similar code base .
Mesos Parameters :
As I started running 2 streaming app on the same Mesos
cluster, it was needed to assign maximum cores to each application
otherwise Mesos by default assigns all the cores to the first streaming
app submitted . In that case, Mesos will not allow the 2nd app to run
saying not enough resources available to offer. Assign max cores to each
application after tuning.
sparkConf.set("spark.cores.max", “40")
Also for streaming application, its advisable to run Mesos in coarse mode instead of fine grained mode.
sparkConf.set("spark.mesos.coarse", “true")
Its important to know that Mesos in coarse mode allows
only 1 executor to run on each node machine (unlike Yarn) and so below
parameters executor cores and executor instances are irrelevant in
Mesos.
//sparkConf.set("spark.executor.instances", "4")
//sparkConf.set("spark.executor.cores", “10")
Read more about Spark on Mesos : http://spark.apache.org/docs/latest/running-on-mesos.html
Memory Parameters :
We should tune driver and executor memory keeping machine configuration and processing needs in mind.
sparkConf.set("spark.driver.memory", "8g")
sparkConf.set("spark.executor.memory", “15g")
The executor memory determines the memory assigned for
each executor and since Mesos allows only 1 executor per machine, we can
keep it relatively higher as per machine RAM (unlike Yarn where
multiple executors runs on same machine and each executor will have its
own separate memory)
Its good to unpersist the dstream as soon as processing
ends for the associated batch if you don't need to keep data in memory
post processing.
sparkConf.set(“spark.streaming.unpersist","true")
Also don't call the persist() method on dstream if the
same RDD is not going to be used multiple times in processing otherwise
it will significantly increase memory consumption unnecessarily.
There are some other memory parameters like
spark.memory.fraction,spark.memory.storageFraction,etc which are
recommended at default value. They should be changed only after reading :
http://spark.apache.org/docs/latest/tuning.html#memory-management-overview
Backpressure Parameter :
Spark gives a very powerful feature called backpressure
. Having this property enabled means spark streaming will tell kafka to
slow down rate of sending messages if the processing time is coming
more than batch interval and scheduling delay is increasing. Its helpful
in cases like when there is sudden surge in data flow and is a must
have property to have in production to avoid application being over
burdened. However this property should be disabled during development
and staging phase otherwise we cannot test the limit of the maximum load
our application can and should handle.
sparkConf.set("spark.streaming.backpressure.enabled","true")
Additional Parameters :
In addition to above parameters, most of times, its
suggested to use kryo seralizer and G1GC for garbage collection in
driver/executor. I too have used both although i did not notice as such
difference possibly because my processing does not involve shuffling
currently:
sparkConf.set(“spark.serializer”,"org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.driver.extraJavaOptions", “-XX:+UseG1GC ”)
sparkConf.set("spark.executor.extraJavaOptions", “-XX:+UseG1GC”)
Spark Streaming on Mesos with Kafka |
Summary in bullet points :
- If using Kafka, choose Direct Kafka approach.
- Maintain around 2* totalCoresInCluster < noOfTopics*partitionsPerTopic < 3*totalCoresInCluster
- Set batch interval after hit and trial . Avoid blockInterval.
- Use spark.streaming.concurrentJobs to set concurrent jobs to process
- Use spark.streaming.kafka.maxRatePerPartition to limit the incoming data rate/sec per partition
- Assign driver and executor memory properly to avoid memory issues.
- Ensure uniform data rate among kafka partitions . Else can try multiple streaming apps.
- Configure Mesos parameters properly.
- Set spark.streaming.unpersist to true
- Set spark.streaming.backpressure.enabled to true in production.
- Use KryoSerializer and G1GC as garbage collector.
The discussion above is based on my experience with
Direct Kafka approach on Mesos cluster. Please feel free to comment if
you come across something i missed, will be glad to add.
Happy Streaming....!!
Commentaires
Enregistrer un commentaire