Operational & Analytical Workloads All In One Place - Modernising Messaging Workflows - Part II

Operational & Analytical Workloads All In One Place - Modernising Messaging Workflows - Part II

Back in the last edition we saw how messaging workflows from existing tools such as RabbitMQ can be brought over to Kafka using self-managed connectors. And in the process we developed and applied a Custom Kafka Connect SMT that can infer schema from raw bytes, serialises the messages to Avro which then applies encryption rules to specific fields on-the-fly.

Python (app) --> RabbitMQ --> Orders (ByteArray) --> Custom Kafka Connect SMT (Record with Schema) --> AvroConverter --> Encryption Rules --> Messages in the Kafka Topic

a) RabbitMQ Source Connector to offload the messages from the existing workflow

b) Custom Kafka Connect Single Message Transformation (SMT) to transform the raw bytes, apply a schema and serialise messages to Avro going into Kafka

c) Encrypt the account/user identify-able fields using Client Side Field Level Encryption (CSFLE)

d) Stream Processing to perform real-time BUY/SELL pattern analysis based on the incoming order flow

e) Tableflow to analyse historical orders that are stored long-term in Kafka topics

f) And over time, moving over completely from RabbitMQ to Kafka and the application to Golang supporting the above pipeline

In this edition let's take to steps d) and e) where we analyse the orders raised using stream processing.

D. Real-time BUY/SELL Pattern analysis using ksqlDB

Analysing trade orders in real-time requires some sort of aggregation/windowed logic that will look at each new event coming in to the stream to understand BUY/SELL patterns either at an account level or at item level. In most cases regulations impose that activity is not traceable to a specific user/account and hence the use of non-deterministic encryption rules to encrypt the account IDs as seen previously. What is of interest from orders with payload of the format {"quantity":55,"instrument":"JKL","id":2,"event":"SELL","account":"<encyrpted-value>"} could be the following.

a) Quantity per Item being Traded over a specific Time Window

CREATE TABLE TRADE_STATS_PER_INSTRUMENT WITH (KAFKA_TOPIC='trade_stats_per_instrument', PARTITIONS=1, VALUE_FORMAT='avro') AS 
SELECT
RABBITMQ_TRADES.INSTRUMENT INSTRUMENT,
RABBITMQ_TRADES.EVENT EVENT,
COUNT(RABBITMQ_TRADES.ID) TRADED_LAST_30_SECONDS
FROM RABBITMQ_TRADES RABBITMQ_TRADES
WINDOW TUMBLING ( SIZE 30 SECONDS )
GROUP BY RABBITMQ_TRADES.INSTRUMENT, RABBITMQ_TRADES.EVENT
EMIT CHANGES;        

b) Top 'K' Traded Items within a specific Time Window

CREATE TABLE TOP_TRADED_INSTRUMENT_LAST_5S WITH (FORMAT='avro', KAFKA_TOPIC='top_traded_instrument_last_5s', PARTITIONS=1) AS
SELECT
RABBITMQ_TRADES.EVENT EVENT,
TOPKDISTINCT(RABBITMQ_TRADES.INSTRUMENT, 3) TOP_TRADED_INSTRUMENT
FROM RABBITMQ_TRADES RABBITMQ_TRADES
WINDOW TUMBLING ( SIZE 5 SECONDS )
GROUP BY RABBITMQ_TRADES.EVENT
EMIT CHANGES;        

c) Top 'K' Traded Items over entirety

CREATE TABLE TOP_TRADED_INSTRUMENT WITH (FORMAT='avro', KAFKA_TOPIC='top_traded_instrument', PARTITIONS=1) AS
SELECT
RABBITMQ_TRADES.EVENT EVENT,
TOPKDISTINCT(RABBITMQ_TRADES.INSTRUMENT, 3) TOP_TRADED_INSTRUMENT
FROM RABBITMQ_TRADES RABBITMQ_TRADES
GROUP BY RABBITMQ_TRADES.EVENT
EMIT CHANGES;        

d) Histogram of Buy/Sell Events per Item

CREATE TABLE INSTRUMENT_HISTOGRAM WITH (KAFKA_TOPIC='instrument-histogram', PARTITIONS=1, VALUE_FORMAT='avro') AS
SELECT
RABBITMQ_TRADES.INSTRUMENT INSTRUMENT,
HISTOGRAM(RABBITMQ_TRADES.EVENT) PATTERN
FROM RABBITMQ_TRADES RABBITMQ_TRADES
GROUP BY RABBITMQ_TRADES.INSTRUMENT
EMIT CHANGES;        

If all of this is in-place a flow visualisation of the same can be seen via the ksqlDB editor - starting from the source stream of orders raised to the resulting divergent streams that analyse the orders in real-time as per the kSQL queries above.

Article content
ksqlDB Flow

E. Leave what's happening in Real-time, I'm interested in the History

Often is the case that the data flowing through the Kafka topics (either as-is in raw form or upon being transformed/enriched) is sinked to an external data lake or warehouse for historical analysis.

Don't fancy another hop in your data pipeline? Introducing Tableflow which helps materialise Kafka topics to Iceberg tables almost as soon as the topic starts to fill up. And this is available to query via engines like Amazon Athena. And what kind of questions do you start to ask when you have all of the data from Kafka at your fingertips.

a) First up, I'd want to know if the Kafka topic has been materialised to an Iceberg table

Tableflow materialisation of Avro-based Kafka topics as Iceberg tables can be verified via SQL as below.

Article content
Materialised Kafka topics as Iceberg tables

b) Next, I'd like to see if the account IDs (with field level encryption) are identifiable or not

The account IDs are indeed in the encrypted form and it seems they are distinct for orders that contain the same account ID, which in turn means two or more trades from the same account cannot be traced to the same user.

Article content
Encrypted field inspection

c) Item that has been traded the most

Article content
Aggregation based on a single field

d) Item that has been bought or sold the most

Article content
Aggregation based on multiple fields

As a summary, the previous flow

Python (app) --> RabbitMQ --> Orders (ByteArray) --> Custom Kafka Connect SMT (Record with Schema) --> AvroConverter --> Encryption Rules --> Messages in the Kafka Topic

has been extended to

Python (app) --> RabbitMQ --> Orders (ByteArray) --> Custom Kafka Connect SMT (Record with Schema) --> AvroConverter --> Encryption Rules --> Messages in the Kafka Topic --> ksqlDB (real-time analysis) --> Iceberg Tables (historical analysis)

And below is all of that in action.

In the next edition, let's see how the Python (app) --> RabbitMQ --> Kafka portion of the stack can be switched over to a Golang (app) --> Kafka instead.

To view or add a comment, sign in

More articles by Naveen Nandan

Insights from the community

Others also viewed

Explore topics