Key Techniques in Data Engineering with Pyspark: Data Transformations
Pyspark is a popular open-source big data processing framework used for big data processing, machine learning, and graph processing. This blog post provides a comprehensive guide to the Pyspark dataframe operations, starting from basic data frame manipulations to advanced concepts like UDFs and Partitioning.
Creating / Updating existing columns using with column():
WithColumn() is a convenient method to create a new column or update an existing column in a PySpark DataFrame. The method takes two parameters, the first one is the name of the new column and the second one is an expression in the form of a user-defined function that calculates the values for the new column.
Example:
from pyspark.sql.functions import *
df = spark.createDataFrame([(1, "John", 25), (2, "Jane", 30), (3, "Jim", 35)], ["id", "name", "age"])
df = df.withColumn("salary", lit(5000))
df.show()
Renaming columns using withColumnRenamed():
WithColumnRenamed() is used to change the name of a column in a PySpark DataFrame. The method takes two parameters, the first one is the old name of the column, and the second one is the new name of the column.
Example:
df = df.withColumnRenamed("age", "years_old")
df.show()
Text and Date transformations(substring, initcap, date_format, to_date etc):
PySpark provides a lot of functions to perform text and date transformations on dataframes. Some of the commonly used functions are:
Example:
df = df.withColumn("first_name", substring(col("name"), 1, 4))
df = df.withColumn("capitalized_name", initcap(col("name")))
df = df.withColumn("birth_date", to_date(col("birthdate"), "yyyy-MM-dd"))
df = df.withColumn("formatted_date", date_format(col("birth_date"), "dd-MMM-yyyy"))
df.show()
Filtering dataframes using boolean operators:
dataframes can be filtered based on the values of one or multiple columns using boolean operators like "==", ">=", "<=", etc.
Example:
df = df.filter(col("age") >= 30)
df.show()
Sorting dataframes using asc() and desc():
Dataframes can be sorted in ascending or descending order based on one or multiple columns using the asc() and desc() functions.
Example:
df = df.sort(asc("age")) df.show()
Handling null values (using coalesce):
PySpark provides the coalesce function to handle null values. The coalesce function takes a list of column names, and returns the first non-null value in the list of columns.
Example:
Recommended by LinkedIn
df = df.withColumn("coalesced_column", coalesce(col("column1"), col("column2"), col("column3"))
df.show()
Dropping single and multiple columns:
PySpark provides the drop method to drop one or multiple columns from a dataframe. The drop method takes a list of column names as input and returns a new dataframe with the specified columns removed.
Example:
df = df.drop("column1", "column2") df.show()
Grouping aggregations using agg():
Aggregations can be performed on a dataframe by grouping the data based on one or multiple columns. The agg method takes a list of aggregate functions and returns a new dataframe with the aggregated values.
Example:
from pyspark.sql.functions import avg
df = df.groupBy("column1").agg(avg("column2").alias("average_column2"))
df.show()
Dataframe joins:
Dataframes can be joined on one or multiple columns to combine the data from multiple dataframes into a single dataframe.
Example:
df1 = spark.createDataFrame([(1, "John", 25), (2, "Jane", 30), (3, "Jim", 35)], ["id", "name", "age"])
df2 = spark.createDataFrame([(1, 5000), (2, 6000), (3, 7000)], ["id", "salary"])
df = df1.join(df2, "id")
df.show()
Reading and Writing dataframes from a storage location:
Dataframes can be read and written to a storage location like HDFS, S3, or local file systems. The read method takes the format and path of the storage location as input and returns a dataframe. The write method takes the dataframe and the path of the storage location as input and writes the dataframe to the specified location.
Example:
df.write.format("parquet").mode("overwrite").save("/tmp/dataframe.parquet")
df = spark.read.format("parquet").load("/tmp/dataframe.parquet")
UDFs (to develop custom logic for data transformations):
PySpark provides the ability to create custom functions using UDFs (User-Defined Functions). UDFs can be used to perform complex data transformations that cannot be achieved using built-in functions.
Example:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
def square(x):
return x * x
square_udf = udf(square, IntegerType())
df = df.withColumn("squared_column", square_udf(col("column1")))
df.show()
Partitioning data frames:
Dataframes can be partitioned to improve the performance of data processing by dividing the data into smaller chunks. Partitioning can be performed based on one or multiple columns.
Example:
df.write.format("parquet").mode("overwrite").partitionBy("column1").save("/tmp/partitioned_dataframe.parquet")
df = spark.read.format("parquet").load
Data Engineer at ECS Federal LLC | Active DoD Top Secret / SCI
1yThank you for sharing 🙏
Lead Data engineer | Data science | big data | spark | python | sql | databricks | ADF | Azure synapse |
2yVishwajeet Dabholkar spark internals is also there in Ananya Nayak’ post as 13th point
Senior Data Engineer @ Onix | Ex Deloitte, Bosch | Microsoft Certified: Fabric Analytics Engineer Associate | Microsoft Certified: Azure Data Engineer Associate | Databricks Certified: Databricks Data Engineer Associate
2yGreat work. Thanks for elaborating them 🙂
Data Engineer | 4x DataBricks | PySpark | PL/SQL | Python
2yWorthy reading 🤗