Spark join broadcast optimization
Some fields are associated in large amounts of data.
Hist: historical data (billions of levels)
Direct join will shuffle all the data, need a lot of io operation, the same key will be dealt with in the same partition, the task of the degree of concurrency has also received restrictions.
The use of broadcast will be distributed to a small table to each implementation of the node, so the associated operations are completed in the local, basically canceled shuffle process, a substantial increase in operating efficiency.
Broadcast, you can see even shuffle process are omitted
For example : ref :http://blog.csdn.net/lsshlsw/article/details/48694893
IpTable: need to be associated with thousands of ip data (70k)Hist: historical data (billions of levels)
Direct join will shuffle all the data, need a lot of io operation, the same key will be dealt with in the same partition, the task of the degree of concurrency has also received restrictions.
The use of broadcast will be distributed to a small table to each implementation of the node, so the associated operations are completed in the local, basically canceled shuffle process, a substantial increase in operating efficiency.
Sample data (2000w) performance test comparison
Small table is not broadcastBroadcast, you can see even shuffle process are omitted
Implementation code (spark1.5)
//读取ip表
val df = ...
//如果数据小于设定的广播大小则将该表广播,默认10M
df.cache.count
//注册表
df.registerTempTable("ipTable")
//关联
sqlContext.sql("select * from (select * from ipTable)a join (select * from hist)b on a.ip = b.ip")
Set the table broadcast threshold, if there is a need and the memory is sufficient, you can increase the value of the default 10M
spark.sql.autoBroadcastJoinThreshold
Commentaires
Enregistrer un commentaire