Replicating Kafka topics between clusters with Kafka Connect and MirrorMaker 2

Replicating Kafka topics between clusters with Kafka Connect and MirrorMaker 2

In this article we are going to create the foundations of replicating Kafka topics between Kafka Clusters utilizing MirrorMaker 2.

To demonstrate the concept, a “batteries included” prototype is provided with docker-compose, which sets up a ready-to-run configuration with:

  • two Kafka single-node instances (authentication enabled, test topics automatically created)
  • a Kafka Connect instance with MirrorMaker2 connector configuration (connector configuration is automatically set up, REST API with authentication is configured to allow bootstrapping, administration and monitoring)
  • Kafka-UI for a convenient web-UI experience to admin and monitor Kafka, its topics and the connectors.

Article content

Use-Cases of Kafka Topic Replication

Kafka topic replication between clusters addresses several key use-cases. One prominent use-case is disaster recovery, where replication ensures data continuity in the event of a failure or disaster in the primary Kafka cluster. By continuously replicating data from the primary cluster to a backup cluster, data loss and downtime can be minimized, allowing applications to switch to the backup cluster and continue the operation there.

Another important use-case is data isolation, which enhances data security and privacy by isolating sensitive data. Replication can be configured to replicate only selected topics from the source (private) cluster to the target (shared) cluster, ensuring that sensitive data remains within the private cluster while exposing only necessary data to the shared cluster.

Data aggregation use consolidates data from multiple Kafka clusters into a single aggregation cluster, facilitating easier data analysis and processing. This is useful for organizations that need to aggregate data from various sources for comprehensive insights.

Cloud migration is another scenario where Kafka topic replication proves invaluable. It enables the migration of data from on-premises Kafka clusters to cloud-based clusters, or from one cloud provider’s cluster to another, by replicating topics data between the clusters. This transition help organizations to leverage the scalability and flexibility nature of cloud infrastructure, or migrate workloads to a more affordable or maintainable cloud provider.

Lastly, in geo-replication use-cases data is getting distributed across geographically dispersed clusters. By replicating topics between clusters located in different regions, this approach ensures data availability and redundancy across locations, and can help in maintaining optimal network latency and throughput for consumers.

Replication Topologies

Replication can be implemented in various topologies to fulfill different architectural needs. The active-passive topology involves one cluster acting as the primary (active) cluster, while the other serves as the backup (passive) cluster. This setup is ideal for disaster recovery scenarios where the passive cluster is used only when the active cluster fails.

In contrast, the active-active topology allows both clusters to be active, serving read and write operations simultaneously. This topology is suitable for geo-replication and load balancing, ensuring high availability and redundancy.

The hub-and-spoke topology features a central hub cluster that aggregates data from multiple spoke clusters. This configuration is useful for data aggregation scenarios where data from various source clusters is consolidated into a central cluster.

Finally, the multi-cluster topology interconnects multiple Kafka clusters, allowing data replication between any pair of clusters. This topology provides flexibility for complex architectures requiring data replication across several clusters.

Capabilities of MirrorMaker 2

MirrorMaker 2 (MM2) is a tool included in Kafka designed for replicating Kafka topic data (topic offset replication) between clusters. Besides topic offset replication it also supports consumer offset synchronization, ensuring that consumer offsets are accurately replicated between clusters. MM2 allows administrators to specify which topics should be replicated (filtering) and to rename topics during replication (prefixing the topic with the source kafka cluster’s name in the target topic’s name). This feature enhances data management and security by enabling selective replication and topic customization.

Besides uni-direction replication (which copies data in the direction from a source cluster to a target cluster) bidirectional replication can also be achieved. In this case each of the clusters contain multiple topics for the same type of data: the topic which contains data that is written by the producers of the given cluster (read+write), and additional (read-only) topic(s) of the same data type that are replicated from other cluster(s). The consumers are configured to consume from each these topics, therefore they get both the data that is produced on the given cluster and the data that was produced on the corresponding replicated cluster(s). This approach enables data to be mirrored in both directions between clusters. This feature is particularly useful for scenarios requiring high availability and disaster recovery.

