From real-time streams to perpetual insights. Zalando Case Study 2!
In my previous article, we explored why Kafka is essential for data science, using Zalando as an example of its transformative potential in handling real-time data streams. While Kafka excels in managing real-time events and streaming pipelines, it’s important to address its limitations. In this follow-up, we delve into Hadoop, its integration with Python, and its relevance in the AI-driven data science landscape, all through the lens of Zalando’s data ecosystem.
Modern e-commerce companies like Zalando operate in a data landscape where both speed and depth of analysis are crucial (Davenport & Harris, 2017). Kafka’s power lies in its ability to handle high-velocity data streams such as user clicks, inventory changes, and order placements in real time (Apache Kafka, 2024; Zalando Tech Blog, 2023). However, the road to comprehensive, AI-driven insights does not end with Kafka. Its limitations around batch processing, complex analytics, and long-term storage call for additional technologies that can extend its capabilities.
Here, we examine these limitations and discuss how Hadoop and Python form a complementary ecosystem that supports large-scale analytics, long-term trend analysis, and the deployment of sophisticated AI models. By doing so, we aim to illustrate how Zalando’s data infrastructure can evolve from simply processing real-time events to generating actionable insights at scale.
These shortcomings highlight the need for an integrated ecosystem—one where Hadoop can pick up where Kafka leaves off, and where Python can bridge the gap to advanced AI workflows.
Hadoop: Powering Large-Scale Batch Processing and Storage Apache Hadoop is a foundational technology for big data storage and batch analytics (White, 2015). It provides a distributed, fault-tolerant framework that can process petabytes of data using commodity hardware.
For Zalando, Hadoop’s core strengths can be critical:
By integrating Hadoop, Zalando complements Kafka’s real-time ingestion capabilities with robust, long-term storage and large-scale analytical capacity.
Python: The Bridge to AI-Driven Insights While Hadoop solves the batch and scale problem, Python enables a smooth integration of advanced analytics and AI models into the data pipeline (Van Rossum & Drake, 2009). Its rich ecosystem of libraries and frameworks makes Python a go-to language for data science teams.
Key Python tools that enhance Hadoop’s capabilities include:
Leveraging AI Within Hadoop’s Infrastructure Integrating AI into Hadoop’s ecosystem can yield powerful competitive advantages for Zalando:
Integrating Kafka and Hadoop for Optimal Results A well-orchestrated data architecture pairs Kafka’s real-time streams with Hadoop’s batch capabilities:
Below is a detailed, technical deep-dive into how the integrated Hadoop, Kafka, and Python ecosystem can work within Zalando’s data infrastructure. This includes architectural explanations, example workflows, and sample code snippets. Though these examples are illustrative and may need adaptation to Zalando’s real environment, they capture the essence of the technologies in use.
Technical Implementation Details
A typical data pipeline leveraging Kafka, Hadoop, and Python follows these steps:
Example: Consuming Kafka Data with PySpark
The following Python snippet (using PySpark Structured Streaming) demonstrates how to read from Kafka topics and write the data into HDFS. Adjust broker addresses, topics, and file paths as needed.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, struct, to_json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
#Create SparkSession
spark = SparkSession.builder \
.appName("KafkaToHDFSExample") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
#Define the Kafka brokers and it's topic
kafka_brokers = "kafka-broker1:9092,kafka-broker2:9092"
kafka_topic = "zalando_events"
#Define schema
event_schema = StructType([
StructField("event_type", StringType(), True),
StructField("product_id", StringType(), True),
StructField("user_id", StringType(), True),
StructField("timestamp", TimestampType(), True)
])
#Read streaming data from Kafka
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_brokers) \
.option("subscribe", kafka_topic) \
.option("startingOffsets", "latest") \
.load()
#Convert value from binary to string and parse JSON
events = df.selectExpr("CAST(value AS STRING) as json_str") \
.select(from_json(col("json_str"), event_schema).alias("data")) \
.select("data.*")
#Write data to HDFS in parquet format partitioned by date/hour
query = events.writeStream \
.format("parquet") \
.option("path", "hdfs://namenode:8020/data/zalando_events/") \
.option("checkpointLocation", "hdfs://namenode:8020/checkpoints/zalando_events/") \
.partitionBy("event_type") \
.outputMode("append") \
.start()
query.awaitTermination()
Recommended by LinkedIn
What this does:
Batch Processing with Hadoop MapReduce (Python Example)
For legacy batch jobs, Hadoop Streaming allows using Python scripts as mappers and reducers. Below is a simple MapReduce job that counts product views per product_id using Hadoop’s streaming interface.
mapper.py (reads lines of JSON, extracts product_id, and emits intermediate counts)
#!/usr/bin/env python3
import sys
import json
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
if event.get("event_type") == "view":
product_id = event.get("product_id", "unknown")
print(f"{product_id}\t1")
except Exception:
# Malformed line or JSON - skip
pass
reducer.py (aggregates counts by product_id)
#!/usr/bin/env python3
import sys
current_product = None
current_count = 0
for line in sys.stdin:
line = line.strip()
product_id, count_str = line.split("\t", 1)
count = int(count_str)
if current_product == product_id:
current_count += count
else:
if current_product is not None:
print(f"{current_product}\t{current_count}")
current_product = product_id
current_count = count
#Don't forget the last product
if current_product is not None:
print(f"{current_product}\t{current_count}")
Running the Job:
hadoop jar /usr/local/hadoop/hadoop-streaming.jar \
-input hdfs://namenode:8020/data/zalando_events/ \
-output hdfs://namenode:8020/output/product_view_counts \
-mapper mapper.py \
-reducer reducer.py \
-file mapper.py \
-file reducer.py
What this does:
Training a Machine Learning Model with PySpark
Once large-scale aggregation or feature engineering is complete, data scientists can use PySpark’s MLlib to train models distributed across the cluster. For example, training a recommendation model (e.g., ALS for collaborative filtering):
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
spark = SparkSession.builder \
.appName("ZalandoRecommendationModel") \
.getOrCreate()
#Load aggregated data (user_id, product_id, rating or implicit feedback) from HDFS
ratings = spark.read.parquet("hdfs://namenode:8020/processed_data/user_product_interactions/")
#Split data into training and test sets
train, test = ratings.randomSplit([0.8, 0.2], seed=42)
#Configure ALS model
als = ALS(
userCol="user_id",
itemCol="product_id",
ratingCol="rating",
implicitPrefs=True,
coldStartStrategy="drop",
maxIter=10,
rank=20,
regParam=0.1
)
#Train model
model = als.fit(train)
#Evaluate model
predictions = model.transform(test)
evaluator = RegressionEvaluator(
metricName="rmse",
labelCol="rating",
predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"RMSE on test data: {rmse}")
#Saving the model
model.save("hdfs://namenode:8020/models/als_recommendation_model")
What this does:
Performance and Scaling Tips
Bringing It All Together
This integrated environment—Kafka for real-time data, Hadoop/HDFS for durable storage and batch analytics, and Python with PySpark for scalable AI/ML modeling—enables Zalando’s data science teams to handle data end-to-end:
References
Very helpful 👍