Handling data skewness

One of the most common problem in distributed computing system is data skewness. In apache spark, data skewness is because of the transformations that change the partitioning like join, group by and order by.

For example when joining two or more datasets on a key that is not evenly distributed across the nodes of the cluster, causing some data partitions to be very large and not allowing spark to process the data parallel.

And there are couple of solutions available for this problem.

Input: Consider we have two dataframes, dataframe1 which is 1TB in size and another dataframe which is 1GB in Size.

Goal: One of the steps in our datapipine is data enrichment, Idea is to perform some transformations on these 2 dataframes to get the final datasets which in turn used for reporting purpose.

In apache spark we have 3 types of joins broadcast join, sortMerge Join and shuffle join and most commonly used joins are sortMerge join and broadcast join.

Lets consider we have used sortMerge for joining these two dataframes.

val dataframe3 = dataframe1.join(dataframe2, Seq(commonKey), "left outer")

And after you submit the job to spark cluster that consist of above transformation, you might see some of the tasks are running longer than others and that is due to the dataskew caused by the sortMerge join.

Example:

No alt text provided for this image

Due to apache spark's distribution nature, spark spits input data into multiple chunks and puts them on multiple nodes in a cluster so that it can perform the operations in parallel.

When we perform sortMerge join, sortMerge sends all the records with same key to the same partition and it causes lot of data movement(shuffling), due to this nature of sortMerge join we might have un even distribution of the records across the partitions, as some partitions might get more records and some might less.

And due to this, partitions with less number of records are processed faster by executors and partitions with more number of records take more time process and hence increasing overall execution time.

And if you would like to know which partition keys causing data skewness, you can run the below query to understand further.

dataframe1

.select("commonKey)

.groupBy("commonKey")

.agg(count(“commonKey”).as(“recCount”))

.sort(desc(“recCount”))

And in the sortMerge join there are two factors that increases the overall execution time, 

1.data shuffling which is one of the costliest operations in apache spark, as there is huge data movement across the network.

2.dataskewness causing longer processing time for some of the partitions.

As said earlier there are couple of solutions for this problem and let's try to solve the above issue by using using broadcast join.

val dataframe3 = dataframe1.join(broadcast(dataframe2),Seq(commonKey),"left outer"), here I am using broadcast as a hint to apache spark to broadcast the right-side of the join.

Please note that apache spark automatically broadcast one of the dataset, if the size of the dataset is smaller than value of spark.sql.autoBroadcastJoinThreshold and you can disable this feature by setting

spark.sql.autoBroadcastJoinThreshold to -1

And after you submit the job to the spark cluster that consist of above transformation and you see all of sudden overall execution time is reduced drastically and it is due to,

data shuffling is avoided because in broadcast join, entire copy of the smaller dataset is sent to all executors, so that partitions with same key are collocated with each other causing no data movement when performing join.

The question here is, whether such an approach works great for the all the time ? what if the size of right side dataset is 10GB, 1TB etc.

What is the restriction on the size of the broadcasted dataframe ? unfortunately broadcast suffer from the following problems.

1.Scalability. Broadcast join has a hardcoded limit on the size of the broadcasted table - 8GB table size or 512000000 records, making this solution not scalable.

2.High memory foot print. Broadcasted data should fully fit into memory of both driver and executors. 

For Example. when a job has 50 executors and broadcasted dataframe is 1GB and price for using broadcast join is additional 50 * 1GB i.e. 50GB of memory. Also if your broadcast table size is tend to increase you will see the following exception very often and you will need to adjust the spark driver and executors memory frequently.

java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:122)

3.Performance, Obviously broadcast join can be expensive if the size of the input dataset is big and a job needs lot of executors and broadcast time is increasing, may become higher than join itself.

Handling of data skewness depends on many factors like data volume, data variety, cluster configuration and processing needs.so there is no simple recipe for handling data skewness.

lets try to solve the data skewness using key salting technique in the next post.

To view or add a comment, sign in

Insights from the community

Others also viewed

Explore topics