Customize the Kryo serialized input and output API in Spark

In Spark built-in support for two serialized formats: (1), Java serialization; (2), Kryo serialization. By default, Spark uses Java's ObjectOutputStream serialization framework, which supports all classes that inherit java.io.Serializable, although Java series is very flexible, but it's poor performance. However, we can use the Kryo library to serialize, which is faster and faster than the Java serialization series (usually 10x faster than Java), but it does not support all serialized objects and requires the user to register classes.
In Spark, it is more sensible to use Kryo series to use Java serialization. In the case of shuffling and caching large amounts of data, it is very important to use Kryo series.
Although Kryo supports the RDD cache and shuffle, but in the Spark is not built to show the use of Kryo to provide data series to disk input and output API, RDD in the saveAsObjectFile and SparkContext objectFile method only supports the use of Java serialization. So if we can use Kryo series will get great!
In this article, I will discuss how to customize the Kryo serial output output related API to read and write data to disk.
Article directory

Write data

Normally, we use the rdd.saveAsObjectFile API to write a series of objects to disk. The saveAsObjectFile code shows how to use our custom saveAsObjectFile method to write kryo serialized objects to disk:
def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String)
The function rdd in this function is the data we need to write; path is the path where the data is saved.
val kryoSerializer = new KryoSerializer(rdd.context.getConf)
KryoSerializer is a class provided KryoSerializer by Spark to provide KryoSerializer for Kryo. In the above code, we created the KryoSerializer object and fetched the incoming cache size from rdd.context.getConf .
rdd.mapPartitions(iter => iter.grouped(10)
      .map(_.toArray))
      .map(splitArray => {}
All objectFiles will be saved on HDFS, we will traverse each fragment in RDD, and then convert them to Byte arrays.
val kryo = kryoSerializer.newKryo()
For each splitArray, we first created the kryo instance, which is thread insecure, so we created it separately in each map operation. When we call kryoSerializer.newKryo() to create a new kryo instance, he also calls our custom KryoRegistrator.
//create output stream and plug it to the kryo output
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
Once we have the kryo instance, we can create the kryo output object, and then we write the class information and the object to that output object.
val byteWritable = new BytesWritable(bao.toByteArray)
      (NullWritable.get(), byteWritable)
    }).saveAsSequenceFile(path)
When we create byteWritable, we wrap the bytearray and save it as a Sequence file. Using the code we can write Kryo objects to disk. The complete code is as follows:
/**
 * User: 过往记忆
 * Date: 15-04-24
 * Time: 上午07:24
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
 
  def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
    val kryoSerializer = new KryoSerializer(rdd.context.getConf)
 
    rdd.mapPartitions(iter => iter.grouped(10)
      .map(_.toArray))
      .map(splitArray => {
      //initializes kyro and calls your registrator class
      val kryo = kryoSerializer.newKryo()
 
      //convert data to bytes
      val bao = new ByteArrayOutputStream()
      val output = kryoSerializer.newKryoOutput()
      output.setOutputStream(bao)
      kryo.writeClassAndObject(output, splitArray)
      output.close()
 
      // We are ignoring key field of sequence file
      val byteWritable = new BytesWritable(bao.toByteArray)
      (NullWritable.get(), byteWritable)
    }).saveAsSequenceFile(path)
  }

read

It is still not perfect for us to write. Usually we use the objectFile API in the sparkContext to read data from the disk, where we use the custom objectFile API to read the Kryo object file.
def objectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)
    (implicit ct: ClassTag[T]) = {
    val kryoSerializer = new KryoSerializer(sc.getConf)
    sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable],
       minPartitions)
       .flatMap(x => {
       val kryo = kryoSerializer.newKryo()
       val input = new Input()
       input.setBuffer(x._2.getBytes)
       val data = kryo.readClassAndObject(input)
       val dataObject = data.asInstanceOf[Array[T]]
       dataObject
    })
  }
The above steps and write steps are very similar, but here we use the input, rather than output. We read the bytes data from the BytesWritable and then deserialize the data using the readClassAndObject API.

how to use

The following example uses the two methods described above to serialize and deserialize the Person object:
/**
 * User: 过往记忆
 * Date: 15-04-24
 * Time: 上午07:24
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
 
// user defined class that need to serialized
  class Person(val name: String)
 
 def main(args: Array[String]) {
 
    if (args.length < 1) {
      println("Please provide output path")
      return
    }
    val outputPath = args(0)
 
    val conf = new SparkConf().setMaster("local").setAppName("kryoexample")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
 
    //create some dummy data
    val personList = 1 to 10000 map (value => new Person(value + ""))
    val personRDD = sc.makeRDD(personList)
 
    saveAsObjectFile(personRDD, outputPath)
    val rdd = objectFile[Person](sc, outputPath)
    println(rdd.map(person => person.name).collect().toList)
  }

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization