From Minutes to Seconds: Supercharging Python for the Billion Row Challenge

From Minutes to Seconds: Supercharging Python for the Billion Row Challenge

See Github repository

Introduction

As data volumes continue to grow exponentially, data professionals are constantly seeking ways to efficiently process and analyze massive datasets. The One Billion Row Challenge, a fun exploration of how far modern data processing tools can be pushed, presents an opportunity to test the limits of various technologies. In this article, I share my experience tackling this challenge using Python and several popular data processing libraries, including Pandas, Dask, Polars, and DuckDB.

The Challenge

The One Billion Row Challenge involves processing a text file containing temperature measurements from various weather stations. Each row in the file follows the format<station name>;<temperature>, and the goal is to calculate the minimum, mean, and maximum temperature for each unique weather station. The catch? The file contains a staggering one billion rows, amounting to over 10 GB of data!

Approach and Implementations

To tackle this challenge, I implemented solutions using five different approaches: pure Python, Pandas, Dask, Polars, and DuckDB. Let's dive into each implementation and analyze their performance.

1. Pure Python

I started with a straightforward pure Python implementation, reading the file line by line, parsing the data, and maintaining a dictionary to store the statistics for each weather station. This approach took approximately 594.89 seconds (nearly 10 minutes) to process the entire dataset, serving as a baseline for comparison.


import time
from tqdm import tqdm

def read_and_calculate_stats(file_name):
    city_stats = {}
    with open(file_name, 'r') as file:
        for line in tqdm(file, desc="Processing data"):
            line = line.strip()  # Strip whitespace
            if not line or ';' not in line:  # Skip empty lines or lines without ;
                continue
            city, temp = line.split(';', maxsplit=1)  # Split only at first ;
            try:
                temp = float(temp)
            except ValueError:  # Handle invalid temperature values
                print(f"Warning: Invalid temperature value '{temp}' for city '{city}'")
                continue  

            if city in city_stats:
                stats = city_stats[city]
                stats['count'] += 1
                stats['total'] += temp
                if temp < stats['min']:
                    stats['min'] = temp
                if temp > stats['max']:
                    stats['max'] = temp
            else:
                city_stats[city] = {
                    'min': temp,
                    'max': temp,
                    'total': temp,
                    'count': 1
                }

    # Calculate mean from total and count
    for city, stats in city_stats.items():
        stats['mean'] = stats['total'] / stats['count']
        del stats['total'], stats['count']  # Optional: Remove these if no longer needed

    return city_stats

# Main execution
start_time = time.time()
city_stats = read_and_calculate_stats('data/measurements.txt')
end_time = time.time()

print('Helsinki',city_stats['Helsinki'])
print('Guatemala City,',city_stats['Guatemala City'],'\n')

print(f"Time elapsed: {end_time - start_time:.2f} seconds")
        


Article content
The kernel keeps crashing when I try to load the entire 13GB dataset into my 16GB Ram System

2. Pandas

Next, I leveraged the power of Pandas, a popular data manipulation library in Python. By reading the data in chunks usingpd.read_csv()with a specifiedchunksize, I was able to process the data more efficiently. The results from each chunk were accumulated and then aggregated to obtain the final statistics. The Pandas implementation significantly reduced the processing time to 155.37 seconds.

import pandas as pd
import time
from tqdm import tqdm

def process_data(file_path, chunk_size=1000000):
    start_time = time.time()  # Start timing

    # Initialize an empty DataFrame to accumulate results
    accumulated_results = pd.DataFrame()

    # Initialize reader object
    reader = pd.read_csv(file_path, sep=';', header=None, names=['city', 'temp'], chunksize=chunk_size)

    # Process each chunk
    for chunk in tqdm(reader, desc="Processing chunks"):
        # Group by 'city' and calculate min, max, and mean for the chunk
        results = chunk.groupby('city')['temp'].agg(['min', 'max', 'mean']).rename(columns={
            'min': 'temperature_min',
            'max': 'temperature_max',
            'mean': 'temperature_mean'
        })
        # Append chunk results to the accumulated results
        accumulated_results = pd.concat([accumulated_results, results])

    # Final aggregation to ensure city stats are correct across all chunks
    final_results = accumulated_results.groupby(accumulated_results.index).agg({
        'temperature_min': 'min',
        'temperature_max': 'max',
        'temperature_mean': 'mean'
    })

    end_time = time.time()  # End timing
    elapsed_time = end_time - start_time  # Calculate elapsed time

    print(f"Elapsed Time: {elapsed_time:.2f} seconds")  # Print the elapsed time
    return final_results

# Specify your file path
file_path = 'data/measurements.txt'
city_stats = process_data(file_path)
print(city_stats)
        

3. Dask

