One operation and maintenance
1. Master hang up, standby restart is also invalidMaster defaults to 512M of memory, when the task in the cluster is particularly high, it will hang, because the master will read each task event log log to generate spark ui, the memory will naturally OOM, you can run the log See that the master of the start through the HA will naturally fail for this reason.
- Increase the Master's memory
spark-env.sh, set in the master node
- Reduce the job information stored in the Master memory
Hang up or suspendSometimes we will see the web node in the web ui disappear or in the dead state, the task of running the node will report a variety of
lost workererrors, causing the same reasons and the above, worker memory to save a lot of ui The information leads to gc when the heartbeat is lost between master and master.
- Increase the Master's memory
spark-env.sh, set in the Worker node
- Reduce the Driver, Executor information stored in the Worker memory
Second, the operation error
1.shuffle FetchFailedExceptionSpark Shuffle FetchFailedException solution
- Missing output location
- Shuffle fetch faild
The current configuration for each executor using 1core, 5GRAM, started 20 executor
solveThis problem usually occurs in a large number of shuffle operation, the task failed, and then re-implementation, has been circulating until the application failed.
Generally encountered such problems can increase the executor memory, while increasing the number of each executor cpu, this will not reduce the task parallelism.
- Spark.executor.memory 15G
- Spark.executor.cores 3
- Spark.cores.max 21
Per executor configuration:
Consumption of memory resources: 105G RAM
You can find the use of resources and did not upgrade, but the same task the original configuration to run a few hours still card, change the configuration can be completed after a few minutes.
2.Executor & Task Lost
- Executor lost
- Task lost
- All kinds of timeout
solveCaused by the network or gc, worker or executor did not receive the heart of the executor or task feedback.
Increase the value of
spark.network.timeoutinto 300 (5min) or higher.
The default is 120 (120s), configure the delay of all network transmission, if not set the following parameters, the default coverage of its attributes
- Spark.rpc.askTimeout or spark.rpc.lookupTimeout
- Data is tilted
- Task tilt gap is not a few tasks, some running speed is particularly slow.
solveMost of the tasks are completed, there are so one or two tasks are running or running very slow, divided into data tilt and task tilt two.
Data tilt data tilt Most of the situation is due to a large number of
invalid data caused, such as null or "", there may be some abnormal
data, such as statistical user login situation, there is a user login
over a million times, invalid data in the calculation Need to filter out
Data processing has a principle, the use of more filter, so you really need to analyze the amount of data on the less, the faster the processing speed.
Solve the data encountered in the spark tilt problem
- Task tilt
Task tilt more reasons, the network io, cpu, mem are likely to cause the implementation of the node on the slow, you can see the performance of the node to monitor the reasons for the analysis. Previously encountered a colleague in the spark of a worker running R task led to the node spark task run slowly.
Or can open the speculation mechanism of spark, open the speculation mechanism if a machine after a few tasks particularly slow, speculation mechanism will be assigned to other tasks to the implementation of the machine, the final Spark will choose the fastest as the final result.
- Spark.speculation true
- Spark.speculation.interval 100 - detection period in milliseconds;
- Spark.speculation.quantile 0.75 - start guess when the percentage of task is completed
- Spark.speculation.multiplier 1.5 - much less than the other times to start speculation.
Error messageHeap memory overflow
solveMemory is not enough, too much data will throw OOM Exeception, mainly driver OOM and executor OOM two
- Driver OOM
It is generally done using the collect operation to aggregate all executor data to the driver. Try not to use the collect operation.
- Executor OOM
You can press the following memory optimization method to increase the code using memory space
- Increase the total amount of executor memory, that is to increase the value of
- Increase the task parallelism (large task is divided into small tasks), refer to the following optimization of parallelism method
- Increase the total amount of executor memory, that is to increase the value of
5.task not serializable
solveIf you call some of the variables defined in the driver, Spark will pass these variables to the Worker, and these variables are not serialized, so you will see the error suggested above.
Here are three solutions:
- Put all the external variables that are called directly into the operators mentioned above, which is best to use the foreachPartition to reduce the consumption of the created variables.
- Will need to use external
@transent, that these
@transentdo not need to be
- Put the external variable into a class to serialize the class.
6.driver.maxResultSize is too small
solveSpark.driver.maxResultSize The default size is 1G The total size limit of the serialization results for each partition of each Spark action (such as collect) is, in short, the result of the executor returning to the driver is too large, and the error is required to raise this value Or avoid using similar methods, such as countByValue, countByKey and so on.
The value can be adjusted
7.taskSet too large
solveIf you know the spark is how to divide the stage, the problem is relatively simple.
A stage contains the task is too large, generally because your transform process is too long, so the driver sent to the executor task will become very large.
So we can solve this problem by resolving the problem. That is, in the
cache.countdata to cut off the long stage.
8. driver did not authorize commitDriver did not authorize commit
9. Environmental error
- The driver node is out of memory
Driver insufficient to cause the application can not start, the driver will be allocated to the memory of the machine or reduce the driver-memory
Os :: commit_memory (0x0000000680000000, 4294967296, 0) failed;
Error = 'Can not allocate memory' (errno = 12)
- Hdfs space is not enough
Hdfs lack of space, event_log can not write, so
ListenerBus会报错, increase hdfs space (delete useless data or increase the node)
Spark compiled package and hadoop version inconsistent Download the
corresponding hadoop version of the spark package or compile their own.
- There are more than 15 tasks committed when the machine port is used too much on a machine without specifying a port.
When submitting a task, specify the app web ui port number.
- Chinese garbled
Use write.csv and other methods to write to the hdfs file, the Chinese garbled. The character set used by the JVM, if not specified, will use the system's character set by default, because the node system character set is not all UTF8, so this problem occurs. Directly to the JVM specified character set can be.
Some python errors
1.python version is too lowSpark using the python version of 2.7, centOS default python version of 2.6, you can upgrade.
2.python permissions are not enough
Error messageSome nodes have an error message
solveThe new node maintenance version 2.7 python, python command is correct, python2.7 but can not call, as long as the change environment variable just fine.
3.pickle use failed
solveThe pickle file is trained in the 0.17 version of the scikit-learn, and some machines are installed 0.14 version, the version inconsistencies lead to upgrade can be resolved, remember to clean up the old version of the data, otherwise it will report a variety of
Cannot import xxxerror The
1. Some Executor does not perform tasksSometimes found that some executor did not perform the task, why?
(1) task partition number is too small,
To know that each partition will only perform tasks on a task. Change the number of
repartition, you can through the
repartitionmethod, even so, before the
repartitionor from the data source to read the data, then (read the data) when the degree of
repartitiondata sources are
(2) the local side effects of data
TaskSetManager will calculate the data before the distribution of the local data, the priority is:
Spark will give priority to the implementation of high-priority tasks, the task is completed quickly (less than the set spark.locality.wait time), then the data level of the next level of the task will not start, this is the Spark delay scheduling mechanism.
Give an extreme example: run a count task, if the data are all stacked on a node, it will only have this machine in the long-term computing tasks, the cluster of other machines will be in a wait state (waiting for local degradation) And not the implementation of the task, resulting in a lot of waste of resources.
The formula for judging is:
curTimeis the current time of the system,
lastLaunchTimeis the last time to start task at a certain priority
If this condition is satisfied, it will proceed to the next priority time to determine the task that assigns the current priority until
The data local task assigns the source code in
If there are a large number of executor in a wait state, you can reduce the value of the following parameters (can also be set to 0), the default is 3s.
When you are poor local data, may be appropriate to improve the value, of course, can also be directly in the cluster data balance.
2. The spark job failed to retry consecutivelyThere may be a worker node failure, task failure will continue to try again in the
executor, to achieve the maximum number of retries will lead to the
applicationthe failure of the entire
application, we can set the failure of the
application(task after the failure of the node Will change the node to try again), you can see in the source code is set by default
taskfails in the
executorwill start in other
executor, while the
executorwill enter the blacklist 30s (not the task to the executor).
3. memoryIf your task shuffle amount is particularly large, while rdd cache less can change the following parameters to further improve the task speed.
spark.storage.memoryFraction- The ratio assigned to the rdd cache, defaults to 0.6 (60%), if the cached data can be reduced by less.
spark.shuffle.memoryFraction- the ratio of memory
spark.shuffle.memoryFractionto shuffle data, defaults to 0.2 (20%)
The remaining 20% of the memory space is allocated to the code generation object.
If the task is running slowly, jvm frequent gc or lack of memory space, or can reduce the above two values.
"spark.rdd.compress","true"- defaults to false,
"spark.rdd.compress","true"some cpu to reduce space usage
4. ConcurrentMysql read concurrent optimization
The degree of parallelism when shuffle occurs, the number in the standalone mode defaults to the number of cores, can also be manually adjusted, the number of settings will cause too many small tasks, increase the cost of starting the task, too small, the task of running large amounts of data Slow.
Sql aggregation operation (shuffle occurs when the degree of parallelism, the default is 200, if the value is too small will lead to OOM, executor lost, the task execution time is too long
The same two tasks:
Spark.sql.shuffle.partitions = 300:
Spark.sql.shuffle.partitions = 500:
The speed of fast is mainly a lot of time to reduce the gc.
But set the General Assembly caused by the deterioration of performance, too much debris task will cause a lot of unnecessary start off the task overhead, there may be some task hang can not be implemented.
Modify the map phase
rdd.repartition(partitionNum)is mainly used in the code
ShuffleSpark-sql join optimization
Map-side-join association optimization
6. diskDisk IO optimization
7. SerializationKryo Serialization
8. Data localizationSpark does not represent the local performance of data under Cluster Manager
Spark read hdfs data locality exception
9. CodeWrite several optimization points for the Spark program