Spark performance optimization: shuffle tuning

Article directory

Shuffle

Summary of tuning

Most of the performance of Spark operations is mainly consumed in the shuffle link, because the link contains a large number of disk IO, serialization, network data transmission and other operations. Therefore, if you want the performance of the job to a higher level, it is necessary to adjust the shuffle process. But also must be reminded that the impact of a Spark operating performance factors, mainly code development, resource parameters and data tilt, shuffle tuning can only be in the entire Spark performance tuning accounted for a small part of it. So we must grasp the basic principles of tuning, do not sacrifice by the end. Here we give you a detailed explanation of the principle of shuffle, and the description of the relevant parameters, while giving the parameters of the tuning recommendations.

Overview of ShuffleManager Development

In the Spark source, the components responsible for the execution, calculation, and processing of the shuffle process are the ShuffleManager, the shuffle manager. With the development of Spark's version, ShuffleManager is also constantly iterating, becoming more and more advanced.
Prior to Spark 1.2, the default shuffle calculation engine was HashShuffleManager. The ShuffleManager and HashShuffleManager has a very serious drawback, that is, will produce a large number of intermediate disk files, and then by a large number of disk IO operations affect the performance.
So in the Spark 1.2 later version, the default ShuffleManager changed to SortShuffleManager. SortShuffleManager compared to HashShuffleManager, there is a certain improvement. The main thing is that each Task shuffle operation, although it will produce more temporary disk files, but will eventually merge all the temporary files (merge) into a disk file, so each Task only one disk file The In the next stage of the shuffle read task to pull their own data, as long as the index read each disk file can be part of the data.
Let's take a closer look at the principles of HashShuffleManager and SortShuffleManager.

HashShuffleManager operating principle

Unoptured HashShuffleManager

The following figure illustrates the principle of an unaltered HashShuffleManager. Here we first clear a hypothesis: each Executor only 1 CPU core, that is, no matter how many tasks on the Executor task thread, the same time can only perform a task thread.
Let's start with shuffle write. Shuffle write phase, mainly in the end of a stage after the calculation, for the next stage can be implemented shuffle class operator (such as reduceByKey), and each task to deal with the data by key "classification." The so-called "classification" is the same key implementation of the hash algorithm, which will write the same key to the same disk file, and each disk file belongs to the downstream stage of a task. Before writing data to disk, the data will be written to the memory buffer, when the memory buffer is filled, the overflow will be written to the disk file.
So each implementation of the shuffle write task, to create a number of disk files for the next stage? Very simple, how many of the next stage of the task, the current stage of each task will create how many copies of disk files. For example, the next stage a total of 100 tasks, then the current stage of each task to create 100 copies of disk files. If the current stage has 50 tasks, a total of 10 Executor, each Executor implementation of five Task, then each Executor will create a total of 500 disk files, all Executor will create 5000 disk files. Thus, the number of disk files generated by the optimized shuffle write operation is extremely staggering.
Then we say shuffle read. Shuffle read, usually a beginning of a stage to do things. At this point the stage of each task will need to put on a stage of the results of all the same key, from each node through the network are drawn to their own node, and then the key aggregation or connection operations. As shuffle write process, the task to the downstream stage of each task to create a disk file, so shuffle read process, each task from the upstream stage as long as all the tasks on the node, pull their own one Disk file can be.
Shuffle read the pull process is pulling the side of the polymerization. Each shuffle read task will have a buffer of their own buffer, each time can only pull the buffer with the same size of the data, and then through a memory in the memory aggregation and other operations. Aggregate a batch of data, and then pull the next batch of data, and put the buffer buffer for polymerization operations. And so on until the end of all the data to pull finished, and get the final result.

Optimized HashShuffleManager

The following figure illustrates the principle of the optimized HashShuffleManager. Here is the optimization that means that we can set a parameter, spark.shuffle.consolidateFiles. The default value of this parameter is false, set it to true to turn on the optimization mechanism. Normally, if we use HashShuffleManager, it is recommended to open this option.
After opening the consolidate mechanism, in the shuffle write process, the task is not for the downstream stage of each task to create a disk file. At this point there will be shuffleFileGroup concept, each shuffleFileGroup will correspond to a number of disk files, the number of disk files and the downstream stage of the number of tasks is the same. How many CPU cores on an Executor, how many tasks can be executed in parallel. And the first batch of the implementation of each task will create a shuffleFileGroup, and the data written to the corresponding disk file.
When Executor CPU core implementation of a number of tasks, and then the implementation of the next batch of tasks, the next batch of tasks will be reused before the shuffleFileGroup, including one of the disk file. In other words, the task will write the data to the existing disk file, and will not write to the new disk file. Therefore, the consolidate mechanism allows different tasks to reuse the same batch of disk files, so that you can effectively multiple disk files to a certain degree of consolidation, thereby significantly reducing the number of disk files, thereby enhancing the performance of shuffle write.
Assuming the second stage has 100 tasks, the first stage has 50 tasks, a total of 10 or Executor, each Executor implementation of five tasks. Then the original use of the optimized HashShuffleManager, each Executor will produce 500 disk files, all Executor will produce 5000 disk files. But at this time after optimization, the number of disk files created by each Executor is calculated as: the number of CPU cores * the number of the next stage of the task. In other words, each Executor will only create 100 disk files at this time, all Executor will only create 1000 disk files.

