Spark-based System Performance Optimization -Advanced level

Preface

Following the basic chapter to explain each Spark developers must be familiar with the development of tuning and resource tuning, this article as a "Spark Guide to Optimization," an advanced chapter, in-depth analysis of data skew tuning and shuffle tuning to address more challenge performance issues.


Data Skew tuning

Tuning overview

Sometimes, we may encounter one of the most difficult problems in big data calculations - data skew, where Spark jobs can perform worse than expected. Data Screw tuning is the use of various technical solutions to solve different types of data tilt issues, in order to ensure the performance of Spark jobs.

The phenomenon of data skew occurs

Most tasks execute very quickly, but individual tasks execute extremely slowly. For example, there are 1,000 tasks in total, 997 tasks are executed within one minute, but the remaining two or three tasks take an hour or two. This is very common.

The original Spark job that was normally executed, one day suddenly reported an OOM (memory overflow) exception and observed the exception stack, which was caused by the code we wrote. This is rare.

The principle of data skew

Data skew principle is simple: in the shuffle of time, must be the same key pull on each node to a node on a task to carry out processing, such as integration or the join operation on a key .Now, if you have a large number of key data, you're going to have data skew. For example, most keys correspond to 10 data records, but the individual key corresponds to 1 million data records, so most tasks may only be allocated to 10 records, and then it runs out in a second, but individual tasks can be assigned to a million records, running for an hour or two. Therefore, the running progress of the entire Spark job is determined by the task that spends longest running time.

As a result of the data skew, the Spark job appears to be running very slowly, possibly even because the amount of data being processed by a task causes OOM (memory to overflow).

The following figure is a clear example: hello is the key, which corresponds to a total of seven data pairs on three nodes, which will be pulled to the same task for processing; the two keys of world and you are respectively corresponding to 1 data pair, so the other two tasks can only handle 1 data separately. At this point, the first task may run seven times longer than the other two tasks, and the entire stage is determined by the slowest task.


How to locate the code that causes data skew

Data skew will only occur during the shuffle. Here for everyone to list some commonly used and may trigger shuffle operator: distinct, groupByKey, reduceByKey, aggregateByKey, join, cogroup, repartition and so on. When data is skewed, it's possible that one of these operators was used in your code.

1.      In case a task execution is particularly slow,

The first thing to look at is the first few stages of data skew.

If it is submitted with the yarn-client mode, then the local can directly see the log, the log can be found in the current run to the first stage; if it is submitted with the yarn-cluster mode, you can view the current Spark Web UI for the first few stages. In addition, whether you are using the yarn-client mode or the yarn-cluster mode, we can take a closer look at the amount of data allocated to each task in this stage on the Spark Web UI, to further determine if the data dispatched by the task is leading to data skew.

For example, in the figure below, the last third shows the run time of each task. Obviously you can see that some tasks run very fast, just a few seconds to run finished; and some task run very slow, take a few minutes to run finished, so only from the run-time point of view has been able to determine the occurrence that the data is skewed, In addition, the last column shows the amount of data processed by each task. Obviously, a task with a very short running time needs to process only a few hundred kilobytes of data, while a task with a particularly long running time needs to process several thousand kilobytes Of the data, the amount of data processing difference of 10 times. In this case, it is more possibility to determine that a data skew has occurred.

Knowing which stage causes data skews, then we need to figure out which part of the code occur skewed corresponds to the stage partitioning strategy. There is definitely a shuffle operator in this part of the code. Accurately evaluate the dependency between the stage and the corresponding code, you need to have a deep understanding of the Spark source code, here we can introduce a relatively simple and practical estimation method: As long as you see the Spark code appears in a shuffle class operator or SQL Spark SQL Statement appears in the statement will lead to shuffle (such as group by statement), then you can determine the place to divide the front and back two stages.

Here we take Spark's most basic entry-level program - word count for example, how to use the most simple method roughly estimate a stage corresponding code. The following example, in the entire code, only one reduceByKey shuffle operator occurs, so that you can think of this operator as a boundary, will be divided into two stages before and after.

Stage0, mainly from the implementation of textFile to map operations, as well as the implementation of shuffle, write operation. Speaking of shuffle write operation, we can simply understand the data in pairs RDD partition operation, each task processing data, the same key will be written to the same disk file.

Stage1, mainly from reduceByKey to collect operations, stage1 each task at the beginning of the run, it will first perform shuffle read operation. The implementation of the shuffle read task, each task from stage0 node to pull their own key to process those keys, and then the same key is global aggregation or join operation where is the value of the key value of the cumulative. stage1 calculates the final wordCounts RDD, after executing the reduceByKey operator, executes the collect operator, pulling all the data onto the Driver for us to traverse and print.

val conf = new SparkConf()

     val sc = new SparkContext(conf) 

     val lines = sc.textFile("hdfs://...")

     val words = lines.flatMap(_.split(" "))

     val pairs = words.map((_, 1))

      val wordCounts = pairs.reduceByKey(_ + _)

      wordCounts.collect().foreach(println(_))


Through the analysis of the word counting program, I hope to let everyone know the principle of the most basic stage division, and how the shuffle operation is performed at the boundary between two stages after the stage division. Then we know how to quickly locate which part of the code in the stage where the data is skew corresponds. For example, we found in the Spark Web UI or local log, few tasks in stage1 performed particularly slow so determine the stage1 data skew, you can return to the code to locate Stage1 includes reduceByKey causing this shuffle class operator, At this point the basic can be determined by the reduceByKey operator data skew problem. For example, a word appeared 1000000 times, other words appeared 10 times, then a task of stage1 must deal with 1000000 data pieces, the entire stage speed will be delayed by this task.

2.      A case of OOM (memory overflow) situation

In this case to locate the problem code is relatively easy. We recommend looking directly at the exception stack in the local log in yarn-client mode, or by looking at the exception stack in the log in yarn-cluster mode via YARN. In general, through the exception stack information, you can locate which line of your code has a memory leak. Then look in that line of code, there will generally be shuffle class operator, then it is likely that this operator led to the data skew.

However, we must note that we cannot judge the occurrence of data skew simply by accidental memory overflow. Because of the code with the bug, as well as accidental data exception, may also lead to memory overflow. Therefore, we still have to follow the above method, through the Spark Web UI to view the error of the stage of the task of the running time and the amount of data allocated to determine whether it is due to data skew led to the memory overflow or not.

Check the data distribution of the key that caused the data skew

Knowing where data skew occurrence, you typically need to look at the RDD / Hive tables that perform the shuffle operation and cause the data to be skewed, to see where the key is distributed. This is mainly for the future to choose which kind of technical resolving scenario. For different key distribution and different shuffle operator combination of various situations, may need to choose different solutions to solve.

At this point according to your implementation of the operation is different, there are many ways to view the key distribution:

1)     If the data is skewed by the Spark SQL statement such as group by, join, then check the key distribution of the tables used in SQL.

2)     If the data is skewed by Spark RDD's implementation of the shuffle operator, you can code key distributions counting in the Spark job, such as RDD.countByKey (), and statistics on the number of each key appears, collect/take to the client print it, you can see the key distribution counts.


For example, for above word counting program, if it is determined that the reduceByKey operator of stage1 caused the data to be skewed, and then one should look at the key distribution in the RDD for the reduceByKey operation, in this case, pairs RDD. The following example, we can first sample pairs of 10% sample data, and then use the countByKey operator counts each key occurrences, and finally in the client traversal and print sample data the occurrence of each key, here is the code,

    val sampledPairs = pairs.sample(false, 0.1)

          val sampledWordCounts = sampledPairs.countByKey()

          sampledWordCounts.foreach(println(_))


Data skew solution

Solution 1: Use Hive ETL to preprocess the data


Use case: leading to data skew is Hive table. If the Hive table data itself is very uneven (for example, a key corresponds to 1000000 data, the other key corresponds to 10 pieces of data), and the business scenario requires frequent use Spark to perform an analysis operation on the Hive table, then developer consider using this technical solution.


Solution: You can evaluate whether data can be pre-processed through Hive (that is, Hive ETL pre-aggregates data according to key or pre-joins with other tables), and then in the Spark job the source is not the original Hive table, but the pre-processed Hive table. At this point, because the data has been aggregated or join operations in advance, then Spark operations do not need to use the original shuffle class operator to perform such operations.


Principle: This solution resolves the data skew from the root since there is absolutely no problem with data skew because there is no need to implement shuffle class operators in Spark. However, we have to remind everyone here that this is a temporary solution. Because after all, the data itself is unevenly distributed, data skew still occurs when the Hive ETL performs shuffle operations such as group by or join, which causes the Hive ETL to be slow. We just advanced the data skew to the Hive ETL to avoid data skew in the Spark program.


Pros: Easy to implement, the effect is very good, completely avoiding the data skew, Spark performance will be greatly improved.


Cons: a temporary solution, the Hive ETL data skew occur.


In practical experience: In some projects that are used in conjunction with Spark in a Java system, a scenario where Java code frequently calls a Spark job and has high-performance requirements for Spark jobs is a good fit for this scenario. Hive ETL, which advances data to the upstream Hive ETL, executes only once a day, only the slower one, and then every time Java invokes a Spark job, it executes fast enough to provide a better user experience.

Project practical experience: This program is used in the Interactive User Behavior Analysis System. The system mainly allows users to submit data analysis statistics tasks through the Java Web system. The back end submits Spark jobs through Java for data analysis statistics. Requires Spark jobs must be fast, try to be executed in 10 minutes, otherwise, the speed is too slow, and the user experience will be poor. So we moved some shuffle jobs from Spark jobs to Hive ETL in advance, so that Spark could directly use preprocessed Hive middleware to reduce Spark's shuffle operations as much as possible, greatly improving performance and improving the performance of some jobs. Part of the job performance was improved more than 6 times.


Solution 2: filter the few lead to tilt the key


