Articles

Upgrading a Running Spark Streaming Application with Code Changes with Zero-Data-Loss

Image
Upgrading a Running Spark Streaming Application with Code Changes with Zero-Data-Loss Spark Streaming is one of the most reliable (near) real time processing solutions available in the streaming world these days. It comes with ease of development , continuous evolving features with very good community support.  One of the most important concern area of any streaming solution is how it tackles failure situations like application going down due to some issue/error. Because the data is realtime and keeps on coming even while application is down, its really important to handle backlog data before processing current time data in order to avoid data loss once application is back to life. Together with Kafka , which is distributed messaging system capable of replaying past data, Spark Streaming handles failure situations quite nicely . Restart of the system is normal and not just the case of failures. Upgrading a running streaming application with code changes,c

AVRO Data Processing in Spark Streaming + Kafka

Image
For a high load RealTime Processing requirement , Kafka-Spark Streaming combination is an obvious choice today as ingestion-processing pipeline.  Adding an efficient data serialisation like AVRO to this combination can significantly reduce network traffic from Kafka broker to Spark worker to improve bandwidth usage and can ensure faster processing in Spark Executor to improve CPU resources utilisation. In current project, I recently switched from Json format input data to Avro data for the above stated reasons and can observe much better throughput. In this post, will explain how to ingest and process binary Avro data in a SparkStreaming-Kafka system along with code sample. Short Intro : Apache Avro is a data serialisation framework developed for big data distributed systems. Avro defines a Schema in advance using Json and subsequently encodes data records in compact binary format keeping the known schema in mind. Its similar to framework like Thr

Apache Spark Streaming : Logging Driver Executor logs the right way

Image
From my experience, i feel logging properly is one of the most important thing to do first when starting Spark Streaming development especially when you are running on cluster with multiple worker machines. Reason is simple : Streaming is a continuous running process and the exception/error may arrive after many hours/days and it can be because of driver or can be because of executor. It will be hard to debug the root cause as driver logs are coming in console cannot be seen after application shuts down while executor logs come in std out/err files ( i am using Mesos as cluster manager) which is tedious to download and see. So when some issue comes, like in my case an out-of-memory issue came after 2 days of running and application went down. I had to be sure whether driver or executor was the actual culprit where issue came first. So i first did this logging configuration properly before debugging the issue. Also with logging, we can control how much retention/d

Spark Streaming : Performance Tuning With Kafka and Mesos

Image
Spark Streaming : Performance Tuning With Kafka and Mesos After spending some time in Spark Streaming , i feel doing setup and coding in Spark is the easiest part of development . The challenging  but interesting part lies in tuning and stabilising the application which also takes most of the time. Its never just 1-2 fancy parameters which can make the application performant rather its the approach which helps to achieve that. In this post,based on my experience with spark 1.5.1,  will discuss how to tune performance of spark streaming on Mesos cluster with kafka for data ingestion. How to start with Tuning:  The best place to start with tuning is Spark official docs itself : http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning The doc is very well documented covering lot of cases and tuning parameters. Everyone should read the doc twice before starting. However, when you  actually start working you will be looking to t

Apache Spark : RDD vs DataFrame vs Dataset

Image
With Spark2.0 release, there are 3 types of data abstractions which Spark officially provides now to use : RDD,DataFrame and DataSet . For a new user, it might be confusing to understand relevance of each one and decide which one to use and which one not to. In this post, will discuss each one of them in detail with their differences and pros-cons. Short Combined Intro : Before i discuss each one in detail separately, want to start with a short combined intro. Evolution of these abstractions happened in this way : RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6) RDD being the oldest available from 1.0 version to Dataset being the newest available from 1.6 version. Given same data, each of the 3 abstraction will compute and give same results to user. But they differ in performance and the ways they compute. RDD lets us decide HOW we want to do which limits the optimisation Spark can do on processing underneath where as dataframe/dataset le

spark hbasefilter hbase

package  cn.deppon.Tool      import  java.util      import  scala.collection.JavaConverters._   import  org.apache.hadoop.hbase.HBaseConfiguration   import  org.apache.hadoop.hbase.client.{HTable, ResultScanner, Scan}   import  org.apache.hadoop.hbase.filter._   import  org.apache.hadoop.hbase.mapreduce.TableInputFormat   import  org.apache.hadoop.hbase.protobuf.ProtobufUtil   import  org.apache.hadoop.hbase.util.{Base64, Bytes}   import  org.apache.log4j.{Level, Logger}   import  org.apache.spark.sql.{Row, SQLContext}   import  org.apache.spark.sql.types._   import  org.apache.spark.{SparkConf, SparkContext}      /**     * Created by DreamBoy on 2017/5/12.     */    object SparkHbaseTool {         /**       * 利用主构造器构造需要的环境的基本条件       */      Logger.getLogger( "org.apache.spark" ).setLevel(Level.ERROR)      //设置spark参数      val conf =  new  SparkConf().setMaster( "local[2]" )       .setAppName( "HbaseTest" )     conf.set( "