Spark kyro Serialization

Serialization plays an important role in distributed systems. Optimizing Spark programs is the first to optimize the serialization approach. Spark provides the user with two serialization methods:
Java serialization: the default serialization method.
Kryo serialization: Compared to Java serialization, faster, space is smaller, but does not support all the serialization format, while using the need to register class. Spark-sql is the default use of kyro serialization.
The following will explain the use of kryo and compare performance.

Configuration

You can set the global spark-default.conf in spark-default.conf or set conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") to SparkConf when the code is conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") also applies to the machine Shuffle operation between the data and serialize rdd to disk, memory.
Spark does not set Kyro as the default serialization because it requires registration of classes, and it is strongly recommended that kyro serialization be used in a number of network data transfer applications.
 
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1],classOf[MyClass2]))
val sc = new SparkContext(conf)val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1],classOf[MyClass2]))
val sc = new SparkContext(conf)
 
If you want to spark.kryoserializer.buffer the object is spark.kryoserializer.buffer large, you can increase the spark.kryoserializer.buffer set the value.
If you do not need to register the serialization of the class, Kyro can still work as usual, but will store each object's full class name (full class name), this use is often more than the default Java serialization also waste more space.
You can set the spark.kryo.registrationRequired to true , the use of kyro if the spark.kryo.registrationRequired in the class will be given an error:
这里写图片描述
The above error needs to be added
sparkConf.registerKryoClasses(
    Array(classOf[scala.collection.mutable.WrappedArray.ofRef[_]],
    classOf[MyClass]))
The following demo will demonstrate the different ways of serializing the space occupied by the situation.

DEMO

case class Info(name: String ,age: Int,gender: String,addr: String)

object KyroTest {
  def main(args: Array[String]) {

  val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
      conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      conf.registerKryoClasses(Array(classOf[Info]))
  val sc = new SparkContext(conf)

  val arr = new ArrayBuffer[Info]()

  val nameArr = Array[String]("lsw","yyy","lss")
  val genderArr = Array[String]("male","female")
  val addressArr = Array[String]("beijing","shanghai","shengzhen","wenzhou","hangzhou")

  for(i <- 1 to 1000000){
    val name = nameArr(Random.nextInt(3))
    val age = Random.nextInt(100)
    val gender = genderArr(Random.nextInt(2))
    val address = addressArr(Random.nextInt(5))
    arr.+=(Info(name,age,gender,address))
    }

  val rdd = sc.parallelize(arr)

  //序列化的方式将rdd存到内存
  rdd.persist(StorageLevel.MEMORY_ONLY_SER)
  rdd.count()
  }
}

result

Can be seen in the web ui cache rdd size:
这里写图片描述
Serialization mode Whether to register Space occupied
Kyro Yes 21.1 MB
Kyro no 38.3 MB
Java no 25.1 MB

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization