Replicating Kafka topics between clusters with Kafka Connect and MirrorMaker 2
In this article we are going to create the foundations of replicating Kafka topics between Kafka Clusters utilizing MirrorMaker 2.
To demonstrate the concept, a “batteries included” prototype is provided with docker-compose, which sets up a ready-to-run configuration with:
Use-Cases of Kafka Topic Replication
Kafka topic replication between clusters addresses several key use-cases. One prominent use-case is disaster recovery, where replication ensures data continuity in the event of a failure or disaster in the primary Kafka cluster. By continuously replicating data from the primary cluster to a backup cluster, data loss and downtime can be minimized, allowing applications to switch to the backup cluster and continue the operation there.
Another important use-case is data isolation, which enhances data security and privacy by isolating sensitive data. Replication can be configured to replicate only selected topics from the source (private) cluster to the target (shared) cluster, ensuring that sensitive data remains within the private cluster while exposing only necessary data to the shared cluster.
Data aggregation use consolidates data from multiple Kafka clusters into a single aggregation cluster, facilitating easier data analysis and processing. This is useful for organizations that need to aggregate data from various sources for comprehensive insights.
Cloud migration is another scenario where Kafka topic replication proves invaluable. It enables the migration of data from on-premises Kafka clusters to cloud-based clusters, or from one cloud provider’s cluster to another, by replicating topics data between the clusters. This transition help organizations to leverage the scalability and flexibility nature of cloud infrastructure, or migrate workloads to a more affordable or maintainable cloud provider.
Lastly, in geo-replication use-cases data is getting distributed across geographically dispersed clusters. By replicating topics between clusters located in different regions, this approach ensures data availability and redundancy across locations, and can help in maintaining optimal network latency and throughput for consumers.
Replication Topologies
Replication can be implemented in various topologies to fulfill different architectural needs. The active-passive topology involves one cluster acting as the primary (active) cluster, while the other serves as the backup (passive) cluster. This setup is ideal for disaster recovery scenarios where the passive cluster is used only when the active cluster fails.
In contrast, the active-active topology allows both clusters to be active, serving read and write operations simultaneously. This topology is suitable for geo-replication and load balancing, ensuring high availability and redundancy.
The hub-and-spoke topology features a central hub cluster that aggregates data from multiple spoke clusters. This configuration is useful for data aggregation scenarios where data from various source clusters is consolidated into a central cluster.
Finally, the multi-cluster topology interconnects multiple Kafka clusters, allowing data replication between any pair of clusters. This topology provides flexibility for complex architectures requiring data replication across several clusters.
Capabilities of MirrorMaker 2
MirrorMaker 2 (MM2) is a tool included in Kafka designed for replicating Kafka topic data (topic offset replication) between clusters. Besides topic offset replication it also supports consumer offset synchronization, ensuring that consumer offsets are accurately replicated between clusters. MM2 allows administrators to specify which topics should be replicated (filtering) and to rename topics during replication (prefixing the topic with the source kafka cluster’s name in the target topic’s name). This feature enhances data management and security by enabling selective replication and topic customization.
Besides uni-direction replication (which copies data in the direction from a source cluster to a target cluster) bidirectional replication can also be achieved. In this case each of the clusters contain multiple topics for the same type of data: the topic which contains data that is written by the producers of the given cluster (read+write), and additional (read-only) topic(s) of the same data type that are replicated from other cluster(s). The consumers are configured to consume from each these topics, therefore they get both the data that is produced on the given cluster and the data that was produced on the corresponding replicated cluster(s). This approach enables data to be mirrored in both directions between clusters. This feature is particularly useful for scenarios requiring high availability and disaster recovery.
Be mindful, that consumer group offsets are replicated with delay (by default 60 seconds). If it is desired that events would be consumed only once, then the consumers should be connected to only one kafka cluster at a time. This also means that if replication is used to achieve disaster recovery, there will be extra steps involved to disable the consumers connected to the impaired cluster first, and afterwards enable consumers connected to the fail-over cluster. If no only-once consumption is desired, then use different consumer groups on the two clusters and/or do not enable consumer group offset replication.
Deployment Options for MirrorMaker 2
MirrorMaker 2 offers flexible several deployment options to suit various operational needs. It can be deployed as a standalone or dedicated cluster, or it can be deployed into a distributed Kafka Connect cluster. This later option provides the most flexibility and operational benefits (such as monitoring and on-the-fly configuration changes), so in my configuration setup this deployment option has been chosen. Many organizations already may have a running distributed Connect cluster available, which makes this a convenient choice to introduce MirrorMaker in their infrastructure.
MirrorMaker 2 uses three connectors for data replication between Kafka clusters:
In the upcoming example we are demonstrating how to configure these to replicate a single topic and the consumer group offsets from a source Kafka cluster to a target cluster.
Demo setup
We are building up a docker compose stack, which starts up and configures the needed services to demonstrate the operation of replication with Mirror Maker 2.
Let’s pin the used container image versions in a .env file:
# file: ./.env
KAFKA_VERSION=3.9.0
KAFKAUI_VERSION=v0.7.2
In order we could configure connectors during the initialization phase, we will need curl, which is by default not installed in the standard Kafka container image. The below Dockerfile solves this problem:
# file: ./connect/Dockerfile
ARG KAFKA_VERSION
FROM apache/kafka:${KAFKA_VERSION}
USER root
RUN set -eux; \
apk update; \
apk upgrade; \
apk add --no-cache curl bash; \
apk cache clean;
USER appuser
The below Docker Compose file sets up a Kafka environment with two single-node Kafka clusters (kafka1 and kafka2), a Kafka Connect service (which will run the MirrorMaker connectors, and a Kafka UI for monitoring and administering.
Each service is bootstrapped with initial configuration, so the user do not need to that themselves:
# file: ./docker-compose.yaml
x-kafka-broker-common: &kafka-broker-common
image: apache/kafka:${KAFKA_VERSION}
command: >
/bin/bash -c '
/etc/kafka/docker/run &
kafka_pid=$$!
set -e # exit on errors
timeout 30s bash -c "
until printf \"\" > /dev/tcp/localhost/9092; do
echo \"[INFO] Waiting for Kafka to be available...\"
sleep 5
done
"
echo "[INFO] Creating topics..."
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create --if-not-exists --topic test \
--replication-factor 1 --partitions 2 \
--command-config /opt/kafka/config/client.properties
echo "[INFO] Kafka is set up and running!"
wait $$kafka_pid
'
healthcheck:
test: "bash -c 'printf \"\" > /dev/tcp/127.0.0.1/9092; exit $$?;'"
interval: 3s
timeout: 3s
retries: 30
x-kafka-broker-common-environment-vars: &kafka-broker-common-environment-vars
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERBROKER'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERBROKER:PLAINTEXT,LOCALHOST:SASL_PLAINTEXT,DOCKERINTERNAL:SASL_PLAINTEXT'
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAINTEXT
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: '/var/lib/kafka/data'
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
services:
kafka1:
<<: *kafka-broker-common
environment:
<<: *kafka-broker-common-environment-vars
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
KAFKA_LISTENERS: 'INTERBROKER://kafka1:29092,CONTROLLER://kafka1:29093,LOCALHOST://0.0.0.0:9092,DOCKERINTERNAL://0.0.0.0:39092'
KAFKA_ADVERTISED_LISTENERS: 'INTERBROKER://kafka1:29092,LOCALHOST://localhost:9092,DOCKERINTERNAL://kafka1:39092' # could be localhost:9092 for docker-host
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093'
volumes:
- ./kafka/kafka1_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf:ro
- ./kafka/client.properties:/opt/kafka/config/client.properties:ro
- kafka1_data:/var/lib/kafka/data
kafka2:
<<: *kafka-broker-common
environment:
<<: *kafka-broker-common-environment-vars
CLUSTER_ID: '5qwDQrHsT9OpHgPFpiS7cg'
KAFKA_LISTENERS: 'INTERBROKER://kafka2:29092,CONTROLLER://kafka2:29093,LOCALHOST://0.0.0.0:9092,DOCKERINTERNAL://0.0.0.0:39092'
KAFKA_ADVERTISED_LISTENERS: 'INTERBROKER://kafka2:29092,LOCALHOST://localhost:9092,DOCKERINTERNAL://kafka2:39092' # could be localhost:9092 for docker-host
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka2:29093'
volumes:
- ./kafka/kafka2_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf:ro
- ./kafka/client.properties:/opt/kafka/config/client.properties:ro
- kafka2_data:/var/lib/kafka/data
connect:
build:
context: ./connect # build our own image for connect
args:
KAFKA_VERSION: ${KAFKA_VERSION}
ports:
- "8083:8083" # Connect REST API
command: >
/bin/bash -c '
/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect.properties &
connect_pid=$$!
/opt/kafka/custom-scripts/deploy-connectors.sh
wait $$connect_pid
'
environment:
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/custom-config/restapi-jaas.conf"
CONNECT_REST_API: http://localhost:8083
CONNECT_USERNAME: init
CONNECT_PASSWORD: init-secret
connector_config_dir: /opt/kafka/connector-config
volumes:
- ./connect/connect.properties:/opt/kafka/config/connect.properties:ro
- ./connect/kafka.properties:/opt/kafka/custom-config/kafka.properties:ro
- ./connect/restapi-users.txt:/opt/kafka/custom-config/restapi-users.txt:ro
- ./connect/restapi-jaas.conf:/opt/kafka/custom-config/restapi-jaas.conf:ro
- ./connect/connectors:/opt/kafka/connector-config:ro
- ./connect/scripts:/opt/kafka/custom-scripts:ro
healthcheck:
test: curl -IL --silent -u health:health-secret --insecure http://localhost:8083 | grep HTTP | grep -q 200 # curl --fail http://localhost || exit 1
interval: 3s
timeout: 3s
retries: 40
start_period: 5s
depends_on:
kafka1:
condition: service_healthy
kafka2:
condition: service_healthy
kafka-ui:
image: provectuslabs/kafka-ui:${KAFKAUI_VERSION}
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: kafka1
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:39092
KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN
KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="kafkaui" password="kafkaui-secret";'
KAFKA_CLUSTERS_1_NAME: kafka2
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka2:39092
KAFKA_CLUSTERS_1_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_CLUSTERS_1_PROPERTIES_SASL_MECHANISM: PLAIN
KAFKA_CLUSTERS_1_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="kafkaui" password="kafkaui-secret";'
KAFKA_CLUSTERS_1_KAFKACONNECT_0_NAME: connect
KAFKA_CLUSTERS_1_KAFKACONNECT_0_ADDRESS: http://connect:8083
KAFKA_CLUSTERS_1_KAFKACONNECT_0_USERNAME: kafkaui
KAFKA_CLUSTERS_1_KAFKACONNECT_0_PASSWORD: kafkaui-secret
LOGGING_LEVEL_ROOT: 'warn'
LOGGING_LEVEL_COM_PROVECTUS: 'warn'
depends_on:
kafka1:
condition: service_healthy
kafka2:
condition: service_healthy
healthcheck:
test: wget --no-verbose --tries=1 --spider localhost:8080 || exit 1
interval: 5s
timeout: 5s
retries: 3
start_period: 30s
volumes:
kafka1_data:
kafka2_data:
Kafka broker configuration files
As authentication is being enforced for all of the Kafka clients, we supply two separate “username-password” files for the two clusters:
# file: ./kafka/kafka1_server_jaas .conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin" password="admin-secret"
user_admin="admin-secret"
user_kafkaui="kafkaui-secret"
user_connect="connect1-secret"
user_mirrormaker="mirrormaker1-secret";
};
# file: ./kafka/kafka2_server_jaas .conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin" password="admin-secret"
user_admin="admin-secret"
user_kafkaui="kafkaui-secret"
user_connect="connect2-secret"
user_mirrormaker="mirrormaker2-secret";
};
For the kafka admin CLI clients we set up a configuration too:
# file: ./kafka/client.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";
Connect cluster configuration files
The following file configures the Connect cluster:
# file: ./connect/connect.properties
config.providers: file,env
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider
bootstrap.servers=${file:/opt/kafka/custom-config/kafka.properties:bootstrap.servers}
sasl.mechanism=${file:/opt/kafka/custom-config/kafka.properties:broker.sasl.mechanism}
security.protocol=${file:/opt/kafka/custom-config/kafka.properties:broker.security.protocol}
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="${file:/opt/kafka/custom-config/kafka.properties:broker.username}" \
password="${file:/opt/kafka/custom-config/kafka.properties:broker.password}";
producer.sasl.mechanism=${file:/opt/kafka/custom-config/kafka.properties:broker.sasl.mechanism}
producer.security.protocol=${file:/opt/kafka/custom-config/kafka.properties:broker.security.protocol}
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="${file:/opt/kafka/custom-config/kafka.properties:broker.username}" \
password="${file:/opt/kafka/custom-config/kafka.properties:broker.password}";
consumer.sasl.mechanism=${file:/opt/kafka/custom-config/kafka.properties:broker.sasl.mechanism}
consumer.security.protocol=${file:/opt/kafka/custom-config/kafka.properties:broker.security.protocol}
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="${file:/opt/kafka/custom-config/kafka.properties:broker.username}" \
password="${file:/opt/kafka/custom-config/kafka.properties:broker.password}";
group.id=connect-cluster
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
offset.storage.partitions=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
status.storage.partitions=1
listeners=HTTP://0.0.0.0:8083
rest.advertised.host.name=connect
rest.advertised.port=8083
rest.extension.classes=org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension
We need configuration files to establish HTTP Basic Auth for the Connect REST API (used for submitting connector configuration during bootstrapping, and to allow access for the KafkaUI):
# file: ./connect/restapi-jaas.conf
KafkaConnect {
org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required
file="/opt/kafka/custom-config/restapi-users.txt";
};
# file: ./connect/restapi-users.txt
health: health-secret
kafkaui: kafkaui-secret
init: init-secret
vscode: vscode-secret
Kafka Broker connection properties are put into the below configuration file, and are getting referenced by the Connect cluster’s main configuration and also by the individual connector configuration files.
# file: ./connect/kafka.properties
bootstrap.servers=kafka2:39092
broker.sasl.mechanism=PLAIN
broker.security.protocol=SASL_PLAINTEXT
broker.username=connect
broker.password=connect2-secret
mirrormaker.username=mirrormaker
mirrormaker1.password=mirrormaker1-secret
mirrormaker2.password=mirrormaker2-secret
Recommended by LinkedIn
Kafka Connector configurations for MirrorMaker 2
The following shell script is invoked when Connect is started. It submits the Connector configuration files to the Connect cluster via its REST API. It ensures that the Mirror Maker 2 connector configurations supplied as json files (see later) are applied during the Connect cluster starts up. This is practical, as we get a ready-to-use Mirror Maker right after startup. If we wish to modify the config files in the filesystem, restarting Connect will update the connector configs from the json files automatically.
#!/bin/bash
# file: ./connect/scripts/deploy-connectors.sh
set -e # exit on errors
echo "[INFO] Waiting for Kafka Connect to become available..."
timeout 30s bash -c "
until curl -IL --silent -u ${CONNECT_USERNAME}:${CONNECT_PASSWORD} --insecure ${CONNECT_REST_API} | grep HTTP | grep -q 200; do
echo \"[INFO] Waiting for Kafka Connect to become available...\"
sleep 2
done
"
for connector_config_file in /opt/kafka/connector-config/*json; do
connector_name=$(basename "$connector_config_file" | sed 's/.json//')
echo "[INFO] Deploying connector: $connector_name"
curl -X PUT -H "Content-Type: application/json" \
-u ${CONNECT_USERNAME}:${CONNECT_PASSWORD} \
-s -S -i \
--fail-with-body --retry 3 --retry-delay 5 --retry-connrefused --connect-timeout 10 \
--data @${connector_config_file} \
${CONNECT_REST_API}/connectors/${connector_name}/config || \
(echo "[ERROR] Failed to deploy connector \'$connector_name\'"; exit 1)
done
echo "[INFO] Finished deploying the connectors"
The source connector configuration ensures that kafka-1 cluster’s “test” topic will be replicated to the kafka-2 cluster:
// file: ./connect/connectors/mirror-source-connector.json
{
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"topics": "test",
"source.cluster.alias": "kafka-1",
"source.cluster.bootstrap.servers": "kafka1:39092",
"source.cluster.security.protocol": "SASL_PLAINTEXT",
"source.cluster.sasl.mechanism": "PLAIN",
"source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker1.password}\";",
"source.cluster.admin.sasl.mechanism": "${file:/opt/kafka/custom-config/kafka.properties:broker.sasl.mechanism}",
"source.cluster.admin.security.protocol": "${file:/opt/kafka/custom-config/kafka.properties:broker.security.protocol}",
"source.cluster.admin.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker1.password}\";",
"target.cluster.alias": "kafka-2",
"target.cluster.bootstrap.servers": "kafka2:39092",
"target.cluster.security.protocol": "SASL_PLAINTEXT",
"target.cluster.sasl.mechanism": "PLAIN",
"target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker2.password}\";",
"sync.topic.acls.enabled": false,
"sync.topic.configs.enabled": false,
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.factor": "1",
"replication.policy.class": "org.apache.kafka.connect.mirror.DefaultReplicationPolicy",
"offset-syncs.topic.replication.factor": "1",
"offset-syncs.topic.location": "source",
"tasks.max": "1"
}
The Checkpoint connector is responsible for replicating the consumer group offsets from kafka-1 to kafka-2.
// file: ./connect/connectors/mirror-checkpoint-connector.json
{
"connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"source.cluster.alias": "kafka-1",
"source.cluster.bootstrap.servers": "kafka1:39092",
"source.cluster.security.protocol": "SASL_PLAINTEXT",
"source.cluster.sasl.mechanism": "PLAIN",
"source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker1.password}\";",
"target.cluster.alias": "kafka-2",
"target.cluster.bootstrap.servers": "kafka2:39092",
"target.cluster.security.protocol": "SASL_PLAINTEXT",
"target.cluster.sasl.mechanism": "PLAIN",
"target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker2.password}\";",
"replication.policy.class": "org.apache.kafka.connect.mirror.DefaultReplicationPolicy",
"emit.checkpoint.enabled": "true",
"emit.checkpoints.interval.seconds": "60",
"sync.group.offsets.enabled": "true",
"sync.group.offsets.interval.seconds": "60",
"group.filter.class": "org.apache.kafka.connect.mirror.DefaultGroupFilter",
"groups": ".*",
"groups.exclude": "console-consumer-.*, connect-.*, __.*",
"refresh.groups.enabled": "true",
"refresh.groups.interval.seconds": "60"
}
The use of the Heartbeat connector is optional. It helps in monitoring the connectivity between the clusters:
// file: ./connect/connectors/mirror-heartbeat-connector.json
{
"connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"source.cluster.alias": "kafka-1",
"source.cluster.bootstrap.servers": "kafka1:39092",
"source.cluster.security.protocol": "SASL_PLAINTEXT",
"source.cluster.sasl.mechanism": "PLAIN",
"source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker1.password}\";",
"target.cluster.alias": "kafka-2",
"target.cluster.bootstrap.servers": "kafka2:39092",
"target.cluster.security.protocol": "SASL_PLAINTEXT",
"target.cluster.sasl.mechanism": "PLAIN",
"target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker2.password}\";",
"emit.heartbeat.enabled": "true",
"emit.heartbeat.interval.seconds": "1",
"heartbeats.topic.replication.factor": "1"
}
Starting up and testing the prototype
To start the stack, use docker compose up in the CLI.
It may take a minute or two to start up the stack.
By opening the Kafka UI at http://localhost:8080 gives us the opportunity to explore and verify the Kafka clusters, their topics and the deployed Connectors too.
Testing topic offset replication
To verify the replication, we are going to produce some test data on the first Kafka Cluster’s ‘test’ topic, and consume it on the second Kafka Cluster. Mirror Maker makes sure that the events of the first cluster’s ‘test’ topic are replicated to second cluster’s ‘kafka-1.test’ topic. Mirror Maker automatically creates the replication target topic with the prefix of the source cluster (kafka-1) on the target cluster.
For simplicity, we are going to use the CLI to run Kafka data producers and consumers to validate that the replication works as expected.
Open a new Git Bash console and start up a consumer with the below command to fetch data from the kafka-2 cluster from the ‘kafka-1.test’ topic (which will contain events replicated from kafka-1) and also from the ‘test’ topic. Consuming from multiple topics within the same consumer group can be realized using a regular expression in the whitelist argument.
docker compose exec -it kafka2 //bin/sh -c '/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "^(kafka-1\\.)?test$" --consumer.config /opt/kafka/config/client.properties --property "print.timestamp=true" --property "print.key=true" --property "key.separator=:" --property "print.partition=true" --property "print.offset=true" --group my-cg'
Next, produce some test data to kafka-1’s ‘test’ topic with the below command. Open a new Git Bash console to execute it:
docker compose exec -it kafka1 //bin/sh -c 'echo "$RANDOM:Hello, Kafka!" | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config /opt/kafka/config/client.properties --property "parse.key=true" --property "key.separator=:"'
As a result, MirrorMaker will replicate the data from kafka1’s ‘test’ topic to kafka2’s ‘kafka-1.test’ topic, and the CLI-based consumer will print the event to the console:
Stop the consumer by pressing Ctrl+c.
Testing consumer group offset replication
In this test, we are going to validate that consumer group offset replication works as expected.
We can image, that we have a high-availability setup: kafka-1 will be our primary active cluster, and kafka-2 will be the disaster recovery cluster. With our current configuration topic offsets and consumer group offsets are replicated from kafka-1 to kafka-2.
Normal worklow: the producers are writing to kafka-1’s ‘test’ topic, consumers are consuming also from there.
Open a new Git Bash console to start a consumer attached to kafka-1’s ‘test’ topic:
docker compose exec -it kafka1 //bin/sh -c '/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "test" --consumer.config /opt/kafka/config/client.properties --property "print.timestamp=true" --property "print.key=true" --property "key.separator=:" --property "print.partition=true" --property "print.offset=true" --group my-cg'
Start a new Git Bash console to produce a test data to kafka-1’s ‘test’ topic:
docker compose exec -it kafka1 //bin/sh -c 'echo "$RANDOM:Hello, Kafka!" | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config /opt/kafka/config/client.properties --property "parse.key=true" --property "key.separator=:"'
As a result, we will see, that the event is consumed by the CLI client and printed to the console.
Failover: let’s say, our consumer clients loose network connectivity to kafka-1, and are no longer able to process data from kafka-1. To emulate this, stop the previously started consumer client attached to kafka-1’s ‘test’ topic, by pressing Ctrl+C in the right console window.
Produce some more data with the above CLI producer script, simulate additional data arriving to the kafka-1’s ‘test’ topic.
Wait at least 2 minutes to make sure that the periodic consumer group offset replication between kafka-1 and kafka-2 takes place. This ensures that the consumers that will be attached to kafka-2 as part of the same consumer group will be able to continue processing data, where they have left off on kafka-1 before their operation got stopped.
Start a consumer client pointed to kafka-2’s replicated topic, using the same consumer group name as previously used on kafka-1.
docker compose exec -it kafka2 //bin/sh -c '/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "^(kafka-1\\.)?test$" --consumer.config /opt/kafka/config/client.properties --property "print.timestamp=true" --property "print.key=true" --property "key.separator=:" --property "print.partition=true" --property "print.offset=true" --group my-cg'
On the console you will see only the new data, and not the data that has already been consumed by the client when it was consuming from kafka-1.
Conclusion
In this article, we have established the foundational concepts and practical implementation of replicating Kafka topics between clusters using Kafka Connect and MirrorMaker 2. By leveraging a comprehensive prototype setup with Docker Compose, we demonstrated the configuration and deployment of two Kafka single-node instances, a Kafka Connect instance with MM2 connector configuration, and a Kafka-UI for administration and monitoring.
MM2's capabilities, including topic offset replication, consumer offset synchronization, and flexible deployment options, were thoroughly explored. We discussed the use of MirrorSourceConnector, MirrorCheckpointConnector, and MirrorHeartbeatConnector for effective data replication and monitoring between Kafka clusters. Additionally, the article highlighted the importance of consumer offset synchronization to ensure consistent data consumption across replicated clusters.
Practical guidance provided on configuring and testing MM2, with the emphasis on topic and consumer group offsets replication. This article serves as a guide for System Architects and DevOps engineers looking to implement Kafka data replication solutions using MirrorMaker 2.