Article content

Be mindful, that consumer group offsets are replicated with delay (by default 60 seconds). If it is desired that events would be consumed only once, then the consumers should be connected to only one kafka cluster at a time. This also means that if replication is used to achieve disaster recovery, there will be extra steps involved to disable the consumers connected to the impaired cluster first, and afterwards enable consumers connected to the fail-over cluster. If no only-once consumption is desired, then use different consumer groups on the two clusters and/or do not enable consumer group offset replication.

 

Deployment Options for MirrorMaker 2

MirrorMaker 2 offers flexible several deployment options to suit various operational needs. It can be deployed as a standalone or dedicated cluster, or it can be deployed into a distributed Kafka Connect cluster. This later option provides the most flexibility and operational benefits (such as monitoring and on-the-fly configuration changes), so in my configuration setup this deployment option has been chosen. Many organizations already may have a running distributed Connect cluster available, which makes this a convenient choice to introduce MirrorMaker in their infrastructure.

MirrorMaker 2 uses three connectors for data replication between Kafka clusters:

Article content

  1. MirrorSourceConnector: Replicates topics from the source cluster to the target cluster
  2. MirrorCheckpointConnector: Tracks and synchronizes consumer group offsets between clusters
  3. MirrorHeartbeatConnector: Monitors connectivity and health between the source and target clusters

 

In the upcoming example we are demonstrating how to configure these to replicate a single topic and the consumer group offsets from a source Kafka cluster to a target cluster.

Demo setup

We are building up a docker compose stack, which starts up and configures the needed services to demonstrate the operation of replication with Mirror Maker 2.

Let’s pin the used container image versions in a .env file:

# file: ./.env
KAFKA_VERSION=3.9.0
KAFKAUI_VERSION=v0.7.2        

In order we could configure connectors during the initialization phase, we will need curl, which is by default not installed in the standard Kafka container image. The below Dockerfile solves this problem:

# file: ./connect/Dockerfile
ARG KAFKA_VERSION

FROM apache/kafka:${KAFKA_VERSION}

USER root
RUN set -eux; \
    apk update; \
    apk upgrade; \
    apk add --no-cache curl bash; \
    apk cache clean;

USER appuser        

