Deploy Debeizum Mysql Connector
Prerequisites
๋ชจ๋ ๊ณผ์ ์ ๋ก์ปฌ Kubernetes์ Ranchder Desktop ํ๊ฒฝ ์์์ ์งํ ํ์์ต๋๋ค.
Deploy Kafka
์ด์ ์ ์ ์๋ Deploy Kafka Cluster using Strimzi ํฌ์คํธ ์ฐธ๊ณ !
helm install strimzi-cluster-operator oci://quay.io/strimzi-helm/strimzi-kafka-operator -n strimzi
# @kafka.kraft.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: strimzi
annotations:
strimzi.io/kraft: enabled
strimzi.io/node-pools: enabled
spec:
kafka:
version: 4.0.0
listeners:
- name: plain
port: 9092
type: internal
tls: false
entityOperator:
topicOperator: {}
userOperator: {}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: controller-nodes
namespace: strimzi
labels:
strimzi.io/cluster: my-cluster # Kafka ํด๋ฌ์คํฐ ์ด๋ฆ๊ณผ ์ผ์นํด์ผ ํจ
spec:
replicas: 3 # ์ปจํธ๋กค๋ฌ ๊ฐ์
roles:
- controller
storage:
type: ephemeral
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: broker-nodes
namespace: strimzi
labels:
strimzi.io/cluster: my-cluster # Kafka ํด๋ฌ์คํฐ ์ด๋ฆ๊ณผ ์ผ์นํด์ผ ํจ
spec:
replicas: 3 # ๋ธ๋ก์ปค ๊ฐ์
roles:
- broker
storage:
type: ephemeral
Deploy Mysql
ใ ใ โฆ ์ด์ bitnami mysql helm chart๋ฅผ ์ฐ์ง ๋ชป ํด์ ์๋์ ๊ฐ์ด ์ง์ ์ ์ ํ์ต๋๋ค.
# @mysql.yaml
apiVersion: v1
kind: Pod
metadata:
name: mysql
labels:
app: mysql
spec:
containers:
- image: mysql:8
name: mysql
env:
- name: MYSQL_ROOT_PASSWORD
value: hello_debezium!
- name: MYSQL_USER
value: admin
- name: MYSQL_PASSWORD
value: hello_debezium!
ports:
- containerPort: 3306
name: mysql
---
apiVersion: v1
kind: Service
metadata:
labels:
app: mysql
name: mysql
spec:
ports:
- name: mysql
port: 3306
targetPort: 3306
selector:
app: mysql
type: ClusterIP
์๋์ ๊ฐ์ด ์ ์ ํฉ๋๋ค.
$ kubectl exec -it mysql-0 -- bash
# ์ ์ ํ
$ mysql -uroot -p"$MYSQL_ROOT_PASSWORD"
# mysql ์ ์ ํ
mysql> CREATE DATABASE public;
mysql> USE public;
mysql> CREATE TABLE user (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
money INT NOT NULL DEFAULT 0,
created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
Data Generator
1์ด๋ง๋ค ๋ฐ์ดํฐ๋ฅผ ์์ฑํด์ ๋ฃ๋ python ์คํฌ๋ฆฝํธ๋ฅผ ๊ตฌ์ฑ ํฉ๋๋ค.
$ python -m venv venv
$ source venv/bin/activate
$ pip install mysql-connector-python==9.4.0
# @mysql-cdc-generator.py
# GPT๋ก ์์ฑ :wink:
import random
import string
import time
import mysql.connector
# -------------------------------
# 1๏ธโฃ DB ์ฐ๊ฒฐ ์ค์
# -------------------------------
config = {
"host": "localhost",
"user": "root",
"password": "changeme",
"database": "testdb",
"autocommit": True,
}
conn = mysql.connector.connect(**config)
cursor = conn.cursor()
# -------------------------------
# 2๏ธโฃ ํฌํผ ํจ์๋ค
# -------------------------------
def random_name(length=6):
return ''.join(random.choices(string.ascii_letters, k=length))
def random_money():
return random.randint(0, 10000)
def user_exists(user_id):
cursor.execute("SELECT 1 FROM user WHERE id=%s", (user_id,))
return cursor.fetchone() is not None
def create_user(user_id):
name = random_name()
money = random_money()
try:
cursor.execute(
"INSERT INTO user (id, name, money) VALUES (%s, %s, %s)",
(user_id, name, money)
)
print(f"[CREATE] id={user_id}, name={name}, money={money}")
except mysql.connector.Error as err:
print(f"[CREATE ERROR] id={user_id}: {err.msg}")
def update_user(user_id):
# allow upsert!
# if not user_exists(user_id):
# print(f"[SKIP UPDATE] id={user_id} not found.")
# return
money = random_money()
cursor.execute(
"UPDATE user SET money=%s WHERE id=%s",
(money, user_id)
)
print(f"[UPDATE] id={user_id}, money={money}")
def delete_user(user_id):
if not user_exists(user_id):
print(f"[SKIP DELETE] id={user_id} not found.")
return
cursor.execute("DELETE FROM user WHERE id=%s", (user_id,))
print(f"[DELETE] id={user_id}")
# -------------------------------
# 3๏ธโฃ ๋ฉ์ธ ๋ฃจํ
# -------------------------------
def run_cdc_simulation(iterations=1000, delay=0.5):
for i in range(iterations):
user_id = random.randint(1, 100)
action = random.choice(["CREATE", "UPDATE", "DELETE"])
if action == "CREATE":
create_user(user_id)
elif action == "UPDATE":
update_user(user_id)
elif action == "DELETE":
delete_user(user_id)
time.sleep(delay)
# -------------------------------
# 4๏ธโฃ ์คํ
# -------------------------------
if __name__ == "__main__":
try:
run_cdc_simulation(iterations=50, delay=0.3)
finally:
cursor.close()
conn.close()
CDC ๋ฐ์ดํฐ ์์ฑ
$ kubectl port-forward mysql 3306
Deploy Kafka Connect
Strimzi์์ KafkaConnect
๋ฆฌ์์ค๋ฅผ ์์ฑํ๋ฉด Kafka Connect ํด๋ฌ์คํฐ๋ฅผ ๋ํ๋ก์ด ํ ์ ์์ต๋๋ค.
# @strimzi.kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
spec:
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
offset.flush.timeout.ms: 10000
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
resources:
requests:
cpu: "1"
memory: 1Gi
limits:
cpu: "1"
memory: 1Gi
๋ค๋ง, ์ด Kafka Connect์๋ Debeizum Connector์ ๋ํ jar ํ์ผ์ด ์๊ธฐ ๋๋ฌธ์ ์ง์ ์ด๊ฑธ ๋ฃ์ด์ค์ผ ํฉ๋๋ค!
์ฐธ๊ณ ๋ก ์ ๊ฐ ํ
์คํธ ํ๋ ๋ฒ์ ์์๋ Kafka Connect๊ฐ quay.io/strimzi/kafka:0.48.0-kafka-4.1.0
์ด๋ฏธ์ง๋ฅผ ์ฌ์ฉํ๊ณ ์์์ต๋๋ค. ์ด๊ฑธ ๋ฒ ์ด์ค ์ด๋ฏธ์ง๋ก ์ฌ์ฉํด ์๋์ ๊ฐ์ด debezium์ ํ๋ฌ๊ทธ์ธ์ ์ถ๊ฐ ํฉ์๋ค. maven
# dbz.kafka-connect.Dockerfile
FROM quay.io/strimzi/kafka:0.48.0-kafka-4.1.0
USER root
RUN curl -L -o /tmp/debezium-connector-mysql.zip \
https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/3.1.3.Final/debezium-connector-mysql-3.1.3.Final-plugin.zip
RUN unzip /tmp/debezium-connector-mysql -d /opt/kafka/plugins/debzium-mysql
USER 1001
nerdctl
๋ก ๋์ปค ๋น๋๋ฅผ ์ํํฉ๋๋ค. Rancher Desktop์์๋ ์ผ๋ฐ ๋์ปค ์์ง(docker desktop, colima)์ผ๋ก ๋ก์ปฌ์์ ๋น๋ํ ์ด๋ฏธ์ง๋ฅผ ์ฝ์ ์ ์์ต๋๋ค ใ
ใ
๊ทธ๋์ nerdctl
๋ก ๋น๋ ํด์ผ ํ๊ณ , --namespace k8s.io
๋ฅผ ๋ถ์ฌํ์ฌ ๋์ปค ๋น๋๋ฅผ ํด์ผ ํฉ๋๋ค.
(์ด๋ฒ์ ํ๋ฉด์ ๋๋์ด ๋ฐฉ๋ฒ์ ์ฐพ์๋๋ค์ ใ
ใ
)
$ nerdctl --namespace k8s.io build \
-t my-kafka-connect-debezium:local \
-f dbz.kafka-connect.Dockerfile .
Re-deploy Kafka Connect with Debezium
# @strimzi.kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
spec:
...
image: my-kafka-connect-debezium:local # ์ฌ๊ธฐ๋ฅผ ์ถ๊ฐ
...
$ kubectl apply -f strimzi.kafka-connect.yaml
์ ๋ฑ๋ก ๋์๋์ง ํ์ธํด๋ด ๋๋ค.
$ kubectl exec -it my-connect-cluster-connect-0 -- bash
# ์ ์ ํ
$ curl http://localhost:8083/connector-plugins
[{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"3.1.3.Final"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"4.1.0"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"4.1.0"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"4.1.0"}]
๋๋์ด! Debezium Connector๋ฅผ ๋์ฐ๊ธฐ ์ํ ์ฌ์ ์ค๋น๊ฐ ๋ชจ๋ ๋๋ฌ์ต๋๋คโฆ. ใ ใ ใ ใ ใ
(Optional) Deploy Kafbat UI
์ข๋ ํธํ๊ฒ ๋๋ฒ๊ทธ ํ๊ธฐ ์ํด์ ใ ใ Kafbat UI๋ ๋ํ๋ก์ด ํฉ๋๋ค.
# @values.kafbat-ui.yaml
yamlApplicationConfig:
kafka:
clusters:
- name: my-cluster
bootstrapServers: my-cluster-kafka-brokers:9092
auth:
type: disabled
management:
health:
ldap:
enabled: false
$ helm repo add kafbat-ui https://kafbat.github.io/helm-charts
$ helm install kafbat-ui kafbat-ui/kafka-ui -f values.kafbat-ui.yaml
$ kubectl --namespace strimzi port-forward deploy/kafbat-ui-kafka-ui 8080:8080
Deploy Debezium Connector
์๋์ ๊ฐ์ด Connector Config๋ฅผ ์ค๋นํ๋ค.
# @source.debezium-mysql.json
{
"name": "source.debezium-mysql",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql-0.mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "hello_debezium!",
"database.server.id": "20251005",
"database.server.name": "my-mysql",
"database.include.list": "public",
"table.include.list": "user",
"schema.history.internal.kafka.bootstrap.servers": "my-cluster-kafka-brokers:9092",
"schema.history.internal.kafka.topic": "__debezium_mysql_history",
"include.schema.changes": "false",
"snapshot.mode": "initial",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true"
}
}
์๋ ๋ช ๋ น์ด๋ก json ํ์ผ๋ก ์์ฑ
$ k exec -it my-connect-cluster-connect-0 -- bash
$ cat <<EOF > source.debezium-mysql.json
...
EOF
์๋์ config/validate
์๋ํฌ์ธํธ์์ ๊ฒ์ ๋จผ์ ์งํ
$ curl -X PUT localhost:8083/connector-plugins/io.debezium.connector.mysql.MySqlConnector/config/validate \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql.strimzi.svc.cluster.local",
"database.port": "3306",
"database.user": "root",
"database.password": "hello_debezium!",
"database.server.id": "20251005",
"database.server.name": "my-mysql",
"topic.prefix": "my-mysql",
"database.include.list": "public",
"table.include.list": "public.user",
"schema.history.internal.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
"schema.history.internal.kafka.topic": "__debezium_mysql_history",
"include.schema.changes": "false",
"snapshot.mode": "initial",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}'
$ curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "source.debezium-mysql",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql.strimzi.svc.cluster.local",
"database.port": "3306",
"database.user": "root",
"database.password": "hello_debezium!",
"database.server.id": "20251005",
"database.server.name": "my-mysql",
"topic.prefix": "my-mysql",
"database.include.list": "public",
"table.include.list": "public.user",
"schema.history.internal.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
"schema.history.internal.kafka.topic": "__debezium_mysql_history",
"include.schema.changes": "false",
"snapshot.mode": "initial",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}'
๋ช ๋ฒ์ ์๋ ๋์ ๊ฒจ์ฐ ์ฑ๊ณต ํ์ต๋๋คโฆ ใ ใ
table.include.list
๋{schema}.{table}
ํฌ๋งท์ผ๋ก ํ๋ค์์ ์ ์ด์ผ ํฉ๋๋ค.- Schema Registry๊ฐ ์์ด์ผ
JsonConverter
๋ฅผ ์ธ ์ ์์ต๋๋ค.
$ curl -X GET http://localhost:8083/connectors
$ curl -X GET http://localhost:8083/connectors/source.debezium-mysql/status
# ๋๋ฒ๊ทธ ์ฉ
$ curl -X POST http://localhost:8083/connectors/source.debezium-mysql/restart
$ curl -X DELETE http://localhost:8083/connectors/source.debezium-mysql
์ด์ kafbat-ui์ ์ ์ํด์ ํ์ธํด๋ณด๋ฉดโฆ!
CDC ๋ฐ์ดํฐ๊ฐ ์ ๋ค์ด์ต๋๋ค!