Hands-On Implementation Guide:

Hands-On Implementation Guide:

This is considered part 2 of the kafka connect, this article will benefit whom are looking for codes and simple straightforward steps to have his first kafka connect.

Description:

the use case I am going to describe is Implementation of kafka sink connector with oracle database on Redhat openshift and not having strimzi.

the steps are as the following:

  • kafka connect yaml
  • kafka connector yaml
  • deployment yaml

Kafka connect yaml

apiVersion: eventstreams.ibm.com/v1beta1
kind: KafkaConnect
metadata:
      
  generation: 23
  managedFields:
    - apiVersion: eventstreams.ibm.com/v1beta1
      fieldsType: FieldsV1
      fieldsV1:
        'f:metadata':
          'f:annotations':
            .: {}
            'f:eventstreams.ibm.com/use-connector-resources': {}
            'f:kubectl.kubernetes.io/last-applied-configuration': {}
        'f:spec':
          'f:image': {}
          'f:tls':
            .: {}
            'f:trustedCertificates': {}
          'f:authentication':
            .: {}
            'f:certificateAndKey':
              .: {}
              'f:certificate': {}
              'f:key': {}
              'f:secretName': {}
            'f:type': {}
          .: {}
          'f:template':
            .: {}
            'f:pod':
              .: {}
              'f:metadata':
                .: {}
                'f:annotations':
                  'f:cloudpakName': {}
                  'f:productName': {}
                  'f:productChargedContainers': {}
                  'f:productCloudpakRatio': {}
                  'f:productMetric': {}
                  .: {}
                  'f:eventstreams.production.type': {}
                  'f:strimzi.io/use-connector-resources': {}
                  'f:cloudpakId': {}
                  'f:productID': {}
                  'f:productVersion': {}
                  'f:cloudpakVersion': {}
          'f:replicas': {}
          'f:resources':
            .: {}
            'f:limits':
              .: {}
              'f:cpu': {}
              'f:memory': {}
            'f:requests':
              .: {}
              'f:cpu': {}
              'f:memory': {}
          'f:config':
            'f:config.storage.topic': {}
            'f:offset.storage.replication.factor': {}
            'f:plugin.path': {}
            'f:status.storage.replication.factor': {}
            .: {}
            'f:group.instance.id': {}
            'f:status.storage.topic': {}
            'f:offset.storage.topic': {}
            'f:config.storage.replication.factor': {}
            'f:group.id': {}
          'f:bootstrapServers': {}
      manager: Mozilla
      operation: Update
      time: '2023-12-19T13:34:44Z'
    - apiVersion: eventstreams.ibm.com/v1beta1
      fieldsType: FieldsV1
      fieldsV1:
        'f:status':
          .: {}
          'f:conditions': {}
          'f:connectorPlugins': {}
          'f:labelSelector': {}
          'f:observedGeneration': {}
          'f:replicas': {}
          'f:url': {}
  
  name: name for the kafka connect
  namespace: es
  resourceVersion: '795697012'
  uid: 6483beae-ea38-4dd4-8cd3-121892c2d81d
spec:
  authentication:
    certificateAndKey:
      certificate: your certificate
      key: your key
      secretName: name for the secret
    type: tls
  bootstrapServers: 'your server with the port'
  config:
    plugin.path: /opt/kafka/plugins
    status.storage.topic: topic for your status
    status.storage.replication.factor: 1
    offset.storage.topic: topic for your storage
    group.instance.id: group-01
    group.id: name_of group
    config.storage.replication.factor: 1
    config.storage.topic: name for topic you choose
    offset.storage.replication.factor: 1
  image: 'your image from your registry'
  replicas: 1
  resources:
    limits:
      cpu: 2000m
      memory: 2Gi
    requests:
      cpu: 1000m
      memory: 2Gi
 
  tls:
    trustedCertificates:
      - certificate: certificate
        secretName: secret_name

        



Kafka connector yaml

