Spark join broadcast optimization

Some fields are associated in large amounts of data.

For example : ref :

IpTable: need to be associated with thousands of ip data (70k)
Hist: historical data (billions of levels)
Direct join will shuffle all the data, need a lot of io operation, the same key will be dealt with in the same partition, the task of the degree of concurrency has also received restrictions.
The use of broadcast will be distributed to a small table to each implementation of the node, so the associated operations are completed in the local, basically canceled shuffle process, a substantial increase in operating efficiency.

Sample data (2000w) performance test comparison

Small table is not broadcast
Broadcast, you can see even shuffle process are omitted

Implementation code (spark1.5) 


val df = ...



sqlContext.sql("select * from (select * from ipTable)a join (select * from hist)b on a.ip = b.ip"

 Set the table broadcast threshold, if there is a need and the memory is sufficient, you can increase the value of the default 10M



  1. I like your post very much. It is very much useful for my research. I hope you to share more info about this. Keep posting Spark Online Training


Enregistrer un commentaire

Posts les plus consultés de ce blog

Spark performance optimization: shuffle tuning

Spark optimization

Use Apache Spark to write data to ElasticSearch