Spark optimization

One operation and maintenance

1. Master hang up, standby restart is also invalid

Master defaults to 512M of memory, when the task in the cluster is particularly high, it will hang, because the master will read each task event log log to generate spark ui, the memory will naturally OOM, you can run the log See that the master of the start through the HA will naturally fail for this reason.


  1. Increase the Master's memory , set in the master node :
     export SPARK_DAEMON_MEMORY 10g # 根据你的实际情况 
  2. Reduce the job information stored in the Master memory
     spark.ui.retainedJobs 500 # 默认都是1000 spark.ui.retainedStages 500 

Hang up or suspend

Sometimes we will see the web node in the web ui disappear or in the dead state, the task of running the node will report a variety of lost worker errors, causing the same reasons and the above, worker memory to save a lot of ui The information leads to gc when the heartbeat is lost between master and master.


  1. Increase the Master's memory , set in the Worker node :
     export SPARK_DAEMON_MEMORY 2g # 根据你的实际情况 
  2. Reduce the Driver, Executor information stored in the Worker memory
     spark.worker.ui.retainedExecutors 200 # 默认都是1000 spark.worker.ui.retainedDrivers 200 

Second, the operation error

1.shuffle FetchFailedException

Spark Shuffle FetchFailedException solution

Error message

  1. Missing output location
     org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 
    missing output location
  2. Shuffle fetch faild
     org.apache.spark.shuffle.FetchFailedException: Failed to connect to spark047215/ 
    shuffle fetch faild
    The current configuration for each executor using 1core, 5GRAM, started 20 executor


This problem usually occurs in a large number of shuffle operation, the task failed, and then re-implementation, has been circulating until the application failed.
Generally encountered such problems can increase the executor memory, while increasing the number of each executor cpu, this will not reduce the task parallelism.
  • Spark.executor.memory 15G
  • Spark.executor.cores 3
  • Spark.cores.max 21
The number of execuote starts: 7
 execuoterNum = spark.cores.max/spark.executor.cores 
Per executor configuration:
 3core,15G RAM 
Consumption of memory resources: 105G RAM
You can find the use of resources and did not upgrade, but the same task the original configuration to run a few hours still card, change the configuration can be completed after a few minutes.

2.Executor & Task Lost

Error message

  1. Executor lost
     WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local): ExecutorLostFailure (executor lost) 
  2. Task lost
     WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, Connection from / closed 
  3. All kinds of timeout
     java.util.concurrent.TimeoutException: Futures timed out after [120 second] ERROR TransportChannelHandler: Connection to / has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust timeout if this is wrong 


Caused by the network or gc, worker or executor did not receive the heart of the executor or task feedback.
Increase the value of into 300 (5min) or higher.
The default is 120 (120s), configure the delay of all network transmission, if not set the following parameters, the default coverage of its attributes
  • Spark.core.connection.ack.wait.timeout
  • Spark.akka.timeout
  • Spark.rpc.askTimeout or spark.rpc.lookupTimeout


Error message

  1. Data is tilted
  2. Task tilt gap is not a few tasks, some running speed is particularly slow.


Most of the tasks are completed, there are so one or two tasks are running or running very slow, divided into data tilt and task tilt two.
  1. Data tilt data tilt Most of the situation is due to a large number of invalid data caused, such as null or "", there may be some abnormal data, such as statistical user login situation, there is a user login over a million times, invalid data in the calculation Need to filter out before.
    Data processing has a principle, the use of more filter, so you really need to analyze the amount of data on the less, the faster the processing speed.
     sqlContext.sql("...where col is not null and col != ''") 
    Specific reference:
    Solve the data encountered in the spark tilt problem
  2. Task tilt
    Task tilt more reasons, the network io, cpu, mem are likely to cause the implementation of the node on the slow, you can see the performance of the node to monitor the reasons for the analysis. Previously encountered a colleague in the spark of a worker running R task led to the node spark task run slowly.
    Or can open the speculation mechanism of spark, open the speculation mechanism if a machine after a few tasks particularly slow, speculation mechanism will be assigned to other tasks to the implementation of the machine, the final Spark will choose the fastest as the final result.
    • Spark.speculation true
    • Spark.speculation.interval 100 - detection period in milliseconds;
    • Spark.speculation.quantile 0.75 - start guess when the percentage of task is completed
    • Spark.speculation.multiplier 1.5 - much less than the other times to start speculation.


Error message

Heap memory overflow
 java.lang.OutOfMemoryError: Java heap space 


Memory is not enough, too much data will throw OOM Exeception, mainly driver OOM and executor OOM two
  1. Driver OOM
    It is generally done using the collect operation to aggregate all executor data to the driver. Try not to use the collect operation.
  2. Executor OOM
    You can press the following memory optimization method to increase the code using memory space
    • Increase the total amount of executor memory, that is to increase the value of spark.executor.memory
    • Increase the task parallelism (large task is divided into small tasks), refer to the following optimization of parallelism method