apiVersion: eventstreams.ibm.com/v1alpha1
kind: KafkaConnector
metadata:
  generation: 4
  labels:
    eventstreams.ibm.com/cluster: name of cluster
  managedFields:
    - apiVersion: eventstreams.ibm.com/v1alpha1
      fieldsType: FieldsV1
      fieldsV1:
        'f:metadata':
          'f:labels':
            .: {}
            'f:eventstreams.ibm.com/cluster': {}
        'f:spec':
          .: {}
          'f:class': {}
          'f:config':
            'f:insert.mode': {}
            'f:table.name.format': {}
            'f:auto.create': {}
            'f:connection.user': {}
            'f:auto.evolve': {}
            'f:tasksMax': {}
            .: {}
            'f:connection.url': {}
            'f:connection.password': {}
            'f:topics': {}
            'f:value.converter.schemas.enable': {}
      manager: Mozilla
      operation: Update
      time: '2023-12-20T13:34:27Z'
    - apiVersion: eventstreams.ibm.com/v1alpha1
      fieldsType: FieldsV1
      fieldsV1:
        'f:status':
          .: {}
          'f:conditions': {}
          'f:connectorStatus':
            .: {}
            'f:connector':
              .: {}
              'f:state': {}
              'f:worker_id': {}
            'f:name': {}
            'f:tasks': {}
            'f:type': {}
          'f:observedGeneration': {}
          'f:tasksMax': {}
          'f:topics': {}
    
  name: name of your connector that you want to choose
  namespace: your_namespace 
  spec:
  class: io.aiven.connect.jdbc.JdbcSinkConnector
  config:
    topics: YOUR TOPIC 
    value.converter.schemas.enable: true
    connection.password: password
    tasksMax: 2
    insert.mode: insert
    table.name.format: your_table_name
    connection.user: user
    auto.create: false
    auto.evolve: false
    connection.url: >-
      jdbc:oracle:thin:@localhost:port_number/connection_string

        

Deployment yaml file

kind: Deployment
apiVersion: apps/v1
metadata:
  annotations:
    
  name: deployment-connect
  uid: 9d87c1b7-5cec-4fe5-a8ba-9c8ae06a5ea0

  generation: 22
  managedFields:
    - manager: okhttp
      operation: Update
      apiVersion: apps/v1
      time: '2023-12-18T08:15:08Z'
      fieldsType: FieldsV1
      fieldsV1:
        'f:metadata':
          'f:labels':
            .: {}
            'f:app.kubernetes.io/instance': {}
            'f:app.kubernetes.io/managed-by': {}
            'f:app.kubernetes.io/name': {}
            'f:app.kubernetes.io/part-of': {}
            'f:eventstreams.ibm.com/cluster': {}
            'f:eventstreams.ibm.com/kind': {}
            'f:eventstreams.ibm.com/name': {}
          'f:ownerReferences':
         
        'f:spec':
          'f:progressDeadlineSeconds': {}
          'f:replicas': {}
          'f:revisionHistoryLimit': {}
          'f:selector': {}
          'f:strategy':
            'f:rollingUpdate':
              .: {}
              'f:maxSurge': {}
              'f:maxUnavailable': {}
            'f:type': {}
          'f:template':
            'f:metadata':
              'f:annotations':
                'f:cloudpakName': {}
                'f:eventstreams.ibm.com/logging-appenders-hash': {}
                'f:productName': {}
                'f:productChargedContainers': {}
                'f:productCloudpakRatio': {}
                'f:productMetric': {}
                .: {}
                'f:eventstreams.production.type': {}
                'f:strimzi.io/use-connector-resources': {}
                'f:cloudpakId': {}
                'f:productID': {}
                'f:productVersion': {}
                'f:cloudpakVersion': {}
              'f:labels':
                .: {}
                'f:app.kubernetes.io/instance': {}
                'f:app.kubernetes.io/managed-by': {}
                'f:app.kubernetes.io/name': {}
                'f:app.kubernetes.io/part-of': {}
                'f:eventstreams.ibm.com/cluster': {}
                'f:eventstreams.ibm.com/kind': {}
                'f:eventstreams.ibm.com/name': {}
    - manager: kube-controller-manager
      operation: Update
      apiVersion: apps/v1
      time: '2023-12-20T13:35:42Z'
      fieldsType: FieldsV1
      fieldsV1:
        'f:metadata':
          'f:annotations':
            .: {}
            'f:deployment.kubernetes.io/revision': {}
        'f:status':
          'f:availableReplicas': {}
          'f:conditions':
            .: {}
            'k:{"type":"Available"}':
              .: {}
              'f:lastTransitionTime': {}
              'f:lastUpdateTime': {}
              'f:message': {}
              'f:reason': {}
              'f:status': {}
              'f:type': {}
            'k:{"type":"Progressing"}':
              .: {}
              'f:lastTransitionTime': {}
              'f:lastUpdateTime': {}
              'f:message': {}
              'f:reason': {}
              'f:status': {}
              'f:type': {}
          'f:observedGeneration': {}
          'f:readyReplicas': {}
          'f:replicas': {}
          'f:updatedReplicas': {}
      subresource: status
  namespace: es
  ownerReferences:
    - apiVersion: eventstreams.ibm.com/v1beta1
      kind: KafkaConnect
      name: name_of_kafka_connect
      uid: 6483beae-ea38-4dd4-8cd3-121892c2d81d
      controller: false
      blockOwnerDeletion: false

spec:
  replicas: 1
 
  template:
    metadata:
      creationTimestamp: null
   
  
        productMetric: VIRTUAL_PROCESSOR_CORE
    spec:
      restartPolicy: Always
      imagePullSecrets:
        - name: ibm-entitlement-key
      schedulerName: default-scheduler
      affinity: {}
      terminationGracePeriodSeconds: 30
      securityContext: {}
      containers:
        - resources:
            limits:
              cpu: '2'
              memory: 2Gi
            requests:
              cpu: '1'
              memory: 2Gi
          readinessProbe:
            httpGet:
              path: /
              port: rest-api
              scheme: HTTP
            initialDelaySeconds: 60
            timeoutSeconds: 5
            periodSeconds: 10
            successThreshold: 1
            failureThreshold: 3
          terminationMessagePath: /dev/termination-log
          name: name_deployment
          command:
            - /opt/kafka/kafka_connect_run.sh
          livenessProbe:
            httpGet:
              path: /
              port: rest-api
              scheme: HTTP
            initialDelaySeconds: 60
            timeoutSeconds: 5
            periodSeconds: 10
            successThreshold: 1
            failureThreshold: 3
          env:
            - name: KAFKA_CONNECT_CONFIGURATION
              value: |
              offset.storage.topic= name_topic                                       
          
            - name: KAFKA_CONNECT_METRICS_ENABLED
              value: 'false'
            - name: KAFKA_CONNECT_BOOTSTRAP_SERVERS
              value: 'value'
            - name: STRIMZI_KAFKA_GC_LOG_ENABLED
              value: 'false'
            - name: DYNAMIC_HEAP_FRACTION
              value: '1.0'
            - name: KAFKA_CONNECT_TLS
              value: 'true'
            - name: KAFKA_CONNECT_TRUSTED_CERTS
              value: path/certficate.crt
            - name: KAFKA_CONNECT_TLS_AUTH_KEY
              value: neoleap-user/user.key
            - name: KAFKA_CONNECT_TLS_AUTH_CERT
              value:path/certficate.crt
          
          ports:
            - name: rest-api
              containerPort: 8083
              protocol: TCP
          imagePullPolicy: IfNotPresent
          volumeMounts:
            - name: strimzi-tmp
              mountPath: /tmp
            - name: kafka-metrics-and-logging
              mountPath: /opt/kafka/custom-config/
            - name: es-cluster-ca-cert
              mountPath: /opt/kafka/your_path
            - name: your_name
              mountPath: /opt/kafka/your_path
          terminationMessagePolicy: File
          image: 'image from registry'
      
      volumes:
        - name: strimzi-tmp
          emptyDir:
            medium: Memory
    
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 0
      maxSurge: 1
  revisionHistoryLimit: 10
  progressDeadlineSeconds: 600


        

Explanation

Kafka connect is considered the mother of all the files, Kafka connector is considered a plugin that integrates with kafka connect.

kafka connector is where you write the configuration whether oracle database url, password , username.


Steps needed

1- you need to download JDBC-connector jar files suitable for Oracle databases as version of your oracle will make different connector

2- the line that will make the magic

 class: io.aiven.connect.jdbc.JdbcSinkConnector

        

What is aiven ?

Aiven for Apache Kafka® Connect, Managed Aiven for Apache Kafka Connect allows you to separate your integrations’ workloads from the Apache Kafka cluster, resulting in more stable, scalable and performant data flows.

they provide a open source documentation and help online

you can find a lot of useful information on this link Aiven github


Aiven will do the magic beneath but first you need to have JDBC jar downloaded first ( we are assuming that the image that will be built will have the JDBC jar included in opt/kafka )


In the log you should find the subscription to the topic found

and you will find that a thread related to sql dialect

Conclusion:

This article should help you to begin your baby steps on implementing the kafka connect, I explained it on a environment ( which was open shift, podman and no strimzi )

please any further question don't hesitate to contact me directly and send me a message, I will be glad to help and learn together.

To view or add a comment, sign in

More articles by Ahmed Karam

  • Understanding Kafka Connect Concepts:

    what do you expect as outcomes from this article what is kafka connect ? how kafka connect will benefit in scenarios ?…

Insights from the community

Others also viewed

Explore topics