To further optimize the processing, I turned to Dask, a flexible library for parallel computing in Python. Dask allows for easy scaling of Pandas-like code to multiple cores or even clusters. By initializing a Dask client and usingdd.read_csv()to load the data in parallel, I achieved a notable improvement in performance. The Dask implementation completed the processing in just 57.12 seconds.

import dask.dataframe as dd
import time
from tqdm import tqdm
from dask.distributed import Client

def process_data_dask(file_path, chunk_size=1000000):
    start_time = time.time()

    # Initialize a Dask client for parallel processing
    client = Client()

    # Load data with Dask, specifying chunk size for efficient partitioning
    ddf = dd.read_csv(file_path, sep=';', header=None, names=['city', 'temp'], blocksize=chunk_size)

    # Group by 'city' and calculate aggregations
    # Note: Dask automatically parallelizes these operations
    results = ddf.groupby('city')['temp'].agg(['min', 'max', 'mean']).rename(columns={
        'min': 'temperature_min',
        'max': 'temperature_max',
        'mean': 'mean'
    })

    # Combine the results and compute the final DataFrame
    final_results = results.compute()
    # Rename the mean column to temperature_mean to be consistent with pandas implementation
    final_results = final_results.rename(columns={'mean': 'temperature_mean'})

    end_time = time.time()
    elapsed_time = end_time - start_time

    print(f"Elapsed Time (Dask): {elapsed_time:.2f} seconds")
    
    # Close the Dask Client to release resources
    client.close()
    
    return final_results

# Specify your file path
file_path = 'data/measurements.txt'

# Start a Dask cluster and process data
city_stats_dask = process_data_dask(file_path)
print(city_stats_dask)
        

4. Polars

Polars, a lightning-fast DataFrame library for Rust and Python, offered another promising approach. By leveraging Polars' lazy evaluation and streaming capabilities, I was able to efficiently process the data usingpl.scan_csv()and perform aggregations on the fly. The Polars implementation showcased impressive performance, completing the challenge in a mere 27.80 seconds.

import polars as pl
import time


def process_data_polars_streaming(file_path):
    start_time = time.time()

    # Create a lazy scan of the CSV file
    df = pl.scan_csv(file_path, separator=";", has_header=False, new_columns=["city", "temp"])

    # Define aggregations and group by 'city' with streaming enabled
    results = (
        df.lazy()
        .group_by("city")
        .agg(
            [
                pl.col("temp").min().alias("temperature_min"),
                pl.col("temp").max().alias("temperature_max"),
                pl.col("temp").mean().alias("temperature_mean"),
            ]
        )
        .collect(streaming=True)  # Enable streaming
    )

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Elapsed Time: {elapsed_time:.2f} seconds")
    return results

# Specify your file path
file_path = "data/measurements.txt"
city_stats = process_data_polars_streaming(file_path)
print(city_stats)
        

5. DuckDB

Finally, I explored DuckDB, an embedded analytical database that seamlessly integrates with Python. By converting the text file to the efficient Parquet format and utilizing DuckDB's SQL querying capabilities, I achieved the fastest processing time of just 7.18 seconds. DuckDB's columnar storage and query optimization techniques proved to be highly effective for this challenge.

import duckdb
import time

con = duckdb.connect()

# Start time tracking
start_time = time.time()

# Aggregate directly in DuckDB
final_results = con.execute("""
    SELECT 
        city, 
        MIN(temp) AS min_temp, 
        MAX(temp) AS max_temp, 
        AVG(temp) AS avg_temp
    FROM 'data/measurements.parquet'
    GROUP BY city
""").fetchdf()

# End time tracking and calculate elapsed time
end_time = time.time()
elapsed_time = end_time - start_time

# Print the results and elapsed time
print(final_results)
print(f"\nTime elapsed: {elapsed_time:.2f} seconds")        

Results

Article content
DuckDB took ~7 Seconds (84x faster than Pure Python)

Conclusion

The One Billion Row Challenge provided an excellent opportunity to explore the capabilities of various data processing tools in Python. The results demonstrate the significant performance gains that can be achieved by leveraging specialized libraries and techniques.

While pure Python serves as a simple starting point, libraries like Pandas and Dask offer substantial improvements in processing speed. Polars and DuckDB push the boundaries even further, showcasing the power of modern data processing technologies. It's important to note that the choice of tool depends on the specific requirements of the project, such as data size, available resources, and desired level of abstraction.

However, this challenge highlights the importance of exploring and benchmarking different options to find the most suitable solution for handling large-scale data processing tasks.As data volumes continue to grow, staying up-to-date with the latest advancements in data processing technologies is crucial. By leveraging the right tools and techniques, data professionals can efficiently tackle even the most demanding challenges and unlock valuable insights from massive datasets.

To view or add a comment, sign in

Insights from the community

Others also viewed

Explore topics