Use case: If you find the key leading to data skew are few, and the impact on the calculation itself is not large, then it is suitable for the use of such solution. For example, 99% of the key corresponds to 10 data pieces, but only one key corresponds to one million data pieces, resulting in data skew.

Principle: Will result in data skew key to filter out, these keys will not participate in the calculation, it is naturally impossible to produce data skew.

Pros: too simple, but the effect is also very good, you can completely avoid the data skew.

Cons: the Use case is rare. In most cases, the key leading to skew is still a lot, not just a few.

Practical experience: In the project, we also used this program to solve data skew. Once found a Spark operation at a time suddenly OOM at run time, after tracing found that a key in the Hive table abnormal data on that day, resulting in a surge in the amount of data. So take a sample before each execution to calculate the sample data in the largest number of key, the key in the program directly to filter out.

Solution 3: increase parallelism of the shuffle operation

Use case: If we must face the data skew difficulties, it is recommended to give priority to the use of such solution, because this is the easiest way to deal with data tilt.

Principle: When executing shuffle on RDD, pass a parameter to the shuffle operator, such as reduceByKey (1000), which sets the number of shuffles read tasks executed by this shuffle operator. For the Spark, SQL shuffle class statement, such as group by, join, etc., you need to set a parameter, that is, spark.sql.shuffle.partitions, this parameter represents the shuffle read task parallelism, the default value is 200, in many cases it is a bit too small.

Increasing the number of shuffles read tasks allows multiple keys originally assigned to a task to be assigned to multiple tasks, allowing each task to process less data than the original. For example, if there are originally five keys, each corresponding to 10 key data, these five keys are assigned to a task, then the task to deal with 50 data. After adding shuffle read task, each task is assigned to a key, that is, each task to process 10 data, then the execution time of each task will be shorter naturally. The specific workflow as shown below,

Pros: It is relatively simple to implement and can effectively reduce the effects of data skew.

Corn: Only to ease the data skew, not completely resolve the problem, based on practical experience, its effectiveness is limited.

Practical experience: The program usually cannot completely solve the data skew, because if there are some extreme cases, such as a key corresponding to the amount of data of 1 million, then no matter how much your task to increase, the number of this corresponds to 100 million data key will be assigned to a task to deal with, so data skew still occurs. So this solution can only be described to be the first to try when it comes to finding data skew, trying to ease data skew with a simple method or using it in combination with other solutions.


Solution 4: Two-stage aggregation (partial aggregation + global aggregation)

Use case: This solution is suitable for RDDs such as aggregate shuffle operators such as reduceByKey, or group by statement in Spark SQL.

Principle: The core idea of this program is to conduct two-stage aggregation. The first is a partial aggregation, each key is first marked with a random number, such as random numbers within 10, then the same key has become different, such as (hello, 1) (hello, 1) (hello, 1) (hello, 1), it becomes (1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1). Then, the data after random number is hit and the aggregation operation such as reduceByKey is executed to perform partial aggregation. Then, the local aggregation result becomes (1_hello, 2) (2_hello, 2). Then get rid of the prefix of each key, it will become (hello, 2) (hello, 2), the global aggregation operation again, we can get the final result, such as (hello, 4).


Implementation: the same key by adding a random prefix, into a number of different keys, you can distribute multiple tasks to do local aggregation instead originally a single task processing, so solve a single task to process data too much data. Then remove the random prefix, again for global aggregation, you can get the final result. Specific principles see below.

Pros: The shuffle operation for the aggregate class causes data to be skewed, this solution is good. Normally data skew can be overcome, or at least drastically reducing data skew, boosting Spark job performance more than double.

Cons: Shuffle operations that are only applicable to aggregation operation, which has a relatively narrow scope of application. If it is shuffle join class operation, you have to use other solutions.

Source Code:

// first step is to prefix each key in the RDD with a random number.

JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(

new PairFunction<Tuple2<Long,Long>, String, Long>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)

throws Exception {

Random random = new Random();

int prefix = random.nextInt(10);

return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);

 }

});


// step2 is to locally aggregate by the key with prefix number

JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(

new Function2<Long, Long, Long>() {

private static final long serialVersionUID = 1L;

@Override

public Long call(Long v1, Long v2) throws Exception {

return v1 + v2;

 }

});


// Step3 is to remove random prefix in the keys。

JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(

new PairFunction<Tuple2<String,Long>, Long, Long>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)

throws Exception {

long originalKey = Long.valueOf(tuple._1.split("_")[1]);

return new Tuple2<Long, Long>(originalKey, tuple._2);

 }

});

 

// step4 is to global aggregate on the un-prefix keys。

JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(

new Function2<Long, Long, Long>() {

private static final long serialVersionUID = 1L;

@Override

public Long call(Long v1, Long v2) throws Exception {

return v1 + v2;

 }

});

...

(Copyright by author Mophy Huang, please don't forward and share with others, it is not full article, if you want to get more, please ask author Mophy Huang) 


To view or add a comment, sign in

More articles by Murphy Huang

Insights from the community

Others also viewed

Explore topics