5.task not serializable

Error message

 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: ... 


If you call some of the variables defined in the driver, Spark will pass these variables to the Worker, and these variables are not serialized, so you will see the error suggested above.
 val x = new X() //在driver中定义的变量{r => x.doSomething(r) }.collect //map中的代码在worker(executor)中执行 
  • 1
In addition to the above map, there are filter, foreach, foreachPartition and other operations, there is a typical example is in the foreachPartition using the database to create a connection method. These variables do not serialize the task caused by the error.
Here are three solutions:
  1. Put all the external variables that are called directly into the operators mentioned above, which is best to use the foreachPartition to reduce the consumption of the created variables.
  2. Will need to use external sparkConf , SparkContext sparkConf , SparkContext , are @transent with @transent , that these @transent do not need to be @transent
  3. Put the external variable into a class to serialize the class.

6.driver.maxResultSize is too small

Error message

 Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 374 tasks (1026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) 


Spark.driver.maxResultSize The default size is 1G The total size limit of the serialization results for each partition of each Spark action (such as collect) is, in short, the result of the executor returning to the driver is too large, and the error is required to raise this value Or avoid using similar methods, such as countByValue, countByKey and so on.
The value can be adjusted
 spark.driver.maxResultSize 2g 

7.taskSet too large

Error message

 WARN TaskSetManager: Stage 198 contains a task of very large size ( 5953 KB). The maximum recommended task size is 100 KB. 
  • 1
This WARN may also cause ERROR
 Caused by: java .lang .RuntimeException : Failed to commit task Caused by: org .apache .spark .executor .CommitDeniedException : attempt_201603251514_0218_m_000245_0: Not committed because the driver did not authorize commit 
  • 1


If you know the spark is how to divide the stage, the problem is relatively simple.
A stage contains the task is too large, generally because your transform process is too long, so the driver sent to the executor task will become very large.
So we can solve this problem by resolving the problem. That is, in the cache.count process call cache.count cache some cache.count data to cut off the long stage.

8. driver did not authorize commit

Driver did not authorize commit

