Define Custome Kafka Connect Transform
ํ์ ํญ๋ชฉ๋ค
apply()
ํจ์๊ฐ ์์ด์ผ ํจ.configure()
ํจ์๊ฐ ์์ด์ผ ํจ.
์์ธํ ์ธํฐํ์ด์ค๋ javadoc ์ฐธ๊ณ !
โก๏ธ javadoc / Transformation
์ฌ์ ์ง์
fat-jar
์ผ๋ฐ jar ํ์ผ์๋ ๋ด๊ฐ ์ ์ํ๊ณ ๊ฐ๋ฐํ ํด๋์ค์ ๋ํ ์ ๋ณด๋ง ๋ค์ด์์ต๋๋ค. ๊ฐ๋ฐ์์ ์ฌ์ฉํ ์ธ๋ถ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ์ฝ๋๋ ํฌํจ๋์ง ์์ต๋๋ค. ๊ทธ๋์ ์คํํ ๋ ClassNotFoundException
์ด ๋๋ ๊ฒฝ์ฐ๊ฐ ์์ต๋๋ค.
fat-jar๋ ๋ด ์ฝ๋์ ์ฝ๋์์ ์ฌ์ฉํ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ .class
๋ค์ ๋ชจ๋ ํ๋์ jar์ ํต์งธ๋ก ๋ฌถ์ jar๋ฅผ ๋งํฉ๋๋ค. ๊ทธ๋์ ์ด ํ์ผ ํ๋๋ง ์์ด๋ ์ดํ๋ฆฌ์ผ์ด์
์ ์คํ๊ณผ ๋ฐฐํฌ๊ฐ ๊ฐ๋ฅํฉ๋๋ค!
shadowJar
https://github.com/GradleUp/shadow
./gradlew clean jar
# build/libs/fatjar-hello-0.1.0.jar ํ์ผ์ด ์์ฑ๋จ.
./gradlew clean shadowJar
# build/libs/fatjar-hello-0.1.0-all.jar ํ์ผ์ด ์์ฑ๋จ.
fat-jar ์ฌ๋ถ ํ์ธ ๋ฐฉ๋ฒ
$ jar tf build/libs/your-app.jar
- ์ผ๋ฐ jar
com/example/...
๊ฐ์ ๋ด๊ฐ ์ ์ํ ํจํค์ง์ ํด๋์ค๋ง ์์.
- fat-jar
com/google/gson/
,org/apache/commons/
๋ฑ ์ธ๋ถ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ํด๋์ค๊ฐ ํจ๊ป ์์.
$ jar tf fatjar-hello-0.1.0.jar
META-INF/
META-INF/MANIFEST.MF
com/
com/example/
com/example/App.class
$ jar tf fatjar-hello-0.1.0-all.jar
META-INF/MANIFEST.MF
com/example/App.class
META-INF/maven/com.google.code.gson/gson/pom.properties
META-INF/maven/com.google.code.gson/gson/pom.xml
META-INF/proguard/gson.pro
com/google/gson/ExclusionStrategy.class
com/google/gson/FieldAttributes.class
com/google/gson/FieldNamingPolicy$1.class
...
Echo Transform
์ฝ๋ ์์ฑ
์ฐธ๊ณ ๋ก ํ์ฌ Strimzi Kafka Connect์ java ๋ฒ์ ์ ์๋์ ๊ฐ๋ค.
openjdk 17.0.16 2025-07-15 LTS
OpenJDK Runtime Environment (Red_Hat-17.0.16.0.8-1) (build 17.0.16+8-LTS)
์ปค์คํ
transformer๋ฅผ ์ ์ํ๊ธฐ ์ํด์ Transformation
์ ์ธํฐํ์ด์ค๋ฅผ ๋ชจ๋ ๊ตฌํํด์ผ ํฉ๋๋ค.
public interface Transformation<R extends ConnectRecord<R>> {
R apply(R record);
ConfigDef config();
void close();
void configure(Map<String, ?> configs);
}
์ฝ๋๋ฅผ ์์ฑ ํด๋ด ์๋ค!
Echo<R extends ConnectRecord>
์์ R
์ ์ ๋ค๋ฆญ ํ์
์ ์ฌ์ฉํด์ SourceRecord
๊ทธ๋ฆฌ๊ณ SinkRecord
๋ฅผ ๋ชจ๋ ๋ค๋ฃฐ ์ ์๋๋ก ํฉ๋๋ค.
package com.example.connect.smt;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class Echo<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger log = LoggerFactory.getLogger(Echo.class);
@Override
public void configure(Map<String, ?> configs) {
log.info("Echo SMT configured");
}
@Override
public R apply (R record) {
log.info("Echo topic={} partition={} offset={} ts={} key={} value={} headers={}",
record.topic(),
record.kafkaPartition(),
null,
record.timestamp(),
String.valueOf(record.key()),
String.valueOf(record.value()),
String.valueOf(record.headers())
);
return record;
}
@Override public ConfigDef config() { return new ConfigDef(); }
@Override public void close() {}
}
๋ชจ๋ํฐ๋ง ์ฉ ์ฝ๋
@Override
public R apply (R record) {
if (record.value() == null) return record;
Struct envelope = (Struct) record.value();
Schema schema = envelope.schema();
for (Field field: schema.fields()) {
log.info("field={} type={}", field.name(), field.schema().type());
}
if (envelope.get("before") != null) {
log.info("before={}", String.valueOf(envelope.get("before")));
}
if (envelope.get("after") != null) {
log.info("after={}", String.valueOf(envelope.get("after")));
}
...
}
Struct ํ์ ์ Json String์ผ๋ก ๋ณํ
@Override
public R apply (R record) {
if (record.value() == null) return record;
Struct envelope = (Struct) record.value();
Schema schema = envelope.schema();
Map<String, Object> map = structToMap(envelope);
String json = new Gson().toJson(map);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
json,
record.timestamp()
);
}
private static Map<String, Object> structToMap(Struct struct) {
Map<String, Object> map = new HashMap<>();
if (struct == null || struct.schema() == null) return map;
for (Field field: struct.schema().fields()) {
Object fieldValue = struct.get(field.name());
if (fieldValue instanceof Struct) {
map.put(field.name(), structToMap((Struct) fieldValue));
} else if (fieldValue instanceof List<?>) {
map.put(field.name(), convertList((List<?>) fieldValue));
} else if (fieldValue instanceof Map<?, ?>) {
map.put(field.name(), convertMap((Map<?, ?>) fieldValue));
} else {
map.put(field.name(), fieldValue);
}
}
return map;
}
private static List<Object> convertList(List<?> list) {
List<Object> newList = new ArrayList<>();
for (Object item: list) {
if (item instanceof Struct) {
newList.add(structToMap((Struct) item));
} else {
newList.add(item);
}
}
return newList;
}
private static Map<String, Object> convertMap(Map<?, ?> map) {
Map<String, Object> newMap = new LinkedHashMap<>();
for (Map.Entry<?, ?> entry: map.entrySet()) {
Object value = entry.getValue();
if (value instanceof Struct) {
newMap.put(entry.getKey().toString(), structToMap((Struct) value));
} else {
newMap.put(entry.getKey().toString(), value);
}
}
return newMap;
}
๋์ปค ๋น๋
๋ก์ปฌ์์ jar ๋น๋ ํ, ๋์ปค ๋น๋ ์์ ์ COPY
๋ก ์ฎ๊ฒจ์ค๋๋ค.
...
COPY ./build/libs/ /opt/kafka/plugins/transform
...
์ฌ๋ฐฐํฌ ํ ์ ๋ฑ๋ก ๋์๋์ง ํ์ธ
$ curl -s http://localhost:8083/connector-plugins?connectorsOnly=false
$ curl -s http://localhost:8083/connector-plugins?connectorsOnly=false | jq .
connectorsOnly=false
๊น์ง ์ถ๊ฐํด์ค์ผ source, sink, converter, transformation ๋ชจ๋ ๋์ต๋๋ค.
$ curl -X PUT localhost:8083/connector-plugins/io.debezium.connector.mysql.MySqlConnector/config/validate \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql.strimzi.svc.cluster.local",
"database.port": "3306",
"database.user": "root",
"database.password": "hello_debezium!",
"database.server.id": "20251005",
"database.server.name": "my-mysql",
"topic.prefix": "my-mysql",
"database.include.list": "public",
"table.include.list": "public.user",
"schema.history.internal.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
"schema.history.internal.kafka.topic": "__debezium_mysql_history",
"include.schema.changes": "false",
"snapshot.mode": "initial",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "echo",
"transforms.echo.type": "com.example.connect.smt.Echo"
}'
$ curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "source.debezium-mysql",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql.strimzi.svc.cluster.local",
"database.port": "3306",
"database.user": "root",
"database.password": "hello_debezium!",
"database.server.id": "20251005",
"database.server.name": "my-mysql",
"topic.prefix": "my-mysql",
"database.include.list": "public",
"table.include.list": "public.user",
"schema.history.internal.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
"schema.history.internal.kafka.topic": "__debezium_mysql_history",
"include.schema.changes": "false",
"snapshot.mode": "initial",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "echo",
"transforms.echo.type": "com.example.connect.smt.Echo"
}
}'
๊ทธ๋ฆฌ๊ณ ๋ํ๋ก์ด ํ kafka connect Pod์ ๋ก๊ทธ๋ฅผ ํ์ธํด๋ณด๋ฉด, logger๋ก ๊ธฐ๋กํ ๊ฒ์ด ํ์๋ฉ๋๋ค.
$ kubectl logs my-connect-cluster-connect-0
2025-10-06 09:47:21 INFO [task-thread-source.debezium-mysql-0] Echo:21 - Echo topic=my-mysql.public.user partition=null offset=null ts=null key=Struct{id=36} value=Struct{before=Struct{id=36,name=TcIQHp,money=8978,created_ts=2025-10-06T09:42:53Z,updated_ts=2025-10-06T09:42:53Z},after=Struct{id=36,name=TcIQHp,money=6401,created_ts=2025-10-06T09:42:53Z,updated_ts=2025-10-06T09:43:33Z},source=Struct{version=3.1.3.Final,connector=mysql,name=my-mysql,ts_ms=1759743813000,db=public,ts_us=1759743813000000,ts_ns=1759743813000000000,table=user,server_id=1,file=binlog.000002,pos=101897,row=0,thread=56},op=u,ts_ms=1759744040681,ts_us=1759744040681781,ts_ns=1759744040681781775} headers=ConnectHeaders(headers=)
2025-10-06 09:47:21 INFO [task-thread-source.debezium-mysql-0] Echo:21 - Echo topic=my-mysql.public.user partition=null offset=null ts=null key=Struct{id=46} value=Struct{after=Struct{id=46,name=wgThpR,money=2367,created_ts=2025-10-06T09:43:33Z,updated_ts=2025-10-06T09:43:33Z},source=Struct{version=3.1.3.Final,connector=mysql,name=my-mysql,ts_ms=1759743813000,db=public,ts_us=1759743813000000,ts_ns=1759743813000000000,table=user,server_id=1,file=binlog.000002,pos=102243,row=0,thread=56},op=c,ts_ms=1759744040681,ts_us=1759744040681933,ts_ns=1759744040681933275} headers=ConnectHeaders(headers=)
๋ ์ฝ๋์ ์คํค๋ง ์ฒดํฌ
import org.apache.kafka.connect.data.Schema
Transform Config
Transform์์ ํ๋ผ๋ฏธํฐ๋ก ์ ๋ฌ๋ฐ์ ์ฌ์ฉํ๋ ๊ฒ ๊ฐ๋ฅ ํฉ๋๋ค.
SAMPLE_RATE_CONFIG
๋ฅผ ์ ์ํ๊ณ , ์ด๋ฅผ ๋ฐํ์ผ๋ก Echoing์ ์ผ๋ง๋ ํ ์ง ์ ์ด ํด๋ด
์๋ค.
private static final String SAMPLE_RATE_CONFIG = "sample.rate";
private static final double DEFAULT_SAMPLE_RATE = 0.01;
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(
SAMPLE_RATE_CONFIG,
ConfigDef.Type.DOUBLE,
DEFAULT_SAMPLE_RATE,
ConfigDef.Range.between(0.0, 1.0),
ConfigDef.Importance.LOW,
"Sample rate for the echo transform"
);
private double sampleRate;
private final Random random = new Random();
@Override
public void configure(Map<String, ?> configs) {
AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs);
this.sampleRate = config.getDouble(SAMPLE_RATE_CONFIG);
log.info("Echo SMT configured with sample rate: {}", this.sampleRate);
}
@Override
public R apply (R record) {
if (record.value() == null) return record;
Struct envelope = (Struct) record.value();
Schema schema = envelope.schema();
Map<String, Object> map = structToMap(envelope);
String json = new Gson().toJson(map);
if (this.random.nextDouble() < this.sampleRate) {
log.info("Echoing record: {}", json);
}
...
}
With Schema Registry
TBD