Customize the Kryo serialized input and output API in Spark
In Spark built-in support for two serialized formats: (1), Java serialization; (2), Kryo serialization. By default, Spark
uses Java's ObjectOutputStream serialization framework, which supports
all classes that inherit java.io.Serializable, although Java series is
very flexible, but it's poor performance.
However, we can use the Kryo library to serialize, which is faster and
faster than the Java serialization series (usually 10x faster than
Java), but it does not support all serialized objects and requires the
user to register classes.
In Spark, it is more sensible to use Kryo series to use Java serialization. In the case of shuffling and caching large amounts of data, it is very important to use Kryo series.
Although Kryo supports the RDD cache and shuffle, but in the Spark is not built to show the use of Kryo to provide data series to disk input and output API, RDD in the saveAsObjectFile and SparkContext objectFile method only supports the use of Java serialization. So if we can use Kryo series will get great!
In this article, I will discuss how to customize the Kryo serial output output related API to read and write data to disk.
The function rdd in this function is the data we need to write; path is the path where the data is saved.
All objectFiles will be saved on HDFS, we will traverse each fragment in RDD, and then convert them to Byte arrays.
For each splitArray, we first created the kryo instance, which is
thread insecure, so we created it separately in each map operation. When we call
Once we have the kryo instance, we can create the kryo output object,
and then we write the class information and the object to that output
object.
When we create byteWritable, we wrap the bytearray and save it as a Sequence file. Using the code we can write Kryo objects to disk. The complete code is as follows:
The above steps and write steps are very similar, but here we use the input, rather than output. We read the bytes data from the BytesWritable and then deserialize the data using the readClassAndObject API.
In Spark, it is more sensible to use Kryo series to use Java serialization. In the case of shuffling and caching large amounts of data, it is very important to use Kryo series.
Although Kryo supports the RDD cache and shuffle, but in the Spark is not built to show the use of Kryo to provide data series to disk input and output API, RDD in the saveAsObjectFile and SparkContext objectFile method only supports the use of Java serialization. So if we can use Kryo series will get great!
In this article, I will discuss how to customize the Kryo serial output output related API to read and write data to disk.
Article directory
Write data
Normally, we use therdd.saveAsObjectFile
API to write a series of objects to disk. The saveAsObjectFile
code shows how to use our custom saveAsObjectFile
method to write kryo serialized objects to disk: def saveAsObjectFile[T : ClassTag](rdd : RDD[T], path : String) |
val kryoSerializer = new KryoSerializer(rdd.context.getConf) |
KryoSerializer
is a class provided KryoSerializer
by Spark to provide KryoSerializer
for Kryo. In the above code, we created the KryoSerializer
object and fetched the incoming cache size from rdd.context.getConf
. rdd.mapPartitions(iter = > iter.grouped( 10 ) .map( _ .toArray)) .map(splitArray = > {} |
val kryo = kryoSerializer.newKryo() |
kryoSerializer.newKryo()
to create a new kryo instance, he also calls our custom KryoRegistrator. //create output stream and plug it to the kryo output val bao = new ByteArrayOutputStream() val output = kryoSerializer.newKryoOutput() output.setOutputStream(bao) kryo.writeClassAndObject(output, splitArray) output.close() |
val byteWritable = new BytesWritable(bao.toByteArray) (NullWritable.get(), byteWritable) }).saveAsSequenceFile(path) |
/** * User: 过往记忆 * Date: 15-04-24 * Time: 上午07:24 * bolg: https://www.iteblog.com * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ def saveAsObjectFile[T : ClassTag](rdd : RDD[T], path : String) { val kryoSerializer = new KryoSerializer(rdd.context.getConf) rdd.mapPartitions(iter = > iter.grouped( 10 ) .map( _ .toArray)) .map(splitArray = > { //initializes kyro and calls your registrator class val kryo = kryoSerializer.newKryo() //convert data to bytes val bao = new ByteArrayOutputStream() val output = kryoSerializer.newKryoOutput() output.setOutputStream(bao) kryo.writeClassAndObject(output, splitArray) output.close() // We are ignoring key field of sequence file val byteWritable = new BytesWritable(bao.toByteArray) (NullWritable.get(), byteWritable) }).saveAsSequenceFile(path) } |
read
It is still not perfect for us to write. Usually we use the objectFile API in the sparkContext to read data from the disk, where we use the custom objectFile API to read the Kryo object file.def objectFile[T](sc : SparkContext, path : String, minPartitions : Int = 1 ) ( implicit ct : ClassTag[T]) = { val kryoSerializer = new KryoSerializer(sc.getConf) sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x = > { val kryo = kryoSerializer.newKryo() val input = new Input() input.setBuffer(x. _ 2 .getBytes) val data = kryo.readClassAndObject(input) val dataObject = data.asInstanceOf[Array[T]] dataObject }) } |
how to use
The following example uses the two methods described above to serialize and deserialize the Person object:/** * User: 过往记忆 * Date: 15-04-24 * Time: 上午07:24 * bolg: https://www.iteblog.com * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ // user defined class that need to serialized class Person( val name : String) def main(args : Array[String]) { if (args.length < 1 ) { println( "Please provide output path" ) return } val outputPath = args( 0 ) val conf = new SparkConf().setMaster( "local" ).setAppName( "kryoexample" ) conf.set( "spark.serializer" , "org.apache.spark.serializer.KryoSerializer" ) val sc = new SparkContext(conf) //create some dummy data val personList = 1 to 10000 map (value = > new Person(value + "" )) val personRDD = sc.makeRDD(personList) saveAsObjectFile(personRDD, outputPath) val rdd = objectFile[Person](sc, outputPath) println(rdd.map(person = > person.name).collect().toList) } |
Commentaires
Enregistrer un commentaire