The below Docker Compose file sets up a Kafka environment with two single-node Kafka clusters (kafka1 and kafka2), a Kafka Connect service (which will run the MirrorMaker connectors, and a Kafka UI for monitoring and administering.

Each service is bootstrapped with initial configuration, so the user do not need to that themselves:

  • Kafka brokers: authentication (users and passwords), creation of the “test” topic
  • Kafka Connect: submitting the MirrorMaker configuration files to the cluster
  • Kafka UI: setup to access the Kafka clusters and the Kafka Connect

# file: ./docker-compose.yaml

x-kafka-broker-common: &kafka-broker-common
    image: apache/kafka:${KAFKA_VERSION}
    command: >
      /bin/bash -c '
        /etc/kafka/docker/run &
        kafka_pid=$$!

        set -e  # exit on errors
        timeout 30s bash -c "
          until printf \"\" > /dev/tcp/localhost/9092; do 
            echo \"[INFO] Waiting for Kafka to be available...\" 
            sleep 5
          done
        "
        echo "[INFO] Creating topics..."
        /opt/kafka/bin/kafka-topics.sh \
          --bootstrap-server localhost:9092 \
          --create --if-not-exists --topic test \
          --replication-factor 1 --partitions 2 \
          --command-config /opt/kafka/config/client.properties

        echo "[INFO] Kafka is set up and running!"
        wait $$kafka_pid
      '
    healthcheck:
      test: "bash -c 'printf \"\" > /dev/tcp/127.0.0.1/9092; exit $$?;'"
      interval: 3s
      timeout: 3s
      retries: 30

x-kafka-broker-common-environment-vars: &kafka-broker-common-environment-vars
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERBROKER'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERBROKER:PLAINTEXT,LOCALHOST:SASL_PLAINTEXT,DOCKERINTERNAL:SASL_PLAINTEXT'    
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAINTEXT

      KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_LOG_DIRS: '/var/lib/kafka/data'
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost

services:

  kafka1:
    <<: *kafka-broker-common
    environment:
      <<: *kafka-broker-common-environment-vars
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
      KAFKA_LISTENERS: 'INTERBROKER://kafka1:29092,CONTROLLER://kafka1:29093,LOCALHOST://0.0.0.0:9092,DOCKERINTERNAL://0.0.0.0:39092'
      KAFKA_ADVERTISED_LISTENERS: 'INTERBROKER://kafka1:29092,LOCALHOST://localhost:9092,DOCKERINTERNAL://kafka1:39092'  # could be localhost:9092 for docker-host
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093'
    volumes:
      - ./kafka/kafka1_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf:ro
      - ./kafka/client.properties:/opt/kafka/config/client.properties:ro
      - kafka1_data:/var/lib/kafka/data

  kafka2:
    <<: *kafka-broker-common
    environment:
      <<: *kafka-broker-common-environment-vars
      CLUSTER_ID: '5qwDQrHsT9OpHgPFpiS7cg'
      KAFKA_LISTENERS: 'INTERBROKER://kafka2:29092,CONTROLLER://kafka2:29093,LOCALHOST://0.0.0.0:9092,DOCKERINTERNAL://0.0.0.0:39092'
      KAFKA_ADVERTISED_LISTENERS: 'INTERBROKER://kafka2:29092,LOCALHOST://localhost:9092,DOCKERINTERNAL://kafka2:39092'  # could be localhost:9092 for docker-host
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka2:29093'

    volumes:
      - ./kafka/kafka2_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf:ro
      - ./kafka/client.properties:/opt/kafka/config/client.properties:ro
      - kafka2_data:/var/lib/kafka/data

  
  connect:
    build: 
      context: ./connect    # build our own image for connect
      args:
        KAFKA_VERSION: ${KAFKA_VERSION}
    ports:
      - "8083:8083"         # Connect REST API
    command: >
      /bin/bash -c '
        /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect.properties &
        connect_pid=$$!
        /opt/kafka/custom-scripts/deploy-connectors.sh
        wait $$connect_pid
      '

    environment:
      KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/custom-config/restapi-jaas.conf"
      CONNECT_REST_API: http://localhost:8083
      CONNECT_USERNAME: init
      CONNECT_PASSWORD: init-secret
      connector_config_dir: /opt/kafka/connector-config
      
    volumes:
      - ./connect/connect.properties:/opt/kafka/config/connect.properties:ro
      - ./connect/kafka.properties:/opt/kafka/custom-config/kafka.properties:ro
      - ./connect/restapi-users.txt:/opt/kafka/custom-config/restapi-users.txt:ro
      - ./connect/restapi-jaas.conf:/opt/kafka/custom-config/restapi-jaas.conf:ro

      - ./connect/connectors:/opt/kafka/connector-config:ro
      - ./connect/scripts:/opt/kafka/custom-scripts:ro
    healthcheck:
      test: curl -IL --silent -u health:health-secret --insecure http://localhost:8083 | grep HTTP | grep -q 200   # curl --fail http://localhost || exit 1
      interval: 3s
      timeout: 3s
      retries: 40
      start_period: 5s
    depends_on:
      kafka1:
        condition: service_healthy
      kafka2:
        condition: service_healthy

  kafka-ui:
    image: provectuslabs/kafka-ui:${KAFKAUI_VERSION}
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: kafka1
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:39092
      KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT
      KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN
      KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="kafkaui" password="kafkaui-secret";'

      KAFKA_CLUSTERS_1_NAME: kafka2
      KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka2:39092
      KAFKA_CLUSTERS_1_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT
      KAFKA_CLUSTERS_1_PROPERTIES_SASL_MECHANISM: PLAIN
      KAFKA_CLUSTERS_1_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="kafkaui" password="kafkaui-secret";'

      KAFKA_CLUSTERS_1_KAFKACONNECT_0_NAME: connect
      KAFKA_CLUSTERS_1_KAFKACONNECT_0_ADDRESS: http://connect:8083
      KAFKA_CLUSTERS_1_KAFKACONNECT_0_USERNAME: kafkaui
      KAFKA_CLUSTERS_1_KAFKACONNECT_0_PASSWORD: kafkaui-secret

      LOGGING_LEVEL_ROOT: 'warn'
      LOGGING_LEVEL_COM_PROVECTUS: 'warn'
    depends_on:
      kafka1:
        condition: service_healthy
      kafka2:
        condition: service_healthy
    healthcheck:
      test: wget --no-verbose --tries=1 --spider localhost:8080 || exit 1
      interval: 5s
      timeout: 5s
      retries: 3
      start_period: 30s

volumes:
  kafka1_data:
  kafka2_data:        

Kafka broker configuration files

As authentication is being enforced for all of the Kafka clients, we supply two separate “username-password” files for the two clusters:

# file: ./kafka/kafka1_server_jaas	.conf
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin" password="admin-secret"
  user_admin="admin-secret"  
  user_kafkaui="kafkaui-secret"
  user_connect="connect1-secret"
  user_mirrormaker="mirrormaker1-secret";
};        
# file: ./kafka/kafka2_server_jaas	.conf
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin" password="admin-secret"
  user_admin="admin-secret"  
  user_kafkaui="kafkaui-secret"
  user_connect="connect2-secret"
  user_mirrormaker="mirrormaker2-secret";
};        

