Identify corrupted records in a dataset using pyspark
Last Updated :
07 Apr, 2025
There can be datasets that may contain corrupt records. Those records don't follow data-specific rules that are followed by correct records e.g., a corrupt record may have been delimited with a pipe ("|") character but the rest of other records are delimited by comma (","), and it is mentioned to read data from that file with comma separator.
This article demonstrates three different ways to identify corrupt records and get rid of corrupt records:
- PERMISSIVE
- DROPMALFORMED
- FAILFAST
Let's discuss all the modes one by one with examples but before that make sure to set up the virtual environment. We are going to use google colab, here's how to set it up:
Step 1: Install Java and Spark in your Colab environment:
Python
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark
Step 2: Set up environment variables:
Python
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"
Step 3: Initialize Spark:
Python
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("CorruptedRecords").getOrCreate()
Now that we have set up the colab environment, let's explore the working of all three modes:
To download the csv file used in this article, click here.
PERMISSIVE
It is the default mode. In “Permissive” Mode, “NULLs” are inserted for the Fields that could Not be Parsed correctly. If you want to retain bad records in dataframe , Use "columnNameOfCorruptRecord" option to identify bad records.
Example
This PySpark code reads a CSV file, identifies corrupted records, and counts the total number of records. It sets a schema for the data, reads the CSV with specified options (including handling corrupted records), filters and displays the corrupted records, and provides the total record count.
Python
from pyspark.sql.functions import col
customers_schema = '''
customer_id INT,
customer_fname STRING,
customer_lname STRING,
customer_email STRING,
customer_password STRING,
customer_street STRING,
customer_city STRING,
customer_state STRING,
customer_zipcode INT,
_corrupt_record STRING
'''
customers_df = spark.read.schema(customers_schema).format("csv").options(
header=True,
delimiter="|",
mode="PERMISSIVE",
columnNameOfCorruptRecord="_corrupt_record"
).load("/content/corrupted_customer_details.csv")
customers_df.filter(col("_corrupt_record").isNotNull()).show()
print(f"Total number of records while reading in PERMISSIVE mode: {customers_df.count()}")
In the above code "_corrupt_record" column is used to store the corrupted records.
Output:
Permissive ModeDROPMALFORMED
This mode is used to drop corrupted records while trying to read from a given dataset.
Example
This PySpark code reads a CSV file and drops any malformed or corrupted records. It sets a schema for the data, reads the CSV with specified options (including dropping malformed records), displays the cleaned dataset, and provides the total record count.
Python
from pyspark.sql.functions import col
customers_schema = '''
customer_id INT,
customer_fname STRING,
customer_lname STRING,
customer_email STRING,
customer_password STRING,
customer_street STRING,
customer_city STRING,
customer_state STRING,
customer_zipcode INT,
_corrupt_record STRING
'''
customers_df = spark.read.schema(customers_schema).format("csv").options(
header=True,
delimiter=",", # Ensure this matches your CSV
mode="DROPMALFORMED" # Drop corrupt rows
).load("/content/corrupted_customer_details.csv")
customers_df.show()
print(f"Total number of records after DROPMALFORMED mode: {customers_df.count()}")
In the output you will find 9 records. But 'DROPMALFORMED' is not going to change total number of records in 'customers_df'.
Output:
Dropmalformed ModeFAILFAST
This mode will throw error if malformed records are detected while trying to read from a given dataset.
Example
This PySpark code reads a CSV file in "FAILFAST" mode, which means it will fail and raise an exception if it encounters any malformed records that do not adhere to the specified schema. It sets a schema for the data, reads the CSV with the specified options, displays the dataset, and provides the total record count. If any malformed records are encountered, it will raise an exception and print the error message.
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import getpass
username = getpass.getuser()
spark = SparkSession.builder.appName(
"Read data in FAILFAST mode").getOrCreate()
customers_schema = '''
customer_id int,
customer_fname string,
customer_lname string,
customer_email string,
customer_password string,
customer_street string,
customer_city string,
customer_state string,
customer_zipcode int
'''
try:
customers_df = spark.read.schema(
customers_schema).format("csv").options(header = True,
delimiter = "|", mode = "FAILFAST").load(f"C:\\Users\\{
username}\\Desktop\\PYSPARK\\source\\corrupted_customer_details.csv")
customers_df.show()
print(f"Total number of records while reading in FAILFAST mode : {
customers_df.count()}")
except Exception as e:
print(e)
Since, there is one corrupt record in the dataset therefore it is going to raise exception. Advantage of FAILFAST mode is it will not allow to proceed with working on a dataset if it contains corrupted records.
Output:
Failfast ModeTo download the jupyter notebook for the entire code, click here.
Similar Reads
How to See Record Count Per Partition in a pySpark DataFrame
The API which was introduced to support Spark and Python language and has features of Scikit-learn and Pandas libraries of Python is known as Pyspark. Whenever we upload any file in the Pyspark, it creates a partition of that data equal to the number of cores. The user can repartition that data and
4 min read
How to Create Delta Table in Databricks Using PySpark
An open-source storage layer called Delta Lake gives data lakes scalability, performance, and dependability. It offers a transactional layer on top of cloud storage and lets you handle massive volumes of data in a data lake. This post will explain how to use PySpark to generate a Delta table in Data
4 min read
How to get distinct rows in dataframe using PySpark?
In this article we are going to get the distinct data from pyspark dataframe in Python, So we are going to create the dataframe using a nested list and get the distinct data. We are going to create a dataframe from pyspark list bypassing the list to the createDataFrame() method from pyspark, then by
2 min read
PySpark Row using on DataFrame and RDD
You can access the rows in the data frame like this: Attribute, dictionary value. Row allows you to create row objects using named arguments. A named argument cannot be omitted to indicate that the value is "none" or does not exist. In this case, you should explicitly set this to None. Subsequent ch
6 min read
Filtering rows based on column values in PySpark dataframe
In this article, we are going to filter the rows based on column values in PySpark dataframe. Creating Dataframe for demonstration:[GFGTABS] Python3 # importing module import spark # importing sparksession from pyspark.sql module from pyspark.sql import SparkSession # creating sparksession and givin
2 min read
Removing duplicate columns after DataFrame join in PySpark
In this article, we will discuss how to remove duplicate columns after a DataFrame join in PySpark. Create the first dataframe for demonstration:[GFGTABS] Python3 # Importing necessary libraries from pyspark.sql import SparkSession # Create a spark session spark = SparkSession.builder.appName('p
3 min read
How to Get substring from a column in PySpark Dataframe ?
In this article, we are going to see how to get the substring from the PySpark Dataframe column and how to create the new column and put the substring in that newly created column. We can get the substring of the column using substring() and substr() function. Syntax: substring(str,pos,len) df.col_n
3 min read
How to check for a substring in a PySpark dataframe ?
In this article, we are going to see how to check for a substring in PySpark dataframe. Substring is a continuous sequence of characters within a larger string size. For example, "learning pyspark" is a substring of "I am learning pyspark from GeeksForGeeks". Let us look at different ways in which w
5 min read
How to create an empty PySpark DataFrame ?
In PySpark, an empty DataFrame is one that contains no data. You might need to create an empty DataFrame for various reasons such as setting up schemas for data processing or initializing structures for later appends. In this article, weâll explore different ways to create an empty PySpark DataFrame
4 min read
Append data to an empty dataframe in PySpark
In this article, we are going to see how to append data to an empty DataFrame in PySpark in the Python programming language. Method 1: Make an empty DataFrame and make a union with a non-empty DataFrame with the same schema The union() function is the most important for this operation. It is used t
5 min read