Article Directory
- 1. Data format
- 1. Object
- 2. Collection Type
- 3. String
- 2. Operator optimization
- 1. reduceByKey/aggregateByKey replaces Group By
- 2. repartitionAndSortWithinPartitions instead repartition + sortByKey
- 3. mapPartitions instead of map
- 4. foreachPartitions instead of foreach
- 5. Use filter to perform coalesce operation
- optimization
- 1. Remove useless data
- 2. Broadcast big data, mapJoin replaces reduceJoin
- semi join replace in / left join
- 4. Use >= &
- Previous Repartition
- 4. Data tilt optimization
- 1. Positioning data tilt
- 2. The principle of data tilt
- 1. Filter useless data
- 2. Improve shuffle parallelism
- 3. Local aggregation + global aggregation
- Replace reduceJoin
- 5. Random sampling split Join + mapJoin
- 5. Parameter tuning
- 1. Adjust the number of executors
- 2. Adjust the executor memory
- 3. Adjust the number of executor cores
- 4. Adjust the driver process memory
- 5. Adjust shuffle and cache ratio
- 6. Adjust the RDD cache ratio
- 7. Replace GC parameters
- 8. Adjust the heartbeat time
- 9. Set the retry and wait interval
- 10. Set the retry waiting interval
- 11. Set the number of tasks in each stage
- optimization
- 1. Reduce the select level
- 2. Double group by alternative distinct
- 7. Other optimizations
- 1. Reused RDD/DataFrame data priority cache
- 2. Unpersist data that is no longer used
- 1. Clear a DataFrame/RDD
- 2. Clear all cache cache RDDs
- 3. ClearCache drops the no longer used table
1. Data format
- Generally, when running Spark, you will run some data with a particularly large amount of data. In order to ensure that the memory is rationalized and used intact, it is recommended to use less encapsulated data structures.
1. Object
Try to use objects as little as possible, because each object has additional information such as object headers and references, which occupies a lot of memory space.
2. Collection Type
Try to use HashMap, LinkedList, etc. These types will use some internal classes to encapsulate collection elements, such as: .
- HashMap can be converted into String for storage, such as: Map("age":10, "name": "Xiao Ming", "sex": "Male") -> "10_Xiao Ming_Male", which can greatly reduce memory usage
- Using arrays to replace collection types can also reduce memory usage, reduce GC frequency, and reduce the number of Full GCs.
3. String
Try to use strings as little as possible. Although strings run very quickly, they have additional information such as character arrays and lengths inside them, so they are also relatively memory-consuming.
Priority is given to using primitive data types to replace strings, such as: Int, Long, etc.
- However, the above data types cannot be avoided in daily code, so we can only try to avoid them and improve the running speed of the code.
2. Operator optimization
1. reduceByKey/aggregateByKey replaces Group By
- When I was running a task before, there was a DataFrame for Group By operation, but it was stuck 99% of the Task every time. I changed this operation to RDD and used the aggregateByKey operator to perfectly solve this problem.
- Because reduceByKey and aggregateByKey will perform pre-aggregateByKey, a combination is performed first, reducing the amount of data and calculations during overall aggregation.
If there are any operations such as String splicing into Array[String] in the code, it is recommended to use it first.aggregateByKeyOperator, this operator is more flexible and more memory-saving
- The reason why reduceByKey is not recommended is to reduce memory usage. reduceByKey can only aggregate the same data structure, that is, the data structure needs to be converted to Array[String]
- If you use reduceByKey, you need to change the structure of each piece of data to Array[String] at mapPartitions, increasing the memory consumption and GC number.
- Use aggregateByKey to be more flexible. First convert String to Array[String] during pre-aggregation to reduce memory usage.
For details, see:A brief discussion on Spark groupBy, reduceByKey and aggregateByKey to solve frequent Full GC problems
2. repartitionAndSortWithinPartitions instead repartition + sortByKey
The repartition + sortByKey performed shuffles twice, which was slower, while the repartitionAndSortWithinPartitions operator only performed shuffles once, which was much faster than the above operation. However, there were few instances on the Internet before, and this operator was thought about for a long time, and it was considered a more complex operator in spark.
For details, see:A brief discussion on Spark repartitionAndSortWithinPartitions
3. mapPartitions instead of map
- Use mapPartitions to traverse the data in the partition. Each time the function will process all the data in a partition, avoiding the GC problem of frequent map operations. Unlike the map operator, which processes one data at a time, it improves performance.
- However, when the amount of data in the partition is particularly large, there may be OOM (memory overflow) problem. At this time, the memory is insufficient and too many objects cannot be processed during the garbage collection period.
If the reason for this happening is caused by uneven data volumes of each partition, it can be solved through the repartition operation.
If there are hashpartitions and other operations in the previous step, you can avoid repartitions and just need to modify the logic of repartitions in the partition.
For code examples, please refer to:Record and resolve Hash conflicts in HashMap and HashPartition
3. When you need to connect to mysql, redis and other databases, mapPartitions are preferred. You only need to connect once in each partition, that is, create a database connection, which avoids the situation where each piece of data is repeatedly connected to the database, and also reduces the pressure on the database.
4. If it is the simplest key, value swap and other operations, it is not recommended to use the mapPartitions operator, which increases the calculation amount. Just use the map operator directly.
4. foreachPartitions instead of foreach
Similar to mapPartitions instead of map, the difference is that foreachPartitions and foreach have no return value, but mapPartitions and map have return value.
5. Use filter to perform coalesce operation
- When filtering more data (more than 20%), it is recommended to follow the filter with the coalesce operator to reduce the number of partitions and distribute the data of each partition to fewer partitions. Because the amount of data for each partition after filtering is not much, using too many partitions will increase the running time. The number of partitions is not as large as possible, and it is also positively correlated with the amount of data.
- When the number of partitions in coalesce is less than the previous number of partitions, shuffle does not occur; when the number of partitions is greater than the number of partitions before coalesce, shuffle occurs.
- Repartition will definitely happen shuffle, which is equal to coalesce(shuffle=true)
optimization
1. Remove useless data
Before joining, unwanted data will be eliminated, otherwise the amount of shuffle data will only be increased
2. Broadcast big data, mapJoin replaces reduceJoin
If two large tables join, and the smaller table does not exceed 1G, you can BroadCast the smaller table and convert the reduceJoin to mapJoin, avoiding Shuffle and improving the join efficiency.
If the small table joins the large table, you can also directly broadcast the small table, avoiding Shuffle.
The broadcast is actually a remote pulling of a piece of data from the Driver or other Executor node and putting it into the local Executor memory.
In this way, each Executor will only retain a copy of the broadcast variable in the memory, and there will be no need to go to Shuffle later
val accSkewBroadCast: Broadcast[Dataset[Row]] = (accSkewDf)
val broadCastValue =
If you want to broadcast the table broadcastCast, you can try the following broadcast method:
("broadcast_table")
("broadcast_table")
semi join replace in / left join
- When joining, only the data in the left table is needed and no right table data is needed, you can use left semi join instead of IN or Left Join operations.
- Left Semi Join (left half connection) is more efficient than In/Exists subquery, and the effect is equivalent to Innter Join
- When encountering duplicate records in the right table, the left table will be skipped directly, and the performance will be higher, but Left Join will keep traversing.
- But when selectOnly the columns in the left table, because only Join Key participates in the correlation calculation in the right table.
4. Use >= & <= to replace between and (both left and right)
When joining or fetching data, the underlying layer will convert the between and operation into >= & <=, and then filter it. Using >= & <= directly can improve efficiency.
Previous Repartition
If you feel that the data is tilted (the data distribution is uneven, and many data are entered into a partition), it is recommended to perform a Repartition operation before joining and increase the number of partitions. This can break up the data in the partition again, avoiding the situation where too much data in a certain partition causes a long time to card.
4. Data tilt optimization
1. Positioning data tilt
- When we ran Spark tasks, we found that most tasks were executed very fast, but some tasks were executed very slowly. For example, there were 3,000 tasks in total, and 2,995 tasks were executed within a few minutes, but the remaining 5 tasks took nearly 1 hour or even 2-3 hours to execute.
- Data skew will only occur in the shuffle process, so we need to locate operators that use shuffle, such as: distinct, groupByKey, reduceByKey, aggregateByKey, join, cogroup, repartition, etc.
2. The principle of data tilt
- When spark is shuffle, shuffle read needs to pull the result data of the shuffle write in the previous step to a task of the node for further processing, such as Join, group by, etc. If one or several keys correspond to a corresponding data volume of millions/10 million, while the other keys have only a few dozen or hundreds of data volumes, data tilt will occur. Because the partition is a hash partition, and the same keys are all hit into the same partition, resulting in a large amount of data in one or several partitions. This causes most of the above tasks to be completed in a few minutes, but a certain/several tasks to be run for several hours.
- The entire job progress of spark is determined by the task with the longest running time, which makes us feel that the entire spark task is running very slowly and the running time is too long.
1. Filter useless data
If you find that the keys with data skewed do not have a big impact on the business, you can consider filtering out these keys directly, so as to avoid data skewed. However, this operation is not usually allowed.
2. Improve shuffle parallelism
The simplest and most crude way is to set the parallelism degree when we use the shuffle operator. Increasing the parallelism degree will distribute the data in each partition to multiple partitions, reducing the pressure of the tilted data partition, and the speed will increase.
- For example: , this affects the parallelism degree when join and group by, indicating the parallelism degree of shuffle read task. The default is 200, which can be set to 1000, 2000, etc. It is recommended to set to 2-3 times that of numExetutors * core.
- For example: reduceByKey(1000), repartition(1000), aggregateByKey(1000), etc., this is also the shuffle read task parallelism.
When the default parallelism degree was 200, each task needed to process 100w pieces of data. When the parallelism degree was set to 1000, each task only needed to process 100w / (1000 / 200) = 20w pieces of data. The task time will be shortened.
3. Local aggregation + global aggregation
Usually used to handle group aggregation of reduceByKey or group by in SQL
- First, add a random number (usually within 10) prefix for each key, and the key of the data will be broken up.
- Then perform reduceByKey and other operations to aggregate data with the same prefix.
- Then remove the prefix and perform aggregation operation.
This is generally for the aggregated shuffle operation
Replace reduceJoin
It is mentioned above, and will not be discussed here
5. Random sampling split Join + mapJoin
Assumption: leftRDD Left Join rightRDD
- The leftRDD that needs to be split is sampled through the sample operator, count the number of each key, and calculate which keys have the largest data volume.
("key","view_name", "view_value", "valid_feaids")
// Data sampling, 0.1 means 10% sampling, can be customized
.sample(false, 0.1)
.rdd
.map(k => (k, 1))
// Statistics the number of times the key appears
.reduceBykey(_ + _)
// Filter out keys with a number greater than the specified number, which can be customized
.filter(_._2 >= 20000)
// Sort by the number of occurrences of key
.map(k => (k._2, k._1))
// false is reverse order, true is order order
.sortByKey(false)
// Take the first N
.take(1000)
- Then disassemble these keys from leftRDD to form a separate leftSkewRDD, which will not be tilted as leftUnSkewRDD
- The keys of rightRDD are also separated to form rightSkewRDD, and the non-tilt one is rightUnSkewRDD
- Broadcast leftSkewRDD, and make reduceJoin mapJoin, leftSkewRDD join with rightSkewRDD, and get the result skewedJoinRDD.
- leftUnSkewRDD and rightUnSketRDD join, get unSkewedJoinRDD
- unSkewedJoinRDD union skewedJoinRDD
5. Parameter tuning
1. Adjust the number of executors
How many executors are applied for for each spark task to execute. This depends on the resource situation. If the resources are sufficient and the task data is large, more can be set, otherwise it can be reduced as appropriate.
num-executors: 100
2. Adjust the executor memory
The memory of each executor process directly determines the speed of our spark task. If OOM appears in the usual code, you need to see if the executor memory is set too small.
executor-memory: 10G
3. Adjust the number of executor cores
Set the number of CPU cores for each executor process. Each core can only execute one task process at the same time. The more this parameter is set, the faster it can execute all tasks. Generally, 2-3 are recommended.
executor-cores
4. Adjust the driver process memory
The default value of the driver-side process is 1g, which is usually sufficient. If you use the collect operator, the driver-side memory needs to be large enough, otherwise OOM will occur. It is recommended to increase the memory at this time.
driver-memory
5. Adjust shuffle and cache ratio
In Executor memory, the memory ratio allocated to shuffle read task for aggregation operation is 20%, which can appropriately increase the shuffle calculation memory ratio, such as adjusting to 70%.
"": "0.7"
6. Adjust the RDD cache ratio
The persistent data of spark task heavy RDD occupies 60% of the Executor memory. When the data is large, it will overflow the write to the disk. If there are more RDDs in the spark task that need to be persisted, it is recommended to increase this parameter to avoid the situation where data can only be written to the disk when there is insufficient memory. If there is no or if the job is frequent, or if it is found that GC or the running is slow, you can appropriately reduce this ratio.
"": "0.3"
7. Replace GC parameters
"": "-XX:+UseG1GC -XX:ParallelGCThreads=3"
8. Adjust the heartbeat time
When the data volume is too large, the executor load pressure is relatively high, and communication sometimes has problems. There will be the following problems.
: Futures timed out after [300 seconds]
Executor heartbeat timed out after xxx ms
Because the default interaction time in spark is 120s, errors are often reported, so the network interaction time needs to be improved.
"": "3000000"
"": "1200000"
"": "10000000"
9. Set the retry and wait interval
The number of retry times after the stage fails in spark is 3. The default value is 3. You can increase the number of times appropriately. Avoid the problem of data failure caused by FULL GC, network instability, etc.
10. Set the retry waiting interval
The default waiting time for retry after failure is: 5s, which can be set to 60s. Increase shuffle stability
11. Set the number of tasks in each stage
This parameter is very important, and if it is not set, it will generally affect the performance of the task.
This is generally determined based on the amount of data. If the amount of data is too large, it is recommended to set more, such as 5000. If the amount of data is not large, it can be appropriately reduced, such as 500~1000.
Generally, it is recommended to set this parameter to num-executors * 2~3 times the executor-cores is more appropriate
For example: executor is: 200, and each executor core is 2, then it is reasonable to set the task number to 1000.
: 5000
optimization
1. Reduce the select level
Try to minimize the number of selects. If it can be solved at once, don’t select multiple times. Each time the select will traverse the subset data, which consumes a lot of resources and memory.
If necessary, it is recommended to use SQL to replace dataframe operations, which looks more intuitive and often avoid various complex operations of dataframes.
2. Double group by alternative distinct
When using distinct, it often occurs because distinct only needs to find different values, it will read all data records, and then use a global reduce task to be heavy, which is very easy to cause data skew.
Group by has operations such as group aggregation operations. It does much more than distinct. There will be multiple reduce tasks in parallel. Each reduce processes part of the data and then performs aggregation operations. The efficiency is much higher than distinct.
distinct:
select count(distinct ) uv,name,age
from A
group by name,age
group by:
select count(uid) uv,name,age
from (
select uid,name,age
from A
group by uid,name,age
) a
group by name,age
7. Other optimizations
1. Reused RDD/DataFrame data priority cache
If a certain RDD or DataFrame is reused in the code, you can cache it. Otherwise, every time you use this RDD/DataFrame, Spark will calculate this RDD/DataFrame from scratch every time, which is very resource-consuming and time-consuming.
Afterwards, the cache must be followed by the action operator (count, etc.), otherwise the cache will be invalid. Then continue other operations.
val cachedDF = ()
()
(xxx)
(xxx)
The above operation uses cache's DataFrame, and there is no need to calculate testDF from scratch every time.
2. Unpersist data that is no longer used
- When not using a certain RDD/DataFrame or Table, it must be cleaned up in time, otherwise it will take up memory/disk. If it is cache, you need to pay special attention, as this takes up memory space and needs to be cleared in time.
1. Clear a DataFrame/RDD
(true)
(true)
()
2. Clear all cache cache RDDs
()
def unpersistRdds(sc: SparkContext): Unit = {
// Obtain all persisted RDDs and perform specified release
val rdds =
(_._2.name != null)
.filter(_._2.("rdd"))
.foreach(_._2.unpersist())
}
3. ClearCache drops the no longer used table
("temp")