From Minutes to Seconds: Supercharging Python for the Billion Row Challenge
Introduction
As data volumes continue to grow exponentially, data professionals are constantly seeking ways to efficiently process and analyze massive datasets. The One Billion Row Challenge, a fun exploration of how far modern data processing tools can be pushed, presents an opportunity to test the limits of various technologies. In this article, I share my experience tackling this challenge using Python and several popular data processing libraries, including Pandas, Dask, Polars, and DuckDB.
The Challenge
The One Billion Row Challenge involves processing a text file containing temperature measurements from various weather stations. Each row in the file follows the format<station name>;<temperature>, and the goal is to calculate the minimum, mean, and maximum temperature for each unique weather station. The catch? The file contains a staggering one billion rows, amounting to over 10 GB of data!
Approach and Implementations
To tackle this challenge, I implemented solutions using five different approaches: pure Python, Pandas, Dask, Polars, and DuckDB. Let's dive into each implementation and analyze their performance.
1. Pure Python
I started with a straightforward pure Python implementation, reading the file line by line, parsing the data, and maintaining a dictionary to store the statistics for each weather station. This approach took approximately 594.89 seconds (nearly 10 minutes) to process the entire dataset, serving as a baseline for comparison.
import time
from tqdm import tqdm
def read_and_calculate_stats(file_name):
city_stats = {}
with open(file_name, 'r') as file:
for line in tqdm(file, desc="Processing data"):
line = line.strip() # Strip whitespace
if not line or ';' not in line: # Skip empty lines or lines without ;
continue
city, temp = line.split(';', maxsplit=1) # Split only at first ;
try:
temp = float(temp)
except ValueError: # Handle invalid temperature values
print(f"Warning: Invalid temperature value '{temp}' for city '{city}'")
continue
if city in city_stats:
stats = city_stats[city]
stats['count'] += 1
stats['total'] += temp
if temp < stats['min']:
stats['min'] = temp
if temp > stats['max']:
stats['max'] = temp
else:
city_stats[city] = {
'min': temp,
'max': temp,
'total': temp,
'count': 1
}
# Calculate mean from total and count
for city, stats in city_stats.items():
stats['mean'] = stats['total'] / stats['count']
del stats['total'], stats['count'] # Optional: Remove these if no longer needed
return city_stats
# Main execution
start_time = time.time()
city_stats = read_and_calculate_stats('data/measurements.txt')
end_time = time.time()
print('Helsinki',city_stats['Helsinki'])
print('Guatemala City,',city_stats['Guatemala City'],'\n')
print(f"Time elapsed: {end_time - start_time:.2f} seconds")
2. Pandas
Next, I leveraged the power of Pandas, a popular data manipulation library in Python. By reading the data in chunks usingpd.read_csv()with a specifiedchunksize, I was able to process the data more efficiently. The results from each chunk were accumulated and then aggregated to obtain the final statistics. The Pandas implementation significantly reduced the processing time to 155.37 seconds.
import pandas as pd
import time
from tqdm import tqdm
def process_data(file_path, chunk_size=1000000):
start_time = time.time() # Start timing
# Initialize an empty DataFrame to accumulate results
accumulated_results = pd.DataFrame()
# Initialize reader object
reader = pd.read_csv(file_path, sep=';', header=None, names=['city', 'temp'], chunksize=chunk_size)
# Process each chunk
for chunk in tqdm(reader, desc="Processing chunks"):
# Group by 'city' and calculate min, max, and mean for the chunk
results = chunk.groupby('city')['temp'].agg(['min', 'max', 'mean']).rename(columns={
'min': 'temperature_min',
'max': 'temperature_max',
'mean': 'temperature_mean'
})
# Append chunk results to the accumulated results
accumulated_results = pd.concat([accumulated_results, results])
# Final aggregation to ensure city stats are correct across all chunks
final_results = accumulated_results.groupby(accumulated_results.index).agg({
'temperature_min': 'min',
'temperature_max': 'max',
'temperature_mean': 'mean'
})
end_time = time.time() # End timing
elapsed_time = end_time - start_time # Calculate elapsed time
print(f"Elapsed Time: {elapsed_time:.2f} seconds") # Print the elapsed time
return final_results
# Specify your file path
file_path = 'data/measurements.txt'
city_stats = process_data(file_path)
print(city_stats)
Recommended by LinkedIn
3. Dask
To further optimize the processing, I turned to Dask, a flexible library for parallel computing in Python. Dask allows for easy scaling of Pandas-like code to multiple cores or even clusters. By initializing a Dask client and usingdd.read_csv()to load the data in parallel, I achieved a notable improvement in performance. The Dask implementation completed the processing in just 57.12 seconds.
import dask.dataframe as dd
import time
from tqdm import tqdm
from dask.distributed import Client
def process_data_dask(file_path, chunk_size=1000000):
start_time = time.time()
# Initialize a Dask client for parallel processing
client = Client()
# Load data with Dask, specifying chunk size for efficient partitioning
ddf = dd.read_csv(file_path, sep=';', header=None, names=['city', 'temp'], blocksize=chunk_size)
# Group by 'city' and calculate aggregations
# Note: Dask automatically parallelizes these operations
results = ddf.groupby('city')['temp'].agg(['min', 'max', 'mean']).rename(columns={
'min': 'temperature_min',
'max': 'temperature_max',
'mean': 'mean'
})
# Combine the results and compute the final DataFrame
final_results = results.compute()
# Rename the mean column to temperature_mean to be consistent with pandas implementation
final_results = final_results.rename(columns={'mean': 'temperature_mean'})
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed Time (Dask): {elapsed_time:.2f} seconds")
# Close the Dask Client to release resources
client.close()
return final_results
# Specify your file path
file_path = 'data/measurements.txt'
# Start a Dask cluster and process data
city_stats_dask = process_data_dask(file_path)
print(city_stats_dask)
4. Polars
Polars, a lightning-fast DataFrame library for Rust and Python, offered another promising approach. By leveraging Polars' lazy evaluation and streaming capabilities, I was able to efficiently process the data usingpl.scan_csv()and perform aggregations on the fly. The Polars implementation showcased impressive performance, completing the challenge in a mere 27.80 seconds.
import polars as pl
import time
def process_data_polars_streaming(file_path):
start_time = time.time()
# Create a lazy scan of the CSV file
df = pl.scan_csv(file_path, separator=";", has_header=False, new_columns=["city", "temp"])
# Define aggregations and group by 'city' with streaming enabled
results = (
df.lazy()
.group_by("city")
.agg(
[
pl.col("temp").min().alias("temperature_min"),
pl.col("temp").max().alias("temperature_max"),
pl.col("temp").mean().alias("temperature_mean"),
]
)
.collect(streaming=True) # Enable streaming
)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed Time: {elapsed_time:.2f} seconds")
return results
# Specify your file path
file_path = "data/measurements.txt"
city_stats = process_data_polars_streaming(file_path)
print(city_stats)
5. DuckDB
Finally, I explored DuckDB, an embedded analytical database that seamlessly integrates with Python. By converting the text file to the efficient Parquet format and utilizing DuckDB's SQL querying capabilities, I achieved the fastest processing time of just 7.18 seconds. DuckDB's columnar storage and query optimization techniques proved to be highly effective for this challenge.
import duckdb
import time
con = duckdb.connect()
# Start time tracking
start_time = time.time()
# Aggregate directly in DuckDB
final_results = con.execute("""
SELECT
city,
MIN(temp) AS min_temp,
MAX(temp) AS max_temp,
AVG(temp) AS avg_temp
FROM 'data/measurements.parquet'
GROUP BY city
""").fetchdf()
# End time tracking and calculate elapsed time
end_time = time.time()
elapsed_time = end_time - start_time
# Print the results and elapsed time
print(final_results)
print(f"\nTime elapsed: {elapsed_time:.2f} seconds")
Results
Conclusion
The One Billion Row Challenge provided an excellent opportunity to explore the capabilities of various data processing tools in Python. The results demonstrate the significant performance gains that can be achieved by leveraging specialized libraries and techniques.
While pure Python serves as a simple starting point, libraries like Pandas and Dask offer substantial improvements in processing speed. Polars and DuckDB push the boundaries even further, showcasing the power of modern data processing technologies. It's important to note that the choice of tool depends on the specific requirements of the project, such as data size, available resources, and desired level of abstraction.
However, this challenge highlights the importance of exploring and benchmarking different options to find the most suitable solution for handling large-scale data processing tasks.As data volumes continue to grow, staying up-to-date with the latest advancements in data processing technologies is crucial. By leveraging the right tools and techniques, data professionals can efficiently tackle even the most demanding challenges and unlock valuable insights from massive datasets.