Just Enough Spark! Core Concepts Revisited !!
Apache Spark™ is a unified analytics engine for large-scale data processing. Its a lightning-fast engine for big data and machine learning. The largest open source project in data processing. It works seamlessly on almost all open source big data technologies.
Many of us already know about Spark but we often get confused with some of the key terms associated with it. In this article, we will quickly visit all the "must know" key concepts / terms associated with Apache Spark.
Spark Basic Architecture
A cluster, or group of machines, pools the resources of many machines together allowing us to use all the cumulative resources as if they were one. Now a group of machines sitting somewhere alone is not powerful, you need a framework to coordinate work across them. Spark is a tailor-made engine exactly for this, managing and coordinating the execution of tasks on data across a cluster of computers.
The cluster of machines that Spark will leverage to execute tasks will be managed by a cluster manager like Spark’s Standalone cluster manager, YARN - Yet Another Resource Negotiator, Kubernetes. We then submit Spark Applications to these cluster managers which will grant resources to our application so that we can complete our work.
Spark Applications consist of a driver process and a set of executor processes. In the illustration we see above, our driver is on the left and four executors on the right.
What is a JVM?
The JVM manages system memory and provides a portable execution environment for Java-based applications
Technical definition: The JVM is the specification for a software program that executes code and provides the runtime environment for that code.
Everyday definition: The JVM is how we run our Java programs. We configure the JVM's settings and then rely on it to manage program resources during execution.
The Java Virtual Machine (JVM) is a program whose purpose is to execute other programs.
The JVM has two primary functions:
- To allow Java programs to run on any device or operating system (known as the "Write once, run anywhere" principle)
- To manage and optimize program memory
What is a Driver?
The driver process runs your main() functions , sits on the node in the cluster and is responsible for 3 main things:
- Maintaining information about the spark application. Its a heart of a spark application and maintains all the I information during the lifetime of the application.
- Responding to user’s program or input.
- Analyzing, distributing and scheduling work across the executors.
What is a Executor?
The executors are responsible for carrying out the work that the driver assigns them.
- Execute code assigned to it by the driver.
- Reporting the state of the computation on that executor back to driver.
What is Cores/Slots/Threads?
Spark parallelizes at two levels. One is the splitting the work among executors. The other is the slot. Each executor has a number of slots. Each slot can be assigned a Task.
For example: the diagram below is showing 2 Core Executor nodes:
- The JVM is naturally multithreaded, but a single JVM, such as our Driver, has a finite upper limit.
- By creating Tasks, the Driver can assign units of work to Slots on each Executor for parallel execution.
- Additionally, the Driver must also decide how to partition the data so that it can be distributed for parallel processing (see below).
- Consequently, the Driver is assigning a Partition of data to each task - in this way each Task knows which piece of data it is to process.
- Once started, each Task will fetch from the original data source (e.g. An Azure Storage Account) the Partition of data assigned to it.
You can set the number of task slots to a value two or three times (i.e. to a multiple of) the number of CPU cores. Although these task slots are often referred to as CPU cores in Spark, they’re implemented as threads that work on a physical core's thread and don’t need to correspond to the number of physical CPU cores on the machine (since different CPU manufacturer's can architect multi-threaded chips differently).
In other words:
- All processors of today have multiple cores (e.g. 1 CPU = 8 Cores)
- Most processors of today are multi-threaded (e.g. 1 Core = 2 Threads, 8 cores = 16 Threads)
- A Spark Task runs on a Slot. 1 Thread is capable of doing 1 Task at a time. To make use of all our threads on the CPU, we cleverly assign the number of Slots to correspond to a multiple of the number of Cores (which translates to multiple Threads).
- By doing this, after the Driver breaks down a given command into Tasks and Partitions, which are tailor-made to fit our particular Cluster Configuration (say 4 nodes - 1 driver and 3 executors, 8 cores per node, 2 threads per core). By using our Clusters at maximum efficiency like this (utilizing all available threads), we can get our massive command executed as fast as possible (given our Cluster in this case, 3*8*2 Threads --> 48 Tasks, 48 Partitions - i.e. 1 Partition per Task)
- If we don't do then even with a 100 executor cluster, the entire burden would go to 1 executor, and the other 99 will be sitting idle - i.e. slow execution.
- if we foolishly assign 49 Tasks and 49 Partitions, the first pass would execute 48 Tasks in parallel across the executors cores (say in 10 minutes), then that 1 remaining Task in the next pass will execute on 1 core for another 10 minutes, while the rest of our 47 cores are sitting idle - meaning the whole job will take double the time at 20 minutes. This is obviously an inefficient use of our available resources, and could rather be fixed by setting the number of tasks/partitions to a multiple of the number of cores we have (in this setup - 48, 96 etc).
What is a Partition?
In order to allow every executor to perform work in parallel, Spark breaks up the data into chunks, called partitions.
A partition is a collection of rows that sit on one physical machine in our cluster. A DataFrame’s partitions represent how the data is physically distributed across your cluster of machines during execution:
- If you have one partition, Spark will only have a parallelism of one, even if you have thousands of executors.
- If you have many partitions, but only one executor, Spark will still only have a parallelism of one because there is only one computation resource.
An important thing to note is that with DataFrames, we do not (for the most part) manipulate partitions manually (on an individual basis). We simply specify high level transformations of data in the physical partitions and Spark determines how this work will actually execute on the cluster.
What is a DAG?
Directed Acyclic Graph ( DAG ) in Apache Spark is a set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDDs.
DAGScheduler is the scheduling layer of Apache Spark that implements stage-oriented scheduling. It transforms a logical execution plan to a physical execution plan (using stages).
After an action has been called, SparkContext hands over a logical plan to DAGScheduler that it in turn translates to a set of stages that are submitted as a set of tasks for execution.
The fundamental concepts of DAGScheduler are jobs and stages that it tracks through internal registries and counters.
What is a Job?
A Job is a sequence of stages, triggered by an action such as count(), collect(), read() or write().
- Each parallelized action is referred to as a Job.
- The results of each Job (parallelized/distributed action) is returned to the Driver from the Executor.
- Depending on the work required, multiple Jobs will be required.
What is a Stage?
Each job that gets divided into smaller sets of tasks is a stage.
A Stage is a sequence of Tasks that can all be run together - i.e. in parallel - without a shuffle. For example: using ".read" to read a file from disk, then runnning ".filter" can be done without a shuffle, so it can fit in a single stage. The number of Tasks in a Stage also depends upon the number of Partitions your datasets have.
What is a Task?
A task is a unit of work that is sent to the executor. Each stage has some tasks, one task per partition. The same task is done over different partitions of the RDD.
In the example of Stages above, each Step is a Task.
What is a Caching?
In applications that reuse the same datasets over and over, one of the most useful optimizations is caching. Caching will place a DataFrame or table into temporary storage across the executors in your cluster and make subsequent reads faster.
What is a Shuffling?
A Shuffle refers to an operation where data is re-partitioned across a Cluster - i.e. when data needs to move between executors.
join and any operation that ends with ByKey will trigger a Shuffle. It is a costly operation because a lot of data can be sent via the network.
For example, to group by color, it will serve us best if...
- All the reds are in one partitions
- All the blues are in a second partition
- All the greens are in a third
From there we can easily sum/count/average all of the reds, blues, and greens.
What is a Partitioning?
A Partition is a logical chunk of your DataFrame
Data is split into Partitions so that each Executor can operate on a single part, enabling parallelization.
It can be processed by a single Executor core/thread.
For example: If you have 4 data partitions and you have 4 executor cores/threads, you can process everything in parallel, in a single pass.
Transformations Vs Actions
Transformations
In Spark, the core data structures are immutable meaning they cannot be changed once created. In order to "change" a DataFrame you will have to instruct Spark how you would like to modify the DataFrame you have into the one that you want. These instructions are called transformations. Let’s perform a simple transformation to find all even numbers in our currentDataFrame. Examples – Select, Filter, GroupBy, Join, Union, Partition etc
Actions
Transformations allow us to build up our logical transformation plan. To trigger the computation, we run an action. An action instructs Spark to compute a result from a series of transformations. The simplest action is count which gives us the total number of records in the DataFrame.
Narrow Transformations Vs Wide Transformations
There are two types of transformations: Narrow and Wide.
For narrow transformations, the data required to compute the records in a single partition reside in at most one partition of the parent dataset.
Examples include:
- filter(..)
- drop(..)
- coalesce()
For wide transformations, the data required to compute the records in a single partition may reside in many partitions of the parent dataset.
Examples include:
- distinct()
- groupBy(..).sum()
- repartition(n)
Remember, spark partitions are collections of rows that sit on physical machines in the cluster. Narrow transformations mean that work can be computed and reported back to the executor without changing the way data is partitioned over the system. Wide transformations require that data be redistributed over the system. This is called a shuffle.
Shuffles are triggered when data needs to move between executors.
Conclusion
So, We have quickly revisited all the key concepts / terms. From the developer perspective I believe these are the concepts we need to understand atleast at the basic level. Do let me know in case you have any questions on it. Thank you for taking the time and reading it. I hope you found yourself informed after reading this. Please click the "like" button if you like the content.
Credits: Azure Databricks Documentation
Data Engineer at Rabobank
1yNice post,Deepak. so while giving parameter --executor-core 8 does that refers to thread or core as in spark terminlogy its always core. and we dont have that much granularity
Full Stack Developer
1yThanks Deepak.. Have some queries!!! For my application, I'm using Spark 3.4, but I was unable to properly tune the spark. Suppose we have an 8-core CPU and 32GB of RAM. Can we allocate 8 cores to the executor and driver, or do we have to divide the 8 cores between the driver and executor? I noticed that spark executor and spark executor instances can be dynamically allocated to improve performance, and however changing any value in that parameter has no effect. I can only see that Spark is currently only using one executor when I continue to look at the SPARK UI Upon reviewing the web, it states that "each data file should hover around the size of 128Mb, which is the default partition size when reading a file [1]." My question is if the partition should calculate the value dynamically using the source data. The partition shouldn't be too large, since occasionally my client runs with 1000 records. I'm looking for some guidance here. Perhaps due to my lack of experience, I am asking this question: does it matter if my application uses Spark for running tasks where the commands are: deploy mode is client, master is in local, and so on?
Great article, thank you for sharing!
Senior Data Engineer at Coforge | MBA in Data Engineering, Gold Medallist 🏅| Python | SQL | Azure | PySpark | Snowflake | Airflow | | Machine Learning | AI
1yGreat explained. Like the way to simplify the topic.
Data Engineer | Analyst | Python, SQL & Spark | 2x Azure Certified | 4x Databricks Certified
1yvery useful content 🙂