๊ณ ๊ฐ€์šฉ์„ฑ์„ ๊ฐ–์ถ˜ Kafka Connect ๋””ํ”Œ๋กœ์ด ํ•˜๊ธฐ ๐Ÿ‘ REST API๋กœ ์ž‘์—… ๋“ฑ๋กํ•˜๊ธฐ!

6 minute read

์ด๋ฒˆ ํฌ์ŠคํŠธ๋Š” Kafka Connector on k8s - Standalone Mode์—์„œ ๋‚ด์šฉ์ด ์ด์–ด์ง‘๋‹ˆ๋‹ค. ๐Ÿ™

์™œ Distributed Mode์— ๊ด€์‹ฌ์„ ๊ฐ–๊ฒŒ ๋˜์—ˆ๋‚˜์š”?

ํšŒ์‚ฌ์—์„œ Confluent๋ฅผ ํ†ตํ•ด Kafka ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ๋Œ€๋ถ€๋ถ„์˜ ๊ฒฝ์šฐ Confluent-managed Connector๋ฅผ ์‚ฌ์šฉ ํ–ˆ์—ˆ์œผ๋‚˜, ์ตœ๊ทผ Confluent์—์„œ ์ง€์›ํ•˜์ง€ ์•Š๋Š” ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋กœ Topic์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ ์žฌํ•ด์•ผ ํ•  ์ผ์ด ์ƒ๊ฒผ๊ณ , ์ด๋ฅผ K8s์— Standalone mode๋กœ ๋„์› ์Šต๋‹ˆ๋‹ค.

๊ทธ๋ ‡๊ฒŒ ํ•œ ๋‹ฌ ์ •๋„ ์ž˜ ์šด์˜ํ•˜๊ณ  ์žˆ๋‹ค๊ฐ€ Confluent ์ •๊ธฐ ๋ฏธํŒ…์—์„œ Connector๋ฅผ Standalone ๋ชจ๋“œ๋กœ ์“ฐ๊ณ  ์žˆ๋‹ค๊ณ  ๋ง์”€ ๋“œ๋ฆฌ๋‹ˆ, Standalone ๋ชจ๋“œ๋Š” ๋ฌธ์„œ ์–ด๋””๋ฅผ ๋ด๋„ โ€œ๊ฐœ๋ฐœ, ํ…Œ์ŠคํŠธ ๋ชฉ์ ์œผ๋กœ ์“ฐ์‹œ์˜คโ€œ๋ผ๊ณ  ๋‚˜์™€์žˆ์ง€ Prod ํ™˜๊ฒฝ์—์„œ๋Š” โ€œDistributedโ€ ๋ชจ๋“œ๋กœ ๋Œ๋ฆฌ๋Š” ๊ฒƒ์ด ๊ถŒ์žฅ ์‚ฌํ•ญ์ด๋ผ๋Š” ํ”ผ๋“œ๋ฐฑ์„ ๋“ค์—ˆ์Šต๋‹ˆ๋‹ค! ์•”ํŠผ ์—ฌ๊ธฐ๊นŒ์ง€๊ฐ€ ๋ฐฐ๊ฒฝ์ด์—ˆ๊ตฌ์š”! ์–ด๋–ป๊ฒŒ ๊ตฌํ˜„ํ–ˆ๋Š”์ง€ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

Standalone vs. Distributed: Scalability

Kafka Connect๋ฅผ Standalone ๋ชจ๋“œ๋กœ ๋””ํ”Œ๋กœ์ด ํ•œ ์ƒํƒœ์—์„œ Pod Replica๋ฅผ 1์—์„œ 2๋กœ ๋Š˜๋ฆฌ๊ฒŒ ๋œ๋‹ค๋ฉด, ๊ฐ Pod์ด ๋™์ผ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. ์ฆ‰, Throughput์ด 2๋ฐฐ๊ฐ€ ๋˜๋Š” ๊ฒƒ์€ ๋งž์ง€๋งŒ ๋ฐ์ดํ„ฐ๋„ 2๋ฐฐ์”ฉ ์ค‘๋ณตํ•ด์„œ ๋“ค์–ด์˜ค๊ฒŒ ๋ฉ๋‹ˆ๋‹ค!!

