Spark map-side-join association optimization
The
association of multiple data is a very common use in the data
processing process, but in distributed computing systems, this problem
tends to become very cumbersome, because the join operation provided by
the framework will generally send all data according to the key to all
Reduce the partition to go, that is shuffle the process.
Resulting in a large number of network and disk IO consumption,
operating efficiency is extremely low, this process is generally called
reduce-side-join.
If there is a small table, then we can achieve their own in the map
side to achieve data association, skip a large amount of data shuffle
process, running time is greatly reduced, depending on the data may be
several times to several times the performance Ascension.
The following will be described as a demo.
Http://blog.csdn.net/lsshlsw/article/details/48694893
The drawback of the reduce-side-join is that it will send the same data of the key to the same partition for operation. The transmission of the large data set takes a long time IO, and the task concurrently receives the restriction and may cause the data to be tilted.
Reduce-side-join is shown below
Map-side-join The chart is shown below
The following will be described as a demo.
When to use
Match a small amount of specific data in mass dataprinciple
Previously wrote an article on spark-sql using the broadcast join to optimize the article, the same principle and the article, re-painted the map here.Http://blog.csdn.net/lsshlsw/article/details/48694893
The drawback of the reduce-side-join is that it will send the same data of the key to the same partition for operation. The transmission of the large data set takes a long time IO, and the task concurrently receives the restriction and may cause the data to be tilted.
Reduce-side-join is shown below
Map-side-join The chart is shown below
Code Description
Data 1 (individual population information):身份证姓名... 110 lsw 222 yyy
Data 2 (National Student Information): 身份证学校名称学号... 110 s1 211 111 s2 222 112 s3 233 113 s2 244
Expected data: 身份证姓名学校名称110 lsw s1
Will be a small amount of data into a Map for broadcast, broadcast will
send this Map to each node, if not broadcast, each task will go to the
implementation of the Map data, resulting in a waste of performance.val people_info = sc.parallelize(Array(("110","lsw"),("222","yyy"))).collectAsMap()
val people_bc = sc.broadcast(people_info)
For large data traversal, using mapPartition instead of map, because
the mapPartition is in each partition to operate, so you can reduce the
traversal of the new broadCastMap.value object space consumption, while
the matching data will not return ().val res = student_all.mapPartitions(iter =>{
val stuMap = people_bc.value
val arrayBuffer = ArrayBuffer[(String,String,String)]()
iter.foreach{case (idCard,school,sno) =>{
if(stuMap.contains(idCard)){
arrayBuffer.+= ((idCard, stuMap.getOrElse(idCard,""),school))
}
}}
arrayBuffer.iterator
})
You can also use the guard mechanism for the implementation of the above codeval res1 = student_all.mapPartitions(iter => {
val stuMap = people_bc.value
for{
(idCard, school, sno) <- iter
if(stuMap.contains(idCard))
} yield (idCard, stuMap.getOrElse(idCard,""),school)
})
Complete code
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object joinTest extends App{
val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val sc = new SparkContext(conf)
/**
* map-side-join
* 取出小表中出现的用户与大表关联后取出所需要的信息
* */
//部分人信息(身份证,姓名)
val people_info = sc.parallelize(Array(("110","lsw"),("222","yyy"))).collectAsMap()
//全国的学生详细信息(身份证,学校名称,学号...)
val student_all = sc.parallelize(Array(("110","s1","211"),
("111","s2","222"),
("112","s3","233"),
("113","s2","244")))
//将需要关联的小表进行关联
val people_bc = sc.broadcast(people_info)
/**
* 使用mapPartition而不是用map,减少创建broadCastMap.value的空间消耗
* 同时匹配不到的数据也不需要返回()
* */
val res = student_all.mapPartitions(iter =>{
val stuMap = people_bc.value
val arrayBuffer = ArrayBuffer[(String,String,String)]()
iter.foreach{case (idCard,school,sno) =>{
if(stuMap.contains(idCard)){
arrayBuffer.+= ((idCard, stuMap.getOrElse(idCard,""),school))
}
}}
arrayBuffer.iterator
})
/**
* 使用另一种方式实现
* 使用for的守卫
* */
val res1 = student_all.mapPartitions(iter => {
val stuMap = people_bc.value
for{
(idCard, school, sno) <- iter
if(stuMap.contains(idCard))
} yield (idCard, stuMap.getOrElse(idCard,""),school)
})
res.foreach(println)
Commentaires
Enregistrer un commentaire