For the kafka admin CLI clients we set up a configuration too:

# file: ./kafka/client.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";        

Connect cluster configuration files

The following file configures the Connect cluster:

# file: ./connect/connect.properties

config.providers: file,env
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
config.providers.env.class: org.apache.kafka.common.config.provider.EnvVarConfigProvider

bootstrap.servers=${file:/opt/kafka/custom-config/kafka.properties:bootstrap.servers}
sasl.mechanism=${file:/opt/kafka/custom-config/kafka.properties:broker.sasl.mechanism}
security.protocol=${file:/opt/kafka/custom-config/kafka.properties:broker.security.protocol}
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="${file:/opt/kafka/custom-config/kafka.properties:broker.username}" \
  password="${file:/opt/kafka/custom-config/kafka.properties:broker.password}";

producer.sasl.mechanism=${file:/opt/kafka/custom-config/kafka.properties:broker.sasl.mechanism}
producer.security.protocol=${file:/opt/kafka/custom-config/kafka.properties:broker.security.protocol}
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="${file:/opt/kafka/custom-config/kafka.properties:broker.username}" \
  password="${file:/opt/kafka/custom-config/kafka.properties:broker.password}";

consumer.sasl.mechanism=${file:/opt/kafka/custom-config/kafka.properties:broker.sasl.mechanism}
consumer.security.protocol=${file:/opt/kafka/custom-config/kafka.properties:broker.security.protocol}
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="${file:/opt/kafka/custom-config/kafka.properties:broker.username}" \
  password="${file:/opt/kafka/custom-config/kafka.properties:broker.password}";

group.id=connect-cluster
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
offset.storage.partitions=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
status.storage.partitions=1

listeners=HTTP://0.0.0.0:8083
rest.advertised.host.name=connect
rest.advertised.port=8083
rest.extension.classes=org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension        

We need configuration files to establish HTTP Basic Auth for the Connect REST API (used for submitting connector configuration during bootstrapping, and to allow access for the KafkaUI):

# file: ./connect/restapi-jaas.conf
KafkaConnect {
    org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required
    file="/opt/kafka/custom-config/restapi-users.txt";
};        
# file: ./connect/restapi-users.txt
health: health-secret
kafkaui: kafkaui-secret
init: init-secret
vscode: vscode-secret        

Kafka Broker connection properties are put into the below configuration file, and are getting referenced by the Connect cluster’s main configuration and also by the individual connector configuration files.

# file: ./connect/kafka.properties
bootstrap.servers=kafka2:39092
broker.sasl.mechanism=PLAIN
broker.security.protocol=SASL_PLAINTEXT

broker.username=connect
broker.password=connect2-secret

mirrormaker.username=mirrormaker
mirrormaker1.password=mirrormaker1-secret
mirrormaker2.password=mirrormaker2-secret        

Kafka Connector configurations for MirrorMaker 2

The following shell script is invoked when Connect is started. It submits the Connector configuration files to the Connect cluster via its REST API. It ensures that the Mirror Maker 2 connector configurations supplied as json files (see later) are applied during the Connect cluster starts up. This is practical, as we get a ready-to-use Mirror Maker right after startup. If we wish to modify the config files in the filesystem, restarting Connect will update the connector configs from the json files automatically.

#!/bin/bash
# file: ./connect/scripts/deploy-connectors.sh

set -e  # exit on errors

echo "[INFO] Waiting for Kafka Connect to become available..."
timeout 30s bash -c "
    until curl -IL --silent -u ${CONNECT_USERNAME}:${CONNECT_PASSWORD} --insecure ${CONNECT_REST_API} | grep HTTP | grep -q 200; do 
        echo \"[INFO] Waiting for Kafka Connect to become available...\" 
        sleep 2
    done
"