9. Environmental error

  1. The driver node is out of memory
    Driver insufficient to cause the application can not start, the driver will be allocated to the memory of the machine or reduce the driver-memory
     Java HotSpot(TM) 64-Bit Server VM warning: INFO: 
    Os :: commit_memory (0x0000000680000000, 4294967296, 0) failed;
    Error = 'Can not allocate memory' (errno = 12)
  2. Hdfs space is not enough
    Hdfs lack of space, event_log can not write, so ListenerBus会报错 , increase hdfs space (delete useless data or increase the node)
     Caused by: org.apache.hadoop.ipc.RemoteException( File /tmp/spark-history/app-20151228095652-0072.inprogress could only be replicated to 0 nodes instead of minReplication (=1) ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.lang.reflect.InvocationTargetException 
  3. Spark compiled package and hadoop version inconsistent Download the corresponding hadoop version of the spark package or compile their own. org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID 
  4. There are more than 15 tasks committed when the machine port is used too much on a machine without specifying a port.
     16/03/16 16:03:17 ERROR SparkUI: Failed to bind SparkUI 地址已在使用: Service 'SparkUI' failed after 16 retries! 
    When submitting a task, specify the app web ui port number.
     --conf spark.ui.port=xxxx 
  5. Chinese garbled
    Use write.csv and other methods to write to the hdfs file, the Chinese garbled. The character set used by the JVM, if not specified, will use the system's character set by default, because the node system character set is not all UTF8, so this problem occurs. Directly to the JVM specified character set can be.
     spark.executor.extraJavaOptions -Dfile.encoding=UTF-8 

Some python errors

1.python version is too low Cannot run program "python2.7": error=2,没有那个文件或目录 
Spark using the python version of 2.7, centOS default python version of 2.6, you can upgrade.

2.python permissions are not enough

Error message

Some nodes have an error message Cannot run program "python2.7": error=13, 权限不够 


The new node maintenance version 2.7 python, python command is correct, python2.7 but can not call, as long as the change environment variable just fine.

3.pickle use failed

Error message

 TypeError: ('__cinit__() takes exactly 8 positional arguments (11 given)', <type 'sklearn.tree._tree.Tree'>, (10, array([1], dtype=int32), 1, <sklearn.tree._tree.RegressionCriterion object at 0x100077480>, 50.0, 2, 1, 0.1, 10, 1, <mtrand.RandomState object at 0x10a55da08>)) 


The pickle file is trained in the 0.17 version of the scikit-learn, and some machines are installed 0.14 version, the version inconsistencies lead to upgrade can be resolved, remember to clean up the old version of the data, otherwise it will report a variety of Cannot import xxx error The

Some optimization

1. Some Executor does not perform tasks

Sometimes found that some executor did not perform the task, why?
(1) task partition number is too small,
To know that each partition will only perform tasks on a task. Change the number of repartition , you can through the repartition method, even so, before the repartition or from the data source to read the data, then (read the data) when the degree of repartition data sources are repartition , Species:
 hdfs - block数就是partition数mysql - 按读入时的分区规则分partition es - 分区数即为es 的分片数(shard) 
(2) the local side effects of data
TaskSetManager will calculate the data before the distribution of the local data, the priority is:
 process(同一个executor) -> node_local(同一个节点) -> rack_local(同一个机架) -> any(任何节点) 
Spark will give priority to the implementation of high-priority tasks, the task is completed quickly (less than the set spark.locality.wait time), then the data level of the next level of the task will not start, this is the Spark delay scheduling mechanism.
Give an extreme example: run a count task, if the data are all stacked on a node, it will only have this machine in the long-term computing tasks, the cluster of other machines will be in a wait state (waiting for local degradation) And not the implementation of the task, resulting in a lot of waste of resources.
The formula for judging is:
 curTime – lastLaunchTime >= localityWaits(currentLocalityIndex) 
Where curTime is the current time of the system, lastLaunchTime is the last time to start task at a certain priority
If this condition is satisfied, it will proceed to the next priority time to determine the task that assigns the current priority until any not satisfied.
The data local task assigns the source code in taskSetManager.scala .
If there are a large number of executor in a wait state, you can reduce the value of the following parameters (can also be set to 0), the default is 3s.
 spark.locality.wait spark.locality.wait.process spark.locality.wait.node spark.locality.wait.rack 
When you are poor local data, may be appropriate to improve the value, of course, can also be directly in the cluster data balance.

2. The spark job failed to retry consecutively

There may be a worker node failure, task failure will continue to try again in the executor , to achieve the maximum number of retries will lead to the application the failure of the entire application , we can set the failure of the application (task after the failure of the node Will change the node to try again), you can see in the source code is set by default 0 ,
 private val EXECUTOR_TASK_BLACKLIST_TIMEOUT = conf.getLong( "spark.scheduler.executorTaskBlacklistTime" , 0 L) 
  • 1
Set in
 spark.scheduler.executorTaskBlacklistTime 30000 
When the task fails in the executor will start in other executor , while the executor will enter the blacklist 30s (not the task to the executor).

3. memory

If your task shuffle amount is particularly large, while rdd cache less can change the following parameters to further improve the task speed. - The ratio assigned to the rdd cache, defaults to 0.6 (60%), if the cached data can be reduced by less.
spark.shuffle.memoryFraction - the ratio of memory spark.shuffle.memoryFraction to shuffle data, defaults to 0.2 (20%)
The remaining 20% ​​of the memory space is allocated to the code generation object.
If the task is running slowly, jvm frequent gc or lack of memory space, or can reduce the above two values.
"spark.rdd.compress","true" - defaults to false, "spark.rdd.compress","true" some cpu to reduce space usage

4. Concurrent

Mysql read concurrent optimization
The degree of parallelism when shuffle occurs, the number in the standalone mode defaults to the number of cores, can also be manually adjusted, the number of settings will cause too many small tasks, increase the cost of starting the task, too small, the task of running large amounts of data Slow.
Sql aggregation operation (shuffle occurs when the degree of parallelism, the default is 200, if the value is too small will lead to OOM, executor lost, the task execution time is too long
The same two tasks:
Spark.sql.shuffle.partitions = 300:
Spark.sql.shuffle.partitions = 500:
The speed of fast is mainly a lot of time to reduce the gc.
But set the General Assembly caused by the deterioration of performance, too much debris task will cause a lot of unnecessary start off the task overhead, there may be some task hang can not be implemented.
Modify the map phase rdd.repartition(partitionNum) is mainly used in the code rdd.repartition(partitionNum) to operate.


Spark-sql join optimization
Map-side-join association optimization

6. disk

Disk IO optimization

7. Serialization

Kryo Serialization

8. Data localization

Spark does not represent the local performance of data under Cluster Manager
Spark read hdfs data locality exception

9. Code

Write several optimization points for the Spark program


  1. I really appreciate the information shared above. It’s of great help. MaxMunus provides Remote Support For Corporate and for Individuals. If anyone is facing any issue in his project of #Apache #Spark, we can support them remotely , kindly Contact MaxMunus
    MaxMunus Offer World Class Industry best Consultant on #Apache #Spark. We provide end to end Remote Support on Projects. MaxMunus is successfully doing remote support for countries like India, USA, UK, Australia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain, and UAE etc.
    Avishek Priyadarshi
    Skype id: avishek_2.
    Ph:(0) 8553177744 / 080 - 41103383

  2. Nice post keep updating the postThank you for providing useful content Big Data Hadoop Online Training India


Enregistrer un commentaire

Posts les plus consultés de ce blog

Spark performance optimization: shuffle tuning

Use Apache Spark to write data to ElasticSearch