๋ฐ˜๋ฉด์— Distributed ๋ชจ๋“œ๋กœ ๋””ํ”Œ๋กœ์ด ํ•œ ์ƒํƒœ์—์„œ Pod Replica๋ฅผ 2๋กœ ๋Š˜๋ฆฌ๊ฒŒ ๋˜๋ฉด, ๊ฐ Pod์ด ๋ฐ์ดํ„ฐ๋ฅผ โ€œ์ ˆ๋ฐ˜์”ฉ ๋‚˜๋ˆ„์–ดโ€ ์ฒ˜๋ฆฌํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. ์ฆ‰, Throughput์„ 2๋ฐฐ๋กœ ๋Š˜๋ฆฌ๋ฉด์„œ ๋ฐ์ดํ„ฐ๋„ ์ค‘๋ณต๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค!

Standalone vs. Distributed: Config Topics

Standalone ๋ชจ๋“œ์—์„œ๋Š” Connector๋ฅผ ์‹คํ–‰ํ•  ๋•Œ, ์•„๋ž˜์™€ ๊ฐ™์ด ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

$ connect-standalone standalone.properties local-file-sink.properties

standalone.properties์—๋Š” Kafka Server์— ์ ‘์†ํ•˜๊ธฐ ์œ„ํ•œ ์ •๋ณด๊ฐ€ ๋‹ด๊ฒจ์žˆ์—ˆ๊ณ , local-file-sink.properties์—๋Š” File Sink Connector๋ฅผ ๋™์ž‘ํ•˜๊ธฐ ์œ„ํ•œ ์ •๋ณด๊ฐ€ ๋‹ด๊ฒจ ์žˆ์—ˆ์Šต๋‹ˆ๋‹ค.

Distributed ๋ชจ๋“œ์—์„œ๋Š” ๋’ค์˜ Connector Plugin ๊ตฌ์„ฑ์ด ํŒŒ์ผ์ด ์•„๋‹ˆ๋ผ Kafka Configuration Topic์— ๊ธฐ๋ก๋ฉ๋‹ˆ๋‹ค.

developer.confluent.io

Distributed ๋ชจ๋“œ๋Š” Standalone๊ณผ ๋‹ฌ๋ฆฌ 3๊ฐ€์ง€ ํ† ํ”ฝ์„ ํ•„์š”๋กœ ํ•ฉ๋‹ˆ๋‹ค.

  • config.storage.topic
    • Kafka Connector์—์„œ ์‹คํ–‰ํ•˜๋Š” ์ž‘์—…(task)์— ๋Œ€ํ•œ ๊ตฌ์„ฑ ์ •๋ณด
    • local-file-sink.properties ํŒŒ์ผ์— ์žˆ๋˜ ์ •๋ณด๊ฐ€ ์š” ํ† ํ”ฝ์— ๋‹ด๊ธด๋‹ค.
  • offset.storage.topic
    • Kafka Connector์˜ ์ž‘์—…์ด ์–ด๋””๊นŒ์ง€ ์ฒ˜๋ฆฌ ํ–ˆ๋Š”์ง€ ๊ธฐ๋กํ•œ ์ •๋ณด
    • Standalone์—์„œ๋Š” offset.storage.file.filename์— ๋ช…์‹œํ•œ ํŒŒ์ผ์— ํ•ด๋‹น ์ •๋ณด๊ฐ€ ๋‹ด๊ฒผ๋‹ค.
  • status.storage.topic
    • Kafka Connector ์œ„์—์„œ ๋™์ž‘ํ•˜๋Š” ๊ฐœ๋ณ„ ์ž‘์—…(task)์˜ ์ƒํƒœ๋ฅผ ์ €์žฅํ•˜๋Š” ์ •๋ณด
    • Distributed ๋ชจ๋“œ์—์„œ๋Š” Fault Tolerance๋ฅผ ์œ„ํ•ด ๊ฐ Task๊ฐ€ ์„œ๋กœ ์ƒํƒœ๋ฅผ ์ฒดํฌํ•œ๋‹ค.

์ด๋ ‡๊ฒŒ ๊ตฌ์„ฑ ์ •๋ณด๊ฐ€ Kafka Connect ๋‚ด๋ถ€๊ฐ€ ์•„๋‹ˆ๋ผ, ์™ธ๋ถ€(remote)์ธ Topic์— ๊ธฐ๋ก๋˜๊ธฐ ๋•Œ๋ฌธ์— ๋ชจ๋“  Pod์ด ์ค‘๋‹จ๋˜๊ฑฐ๋‚˜ ์œ ์‹ค ๋˜๋”๋ผ๋„, Topic์— ๊ธฐ๋กํ•ด๋‘์—ˆ๋˜ ์ •๋ณด๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ Kafka Connect๋ฅผ ์•ˆ์ „ํ•˜๊ฒŒ ๋‹ค์‹œ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค ๐Ÿ˜Œ

Standalone โ†’ Distributed

์ œ ๊ฒฝ์šฐ๋Š” Standalone ๋ชจ๋“œ๋กœ ๋””ํ”Œ๋กœ์ด ํ•œ Kafka Connect๋ฅผ Distributed ๋ชจ๋“œ๋กœ ์ „ํ™˜ํ•˜๋Š” ๊ฒฝ์šฐ ์˜€์Šต๋‹ˆ๋‹ค. ์–ด๋–ค properties๋ฅผ ๋ณ€๊ฒฝ ํ–ˆ๋Š”์ง€ ์œ„์ฃผ๋กœ ์‚ดํŽด๋ณด๋ฉด

- offset.storage.file.filename=/tmp/connect.offsets
+ group.id=local-file-sink
+ config.storage.topic=_local_file_sink.config
+ offset.storage.topic=_local_file_sink.offset
+ status.storage.topic=_local_file_sink.status

์šฐ์„  ๋”์ด์ƒ offset ์ •๋ณด๋ฅผ Kafka Connect์˜ ๋กœ์ปฌ์— ์ €์žฅํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— offset.storage.file.filename ๊ฐ’์ด ํ•„์š” ์—†์Šต๋‹ˆ๋‹ค.

๊ทธ๋ฆฌ๊ณ  Distributed ๋ชจ๋“œ๋กœ ๋™์ž‘ํ•˜๊ธฐ ์œ„ํ•ด ๊ฐ ํƒœ์Šคํฌ์˜ ์ •๋ณด๋ฅผ ์ €์žฅํ•  Topic 3๊ฐ€์ง€๋ฅผ ์ง€์ • ํ•ฉ๋‹ˆ๋‹ค.

  • config.storage.topic
  • offset.storage.topic
  • status.storage.topic

Distributed ๋ชจ๋“œ๋กœ Kafka Connect๋ฅผ ๋””ํ”Œ๋กœ์ด ํ•˜๊ฒŒ ๋˜๋ฉด, Kafka Connect๋ฅผ ํด๋Ÿฌ์Šคํ„ฐ(cluster) ํ˜•์‹์œผ๋กœ ์šด์˜ํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. ๊ทธ๋ž˜์„œ ์ด ํด๋Ÿฌ์Šคํ„ฐ ์ด๋ฆ„์„ group.id๋กœ ์ง€์ •ํ•ด์ค๋‹ˆ๋‹ค.

Update Pod Yaml

Pod Yaml๋„ ์•„๋ž˜์™€ ๊ฐ™์ด ๋ณ€๊ฒฝํ•ฉ๋‹ˆ๋‹ค.

- command:
- - "connect-standalone"
- - "/etc/kafka-connect-properties/standalone/standalone.properties"
- - "/etc/kafka-connect-properties/file-sink-connector/local-file-sink.properties"
+ command:
+ - "connect-distributed"
+ - "/etc/kafka-connect-properties/distributed/distributed.properties"

ํฌ๊ฒŒ ๋ณ€๊ฒฝ๋˜๋Š” ์ ์€ ์—†๊ณ , container๋ฅผ ๋Œ๋ฆด ๋•Œ, connect-distributed์™€ ์œ„์˜ ์š”๊ตฌ์‚ฌํ•ญ์„ ๋ฐ˜์˜ํ•œ distributed.properties๋กœ ์‹คํ–‰ํ•˜๋„๋ก ๋ณ€๊ฒฝํ•ฉ๋‹ˆ๋‹ค.

Registry Task using REST API

Standalone ๋ชจ๋“œ์—์„œ๋Š” ์–ด๋–ค ์ž‘์—…(task)๋ฅผ ๋Œ๋ฆด์ง€ .properties ํŒŒ์ผ์„ ์ž‘์„ฑํ•˜๊ณ  ์ด๋ฅผ connect-standalone์— ๋„˜๊ฒจ์ฃผ์—ˆ์Šต๋‹ˆ๋‹ค.

Distributed ๋ชจ๋“œ์—์„œ๋Š” ์ž‘์—…(task)์„ ๋“ฑ๋กํ•  ๋•Œ Kafka Connect์˜ REST API๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค!

$ curl -X POST -H "Content-Type: application/json" \
  http://localhost:8083/connectors \
  --data "@/etc/kafka-connect-properties/file-sink-connector/local-file-sink.json"

์œ„์˜ curl ๋ช…๋ น์–ด์—์„œ POST์˜ body๋ฅผ local-file-sink.json์œผ๋กœ ์ „๋‹ฌ ํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ์œ„ํ•ด local-file-sink.json์„ ์•„๋ž˜์™€ ๊ฐ™์ด ์ž‘์„ฑํ•œ ํ›„, Standalone ๋ชจ๋“œ์—์„œ ํ–ˆ๋˜ ๊ฒƒ์ฒ˜๋Ÿผ K8s Secret์œผ๋กœ ๋งŒ๋“ค์–ด Pod์— Volume Mount๋กœ ์ฃผ์ž…ํ•ฉ๋‹ˆ๋‹ค. (์•„! ์ฐธ๊ณ ๋กœ ์š”๋ ‡๊ฒŒ .json ํŒŒ์ผ๋กœ ์ฃผ์ž…ํ•˜๋Š” ๊ฒฝ์šฐ ๊ฒฝ๋กœ ๋งจ ์•ž์— @๋ฅผ ๊ผญ ๋„ฃ์–ด์ค˜์•ผ ํ–ˆ์Šต๋‹ˆ๋‹ค;;)

// @./local-file-sink.json
{
  "name": "local-file-sink",
  "config":
  {
      "connector.class": "FileStreamSink",
      "tasks.max": "1",
      "topics": "szcode2.qa.avro.server",

      "file": "/tmp/test.sink.txt"
  }
}

curl์„ ํ†ตํ•œ ์ž‘์—… ๋“ฑ๋ก์€ Kafka Connector Pod์ด ๋””ํ”Œ๋กœ์ด ๋˜๊ณ , ๋ช‡์ดˆ๊ฐ„์˜ ๋žœ๋”ฉ ํ›„ ๋“ฑ๋ก์ด ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค.

๊ทธ๋ฆฌ๊ณ , ์ž‘์—… ๋“ฑ๋ก ํ›„์— ๋˜ 1๋ถ„~3๋ถ„ ์ •๋„ ๊ธฐ๋‹ค๋ฆฌ๋ฉด, Confluent์—์„œ๋„ Distributed ๋ชจ๋“œ์˜ Connector๊ฐ€ ๋“ฑ๋ก๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค ๐Ÿ˜Œ

๋งบ์Œ๋ง

๋ญ”๊ฐ€ Kafka๋ฅผ ์ฒ˜์Œ ๊ณต๋ถ€ํ•  ๋•Œ ๋ดค๋˜ ๊ธฐ์–ต์ด ์–ด๋ ดํ’‹์ด ๋‚˜๋Š” ๊ฒƒ ๊ฐ™์€๋ฐ, ์ง์ ‘ ๋„์›Œ๋ณด๋‹ˆ ์™œ Standalone ๋ชจ๋“œ์™€ Distributed ๋ชจ๋“œ, ๋‘ ๋ฐฉ์‹์ด ์กด์žฌํ•˜๋Š”์ง€ ์ž˜ ์™€๋‹ฟ๋Š” ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค ใ…Žใ…Ž (์—ญ์‹œ ์ง์ ‘ ํ•ด๋ด์•ผ ๋Š˜์–ด)

Distributed Mode ์†์„ฑ ์ค‘์— rest.advertised.host.name ์ชฝ์€ ์•„์ง ์ œ๋Œ€๋กœ ๋ชป ๋ดค๋Š”๋ฐ, ๋‚˜์ค‘์— ์‹œ๊ฐ„์ด ๋‚˜๋ฉด ์ข€๋” ์‚ดํŽด๋ณด๊ณ ์ž ํ•ฉ๋‹ˆ๋‹ค. (์ ๋‹นํžˆ ๋Š์–ด์ฃผ๋Š” ๊ฒƒ๋„ ํ•„์š” ใ…Žใ…Ž)

์ด๋ฒˆ์— ํšŒ์‚ฌ ์—…๋ฌด๋กœ Kafka ์ž‘์—…์„ ๊ฝค ๋งŽ์ด ํ•ด๋ณด๊ฒŒ ๋˜์–ด์„œ ๋‹ค์Œ ์ž๊ฒฉ์ฆ์œผ๋กœ Confluent Certificate๋ฅผ ๋ชฉํ‘œ๋กœ ์„ค์ • ํ–ˆ์Šต๋‹ˆ๋‹ค! ์ด์ชฝ ์ƒํƒœ๊ณ„์— ๋Œ€ํ•ด์„œ๋„ ๋”๋”๋” ๋งŽ์ด ์•Œ ์ˆ˜ ์žˆ๊ฒŒ ๋˜๊ธธ ใ…Žใ…Ž ๊ทธ๋Ÿผ ์•ž์œผ๋กœ๋„ ์•„์ขŒ์žฃ! ๐Ÿ‘Š

Categories:

Updated: