8 minute read

ํ•„์ˆ˜ ํ•ญ๋ชฉ๋“ค

  • apply() ํ•จ์ˆ˜๊ฐ€ ์žˆ์–ด์•ผ ํ•จ.
  • configure() ํ•จ์ˆ˜๊ฐ€ ์žˆ์–ด์•ผ ํ•จ.

์ž์„ธํ•œ ์ธํ„ฐํŽ˜์ด์Šค๋Š” javadoc ์ฐธ๊ณ !

โžก๏ธ javadoc / Transformation

์‚ฌ์ „ ์ง€์‹

fat-jar

์ผ๋ฐ˜ jar ํŒŒ์ผ์—๋Š” ๋‚ด๊ฐ€ ์ •์˜ํ•˜๊ณ  ๊ฐœ๋ฐœํ•œ ํด๋ž˜์Šค์— ๋Œ€ํ•œ ์ •๋ณด๋งŒ ๋“ค์–ด์žˆ์Šต๋‹ˆ๋‹ค. ๊ฐœ๋ฐœ์—์„œ ์‚ฌ์šฉํ•œ ์™ธ๋ถ€ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์˜ ์ฝ”๋“œ๋Š” ํฌํ•จ๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ๊ทธ๋ž˜์„œ ์‹คํ–‰ํ•  ๋•Œ ClassNotFoundException์ด ๋‚˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

fat-jar๋Š” ๋‚ด ์ฝ”๋“œ์™€ ์ฝ”๋“œ์—์„œ ์‚ฌ์šฉํ•˜๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์˜ .class๋“ค์„ ๋ชจ๋‘ ํ•˜๋‚˜์˜ jar์— ํ†ต์งธ๋กœ ๋ฌถ์€ jar๋ฅผ ๋งํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋ž˜์„œ ์ด ํŒŒ์ผ ํ•˜๋‚˜๋งŒ ์žˆ์–ด๋„ ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜์˜ ์‹คํ–‰๊ณผ ๋ฐฐํฌ๊ฐ€ ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค!

shadowJar

https://github.com/GradleUp/shadow

./gradlew clean jar
# build/libs/fatjar-hello-0.1.0.jar ํŒŒ์ผ์ด ์ƒ์„ฑ๋จ.

./gradlew clean shadowJar
# build/libs/fatjar-hello-0.1.0-all.jar ํŒŒ์ผ์ด ์ƒ์„ฑ๋จ.

fat-jar ์—ฌ๋ถ€ ํ™•์ธ ๋ฐฉ๋ฒ•

$ jar tf build/libs/your-app.jar
  • ์ผ๋ฐ˜ jar
    • com/example/... ๊ฐ™์€ ๋‚ด๊ฐ€ ์ •์˜ํ•œ ํŒจํ‚ค์ง€์˜ ํด๋ž˜์Šค๋งŒ ์žˆ์Œ.
  • fat-jar
    • com/google/gson/, org/apache/commons/ ๋“ฑ ์™ธ๋ถ€ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ํด๋ž˜์Šค๊ฐ€ ํ•จ๊ป˜ ์žˆ์Œ.
$ jar tf fatjar-hello-0.1.0.jar
META-INF/
META-INF/MANIFEST.MF
com/
com/example/
com/example/App.class
$ jar tf fatjar-hello-0.1.0-all.jar
META-INF/MANIFEST.MF
com/example/App.class
META-INF/maven/com.google.code.gson/gson/pom.properties
META-INF/maven/com.google.code.gson/gson/pom.xml
META-INF/proguard/gson.pro
com/google/gson/ExclusionStrategy.class
com/google/gson/FieldAttributes.class
com/google/gson/FieldNamingPolicy$1.class
...

Echo Transform

์ฝ”๋“œ ์ž‘์„ฑ

์ฐธ๊ณ ๋กœ ํ˜„์žฌ Strimzi Kafka Connect์˜ java ๋ฒ„์ „์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

openjdk 17.0.16 2025-07-15 LTS
OpenJDK Runtime Environment (Red_Hat-17.0.16.0.8-1) (build 17.0.16+8-LTS)

์ปค์Šคํ…€ transformer๋ฅผ ์ •์˜ํ•˜๊ธฐ ์œ„ํ•ด์„  Transformation์˜ ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๋ชจ๋‘ ๊ตฌํ˜„ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

public interface Transformation<R extends ConnectRecord<R>> {
    R apply(R record);
    ConfigDef config();
    void close();
    void configure(Map<String, ?> configs);
}

์ฝ”๋“œ๋ฅผ ์ž‘์„ฑ ํ•ด๋ด…์‹œ๋‹ค!

Echo<R extends ConnectRecord>์—์„œ R์€ ์ œ๋„ค๋ฆญ ํƒ€์ž…์„ ์‚ฌ์šฉํ•ด์„œ SourceRecord ๊ทธ๋ฆฌ๊ณ  SinkRecord๋ฅผ ๋ชจ๋‘ ๋‹ค๋ฃฐ ์ˆ˜ ์žˆ๋„๋ก ํ•ฉ๋‹ˆ๋‹ค.

package com.example.connect.smt;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class Echo<R extends ConnectRecord<R>> implements Transformation<R> {
    private static final Logger log = LoggerFactory.getLogger(Echo.class);

    @Override
    public void configure(Map<String, ?> configs) {
        log.info("Echo SMT configured");
    }

    @Override
    public R apply (R record) {
        log.info("Echo topic={} partition={} offset={} ts={} key={} value={} headers={}",
            record.topic(),
            record.kafkaPartition(),
            null,
            record.timestamp(),
            String.valueOf(record.key()),
            String.valueOf(record.value()),
            String.valueOf(record.headers())
        );
        return record;
    }

    @Override public ConfigDef config() { return new ConfigDef(); }
    @Override public void close() {}
}

๋ชจ๋‹ˆํ„ฐ๋ง ์šฉ ์ฝ”๋“œ

@Override
public R apply (R record) {
    if (record.value() == null) return record;

    Struct envelope = (Struct) record.value();
    Schema schema = envelope.schema();
    for (Field field: schema.fields()) {
        log.info("field={} type={}", field.name(), field.schema().type());
    }

    if (envelope.get("before") != null) {
        log.info("before={}", String.valueOf(envelope.get("before")));
    }
    if (envelope.get("after") != null) {
        log.info("after={}", String.valueOf(envelope.get("after")));
    }
    ...
}

Struct ํƒ€์ž…์„ Json String์œผ๋กœ ๋ณ€ํ™˜

@Override
    public R apply (R record) {
        if (record.value() == null) return record;

        Struct envelope = (Struct) record.value();
        Schema schema = envelope.schema();
        Map<String, Object> map = structToMap(envelope);
        String json = new Gson().toJson(map);

        return record.newRecord(
            record.topic(),
            record.kafkaPartition(),
            record.keySchema(),
            record.key(),
            record.valueSchema(),
            json,
            record.timestamp()
        );
    }

    private static Map<String, Object> structToMap(Struct struct) {
        Map<String, Object> map = new HashMap<>();

        if (struct == null || struct.schema() == null) return map;

        for (Field field: struct.schema().fields()) {
            Object fieldValue = struct.get(field.name());
            if (fieldValue instanceof Struct) {
                map.put(field.name(), structToMap((Struct) fieldValue));
            } else if (fieldValue instanceof List<?>) {
                map.put(field.name(), convertList((List<?>) fieldValue));
            } else if (fieldValue instanceof Map<?, ?>) {
                map.put(field.name(), convertMap((Map<?, ?>) fieldValue));
            } else {
                map.put(field.name(), fieldValue);
            }
        }

        return map;
    }

    private static List<Object> convertList(List<?> list) {
        List<Object> newList = new ArrayList<>();
        for (Object item: list) {
            if (item instanceof Struct) {
                newList.add(structToMap((Struct) item));
            } else {
                newList.add(item);
            }
        }
        return newList;
    }

    private static Map<String, Object> convertMap(Map<?, ?> map) {
        Map<String, Object> newMap = new LinkedHashMap<>();
        for (Map.Entry<?, ?> entry: map.entrySet()) {
            Object value = entry.getValue();
            if (value instanceof Struct) {
                newMap.put(entry.getKey().toString(), structToMap((Struct) value));
            } else {
                newMap.put(entry.getKey().toString(), value);
            }
        }
        return newMap;
    }

๋„์ปค ๋นŒ๋“œ

๋กœ์ปฌ์—์„œ jar ๋นŒ๋“œ ํ›„, ๋„์ปค ๋นŒ๋“œ ์‹œ์ ์— COPY๋กœ ์˜ฎ๊ฒจ์ค๋‹ˆ๋‹ค.

...
COPY ./build/libs/ /opt/kafka/plugins/transform
...

์žฌ๋ฐฐํฌ ํ›„ ์ž˜ ๋“ฑ๋ก ๋˜์—ˆ๋Š”์ง€ ํ™•์ธ

$ curl -s http://localhost:8083/connector-plugins?connectorsOnly=false
$ curl -s http://localhost:8083/connector-plugins?connectorsOnly=false | jq .

connectorsOnly=false๊นŒ์ง€ ์ถ”๊ฐ€ํ•ด์ค˜์•ผ source, sink, converter, transformation ๋ชจ๋‘ ๋‚˜์˜ต๋‹ˆ๋‹ค.

$ curl -X PUT localhost:8083/connector-plugins/io.debezium.connector.mysql.MySqlConnector/config/validate \
  -H "Content-Type: application/json" \
  -d '{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",

    "database.hostname": "mysql.strimzi.svc.cluster.local",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "hello_debezium!",

    "database.server.id": "20251005",
    "database.server.name": "my-mysql",
    "topic.prefix": "my-mysql",

    "database.include.list": "public",
    "table.include.list": "public.user",

    "schema.history.internal.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
    "schema.history.internal.kafka.topic": "__debezium_mysql_history",

    "include.schema.changes": "false",
    "snapshot.mode": "initial",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",


    "transforms": "echo",
    "transforms.echo.type": "com.example.connect.smt.Echo"
  }'
$ curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "source.debezium-mysql",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "tasks.max": "1",

      "database.hostname": "mysql.strimzi.svc.cluster.local",
      "database.port": "3306",
      "database.user": "root",
      "database.password": "hello_debezium!",

      "database.server.id": "20251005",
      "database.server.name": "my-mysql",
      "topic.prefix": "my-mysql",

      "database.include.list": "public",
      "table.include.list": "public.user",

      "schema.history.internal.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
      "schema.history.internal.kafka.topic": "__debezium_mysql_history",

      "include.schema.changes": "false",
      "snapshot.mode": "initial",

      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false",

      "transforms": "echo",
      "transforms.echo.type": "com.example.connect.smt.Echo"
    }
  }'

๊ทธ๋ฆฌ๊ณ  ๋””ํ”Œ๋กœ์ด ํ•œ kafka connect Pod์˜ ๋กœ๊ทธ๋ฅผ ํ™•์ธํ•ด๋ณด๋ฉด, logger๋กœ ๊ธฐ๋กํ•œ ๊ฒƒ์ด ํ‘œ์‹œ๋ฉ๋‹ˆ๋‹ค.

$ kubectl logs my-connect-cluster-connect-0
2025-10-06 09:47:21 INFO  [task-thread-source.debezium-mysql-0] Echo:21 - Echo topic=my-mysql.public.user partition=null offset=null ts=null key=Struct{id=36} value=Struct{before=Struct{id=36,name=TcIQHp,money=8978,created_ts=2025-10-06T09:42:53Z,updated_ts=2025-10-06T09:42:53Z},after=Struct{id=36,name=TcIQHp,money=6401,created_ts=2025-10-06T09:42:53Z,updated_ts=2025-10-06T09:43:33Z},source=Struct{version=3.1.3.Final,connector=mysql,name=my-mysql,ts_ms=1759743813000,db=public,ts_us=1759743813000000,ts_ns=1759743813000000000,table=user,server_id=1,file=binlog.000002,pos=101897,row=0,thread=56},op=u,ts_ms=1759744040681,ts_us=1759744040681781,ts_ns=1759744040681781775} headers=ConnectHeaders(headers=)
2025-10-06 09:47:21 INFO  [task-thread-source.debezium-mysql-0] Echo:21 - Echo topic=my-mysql.public.user partition=null offset=null ts=null key=Struct{id=46} value=Struct{after=Struct{id=46,name=wgThpR,money=2367,created_ts=2025-10-06T09:43:33Z,updated_ts=2025-10-06T09:43:33Z},source=Struct{version=3.1.3.Final,connector=mysql,name=my-mysql,ts_ms=1759743813000,db=public,ts_us=1759743813000000,ts_ns=1759743813000000000,table=user,server_id=1,file=binlog.000002,pos=102243,row=0,thread=56},op=c,ts_ms=1759744040681,ts_us=1759744040681933,ts_ns=1759744040681933275} headers=ConnectHeaders(headers=)

๋ ˆ์ฝ”๋“œ์˜ ์Šคํ‚ค๋งˆ ์ฒดํฌ

import org.apache.kafka.connect.data.Schema

Transform Config

Transform์—์„œ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์ „๋‹ฌ๋ฐ›์•„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ ๊ฐ€๋Šฅ ํ•ฉ๋‹ˆ๋‹ค.

SAMPLE_RATE_CONFIG๋ฅผ ์ •์˜ํ•˜๊ณ , ์ด๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ Echoing์„ ์–ผ๋งˆ๋‚˜ ํ• ์ง€ ์ œ์–ด ํ•ด๋ด…์‹œ๋‹ค.

    private static final String SAMPLE_RATE_CONFIG = "sample.rate";
    private static final double DEFAULT_SAMPLE_RATE = 0.01;

    private static final ConfigDef CONFIG_DEF = new ConfigDef()
        .define(
            SAMPLE_RATE_CONFIG,
            ConfigDef.Type.DOUBLE,
            DEFAULT_SAMPLE_RATE,
            ConfigDef.Range.between(0.0, 1.0),
            ConfigDef.Importance.LOW,
            "Sample rate for the echo transform"
        );

    private double sampleRate;
    private final Random random = new Random();


    @Override
    public void configure(Map<String, ?> configs) {
        AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs);
        this.sampleRate = config.getDouble(SAMPLE_RATE_CONFIG);

        log.info("Echo SMT configured with sample rate: {}", this.sampleRate);

    }

    @Override
    public R apply (R record) {
        if (record.value() == null) return record;

        Struct envelope = (Struct) record.value();
        Schema schema = envelope.schema();
        Map<String, Object> map = structToMap(envelope);
        String json = new Gson().toJson(map);

        if (this.random.nextDouble() < this.sampleRate) {
            log.info("Echoing record: {}", json);
        }
        ...
    }

With Schema Registry

TBD

References

Categories:

Updated: