Spark join broadcast optimization

Some fields are associated in large amounts of data.

For example : ref :http://blog.csdn.net/lsshlsw/article/details/48694893

IpTable: need to be associated with thousands of ip data (70k)
Hist: historical data (billions of levels)
Direct join will shuffle all the data, need a lot of io operation, the same key will be dealt with in the same partition, the task of the degree of concurrency has also received restrictions.
这里写图片描述
The use of broadcast will be distributed to a small table to each implementation of the node, so the associated operations are completed in the local, basically canceled shuffle process, a substantial increase in operating efficiency.
这里写图片描述

Sample data (2000w) performance test comparison

Small table is not broadcast
无广播
Broadcast, you can see even shuffle process are omitted
这里写图片描述

Implementation code (spark1.5) 

 

//读取ip表
val df = ...

//如果数据小于设定的广播大小则将该表广播,默认10M
df.cache.count

//注册表
df.registerTempTable("ipTable")

//关联
sqlContext.sql("select * from (select * from ipTable)a join (select * from hist)b on a.ip = b.ip"
 
 

 Set the table broadcast threshold, if there is a need and the memory is sufficient, you can increase the value of the default 10M

spark.sql.autoBroadcastJoinThreshold

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization