10 minute read

Prerequisites

๋ชจ๋“  ๊ณผ์ •์€ ๋กœ์ปฌ Kubernetes์˜ Ranchder Desktop ํ™˜๊ฒฝ ์œ„์—์„œ ์ง„ํ–‰ ํ•˜์˜€์Šต๋‹ˆ๋‹ค.

Deploy Kafka

์ด์ „์— ์ ์—ˆ๋˜ Deploy Kafka Cluster using Strimzi ํฌ์ŠคํŠธ ์ฐธ๊ณ !

helm install strimzi-cluster-operator oci://quay.io/strimzi-helm/strimzi-kafka-operator -n strimzi
# @kafka.kraft.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: strimzi
  annotations:
    strimzi.io/kraft: enabled
    strimzi.io/node-pools: enabled
spec:
  kafka:
    version: 4.0.0
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
  entityOperator:
    topicOperator: {}
    userOperator: {}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: controller-nodes
  namespace: strimzi
  labels:
    strimzi.io/cluster: my-cluster  # Kafka ํด๋Ÿฌ์Šคํ„ฐ ์ด๋ฆ„๊ณผ ์ผ์น˜ํ•ด์•ผ ํ•จ
spec:
  replicas: 3  # ์ปจํŠธ๋กค๋Ÿฌ ๊ฐœ์ˆ˜
  roles:
    - controller
  storage:
    type: ephemeral
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: broker-nodes
  namespace: strimzi
  labels:
    strimzi.io/cluster: my-cluster  # Kafka ํด๋Ÿฌ์Šคํ„ฐ ์ด๋ฆ„๊ณผ ์ผ์น˜ํ•ด์•ผ ํ•จ
spec:
  replicas: 3  # ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜
  roles:
    - broker
  storage:
    type: ephemeral

Deploy Mysql

ใ… ใ… โ€ฆ ์ด์   bitnami mysql helm chart๋ฅผ ์“ฐ์ง€ ๋ชป ํ•ด์„œ ์•„๋ž˜์™€ ๊ฐ™์ด ์ง์ ‘ ์ •์˜ ํ–ˆ์Šต๋‹ˆ๋‹ค.

# @mysql.yaml
apiVersion: v1
kind: Pod
metadata:
  name: mysql
  labels:
    app: mysql
spec:
  containers:
  - image: mysql:8
    name: mysql
    env:
    - name: MYSQL_ROOT_PASSWORD
      value: hello_debezium!
    - name: MYSQL_USER
      value: admin
    - name: MYSQL_PASSWORD
      value: hello_debezium!
    ports:
    - containerPort: 3306
      name: mysql
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: mysql
  name: mysql
spec:
  ports:
  - name: mysql
    port: 3306
    targetPort: 3306
  selector:
    app: mysql
  type: ClusterIP

์•„๋ž˜์™€ ๊ฐ™์ด ์ ‘์† ํ•ฉ๋‹ˆ๋‹ค.

$ kubectl exec -it mysql-0 -- bash

# ์ ‘์† ํ›„
$ mysql -uroot -p"$MYSQL_ROOT_PASSWORD"

# mysql ์ ‘์† ํ›„
mysql> CREATE DATABASE public;
mysql> USE public;
mysql> CREATE TABLE user (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    money INT NOT NULL DEFAULT 0,
    created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

Data Generator

1์ดˆ๋งˆ๋‹ค ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•ด์„œ ๋„ฃ๋Š” python ์Šคํฌ๋ฆฝํŠธ๋ฅผ ๊ตฌ์„ฑ ํ•ฉ๋‹ˆ๋‹ค.

$ python -m venv venv
$ source venv/bin/activate

$ pip install mysql-connector-python==9.4.0
# @mysql-cdc-generator.py
# GPT๋กœ ์ƒ์„ฑ :wink:
import random
import string
import time
import mysql.connector

# -------------------------------
# 1๏ธโƒฃ DB ์—ฐ๊ฒฐ ์„ค์ •
# -------------------------------
config = {
    "host": "localhost",
    "user": "root",
    "password": "changeme",
    "database": "testdb",
    "autocommit": True,
}

conn = mysql.connector.connect(**config)
cursor = conn.cursor()

# -------------------------------
# 2๏ธโƒฃ ํ—ฌํผ ํ•จ์ˆ˜๋“ค
# -------------------------------

def random_name(length=6):
    return ''.join(random.choices(string.ascii_letters, k=length))

def random_money():
    return random.randint(0, 10000)

def user_exists(user_id):
    cursor.execute("SELECT 1 FROM user WHERE id=%s", (user_id,))
    return cursor.fetchone() is not None

def create_user(user_id):
    name = random_name()
    money = random_money()
    try:
        cursor.execute(
            "INSERT INTO user (id, name, money) VALUES (%s, %s, %s)",
            (user_id, name, money)
        )
        print(f"[CREATE] id={user_id}, name={name}, money={money}")
    except mysql.connector.Error as err:
        print(f"[CREATE ERROR] id={user_id}: {err.msg}")

def update_user(user_id):
    # allow upsert!
    # if not user_exists(user_id):
    #     print(f"[SKIP UPDATE] id={user_id} not found.")
    #     return
    money = random_money()
    cursor.execute(
        "UPDATE user SET money=%s WHERE id=%s",
        (money, user_id)
    )
    print(f"[UPDATE] id={user_id}, money={money}")

def delete_user(user_id):
    if not user_exists(user_id):
        print(f"[SKIP DELETE] id={user_id} not found.")
        return
    cursor.execute("DELETE FROM user WHERE id=%s", (user_id,))
    print(f"[DELETE] id={user_id}")

# -------------------------------
# 3๏ธโƒฃ ๋ฉ”์ธ ๋ฃจํ”„
# -------------------------------

def run_cdc_simulation(iterations=1000, delay=0.5):
    for i in range(iterations):
        user_id = random.randint(1, 100)
        action = random.choice(["CREATE", "UPDATE", "DELETE"])

        if action == "CREATE":
            create_user(user_id)
        elif action == "UPDATE":
            update_user(user_id)
        elif action == "DELETE":
            delete_user(user_id)

        time.sleep(delay)

# -------------------------------
# 4๏ธโƒฃ ์‹คํ–‰
# -------------------------------
if __name__ == "__main__":
    try:
        run_cdc_simulation(iterations=50, delay=0.3)
    finally:
        cursor.close()
        conn.close()

CDC ๋ฐ์ดํ„ฐ ์ƒ์„ฑ

$ kubectl port-forward mysql 3306

Deploy Kafka Connect

Strimzi์—์„  KafkaConnect ๋ฆฌ์†Œ์Šค๋ฅผ ์ƒ์„ฑํ•˜๋ฉด Kafka Connect ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ๋””ํ”Œ๋กœ์ด ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

# @strimzi.kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec:
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  config:
    offset.flush.timeout.ms: 10000
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
  resources:
    requests:
      cpu: "1"
      memory: 1Gi
    limits:
      cpu: "1"
      memory: 1Gi

๋‹ค๋งŒ, ์ด Kafka Connect์—๋Š” Debeizum Connector์— ๋Œ€ํ•œ jar ํŒŒ์ผ์ด ์—†๊ธฐ ๋•Œ๋ฌธ์— ์ง์ ‘ ์ด๊ฑธ ๋„ฃ์–ด์ค˜์•ผ ํ•ฉ๋‹ˆ๋‹ค!

์ฐธ๊ณ ๋กœ ์ œ๊ฐ€ ํ…Œ์ŠคํŠธ ํ•˜๋˜ ๋ฒ„์ „์—์„œ๋Š” Kafka Connect๊ฐ€ quay.io/strimzi/kafka:0.48.0-kafka-4.1.0 ์ด๋ฏธ์ง€๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ์—ˆ์Šต๋‹ˆ๋‹ค. ์ด๊ฑธ ๋ฒ ์ด์Šค ์ด๋ฏธ์ง€๋กœ ์‚ฌ์šฉํ•ด ์•„๋ž˜์™€ ๊ฐ™์ด debezium์˜ ํ”Œ๋Ÿฌ๊ทธ์ธ์„ ์ถ”๊ฐ€ ํ•ฉ์‹œ๋‹ค. maven

# dbz.kafka-connect.Dockerfile
FROM quay.io/strimzi/kafka:0.48.0-kafka-4.1.0

USER root

RUN curl -L -o /tmp/debezium-connector-mysql.zip \
  https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/3.1.3.Final/debezium-connector-mysql-3.1.3.Final-plugin.zip
RUN unzip /tmp/debezium-connector-mysql -d /opt/kafka/plugins/debzium-mysql

USER 1001

nerdctl๋กœ ๋„์ปค ๋นŒ๋“œ๋ฅผ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค. Rancher Desktop์—์„œ๋Š” ์ผ๋ฐ˜ ๋„์ปค ์—”์ง„(docker desktop, colima)์œผ๋กœ ๋กœ์ปฌ์—์„œ ๋นŒ๋“œํ•œ ์ด๋ฏธ์ง€๋ฅผ ์ฝ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค ใ… ใ… 

๊ทธ๋ž˜์„œ nerdctl๋กœ ๋นŒ๋“œ ํ•ด์•ผ ํ•˜๊ณ , --namespace k8s.io๋ฅผ ๋ถ€์—ฌํ•˜์—ฌ ๋„์ปค ๋นŒ๋“œ๋ฅผ ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. (์ด๋ฒˆ์— ํ•˜๋ฉด์„œ ๋“œ๋””์–ด ๋ฐฉ๋ฒ•์„ ์ฐพ์•„๋ƒˆ๋„ค์š” ใ…Žใ…Ž)

$ nerdctl --namespace k8s.io build \
  -t my-kafka-connect-debezium:local \
  -f dbz.kafka-connect.Dockerfile .

Re-deploy Kafka Connect with Debezium

# @strimzi.kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec:
  ...
  image: my-kafka-connect-debezium:local  # ์—ฌ๊ธฐ๋ฅผ ์ถ”๊ฐ€
  ...
$ kubectl apply -f strimzi.kafka-connect.yaml

์ž˜ ๋“ฑ๋ก ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•ด๋ด…๋‹ˆ๋‹ค.

$ kubectl exec -it my-connect-cluster-connect-0 -- bash

# ์ ‘์† ํ›„
$ curl http://localhost:8083/connector-plugins
[{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"3.1.3.Final"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"4.1.0"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"4.1.0"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"4.1.0"}]

๋“œ๋””์–ด! Debezium Connector๋ฅผ ๋„์šฐ๊ธฐ ์œ„ํ•œ ์‚ฌ์ „ ์ค€๋น„๊ฐ€ ๋ชจ๋‘ ๋๋‚ฌ์Šต๋‹ˆ๋‹คโ€ฆ. ใ…‹ใ…‹ใ…‹ใ…‹ใ…‹

(Optional) Deploy Kafbat UI

์ข€๋” ํŽธํ•˜๊ฒŒ ๋””๋ฒ„๊ทธ ํ•˜๊ธฐ ์œ„ํ•ด์„œ ใ…Žใ…Ž Kafbat UI๋„ ๋””ํ”Œ๋กœ์ด ํ•ฉ๋‹ˆ๋‹ค.

# @values.kafbat-ui.yaml
yamlApplicationConfig:
  kafka:
    clusters:
      - name: my-cluster
        bootstrapServers: my-cluster-kafka-brokers:9092
  auth:
    type: disabled
  management:
    health:
      ldap:
        enabled: false
$ helm repo add kafbat-ui https://kafbat.github.io/helm-charts
$ helm install kafbat-ui kafbat-ui/kafka-ui -f values.kafbat-ui.yaml
$ kubectl --namespace strimzi port-forward deploy/kafbat-ui-kafka-ui 8080:8080

Deploy Debezium Connector

์•„๋ž˜์™€ ๊ฐ™์ด Connector Config๋ฅผ ์ค€๋น„ํ•œ๋‹ค.

# @source.debezium-mysql.json
{
  "name": "source.debezium-mysql",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",

    "database.hostname": "mysql-0.mysql",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "hello_debezium!",

    "database.server.id": "20251005",
    "database.server.name": "my-mysql",

    "database.include.list": "public",
    "table.include.list": "user",

    "schema.history.internal.kafka.bootstrap.servers": "my-cluster-kafka-brokers:9092",
    "schema.history.internal.kafka.topic": "__debezium_mysql_history",

    "include.schema.changes": "false",
    "snapshot.mode": "initial",

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true"
  }
}

์•„๋ž˜ ๋ช…๋ น์–ด๋กœ json ํŒŒ์ผ๋กœ ์ƒ์„ฑ

$ k exec -it my-connect-cluster-connect-0 -- bash

$ cat <<EOF > source.debezium-mysql.json
...
EOF

์•„๋ž˜์˜ config/validate ์—”๋“œํฌ์ธํŠธ์—์„œ ๊ฒ€์ˆ˜ ๋จผ์ € ์ง„ํ–‰

$ curl -X PUT localhost:8083/connector-plugins/io.debezium.connector.mysql.MySqlConnector/config/validate \
  -H "Content-Type: application/json" \
  -d '{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",

    "database.hostname": "mysql.strimzi.svc.cluster.local",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "hello_debezium!",

    "database.server.id": "20251005",
    "database.server.name": "my-mysql",
    "topic.prefix": "my-mysql",

    "database.include.list": "public",
    "table.include.list": "public.user",

    "schema.history.internal.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
    "schema.history.internal.kafka.topic": "__debezium_mysql_history",

    "include.schema.changes": "false",
    "snapshot.mode": "initial",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false"
  }'
$ curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "source.debezium-mysql",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "tasks.max": "1",

      "database.hostname": "mysql.strimzi.svc.cluster.local",
      "database.port": "3306",
      "database.user": "root",
      "database.password": "hello_debezium!",

      "database.server.id": "20251005",
      "database.server.name": "my-mysql",
      "topic.prefix": "my-mysql",

      "database.include.list": "public",
      "table.include.list": "public.user",

      "schema.history.internal.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
      "schema.history.internal.kafka.topic": "__debezium_mysql_history",

      "include.schema.changes": "false",
      "snapshot.mode": "initial",

      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false"
    }
  }'

๋ช‡ ๋ฒˆ์˜ ์‹œ๋„ ๋์— ๊ฒจ์šฐ ์„ฑ๊ณต ํ–ˆ์Šต๋‹ˆ๋‹คโ€ฆ ใ…‹ใ…‹

  • table.include.list๋Š” {schema}.{table} ํฌ๋งท์œผ๋กœ ํ’€๋„ค์ž„์„ ์ ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
  • Schema Registry๊ฐ€ ์žˆ์–ด์•ผ JsonConverter๋ฅผ ์“ธ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
$ curl -X GET http://localhost:8083/connectors
$ curl -X GET http://localhost:8083/connectors/source.debezium-mysql/status

# ๋””๋ฒ„๊ทธ ์šฉ
$ curl -X POST http://localhost:8083/connectors/source.debezium-mysql/restart
$ curl -X DELETE http://localhost:8083/connectors/source.debezium-mysql

์ด์ œ kafbat-ui์— ์ ‘์†ํ•ด์„œ ํ™•์ธํ•ด๋ณด๋ฉดโ€ฆ!

CDC ๋ฐ์ดํ„ฐ๊ฐ€ ์ž˜ ๋“ค์–ด์˜ต๋‹ˆ๋‹ค!

Categories:

Updated: