Spark map-side-join association optimization

The association of multiple data is a very common use in the data processing process, but in distributed computing systems, this problem tends to become very cumbersome, because the join operation provided by the framework will generally send all data according to the key to all Reduce the partition to go, that is shuffle the process. Resulting in a large number of network and disk IO consumption, operating efficiency is extremely low, this process is generally called reduce-side-join. If there is a small table, then we can achieve their own in the map side to achieve data association, skip a large amount of data shuffle process, running time is greatly reduced, depending on the data may be several times to several times the performance Ascension.
The following will be described as a demo.

When to use

Match a small amount of specific data in mass data

principle

Previously wrote an article on spark-sql using the broadcast join to optimize the article, the same principle and the article, re-painted the map here.
Http://blog.csdn.net/lsshlsw/article/details/48694893
The drawback of the reduce-side-join is that it will send the same data of the key to the same partition for operation. The transmission of the large data set takes a long time IO, and the task concurrently receives the restriction and may cause the data to be tilted.
Reduce-side-join is shown below
这里写图片描述
Map-side-join The chart is shown below
这里写图片描述

Code Description

Data 1 (individual population information):
身份证姓名... 110 lsw 222 yyy 
Data 2 (National Student Information):
 身份证学校名称学号... 110 s1 211 111 s2 222 112 s3 233 113 s2 244 
Expected data:
 身份证姓名学校名称110 lsw s1 
Will be a small amount of data into a Map for broadcast, broadcast will send this Map to each node, if not broadcast, each task will go to the implementation of the Map data, resulting in a waste of performance.
val people_info = sc.parallelize(Array(("110","lsw"),("222","yyy"))).collectAsMap()
val people_bc = sc.broadcast(people_info)
For large data traversal, using mapPartition instead of map, because the mapPartition is in each partition to operate, so you can reduce the traversal of the new broadCastMap.value object space consumption, while the matching data will not return ().

val res = student_all.mapPartitions(iter =>{
    val stuMap = people_bc.value
    val arrayBuffer = ArrayBuffer[(String,String,String)]()
    iter.foreach{case (idCard,school,sno) =>{
        if(stuMap.contains(idCard)){
        arrayBuffer.+= ((idCard, stuMap.getOrElse(idCard,""),school))
    }
    }}
    arrayBuffer.iterator
})
 
You can also use the guard mechanism for the implementation of the above code
val res1 = student_all.mapPartitions(iter => {
    val stuMap = people_bc.value
    for{
        (idCard, school, sno) <- iter
        if(stuMap.contains(idCard))
        } yield (idCard, stuMap.getOrElse(idCard,""),school)
})

Complete code 

import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer

object joinTest extends App{

  val conf = new SparkConf().setMaster("local[2]").setAppName("test")
  val sc = new SparkContext(conf)

  /**
   * map-side-join
   * 取出小表中出现的用户与大表关联后取出所需要的信息
   * */
  //部分人信息(身份证,姓名)
  val people_info = sc.parallelize(Array(("110","lsw"),("222","yyy"))).collectAsMap()
  //全国的学生详细信息(身份证,学校名称,学号...)
  val student_all = sc.parallelize(Array(("110","s1","211"),
                                              ("111","s2","222"),
                                              ("112","s3","233"),
                                              ("113","s2","244")))

  //将需要关联的小表进行关联
  val people_bc = sc.broadcast(people_info)

  /**
   * 使用mapPartition而不是用map,减少创建broadCastMap.value的空间消耗
   * 同时匹配不到的数据也不需要返回()
   * */
  val res = student_all.mapPartitions(iter =>{
    val stuMap = people_bc.value
    val arrayBuffer = ArrayBuffer[(String,String,String)]()
    iter.foreach{case (idCard,school,sno) =>{
      if(stuMap.contains(idCard)){
        arrayBuffer.+= ((idCard, stuMap.getOrElse(idCard,""),school))
      }
    }}
    arrayBuffer.iterator
  })

  /**
   * 使用另一种方式实现
   * 使用for的守卫
   * */
  val res1 = student_all.mapPartitions(iter => {
    val stuMap = people_bc.value
    for{
      (idCard, school, sno) <- iter
      if(stuMap.contains(idCard))
    } yield (idCard, stuMap.getOrElse(idCard,""),school)
  })

  res.foreach(println)

 

 

 
 
 
 
 
 

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization