Customize the Kryo serialized input and output API in Spark
In Spark built-in support for two serialized formats: (1), Java serialization; (2), Kryo serialization.  By default, Spark
 uses Java's ObjectOutputStream serialization framework, which supports 
all classes that inherit java.io.Serializable, although Java series is 
very flexible, but it's poor performance. 
 However, we can use the Kryo library to serialize, which is faster and 
faster than the Java serialization series (usually 10x faster than 
Java), but it does not support all serialized objects and requires the 
user to register classes. 
In Spark, it is more sensible to use Kryo series to use Java serialization. In the case of shuffling and caching large amounts of data, it is very important to use Kryo series.
Although Kryo supports the RDD cache and shuffle, but in the Spark is not built to show the use of Kryo to provide data series to disk input and output API, RDD in the saveAsObjectFile and SparkContext objectFile method only supports the use of Java serialization. So if we can use Kryo series will get great!
In this article, I will discuss how to customize the Kryo serial output output related API to read and write data to disk.
 The function rdd in this function is the data we need to write; path is the path where the data is saved. 
 
 All objectFiles will be saved on HDFS, we will traverse each fragment in RDD, and then convert them to Byte arrays. 
 For each splitArray, we first created the kryo instance, which is 
thread insecure, so we created it separately in each map operation.  When we call 
 Once we have the kryo instance, we can create the kryo output object, 
and then we write the class information and the object to that output 
object. 
 When we create byteWritable, we wrap the bytearray and save it as a Sequence file.  Using the code we can write Kryo objects to disk.  The complete code is as follows: 
 The above steps and write steps are very similar, but here we use the input, rather than output.  We read the bytes data from the BytesWritable and then deserialize the data using the readClassAndObject API. 
In Spark, it is more sensible to use Kryo series to use Java serialization. In the case of shuffling and caching large amounts of data, it is very important to use Kryo series.
Although Kryo supports the RDD cache and shuffle, but in the Spark is not built to show the use of Kryo to provide data series to disk input and output API, RDD in the saveAsObjectFile and SparkContext objectFile method only supports the use of Java serialization. So if we can use Kryo series will get great!
In this article, I will discuss how to customize the Kryo serial output output related API to read and write data to disk.
  Article directory 
Write data
Normally, we use therdd.saveAsObjectFile API to write a series of objects to disk.  The saveAsObjectFile code shows how to use our custom saveAsObjectFile method to write kryo serialized objects to disk: | defsaveAsObjectFile[T:ClassTag](rdd:RDD[T], path:String) | 
| valkryoSerializer =newKryoSerializer(rdd.context.getConf) | 
KryoSerializer is a class provided KryoSerializer by Spark to provide KryoSerializer for Kryo.  In the above code, we created the KryoSerializer object and fetched the incoming cache size from rdd.context.getConf . | rdd.mapPartitions(iter => iter.grouped(10)      .map(_.toArray))      .map(splitArray => {} | 
| valkryo =kryoSerializer.newKryo() | 
kryoSerializer.newKryo() to create a new kryo instance, he also calls our custom KryoRegistrator. | //create output stream and plug it to the kryo outputvalbao =newByteArrayOutputStream()valoutput =kryoSerializer.newKryoOutput()output.setOutputStream(bao)kryo.writeClassAndObject(output, splitArray)output.close() | 
| valbyteWritable =newBytesWritable(bao.toByteArray)      (NullWritable.get(), byteWritable)    }).saveAsSequenceFile(path) | 
| /** * User: 过往记忆 * Date: 15-04-24 * Time: 上午07:24 * bolg: https://www.iteblog.com * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */  defsaveAsObjectFile[T:ClassTag](rdd:RDD[T], path:String) {    valkryoSerializer =newKryoSerializer(rdd.context.getConf)    rdd.mapPartitions(iter => iter.grouped(10)      .map(_.toArray))      .map(splitArray => {      //initializes kyro and calls your registrator class      valkryo =kryoSerializer.newKryo()      //convert data to bytes      valbao =newByteArrayOutputStream()      valoutput =kryoSerializer.newKryoOutput()      output.setOutputStream(bao)      kryo.writeClassAndObject(output, splitArray)      output.close()      // We are ignoring key field of sequence file      valbyteWritable =newBytesWritable(bao.toByteArray)      (NullWritable.get(), byteWritable)    }).saveAsSequenceFile(path)  } | 
read
It is still not perfect for us to write. Usually we use the objectFile API in the sparkContext to read data from the disk, where we use the custom objectFile API to read the Kryo object file.| defobjectFile[T](sc:SparkContext, path:String, minPartitions:Int =1)    (implicitct:ClassTag[T]) ={    valkryoSerializer =newKryoSerializer(sc.getConf)    sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable],       minPartitions)       .flatMap(x => {       valkryo =kryoSerializer.newKryo()       valinput =newInput()       input.setBuffer(x._2.getBytes)       valdata =kryo.readClassAndObject(input)       valdataObject =data.asInstanceOf[Array[T]]       dataObject    })  } | 
how to use
The following example uses the two methods described above to serialize and deserialize the Person object:| /** * User: 过往记忆 * Date: 15-04-24 * Time: 上午07:24 * bolg: https://www.iteblog.com * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */// user defined class that need to serialized  classPerson(valname:String) defmain(args:Array[String]) {    if(args.length < 1) {      println("Please provide output path")      return    }    valoutputPath =args(0)    valconf =newSparkConf().setMaster("local").setAppName("kryoexample")    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")    valsc =newSparkContext(conf)    //create some dummy data    valpersonList =1to 10000map (value => newPerson(value + ""))    valpersonRDD =sc.makeRDD(personList)    saveAsObjectFile(personRDD, outputPath)    valrdd =objectFile[Person](sc, outputPath)    println(rdd.map(person => person.name).collect().toList)  } | 
Commentaires
Enregistrer un commentaire