for connector_config_file in /opt/kafka/connector-config/*json; do

    connector_name=$(basename "$connector_config_file" | sed 's/.json//')
    echo "[INFO] Deploying connector: $connector_name"

    curl -X PUT -H "Content-Type: application/json" \
        -u ${CONNECT_USERNAME}:${CONNECT_PASSWORD} \
        -s -S -i \
        --fail-with-body --retry 3 --retry-delay 5 --retry-connrefused --connect-timeout 10 \
        --data @${connector_config_file} \
        ${CONNECT_REST_API}/connectors/${connector_name}/config || \
        (echo "[ERROR] Failed to deploy connector \'$connector_name\'"; exit 1)
done

echo "[INFO] Finished deploying the connectors"        

The source connector configuration ensures that kafka-1 cluster’s “test” topic will be replicated to the kafka-2 cluster:

// file: ./connect/connectors/mirror-source-connector.json
{
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "topics": "test",

    "source.cluster.alias": "kafka-1",
    "source.cluster.bootstrap.servers": "kafka1:39092",
    "source.cluster.security.protocol": "SASL_PLAINTEXT",
    "source.cluster.sasl.mechanism": "PLAIN",
    "source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker1.password}\";",
    
    "source.cluster.admin.sasl.mechanism": "${file:/opt/kafka/custom-config/kafka.properties:broker.sasl.mechanism}",
    "source.cluster.admin.security.protocol": "${file:/opt/kafka/custom-config/kafka.properties:broker.security.protocol}",
    "source.cluster.admin.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker1.password}\";",
    
    "target.cluster.alias": "kafka-2",
    "target.cluster.bootstrap.servers": "kafka2:39092",
    "target.cluster.security.protocol": "SASL_PLAINTEXT",
    "target.cluster.sasl.mechanism": "PLAIN",
    "target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker2.password}\";",

    "sync.topic.acls.enabled": false,
    "sync.topic.configs.enabled": false,

    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",

    "replication.factor": "1",
    "replication.policy.class": "org.apache.kafka.connect.mirror.DefaultReplicationPolicy",
    "offset-syncs.topic.replication.factor": "1",
    "offset-syncs.topic.location": "source", 
    "tasks.max": "1"
  }        

The Checkpoint connector is responsible for replicating the consumer group offsets from kafka-1 to kafka-2.

// file: ./connect/connectors/mirror-checkpoint-connector.json
{
    "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",

    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",

    "source.cluster.alias": "kafka-1",
    "source.cluster.bootstrap.servers": "kafka1:39092",
    "source.cluster.security.protocol": "SASL_PLAINTEXT",
    "source.cluster.sasl.mechanism": "PLAIN",
    "source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker1.password}\";",

    "target.cluster.alias": "kafka-2",
    "target.cluster.bootstrap.servers": "kafka2:39092",
    "target.cluster.security.protocol": "SASL_PLAINTEXT",
    "target.cluster.sasl.mechanism": "PLAIN",
    "target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker2.password}\";",

    "replication.policy.class": "org.apache.kafka.connect.mirror.DefaultReplicationPolicy",
    "emit.checkpoint.enabled": "true",
    "emit.checkpoints.interval.seconds": "60",
    "sync.group.offsets.enabled": "true",
    "sync.group.offsets.interval.seconds": "60",

    "group.filter.class": "org.apache.kafka.connect.mirror.DefaultGroupFilter",
    "groups": ".*",
    "groups.exclude": "console-consumer-.*, connect-.*, __.*",
    "refresh.groups.enabled": "true",
    "refresh.groups.interval.seconds": "60"
}        

The use of the Heartbeat connector is optional. It helps in monitoring the connectivity between the clusters:

// file: ./connect/connectors/mirror-heartbeat-connector.json
{
    "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    
    "source.cluster.alias": "kafka-1",
    "source.cluster.bootstrap.servers": "kafka1:39092",
    "source.cluster.security.protocol": "SASL_PLAINTEXT",
    "source.cluster.sasl.mechanism": "PLAIN",
    "source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker1.password}\";",

    "target.cluster.alias": "kafka-2",
    "target.cluster.bootstrap.servers": "kafka2:39092",
    "target.cluster.security.protocol": "SASL_PLAINTEXT",
    "target.cluster.sasl.mechanism": "PLAIN",
    "target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker.username}\" password=\"${file:/opt/kafka/custom-config/kafka.properties:mirrormaker2.password}\";",

    "emit.heartbeat.enabled": "true",
    "emit.heartbeat.interval.seconds": "1",
    "heartbeats.topic.replication.factor": "1"
  }        

Starting up and testing the prototype

To start the stack, use docker compose up in the CLI.

It may take a minute or two to start up the stack.

By opening the Kafka UI at http://localhost:8080 gives us the opportunity to explore and verify the Kafka clusters, their topics and the deployed Connectors too.

Article content
Kafka Brokers on the Kafka UI Dashboard view


Article content
The topics on the first Kafka cluster


Article content
Topics of the second Kafka cluster


Article content
The MirrorMaker connectors

Testing topic offset replication

To verify the replication, we are going to produce some test data on the first Kafka Cluster’s ‘test’ topic, and consume it on the second Kafka Cluster. Mirror Maker makes sure that the events of the first cluster’s ‘test’ topic are replicated to second cluster’s ‘kafka-1.test’ topic. Mirror Maker automatically creates the replication target topic with the prefix of the source cluster (kafka-1) on the target cluster.

Article content

For simplicity, we are going to use the CLI to run Kafka data producers and consumers to validate that the replication works as expected.

Open a new Git Bash console and start up a consumer with the below command to fetch data from the kafka-2 cluster from the ‘kafka-1.test’ topic (which will contain events replicated from kafka-1) and also from the ‘test’ topic. Consuming from multiple topics within the same consumer group can be realized using a regular expression in the whitelist argument.

docker compose exec -it kafka2 //bin/sh -c '/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "^(kafka-1\\.)?test$" --consumer.config /opt/kafka/config/client.properties --property "print.timestamp=true" --property "print.key=true" --property "key.separator=:" --property "print.partition=true" --property "print.offset=true" --group my-cg'        

Next, produce some test data to kafka-1’s ‘test’ topic with the below command. Open a new Git Bash console to execute it:

docker compose exec -it kafka1 //bin/sh -c 'echo "$RANDOM:Hello, Kafka!" | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config /opt/kafka/config/client.properties --property "parse.key=true" --property "key.separator=:"'        

As a result, MirrorMaker will replicate the data from kafka1’s ‘test’ topic to kafka2’s ‘kafka-1.test’ topic, and the CLI-based consumer will print the event to the console:

Article content

 Stop the consumer by pressing Ctrl+c.

Testing consumer group offset replication

In this test, we are going to validate that consumer group offset replication works as expected.

We can image, that we have a high-availability setup: kafka-1 will be our primary active cluster, and kafka-2 will be the disaster recovery cluster. With our current configuration topic offsets and consumer group offsets are replicated from kafka-1 to kafka-2.


Article content

Normal worklow: the producers are writing to kafka-1’s ‘test’ topic, consumers are consuming also from there.

Open a new Git Bash console to start a consumer attached to kafka-1’s ‘test’ topic:

docker compose exec -it kafka1 //bin/sh -c '/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "test" --consumer.config /opt/kafka/config/client.properties --property "print.timestamp=true" --property "print.key=true" --property "key.separator=:" --property "print.partition=true" --property "print.offset=true" --group my-cg'          

Start a new Git Bash console to produce a test data to kafka-1’s ‘test’ topic:

docker compose exec -it kafka1 //bin/sh -c 'echo "$RANDOM:Hello, Kafka!" | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config /opt/kafka/config/client.properties --property "parse.key=true" --property "key.separator=:"'        

As a result, we will see, that the event is consumed by the CLI client and printed to the console.

Failover: let’s say, our consumer clients loose network connectivity to kafka-1, and are no longer able to process data from kafka-1. To emulate this, stop the previously started consumer client attached to kafka-1’s ‘test’ topic, by pressing Ctrl+C in the right console window.

Produce some more data with the above CLI producer script, simulate additional data arriving to the kafka-1’s ‘test’ topic.

Wait at least 2 minutes to make sure that the periodic consumer group offset replication between kafka-1 and kafka-2 takes place. This ensures that the consumers that will be attached to kafka-2 as part of the same consumer group will be able to continue processing data, where they have left off on kafka-1 before their operation got stopped.

Start a consumer client pointed to kafka-2’s replicated topic, using the same consumer group name as previously used on kafka-1.

docker compose exec -it kafka2 //bin/sh -c '/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "^(kafka-1\\.)?test$" --consumer.config /opt/kafka/config/client.properties --property "print.timestamp=true" --property "print.key=true" --property "key.separator=:" --property "print.partition=true" --property "print.offset=true" --group my-cg'        

On the console you will see only the new data, and not the data that has already been consumed by the client when it was consuming from kafka-1.

 

Conclusion

In this article, we have established the foundational concepts and practical implementation of replicating Kafka topics between clusters using Kafka Connect and MirrorMaker 2. By leveraging a comprehensive prototype setup with Docker Compose, we demonstrated the configuration and deployment of two Kafka single-node instances, a Kafka Connect instance with MM2 connector configuration, and a Kafka-UI for administration and monitoring.

MM2's capabilities, including topic offset replication, consumer offset synchronization, and flexible deployment options, were thoroughly explored. We discussed the use of MirrorSourceConnector, MirrorCheckpointConnector, and MirrorHeartbeatConnector for effective data replication and monitoring between Kafka clusters. Additionally, the article highlighted the importance of consumer offset synchronization to ensure consistent data consumption across replicated clusters.

Practical guidance provided on configuring and testing MM2, with the emphasis on topic and consumer group offsets replication. This article serves as a guide for System Architects and DevOps engineers looking to implement Kafka data replication solutions using MirrorMaker 2.

To view or add a comment, sign in

More articles by Richard Pal

Insights from the community

Others also viewed

Explore topics