SortShuffleManager operating principle

SortShuffleManager operating mechanism is divided into two kinds, one is the ordinary operating mechanism, the other is the bypass running mechanism. When the number of shuffle read tasks is less than or equal to the value of the spark.shuffle.sort.bypassMergeThreshold parameter (default is 200), the bypass mechanism is enabled.

Normal operation mechanism

The following figure illustrates the general principle of SortShuffleManager. In this mode, the data will be written to a memory data structure, this time according to different shuffle operator, may use a different data structure. If it is reduceByKey this kind of aggregation shuffle operator, then the use of Map data structure, while the aggregation through the Map, while writing to memory; If it is join the ordinary shuffle operator, then the use of Array data structure, Into memory. Then, each write a data into the memory data structure, it will determine whether to achieve a critical threshold. If the critical threshold is reached, then the data in the memory data structure is tried to overflow to the disk, and then the memory data structure is cleared.
Before the overflow is written to the disk file, the data already in the memory data structure is sorted according to the key. After sorting, the data is written to disk files in batches. The default batch number is 10000, that is, the sort of good data, will be in the form of each batch of 10,000 data in batches to write disk files. The write disk file is implemented via Java's BufferedOutputStream. BufferedOutputStream is the Java buffer output stream, the first data will be buffered in memory, when the memory buffer overflow once again written to the disk file, this can reduce the number of disk IO, improve performance.
A task will write all the data into the memory data structure of the process, there will be multiple disk overflow operation, it will produce multiple temporary files. Finally, all the previous temporary disk files are merged, this is the merge process, this time will be all the temporary disk file in the data read out, and then write to the final disk file. In addition, because a task only corresponds to a disk file, which means that the task for the downstream stage task prepared data in this one file, it will also write a separate index file, which identifies the downstream tasks The start offset and end offset of the data in the file.
SortShuffleManager because there is a disk file merge process, thus greatly reducing the number of files. For example, the first stage has 50 tasks, a total of 10 Executor, each Executor 5 tasks, and the second stage 100 tasks. Because each task eventually only one disk file, so at this time each Executor only 5 disk files, all Executor only 50 disk files.

Bypass operating mechanism

The following figure illustrates the principle of bypass SortShuffleManager. Bypass The operating mechanism of the trigger conditions are as follows:
1, shuffle map task number is less than spark.shuffle.sort.bypassMergeThreshold parameter value.
2, not the aggregation class shuffle operator (such as reduceByKey).
At this point the task for each downstream task to create a temporary disk file, and the data by key for the hash and then according to the hash value of the key, the key will be written to the corresponding disk file. Of course, write to the disk file is also written to the memory buffer, the buffer is filled and then write to the disk file. Finally, all temporary disk files are merged into one disk file and a separate index file is created.
The process of the disk write mechanism is not exactly the same as the optimized HashShuffleManager, because you have to create a number of amazing disk files, but in the end will do a disk file merger only. So a small amount of the final disk file, but also to the mechanism is relatively unparalleled HashShuffleManager, shuffle read performance will be better.
The mechanism and the general SortShuffleManager operating mechanism is different: first, the disk write mechanism is different; second, will not be sorted. In other words, the biggest advantage of enabling the mechanism is that shuffle write process, do not need to sort the data operation, it will save this part of the performance overhead.

Shuffle related parameter tuning

The following are some of the main parameters of the Shffule process, where the functions of each parameter, the default values, and the tuning recommendations based on practical experience are explained in detail.

Spark.shuffle.file.buffer

1, the default value: 32k
Parameter Description: This parameter is used to set the buffer buffer size of the bufferedOutputStream of the shuffle write task. Write the data to the disk file before it will be written to the buffer buffer, to be filled after the buffer will be written to the disk.
Tuning advice: If the available memory resources are sufficient, you can increase the size of this parameter (such as 64k), so as to reduce the number of times the disk file overflows during the shuffle write process, which can reduce the number of disk IO times and improve performance The In practice found that a reasonable adjustment of the parameters, performance will be 1% to 5% increase.

Spark.reducer.maxSizeInFlight

Default: 48m
Parameter Description: This parameter is used to set the buff buffer size of the shuffle read task, and this buffer decides how much data can be drawn at a time.
Tuning Suggestions: If the available memory resources are sufficient, you can increase the size of the parameters (such as 96m), thereby reducing the number of times the data is pulled, which can reduce the number of network transmissions and improve performance. In practice found that a reasonable adjustment of the parameters, performance will be 1% to 5% increase.

Spark.shuffle.io.maxRetries

