object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable)

In the use of Spark operation Hbase, the return of the data type is RDD [ImmutableBytesWritable, Result], we may be on the results of other operations, such as join, but because org.apache.hadoop.hbase.io.ImmutableBytesWritable and Org.apache.hadoop.hbase.client.Result does not implement the java.io.Serializable interface, the program may run in the process of the following exceptions:

Serialization stack:
  - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 30 30 30 30 30 30 32 34 32 30 32 37 37 32 31)
  - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
  - object (class scala.Tuple2, (30 30 30 30 30 30 32 34 32 30 32 37 37 32 31,keyvalues={00000011020Winz59XojM111/f:iteblog/1470844800000/Put/vlen=2/mvcc=0}))
  - element of array (index: 0)
  - array (class [Lscala.Tuple2;, size 10); not retrying
17/03/16 16:07:48 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
  - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 30 30 30 30 30 30 32 34 32 30 32 37 37 32 31)
  - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
  - object (class scala.Tuple2, (30 30 30 30 30 30 32 34 32 30 32 37 37 32 31,keyvalues={00000011020Winz59XojM111/f:iteblog/1470844800000/Put/vlen=2/mvcc=0}))
  - element of array (index: 0)
  - array (class [Lscala.Tuple2;, size 10)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
  - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 30 30 30 30 30 30 32 34 32 30 32 37 37 32 31)
  - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
  - object (class scala.Tuple2, (30 30 30 30 30 30 32 34 32 30 32 37 37 32 31,keyvalues={00000011020Winz59XojM111/f:iteblog/1470844800000/Put/vlen=2/mvcc=0}))
  - element of array (index: 0)
  - array (class [Lscala.Tuple2;, size 10)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
  at scala.Option.foreach(Option.scala:236)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1328)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
  at com.iteblog.HBase2Hive$.main(HBase2Hive.scala:41)
  at com.iteblog.HBase2Hive.main(HBase2Hive.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)




Specifies how to serialize the ImmutableBytesWritable class

We can manually set how to serialize the ImmutableBytesWritable class, as follows:


sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]))
 
 Convert IntutableBytesWritable into other serializable objects

This method is from the ImmutableBytesWritable object to extract the data we need, and then store it in other serializable objects, as follows:
 
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
hBaseRDD.map{item =>
  val immutableBytesWritable = item._1
  Bytes.toString(immutableBytesWritable.get())
}
 
 spark_hbase_iteblog

Commentaires

Enregistrer un commentaire

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization