5 minute read

๋“ค์–ด๊ฐ€๋ฉฐ

์š”์ฆ˜์— Kafka์— ์ˆ˜์ง‘ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ์—ฌ๋Ÿฌ ํƒ€๊ฒŸ DB๋กœ ์ ์žฌํ•˜๋Š” ๊ฒƒ์„ ๋งŽ์ด ์ˆ˜ํ–‰ํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฐ๋ฐ ๋ช‡๋ช‡ ์˜คํ”ˆ์†Œ์Šค Connector์˜ ๋™์ž‘์ด ๋งˆ์Œ์— ์•ˆ ๋“ค์–ด์„œ ์ˆ˜์ • ํ•˜๊ณ  ์‹ถ์€ ์ ๋“ค์ด ์žˆ๋”๋ผ๊ตฌ์š”โ€ฆ;; ๊ทธ๋ž˜์„œ ์ด๋ฒˆ ๊ธฐํšŒ์— ์ปค์Šคํ…€ ์ปค๋„ฅํ„ฐ๋Š” ์–ด๋–ป๊ฒŒ ๋งŒ๋“œ๋Š”์ง€, Kafka์˜ Connector API๋ฅผ ์ข€ ์ตํ˜€๋ณด๋ ค๊ณ  ํ•ฉ๋‹ˆ๋‹ค.

HttpPollSourceConnector

์ฃผ๊ธฐ์ ์œผ๋กœ REST API๋ฅผ ํ˜ธ์ถœํ•ด ์‘๋‹ต๊ฐ’์„ ๋ฐ›์€ ๋‹ค์Œ ์ด๊ฒƒ์„ Kafka ํ† ํ”ฝ์— ์ ์žฌํ•˜๋Š” Source Connector๋ฅผ ๋งŒ๋“ค๊ณ ์ž ํ•ฉ๋‹ˆ๋‹ค.

์ž…๋ ฅ ๋ฐ›๋Š” ๊ฐ’์œผ๋กœ๋Š”

  • REST API์˜ ์—”๋“œํฌ์ธํŠธ
  • REST API ํ˜ธ์ถœ ์ฃผ๊ธฐ

Java์—์„œ REST API์— ๋Œ€ํ•œ HTTP ํ˜ธ์ถœ์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. Java์—์„œ๋Š” okhttp๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค๊ณ  ํ•ฉ๋‹ˆ๋‹ค.

๊ตฌ์„ฑ

  • SourceConnector์˜ ์„œ๋ธŒํด๋ž˜์Šค
    • ์ปค๋„ฅํ„ฐ ์ „์ฒด์˜ ์ˆ˜๋ช…์ฃผ๊ธฐ, Config ์ •์˜, ํƒœ์Šคํฌ ๋ถ„๋ฐฐ๋ฅผ ๋‹ด๋‹น
  • SourceTask์˜ ์„œ๋ธŒํด๋ž˜์Šค
    • ์‹ค์ œ ์†Œ์Šค ์‹œ์Šคํ…œ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์™€ SourceRecord๋ฅผ ๋งŒ๋“œ๋Š” ์‹คํ–‰ ๋‹จ์œ„

ํ•„์ˆ˜ ๊ตฌํ˜„ํ•ด์•ผ ํ•˜๋Š” ํ•จ์ˆ˜๋“ค

  • SourceConnector
    • String version()
    • void start(Map<String, String> props)
      • ์ปค๋„ฅํ„ฐ ์‹œ์ž‘ ์‹œ Config ๊ฒ€์ฆ ๋ฐ ์ดˆ๊ธฐํ™”
    • Class<? extends Task> taskClass()
      • ์ปค๋„ฅํ„ฐ ์‹คํ–‰์— ์‚ฌ์šฉํ•  ํƒœ์Šคํฌ ํด๋ž˜์Šค๋ฅผ ๋ฐ˜ํ™˜
      • ๋ณดํ†ต ๊ตฌํ˜„ํ•œ SourceTask ์„œ๋ธŒํด๋ž˜์Šค์— ๋Œ€ํ•ด {...SourceTask}.class๋กœ ์ฒ˜๋ฆฌ
    • List<Map<String, String>> taskConfigs(int maxTasks)
      • ํƒœ์Šคํฌ ๊ฐฏ์ˆ˜ ๋งŒํผ config ์Šฌ๋ผ์ด์Šค ์ƒ์„ฑ
    • ConfigDef config()
      • ์ปค๋„ฅํ„ฐ config์— ๋Œ€ํ•œ ์Šคํ‚ค๋งˆ (ํƒ€์ž…, ํ•„์ˆ˜ ์—ฌ๋ถ€, ๊ธฐ๋ณธ๊ฐ’, ์„ค๋ช…)
    • void stop()
      • ๋ฆฌ์†Œ์Šค ์ •๋ฆฌ(์“ฐ๋ ˆ๋“œ/ํด๋ผ์ด์–ธํŠธ ์ข…๋ฃŒ ๋“ฑ)
  • SourceTask
    • String version()
    • void start(Map<String, String> props)
      • ์†Œ์Šค ์‹œ์Šคํ…œ์— ์—ฐ๊ฒฐ, ์˜คํ”„์…‹ ๋ณต์›, ํด๋ผ์ด์–ธํŠธ ์ค€๋น„
    • List<SourceRecord> poll() throws InterruptedException
      • ์ฃผ๊ธฐ์ ์œผ๋กœ ์†Œ์Šค์—์„œ ๋ฐ์ดํ„ฐ ์ฝ์–ด์„œ ๋ ˆ์ฝ”๋ฅด๋ฅผ ๋ญ‰์น˜๋ฅผ ๋ฐ˜ํ™˜
      • ์˜ˆ์™ธ ์ฒ˜๋ฆฌ
        • ์žฌ์‹œ๋„ ๊ฐ€๋Šฅํ•œ ์ƒํ™ฉ์€ RetriableException์œผ๋กœ ์ฒ˜๋ฆฌ
    • void stop()
      • ํ’€๋ง ๋ฃจํ”„/IO ์ •๋ฆฌ
    • void commitRecord(SourceRecord record)
      • ํ•ด๋‹น ๋ ˆ์ฝ”๋“œ๊ฐ€ ์นดํ”„์นด์— ์•ˆ์ „ํžˆ ์ปค๋ฐ‹ ๋˜์—ˆ์„ ๋•Œ ์‹คํ–‰ํ•  ์ฝœ๋ฐฑ
      • ์™ธ๋ถ€์— ์ปค๋ฐ‹ ํ•ด์•ผ ํ•˜๊ฑฐ๋‚˜ ์ฒดํฌํฌ์ธํŠธ ์—ฐ๋™์ด ํ•„์š”ํ•  ๋•Œ ์‚ฌ์šฉํ•  ๊ฒƒ
    • void commit()
      • ๋ฐฐ์น˜ ๋‹จ์œ„ ์ปค๋ฐ‹ ํ›… (ํ”„๋ ˆ์ž„์›Œํฌ๊ฐ€ ํ˜ธ์ถœ)
public class MySourceConnector extends SourceConnector {
  @Override public String version() { return "0.1.0"; }
  @Override public void start(Map<String,String> props) { /* validate/init */ }
  @Override public Class<? extends Task> taskClass() { return MySourceTask.class; }
  @Override public List<Map<String,String>> taskConfigs(int maxTasks) { /* shard */ }
  @Override public void stop() { /* cleanup */ }
  @Override public ConfigDef config() { return new ConfigDef()/* .define(...) */; }
}

public class MySourceTask extends SourceTask {
  @Override public String version() { return "0.1.0"; }
  @Override public void start(Map<String,String> props) { /* client + restore offset */ }
  @Override public List<SourceRecord> poll() throws InterruptedException {
    // fetch -> build SourceRecord(partitionMap, offsetMap, topic, key, value)
  }
  @Override public void commitRecord(SourceRecord record) { /* optional */ }
  @Override public void stop() { /* cleanup */ }
}

์ปค๋„ฅํ„ฐ ๋“ฑ๋ก ์ฒดํฌ

$ curl -s http://localhost:8083/connector-plugins

$ curl -s http://localhost:8083/connector-plugins | jq .
[
  {
    "class": "com.example.connect.HttpPollSourceConnector",
    "type": "source",
    "version": "0.1.0"
  },
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "3.1.3.Final"
  },
  ...
]

์ปค๋„ฅํ„ฐ ๋””ํ”Œ๋กœ์ด

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "source.custom-http-poll",
    "config": {
      "connector.class": "com.example.connect.HttpPollSourceConnector",
      "tasks.max": "1",

      "http.url": "https://jsonplaceholder.typicode.com/todos",
      "topic": "todos",
      "poll.interval.ms": "500",

      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false"
    }
  }'

# ๋””ํ”Œ๋กœ์ด ํ›„
$ curl -s http://localhost:8083/connectors | jq .
$ curl -s http://localhost:8083/connectors/source.custom-http-poll | jq .
$ curl -s http://localhost:8083/connectors/source.custom-http-poll/status | jq .

taskConfigs() ํ•จ์ˆ˜

Connector๊ฐ€ ์—ฌ๋Ÿฌ Task๋กœ ๋‚˜๋‰˜์–ด ์‹คํ–‰๋  ๋•Œ, ๊ฐ Task์— ์ „๋‹ฌํ•  ์„ค์ •๊ฐ’์„ ๋งŒ๋“ค์–ด์ฃผ๋Š” ๋ฉ”์„œ๋“œ.

Kafka Connector๋ฅผ ๋“ฑ๋กํ•˜๋ฉด, ๋‚ด๋ถ€์ ์œผ๋กœ ์—ฌ๋Ÿฌ ๊ฐœ์˜ Task๋ฅผ ๋ณ‘๋ ฌ๋กœ ๋„์›Œ์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌ ํ•ฉ๋‹ˆ๋‹ค.

taskConfigs()๋Š” ์ƒ์„ฑ๋œ Task๋“ค์ด ๊ฐ๊ฐ ์–ด๋–ค ์„ค์ •์„ ๊ฐ€์ง€๊ณ  ์‹คํ–‰๋˜๋Š”์ง€๋ฅผ ์ •์˜ํ•˜๋Š” ํ•จ์ˆ˜ ์ž…๋‹ˆ๋‹ค.

๋งŒ์•ฝ tasks.max=3์ด๋ผ๋ฉด, Connect๋Š” taskConfigs(3)์„ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

Source Offset

Connect ์›Œ์ปค๋Š” ์†Œ์Šค ์‹œ์Šคํ…œ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์–ด๋””๊นŒ์ง€ ์ฝ์—ˆ๋Š”์ง€๋ฅผ ๊ธฐ๋ก ํ•ด๋‘ก๋‹ˆ๋‹ค. ์ด ๊ธฐ๋ก์€ SourceRecord()๋ฅผ ๋งŒ๋“ค ๋•Œ ๊ธฐ๋ก ๋˜๋ฉฐ, ๋‚ด๋ถ€ ํ† ํ”ฝ์ธ __connect_offsets์— ์ปค๋ฐ‹ ๋˜์–ด ์ €์žฅ ๋ฉ๋‹ˆ๋‹ค.

์„ฑ๊ณต!

Categories:

Updated: