Define Custome Kafka Source Connector
๋ค์ด๊ฐ๋ฉฐ
์์ฆ์ Kafka์ ์์งํ ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ํ๊ฒ DB๋ก ์ ์ฌํ๋ ๊ฒ์ ๋ง์ด ์ํํ๊ณ ์์ต๋๋ค. ๊ทธ๋ฐ๋ฐ ๋ช๋ช ์คํ์์ค Connector์ ๋์์ด ๋ง์์ ์ ๋ค์ด์ ์์ ํ๊ณ ์ถ์ ์ ๋ค์ด ์๋๋ผ๊ตฌ์โฆ;; ๊ทธ๋์ ์ด๋ฒ ๊ธฐํ์ ์ปค์คํ ์ปค๋ฅํฐ๋ ์ด๋ป๊ฒ ๋ง๋๋์ง, Kafka์ Connector API๋ฅผ ์ข ์ตํ๋ณด๋ ค๊ณ ํฉ๋๋ค.
HttpPollSourceConnector
์ฃผ๊ธฐ์ ์ผ๋ก REST API๋ฅผ ํธ์ถํด ์๋ต๊ฐ์ ๋ฐ์ ๋ค์ ์ด๊ฒ์ Kafka ํ ํฝ์ ์ ์ฌํ๋ Source Connector๋ฅผ ๋ง๋ค๊ณ ์ ํฉ๋๋ค.
์ ๋ ฅ ๋ฐ๋ ๊ฐ์ผ๋ก๋
- REST API์ ์๋ํฌ์ธํธ
- REST API ํธ์ถ ์ฃผ๊ธฐ
Java์์ REST API์ ๋ํ HTTP ํธ์ถ์ด ํ์ํฉ๋๋ค. Java์์๋ okhttp
๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค๊ณ ํฉ๋๋ค.
๊ตฌ์ฑ
SourceConnector
์ ์๋ธํด๋์ค- ์ปค๋ฅํฐ ์ ์ฒด์ ์๋ช ์ฃผ๊ธฐ, Config ์ ์, ํ์คํฌ ๋ถ๋ฐฐ๋ฅผ ๋ด๋น
SourceTask
์ ์๋ธํด๋์ค- ์ค์ ์์ค ์์คํ
์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์
SourceRecord
๋ฅผ ๋ง๋๋ ์คํ ๋จ์
- ์ค์ ์์ค ์์คํ
์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์
ํ์ ๊ตฌํํด์ผ ํ๋ ํจ์๋ค
SourceConnector
String version()
void start(Map<String, String> props)
- ์ปค๋ฅํฐ ์์ ์ Config ๊ฒ์ฆ ๋ฐ ์ด๊ธฐํ
Class<? extends Task> taskClass()
- ์ปค๋ฅํฐ ์คํ์ ์ฌ์ฉํ ํ์คํฌ ํด๋์ค๋ฅผ ๋ฐํ
- ๋ณดํต ๊ตฌํํ
SourceTask
์๋ธํด๋์ค์ ๋ํด{...SourceTask}.class
๋ก ์ฒ๋ฆฌ
List<Map<String, String>> taskConfigs(int maxTasks)
- ํ์คํฌ ๊ฐฏ์ ๋งํผ config ์ฌ๋ผ์ด์ค ์์ฑ
ConfigDef config()
- ์ปค๋ฅํฐ config์ ๋ํ ์คํค๋ง (ํ์ , ํ์ ์ฌ๋ถ, ๊ธฐ๋ณธ๊ฐ, ์ค๋ช )
void stop()
- ๋ฆฌ์์ค ์ ๋ฆฌ(์ฐ๋ ๋/ํด๋ผ์ด์ธํธ ์ข ๋ฃ ๋ฑ)
SourceTask
String version()
void start(Map<String, String> props)
- ์์ค ์์คํ ์ ์ฐ๊ฒฐ, ์คํ์ ๋ณต์, ํด๋ผ์ด์ธํธ ์ค๋น
List<SourceRecord> poll() throws InterruptedException
- ์ฃผ๊ธฐ์ ์ผ๋ก ์์ค์์ ๋ฐ์ดํฐ ์ฝ์ด์ ๋ ์ฝ๋ฅด๋ฅผ ๋ญ์น๋ฅผ ๋ฐํ
- ์์ธ ์ฒ๋ฆฌ
- ์ฌ์๋ ๊ฐ๋ฅํ ์ํฉ์
RetriableException
์ผ๋ก ์ฒ๋ฆฌ
- ์ฌ์๋ ๊ฐ๋ฅํ ์ํฉ์
void stop()
- ํ๋ง ๋ฃจํ/IO ์ ๋ฆฌ
void commitRecord(SourceRecord record)
- ํด๋น ๋ ์ฝ๋๊ฐ ์นดํ์นด์ ์์ ํ ์ปค๋ฐ ๋์์ ๋ ์คํํ ์ฝ๋ฐฑ
- ์ธ๋ถ์ ์ปค๋ฐ ํด์ผ ํ๊ฑฐ๋ ์ฒดํฌํฌ์ธํธ ์ฐ๋์ด ํ์ํ ๋ ์ฌ์ฉํ ๊ฒ
void commit()
- ๋ฐฐ์น ๋จ์ ์ปค๋ฐ ํ (ํ๋ ์์ํฌ๊ฐ ํธ์ถ)
public class MySourceConnector extends SourceConnector {
@Override public String version() { return "0.1.0"; }
@Override public void start(Map<String,String> props) { /* validate/init */ }
@Override public Class<? extends Task> taskClass() { return MySourceTask.class; }
@Override public List<Map<String,String>> taskConfigs(int maxTasks) { /* shard */ }
@Override public void stop() { /* cleanup */ }
@Override public ConfigDef config() { return new ConfigDef()/* .define(...) */; }
}
public class MySourceTask extends SourceTask {
@Override public String version() { return "0.1.0"; }
@Override public void start(Map<String,String> props) { /* client + restore offset */ }
@Override public List<SourceRecord> poll() throws InterruptedException {
// fetch -> build SourceRecord(partitionMap, offsetMap, topic, key, value)
}
@Override public void commitRecord(SourceRecord record) { /* optional */ }
@Override public void stop() { /* cleanup */ }
}
์ปค๋ฅํฐ ๋ฑ๋ก ์ฒดํฌ
$ curl -s http://localhost:8083/connector-plugins
$ curl -s http://localhost:8083/connector-plugins | jq .
[
{
"class": "com.example.connect.HttpPollSourceConnector",
"type": "source",
"version": "0.1.0"
},
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "3.1.3.Final"
},
...
]
์ปค๋ฅํฐ ๋ํ๋ก์ด
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "source.custom-http-poll",
"config": {
"connector.class": "com.example.connect.HttpPollSourceConnector",
"tasks.max": "1",
"http.url": "https://jsonplaceholder.typicode.com/todos",
"topic": "todos",
"poll.interval.ms": "500",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}'
# ๋ํ๋ก์ด ํ
$ curl -s http://localhost:8083/connectors | jq .
$ curl -s http://localhost:8083/connectors/source.custom-http-poll | jq .
$ curl -s http://localhost:8083/connectors/source.custom-http-poll/status | jq .
taskConfigs()
ํจ์
Connector๊ฐ ์ฌ๋ฌ Task๋ก ๋๋์ด ์คํ๋ ๋, ๊ฐ Task์ ์ ๋ฌํ ์ค์ ๊ฐ์ ๋ง๋ค์ด์ฃผ๋ ๋ฉ์๋.
Kafka Connector๋ฅผ ๋ฑ๋กํ๋ฉด, ๋ด๋ถ์ ์ผ๋ก ์ฌ๋ฌ ๊ฐ์ Task๋ฅผ ๋ณ๋ ฌ๋ก ๋์์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌ ํฉ๋๋ค.
taskConfigs()
๋ ์์ฑ๋ Task๋ค์ด ๊ฐ๊ฐ ์ด๋ค ์ค์ ์ ๊ฐ์ง๊ณ ์คํ๋๋์ง๋ฅผ ์ ์ํ๋ ํจ์ ์
๋๋ค.
๋ง์ฝ tasks.max=3
์ด๋ผ๋ฉด, Connect๋ taskConfigs(3)
์ ํธ์ถํฉ๋๋ค.
Source Offset
Connect ์์ปค๋ ์์ค ์์คํ
์์ ๋ฐ์ดํฐ๋ฅผ ์ด๋๊น์ง ์ฝ์๋์ง๋ฅผ ๊ธฐ๋ก ํด๋ก๋๋ค. ์ด ๊ธฐ๋ก์ SourceRecord()
๋ฅผ ๋ง๋ค ๋ ๊ธฐ๋ก ๋๋ฉฐ, ๋ด๋ถ ํ ํฝ์ธ __connect_offsets
์ ์ปค๋ฐ ๋์ด ์ ์ฅ ๋ฉ๋๋ค.