Hands-On Implementation Guide:
This is considered part 2 of the kafka connect, this article will benefit whom are looking for codes and simple straightforward steps to have his first kafka connect.
Description:
the use case I am going to describe is Implementation of kafka sink connector with oracle database on Redhat openshift and not having strimzi.
the steps are as the following:
Kafka connect yaml
apiVersion: eventstreams.ibm.com/v1beta1
kind: KafkaConnect
metadata:
generation: 23
managedFields:
- apiVersion: eventstreams.ibm.com/v1beta1
fieldsType: FieldsV1
fieldsV1:
'f:metadata':
'f:annotations':
.: {}
'f:eventstreams.ibm.com/use-connector-resources': {}
'f:kubectl.kubernetes.io/last-applied-configuration': {}
'f:spec':
'f:image': {}
'f:tls':
.: {}
'f:trustedCertificates': {}
'f:authentication':
.: {}
'f:certificateAndKey':
.: {}
'f:certificate': {}
'f:key': {}
'f:secretName': {}
'f:type': {}
.: {}
'f:template':
.: {}
'f:pod':
.: {}
'f:metadata':
.: {}
'f:annotations':
'f:cloudpakName': {}
'f:productName': {}
'f:productChargedContainers': {}
'f:productCloudpakRatio': {}
'f:productMetric': {}
.: {}
'f:eventstreams.production.type': {}
'f:strimzi.io/use-connector-resources': {}
'f:cloudpakId': {}
'f:productID': {}
'f:productVersion': {}
'f:cloudpakVersion': {}
'f:replicas': {}
'f:resources':
.: {}
'f:limits':
.: {}
'f:cpu': {}
'f:memory': {}
'f:requests':
.: {}
'f:cpu': {}
'f:memory': {}
'f:config':
'f:config.storage.topic': {}
'f:offset.storage.replication.factor': {}
'f:plugin.path': {}
'f:status.storage.replication.factor': {}
.: {}
'f:group.instance.id': {}
'f:status.storage.topic': {}
'f:offset.storage.topic': {}
'f:config.storage.replication.factor': {}
'f:group.id': {}
'f:bootstrapServers': {}
manager: Mozilla
operation: Update
time: '2023-12-19T13:34:44Z'
- apiVersion: eventstreams.ibm.com/v1beta1
fieldsType: FieldsV1
fieldsV1:
'f:status':
.: {}
'f:conditions': {}
'f:connectorPlugins': {}
'f:labelSelector': {}
'f:observedGeneration': {}
'f:replicas': {}
'f:url': {}
name: name for the kafka connect
namespace: es
resourceVersion: '795697012'
uid: 6483beae-ea38-4dd4-8cd3-121892c2d81d
spec:
authentication:
certificateAndKey:
certificate: your certificate
key: your key
secretName: name for the secret
type: tls
bootstrapServers: 'your server with the port'
config:
plugin.path: /opt/kafka/plugins
status.storage.topic: topic for your status
status.storage.replication.factor: 1
offset.storage.topic: topic for your storage
group.instance.id: group-01
group.id: name_of group
config.storage.replication.factor: 1
config.storage.topic: name for topic you choose
offset.storage.replication.factor: 1
image: 'your image from your registry'
replicas: 1
resources:
limits:
cpu: 2000m
memory: 2Gi
requests:
cpu: 1000m
memory: 2Gi
tls:
trustedCertificates:
- certificate: certificate
secretName: secret_name
Kafka connector yaml
apiVersion: eventstreams.ibm.com/v1alpha1
kind: KafkaConnector
metadata:
generation: 4
labels:
eventstreams.ibm.com/cluster: name of cluster
managedFields:
- apiVersion: eventstreams.ibm.com/v1alpha1
fieldsType: FieldsV1
fieldsV1:
'f:metadata':
'f:labels':
.: {}
'f:eventstreams.ibm.com/cluster': {}
'f:spec':
.: {}
'f:class': {}
'f:config':
'f:insert.mode': {}
'f:table.name.format': {}
'f:auto.create': {}
'f:connection.user': {}
'f:auto.evolve': {}
'f:tasksMax': {}
.: {}
'f:connection.url': {}
'f:connection.password': {}
'f:topics': {}
'f:value.converter.schemas.enable': {}
manager: Mozilla
operation: Update
time: '2023-12-20T13:34:27Z'
- apiVersion: eventstreams.ibm.com/v1alpha1
fieldsType: FieldsV1
fieldsV1:
'f:status':
.: {}
'f:conditions': {}
'f:connectorStatus':
.: {}
'f:connector':
.: {}
'f:state': {}
'f:worker_id': {}
'f:name': {}
'f:tasks': {}
'f:type': {}
'f:observedGeneration': {}
'f:tasksMax': {}
'f:topics': {}
name: name of your connector that you want to choose
namespace: your_namespace
spec:
class: io.aiven.connect.jdbc.JdbcSinkConnector
config:
topics: YOUR TOPIC
value.converter.schemas.enable: true
connection.password: password
tasksMax: 2
insert.mode: insert
table.name.format: your_table_name
connection.user: user
auto.create: false
auto.evolve: false
connection.url: >-
jdbc:oracle:thin:@localhost:port_number/connection_string
Deployment yaml file
kind: Deployment
apiVersion: apps/v1
metadata:
annotations:
name: deployment-connect
uid: 9d87c1b7-5cec-4fe5-a8ba-9c8ae06a5ea0
generation: 22
managedFields:
- manager: okhttp
operation: Update
apiVersion: apps/v1
time: '2023-12-18T08:15:08Z'
fieldsType: FieldsV1
fieldsV1:
'f:metadata':
'f:labels':
.: {}
'f:app.kubernetes.io/instance': {}
'f:app.kubernetes.io/managed-by': {}
'f:app.kubernetes.io/name': {}
'f:app.kubernetes.io/part-of': {}
'f:eventstreams.ibm.com/cluster': {}
'f:eventstreams.ibm.com/kind': {}
'f:eventstreams.ibm.com/name': {}
'f:ownerReferences':
'f:spec':
'f:progressDeadlineSeconds': {}
'f:replicas': {}
'f:revisionHistoryLimit': {}
'f:selector': {}
'f:strategy':
'f:rollingUpdate':
.: {}
'f:maxSurge': {}
'f:maxUnavailable': {}
'f:type': {}
'f:template':
'f:metadata':
'f:annotations':
'f:cloudpakName': {}
'f:eventstreams.ibm.com/logging-appenders-hash': {}
'f:productName': {}
'f:productChargedContainers': {}
'f:productCloudpakRatio': {}
'f:productMetric': {}
.: {}
'f:eventstreams.production.type': {}
'f:strimzi.io/use-connector-resources': {}
'f:cloudpakId': {}
'f:productID': {}
'f:productVersion': {}
'f:cloudpakVersion': {}
'f:labels':
.: {}
'f:app.kubernetes.io/instance': {}
'f:app.kubernetes.io/managed-by': {}
'f:app.kubernetes.io/name': {}
'f:app.kubernetes.io/part-of': {}
'f:eventstreams.ibm.com/cluster': {}
'f:eventstreams.ibm.com/kind': {}
'f:eventstreams.ibm.com/name': {}
- manager: kube-controller-manager
operation: Update
apiVersion: apps/v1
time: '2023-12-20T13:35:42Z'
fieldsType: FieldsV1
fieldsV1:
'f:metadata':
'f:annotations':
.: {}
'f:deployment.kubernetes.io/revision': {}
'f:status':
'f:availableReplicas': {}
'f:conditions':
.: {}
'k:{"type":"Available"}':
.: {}
'f:lastTransitionTime': {}
'f:lastUpdateTime': {}
'f:message': {}
'f:reason': {}
'f:status': {}
'f:type': {}
'k:{"type":"Progressing"}':
.: {}
'f:lastTransitionTime': {}
'f:lastUpdateTime': {}
'f:message': {}
'f:reason': {}
'f:status': {}
'f:type': {}
'f:observedGeneration': {}
'f:readyReplicas': {}
'f:replicas': {}
'f:updatedReplicas': {}
subresource: status
namespace: es
ownerReferences:
- apiVersion: eventstreams.ibm.com/v1beta1
kind: KafkaConnect
name: name_of_kafka_connect
uid: 6483beae-ea38-4dd4-8cd3-121892c2d81d
controller: false
blockOwnerDeletion: false
spec:
replicas: 1
template:
metadata:
creationTimestamp: null
productMetric: VIRTUAL_PROCESSOR_CORE
spec:
restartPolicy: Always
imagePullSecrets:
- name: ibm-entitlement-key
schedulerName: default-scheduler
affinity: {}
terminationGracePeriodSeconds: 30
securityContext: {}
containers:
- resources:
limits:
cpu: '2'
memory: 2Gi
requests:
cpu: '1'
memory: 2Gi
readinessProbe:
httpGet:
path: /
port: rest-api
scheme: HTTP
initialDelaySeconds: 60
timeoutSeconds: 5
periodSeconds: 10
successThreshold: 1
failureThreshold: 3
terminationMessagePath: /dev/termination-log
name: name_deployment
command:
- /opt/kafka/kafka_connect_run.sh
livenessProbe:
httpGet:
path: /
port: rest-api
scheme: HTTP
initialDelaySeconds: 60
timeoutSeconds: 5
periodSeconds: 10
successThreshold: 1
failureThreshold: 3
env:
- name: KAFKA_CONNECT_CONFIGURATION
value: |
offset.storage.topic= name_topic
- name: KAFKA_CONNECT_METRICS_ENABLED
value: 'false'
- name: KAFKA_CONNECT_BOOTSTRAP_SERVERS
value: 'value'
- name: STRIMZI_KAFKA_GC_LOG_ENABLED
value: 'false'
- name: DYNAMIC_HEAP_FRACTION
value: '1.0'
- name: KAFKA_CONNECT_TLS
value: 'true'
- name: KAFKA_CONNECT_TRUSTED_CERTS
value: path/certficate.crt
- name: KAFKA_CONNECT_TLS_AUTH_KEY
value: neoleap-user/user.key
- name: KAFKA_CONNECT_TLS_AUTH_CERT
value:path/certficate.crt
ports:
- name: rest-api
containerPort: 8083
protocol: TCP
imagePullPolicy: IfNotPresent
volumeMounts:
- name: strimzi-tmp
mountPath: /tmp
- name: kafka-metrics-and-logging
mountPath: /opt/kafka/custom-config/
- name: es-cluster-ca-cert
mountPath: /opt/kafka/your_path
- name: your_name
mountPath: /opt/kafka/your_path
terminationMessagePolicy: File
image: 'image from registry'
volumes:
- name: strimzi-tmp
emptyDir:
medium: Memory
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 0
maxSurge: 1
revisionHistoryLimit: 10
progressDeadlineSeconds: 600
Explanation
Kafka connect is considered the mother of all the files, Kafka connector is considered a plugin that integrates with kafka connect.
Recommended by LinkedIn
kafka connector is where you write the configuration whether oracle database url, password , username.
Steps needed
1- you need to download JDBC-connector jar files suitable for Oracle databases as version of your oracle will make different connector
2- the line that will make the magic
class: io.aiven.connect.jdbc.JdbcSinkConnector
What is aiven ?
Aiven for Apache Kafka® Connect, Managed Aiven for Apache Kafka Connect allows you to separate your integrations’ workloads from the Apache Kafka cluster, resulting in more stable, scalable and performant data flows.
they provide a open source documentation and help online
you can find a lot of useful information on this link Aiven github
Aiven will do the magic beneath but first you need to have JDBC jar downloaded first ( we are assuming that the image that will be built will have the JDBC jar included in opt/kafka )
In the log you should find the subscription to the topic found
and you will find that a thread related to sql dialect
Conclusion:
This article should help you to begin your baby steps on implementing the kafka connect, I explained it on a environment ( which was open shift, podman and no strimzi )
please any further question don't hesitate to contact me directly and send me a message, I will be glad to help and learn together.