Default: 3
Parameter Description: shuffle read task from the shuffle write task where the node is pulling their own data, if the network due to abnormal pull failure, it will automatically retry. This parameter represents the maximum number of times that can be retried. If it is taken within a specified number of times or is not successful, it may cause the job to fail.
Tuning Suggestions: For those jobs that contain a very time-consuming shuffle operation, it is recommended to increase the maximum number of retries (for example, 60 times) to avoid data failure due to factors such as the full gc of the JVM or network instability. In practice found that for the large amount of data (billions to billions of shuffle process), adjust the parameters can greatly enhance the stability.

Spark.shuffle.io.retryWait

Default: 5s
Parameter Description: The same as above, this parameter represents the retry interval for each retry of the data, the default is 5s.
Tuning Suggestions: It is recommended to increase the interval length (eg 60s) to increase the stability of the shuffle operation.

Spark.shuffle.memoryFraction

Default: 0.2
Parameter Description: This parameter represents the memory size of the Executor memory, which is assigned to the shuffle read task for the aggregation operation. The default is 20%.
Tuning advice: This parameter is explained in resource parameter tuning. If the memory is sufficient, and rarely use the persistence operation, it is recommended to increase this ratio, to shuffle read the aggregation operation of more memory, in order to avoid the lack of memory caused by the frequent process of reading and writing disk. In practice found that the reasonable adjustment of the parameters can increase performance by about 10%.

Spark.shuffle.manager

Default: sort
Parameter Description: This parameter is used to set the type of ShuffleManager. Spark 1.5 later, there are three options: hash, sort and tungsten-sort. HashShuffleManager is Spark 1.2 previous default options, but Spark 1.2 and later versions are SortShuffleManager by default. The tungsten-sort is similar to the sort, but uses the heap memory management mechanism in the tungsten project, which is more efficient to use.
Tuning recommendations: SortShuffleManager will sort the data by default, so if you need the sorting mechanism in your business logic, you can use the default SortShuffleManager; and if your business logic does not need to sort the data, The latter several parameters tuning, through the bypass mechanism or optimized HashShuffleManager to avoid sorting operations, while providing better disk read and write performance. Here to note that, tungsten-sort to be used with caution, because before the discovery of some of the corresponding bug.

Spark.shuffle.sort.bypassMergeThreshold

Default: 200
Parameter Description: When ShuffleManager is SortShuffleManager, if the number of shuffle read tasks is less than this threshold (default is 200), shuffle write process will not be sorted, but directly in accordance with the way to optimize the HashShuffleManager to write data, But in the end will be generated by each task all the temporary disk files are merged into a file, and will create a separate index file.
Tuning advice: When you use SortShuffleManager, if you do not need to sort operation, it is recommended that this parameter will be larger, greater than the number of shuffle read task. Then this will automatically enable the bypass mechanism, map-side will not be sorted, reducing the performance of the sort of overhead. But this way, will still produce a large number of disk files, so shuffle write performance to be improved.

Spark.shuffle.consolidateFiles

Default: false
Parameter Description: If you use HashShuffleManager, this parameter is valid. If set to true, then it will open the consolidate mechanism, will be a substantial merger shuffle write output file, shuffle read task for the case of a particularly large number of cases, this method can greatly reduce the disk IO overhead, improve performance.
Tuning suggestions: If you do not need SortShuffleManager sorting mechanism, then in addition to using the bypass mechanism, you can also try to spark.shffle.manager parameters manually specified as hash, the use of HashShuffleManager, while open consolidate mechanism. In practice tried and found that its performance than open the bypass mechanism SortShuffleManager to be higher than 10% to 30%.

Write in the last words

This paper explains the optimization principles of the development process, the resource settings before the adjustment of the parameters, the operation of the data tilt of the solution, in order to improve the shuffle tuning. We hope that after reading this article, remember these performance tuning principles and programs, Spark operations in the development, testing and operation of the process to try more, so that we can develop a better Spark operations, and continuously improve its performance The

Commentaires

  1. Thank you so much for this nice information. Hope so many people will get aware of this and useful as well. And please keep update like this.

    Big Data Services

    Data Lake Services

    Advanced Analytics Solutions

    Full Stack Development Services

    RépondreSupprimer
  2. It had taken much research as well as standard potentials to search for the data warehouse consulting companies , which can help in the process of data transformation excellently.

    RépondreSupprimer
  3. Thank you so much for this nice information. Hope so many people will get aware of this and useful as well. And please keep update like this.

    Big Data Services

    Data Lake Services

    Advanced Analytics Solutions

    Full Stack Development Services

    RépondreSupprimer
  4. The main motive of the Big data modernization solutions is to spread the knowledge so that they can give more big data engineers to the world.


    RépondreSupprimer
  5. Google brain is working in the Big data engineering automation to make it a huge success for the world. We hope that society will soon use AI devices at a reasonable cost.

    RépondreSupprimer

Publier un commentaire

Posts les plus consultés de ce blog

Spark optimization

Spark operator: RDD basic conversion operation (6) -zip, zipPartitions