Upgrading a Running Spark Streaming Application with Code Changes with Zero-Data-Loss
Upgrading a Running Spark Streaming Application with Code Changes with Zero-Data-Loss
Spark Streaming is one of the most reliable (near) real time processing solutions available in the streaming world these days. It comes with ease of development , continuous evolving features with very good community support.
One of the most important concern area of any streaming
solution is how it tackles failure situations like application going
down due to some issue/error. Because the data is realtime and keeps on
coming even while application is down, its really important to handle
backlog data before processing current time data in order to avoid data
loss once application is back to life. Together with Kafka , which is
distributed messaging system capable of replaying past data, Spark
Streaming handles failure situations quite nicely .
Restart of the system is normal and not just the case of failures. Upgrading a running streaming application with code changes,configuration changes,new features,etc is needed time to time . It requires graceful shutdown of the streaming application , deployment of changes and then restart of the application to continue from the same point where it had left.
Restart of the system is normal and not just the case of failures. Upgrading a running streaming application with code changes,configuration changes,new features,etc is needed time to time . It requires graceful shutdown of the streaming application , deployment of changes and then restart of the application to continue from the same point where it had left.
There are quite some challenges involved in doing such upgradations compared to failure events.
In this post, want to discuss my experience and learning dealing with the same.
Background : Fault Tolerance, Checkpointing, DirectKafka
Its always better to start first with some background before jumping directly to the problem statement itself.
Fault tolerance means dealing with failure scenarios
where application goes down and has to be restarted for processing.
Upgrading means intentional restart of the system to deploy new changes .
One common thing between both situations is that both need System
Restart . But the caveat is both cannot be done same way.
For Fault Tolerance, Spark Streaming came up with Checkpointing : https://spark.apache.org/docs/1.5.1/streaming-programming-guide.html#checkpointing
Checkpointing keeps on persisting necessary information
of the streaming system like metadata and state data to reliable
storage system (disk/hdfs/S3) at regular intervals. Using Kafka with
DirectKafka approach for ingestion makes the system robust as it avoids
storing actual data and stores just the Kafka offsets which can be used
to replay the kafka messages post restart. (Unlike inefficient receiver
based approach, more details at : http://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html )
For enabling Checkpointing and DirectKafka, user needs to define a checkpoint directory in code , call KafkaUtils.createDirectStream API to get kafka messages and then call StreamingContext.getOrCreate API to get context either from the checkpoint directory or create a new one.
The code for Checkpointing + DirectKafka looks like this :
The code can be accessed at my github repo : https://github.com/cpbaranwal/Spark-Streaming-DirectKafka-Examples/blob/master/src/main/scala/DirectKafkaDefaultExample.scala
With the above minimal code change, Spark Streaming can
give Exactly-Once processing provided the nature of processing is
idempotent or like atomic transaction . For example in my case, the
processing nature was idempotent as the stream processing was doing
computation and then doing insert/update in database based on primary
key.
Challenge with Upgradation :
The reason i started with that much details on Fault
Tolerance and Checkpointing is because many people while trying to do
Upgradation first time in running Spark Streaming can be mistaken. I did
the initial mistake myself by assuming I can just stop the application
gracefully, deploy the new jar with code/configuration changes and
checkpointing should take care of the rest. This is a wrong assumption.
The code changes may/may not be reflected or even worse some exception
can come. The configuration changes will not be picked up at all.
Saw similar questions on stackOverFlow where people were trying to use checkpointing for upgradation :
Saw similar questions on stackOverFlow where people were trying to use checkpointing for upgradation :
Its important to understand that sole objective of
Checkpointing is fault tolerance. When we enable checkpointing, it
basically means that Spark Streaming will save the Kafka Offsets as well
as entire metadata like configuration changes and stateful data like updateStateByKey
in persistent storage . So on restart, Kafka partition Offsets will be
fetched from storage and executors will ask Kafka brokers to start
replaying data from those offsets. But at the same time, SparkConf object will also be built by deserialising the saved metadata from the disk ( check StreamingContext.getOrCreate() method implementation: https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
) . Streaming Context will be created using the same deserialised
SparkConf object . Once streaming context is up, no changes can be done
in SparkConf to be that can reflect in streaming context. So basically
updated configuration changes can not be considered.
In short, we can say that we can rely on checkpointing for Failure Restart but we cannot rely on checkpointing for Upgradation Restart.
In short, we can say that we can rely on checkpointing for Failure Restart but we cannot rely on checkpointing for Upgradation Restart.
Solution for Upgradation :
So How can we do Upgradation without checkpointing ?
Easiest way is to delete the checkpoint directory
itself before deployment so that fresh streaming context is created. But
its not a good solution as kafka offsets will also be deleted and then
data from kafka will be fetched either from the latest offset or the
smallest offset in kafka broker depending upon auto.offset.reset parameter. Either it will mean hell lot of re-computations or substantial data loss which might not be acceptable.
So How can we do Upgradation efficiently with Zero-Data-Loss Exactly-Once semantics ?
The Spark documentation (https://spark.apache.org/docs/1.5.1/streaming-programming-guide.html#upgrading-application-code) suggests 2 approaches :
Start new application in parallel with the old one and after sometime bring down the old one.
Shutdown the old application gracefully and start new
one from the same point where it left with help of source-side buffering
support from Kafka.
The 1st solution is not practically feasible in
resource limited environments. Also too much data might get doubly
processed in parallel systems to avoid zero data loss. In short, a bad
solution.
The 2nd solution is the correct approach but there is
no concrete solution example given in the documentation. It basically
means to write API for custom management of Kafka offsets for persisting
while processing and reading during a restart .
I am sharing here code for 2nd solution which uses overridden KafkaUtils.createDirectStream API.
We also need to configure light weight Zookeeper
which will be used for persisting and reading Kafka offsets reliably.
In my case, writing kafka offsets of all the partitions to zookeeper was
taking around 10-20 milliseconds which was quite acceptable.
Configuring Zookeeper is very easy : https://open.mesosphere.com/advanced-course/installing-zookeeper/
The code can be accessed at my github repo : https://github.com/cpbaranwal/Spark-Streaming-DirectKafka-Examples/blob/master/src/main/scala/CustomDirectKafkaExample.scala
Code Explanation :
- Create a zookeeper client with necessary details.
- Try reading kafka offset from Zookeeper. If its first time and there is no kafka offset in zookeeper, default KafkaUtils.createDirectStream API will be called . Data from latest kafka offsets for topic partitions will be fetched from Kafka brokers .
- Save the starting offsets for each RDD partition in each batch of the stream right away to zookeeper before starting processing for the corresponding batch. Its important to know that Kafka topic partition is one to one mapped with Spark RDD partition.
- On subsequent application restarts, starting offsets will be retrieved from zookeeper just like it used to be fetched from checkpoint directory but this time, without any additional metadata info. Streaming Context will be created with the fresh SparkConf object and all code/configuration changes will be considered.
However this solution too has limitations which should be kept in mind:
- Cannot be used if state management like updateSateByKey functionality is required. Because these additional info need to be saved essentially means checkpointing . In my case, state management was not needed so this solution fitted quite well.
- Solution for this also seems possible by having a flag at start which can decide whether to use checkpointing or custom offset management. But I have not tried it personally as it was not needed in my case.
- Additional persistent system like zookeeper comes in picture and needs to be configured.
- Cannot be used if Kafka topics/number of partitions change as offsets of each spark partition is one to one mapped with kafka partition.
- Can only be used with advanced ingestion system like Kafka,flume with source side buffering support.
References:
http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html
https://spark.apache.org/docs/1.5.1/streaming-programming-guide.html#upgrading-application-code
https://spark.apache.org/docs/1.5.1/streaming-programming-guide.html#checkpointing
http://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
https://spark.apache.org/docs/1.5.1/streaming-programming-guide.html#upgrading-application-code
https://spark.apache.org/docs/1.5.1/streaming-programming-guide.html#checkpointing
http://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
Commentaires
Enregistrer un commentaire