Kafka Connector on k8s - Distributed Mode
์ด๋ฒ ํฌ์คํธ๋ Kafka Connector on k8s - Standalone Mode์์ ๋ด์ฉ์ด ์ด์ด์ง๋๋ค. ๐
์ Distributed Mode์ ๊ด์ฌ์ ๊ฐ๊ฒ ๋์๋์?
ํ์ฌ์์ Confluent๋ฅผ ํตํด Kafka ํด๋ฌ์คํฐ๋ฅผ ์ฌ์ฉํ๊ณ ์์ต๋๋ค. ๋๋ถ๋ถ์ ๊ฒฝ์ฐ Confluent-managed Connector๋ฅผ ์ฌ์ฉ ํ์์ผ๋, ์ต๊ทผ Confluent์์ ์ง์ํ์ง ์๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ก Topic์ ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํด์ผ ํ ์ผ์ด ์๊ฒผ๊ณ , ์ด๋ฅผ K8s์ Standalone mode๋ก ๋์ ์ต๋๋ค.
๊ทธ๋ ๊ฒ ํ ๋ฌ ์ ๋ ์ ์ด์ํ๊ณ ์๋ค๊ฐ Confluent ์ ๊ธฐ ๋ฏธํ ์์ Connector๋ฅผ Standalone ๋ชจ๋๋ก ์ฐ๊ณ ์๋ค๊ณ ๋ง์ ๋๋ฆฌ๋, Standalone ๋ชจ๋๋ ๋ฌธ์ ์ด๋๋ฅผ ๋ด๋ โ๊ฐ๋ฐ, ํ ์คํธ ๋ชฉ์ ์ผ๋ก ์ฐ์์คโ๋ผ๊ณ ๋์์์ง Prod ํ๊ฒฝ์์๋ โDistributedโ ๋ชจ๋๋ก ๋๋ฆฌ๋ ๊ฒ์ด ๊ถ์ฅ ์ฌํญ์ด๋ผ๋ ํผ๋๋ฐฑ์ ๋ค์์ต๋๋ค! ์ํผ ์ฌ๊ธฐ๊น์ง๊ฐ ๋ฐฐ๊ฒฝ์ด์๊ตฌ์! ์ด๋ป๊ฒ ๊ตฌํํ๋์ง ์ดํด๋ณด๊ฒ ์ต๋๋ค.
Standalone vs. Distributed: Scalability

Kafka Connect๋ฅผ Standalone ๋ชจ๋๋ก ๋ํ๋ก์ด ํ ์ํ์์ Pod Replica๋ฅผ 1์์ 2๋ก ๋๋ฆฌ๊ฒ ๋๋ค๋ฉด, ๊ฐ Pod์ด ๋์ผ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ฒ ๋ฉ๋๋ค. ์ฆ, Throughput์ด 2๋ฐฐ๊ฐ ๋๋ ๊ฒ์ ๋ง์ง๋ง ๋ฐ์ดํฐ๋ 2๋ฐฐ์ฉ ์ค๋ณตํด์ ๋ค์ด์ค๊ฒ ๋ฉ๋๋ค!!
๋ฐ๋ฉด์ Distributed ๋ชจ๋๋ก ๋ํ๋ก์ด ํ ์ํ์์ Pod Replica๋ฅผ 2๋ก ๋๋ฆฌ๊ฒ ๋๋ฉด, ๊ฐ Pod์ด ๋ฐ์ดํฐ๋ฅผ โ์ ๋ฐ์ฉ ๋๋์ดโ ์ฒ๋ฆฌํ๊ฒ ๋ฉ๋๋ค. ์ฆ, Throughput์ 2๋ฐฐ๋ก ๋๋ฆฌ๋ฉด์ ๋ฐ์ดํฐ๋ ์ค๋ณต๋์ง ์์ต๋๋ค!
Standalone vs. Distributed: Config Topics
Standalone ๋ชจ๋์์๋ Connector๋ฅผ ์คํํ ๋, ์๋์ ๊ฐ์ด ์คํํฉ๋๋ค.
$ connect-standalone standalone.properties local-file-sink.properties
standalone.properties์๋ Kafka Server์ ์ ์ํ๊ธฐ ์ํ ์ ๋ณด๊ฐ ๋ด๊ฒจ์์๊ณ , local-file-sink.properties์๋ File Sink Connector๋ฅผ ๋์ํ๊ธฐ ์ํ ์ ๋ณด๊ฐ ๋ด๊ฒจ ์์์ต๋๋ค.
Distributed ๋ชจ๋์์๋ ๋ค์ Connector Plugin ๊ตฌ์ฑ์ด ํ์ผ์ด ์๋๋ผ Kafka Configuration Topic์ ๊ธฐ๋ก๋ฉ๋๋ค.

developer.confluent.io
Distributed ๋ชจ๋๋ Standalone๊ณผ ๋ฌ๋ฆฌ 3๊ฐ์ง ํ ํฝ์ ํ์๋ก ํฉ๋๋ค.
config.storage.topic- Kafka Connector์์ ์คํํ๋ ์์ (task)์ ๋ํ ๊ตฌ์ฑ ์ ๋ณด
local-file-sink.propertiesํ์ผ์ ์๋ ์ ๋ณด๊ฐ ์ ํ ํฝ์ ๋ด๊ธด๋ค.
offset.storage.topic- Kafka Connector์ ์์ ์ด ์ด๋๊น์ง ์ฒ๋ฆฌ ํ๋์ง ๊ธฐ๋กํ ์ ๋ณด
- Standalone์์๋
offset.storage.file.filename์ ๋ช ์ํ ํ์ผ์ ํด๋น ์ ๋ณด๊ฐ ๋ด๊ฒผ๋ค.
status.storage.topic- Kafka Connector ์์์ ๋์ํ๋ ๊ฐ๋ณ ์์ (task)์ ์ํ๋ฅผ ์ ์ฅํ๋ ์ ๋ณด
- Distributed ๋ชจ๋์์๋ Fault Tolerance๋ฅผ ์ํด ๊ฐ Task๊ฐ ์๋ก ์ํ๋ฅผ ์ฒดํฌํ๋ค.
์ด๋ ๊ฒ ๊ตฌ์ฑ ์ ๋ณด๊ฐ Kafka Connect ๋ด๋ถ๊ฐ ์๋๋ผ, ์ธ๋ถ(remote)์ธ Topic์ ๊ธฐ๋ก๋๊ธฐ ๋๋ฌธ์ ๋ชจ๋ Pod์ด ์ค๋จ๋๊ฑฐ๋ ์ ์ค ๋๋๋ผ๋, Topic์ ๊ธฐ๋กํด๋์๋ ์ ๋ณด๋ฅผ ๋ฐํ์ผ๋ก Kafka Connect๋ฅผ ์์ ํ๊ฒ ๋ค์ ์คํํ ์ ์์ต๋๋ค ๐
Standalone โ Distributed
์ ๊ฒฝ์ฐ๋ Standalone ๋ชจ๋๋ก ๋ํ๋ก์ด ํ Kafka Connect๋ฅผ Distributed ๋ชจ๋๋ก ์ ํํ๋ ๊ฒฝ์ฐ ์์ต๋๋ค. ์ด๋ค properties๋ฅผ ๋ณ๊ฒฝ ํ๋์ง ์์ฃผ๋ก ์ดํด๋ณด๋ฉด
- offset.storage.file.filename=/tmp/connect.offsets
+ group.id=local-file-sink
+ config.storage.topic=_local_file_sink.config
+ offset.storage.topic=_local_file_sink.offset
+ status.storage.topic=_local_file_sink.status
์ฐ์ ๋์ด์ offset ์ ๋ณด๋ฅผ Kafka Connect์ ๋ก์ปฌ์ ์ ์ฅํ์ง ์๊ธฐ ๋๋ฌธ์ offset.storage.file.filename ๊ฐ์ด ํ์ ์์ต๋๋ค.
๊ทธ๋ฆฌ๊ณ Distributed ๋ชจ๋๋ก ๋์ํ๊ธฐ ์ํด ๊ฐ ํ์คํฌ์ ์ ๋ณด๋ฅผ ์ ์ฅํ Topic 3๊ฐ์ง๋ฅผ ์ง์ ํฉ๋๋ค.
config.storage.topicoffset.storage.topicstatus.storage.topic
Distributed ๋ชจ๋๋ก Kafka Connect๋ฅผ ๋ํ๋ก์ด ํ๊ฒ ๋๋ฉด, Kafka Connect๋ฅผ ํด๋ฌ์คํฐ(cluster) ํ์์ผ๋ก ์ด์ํ๊ฒ ๋ฉ๋๋ค. ๊ทธ๋์ ์ด ํด๋ฌ์คํฐ ์ด๋ฆ์ group.id๋ก ์ง์ ํด์ค๋๋ค.
Update Pod Yaml
Pod Yaml๋ ์๋์ ๊ฐ์ด ๋ณ๊ฒฝํฉ๋๋ค.
- command:
- - "connect-standalone"
- - "/etc/kafka-connect-properties/standalone/standalone.properties"
- - "/etc/kafka-connect-properties/file-sink-connector/local-file-sink.properties"
+ command:
+ - "connect-distributed"
+ - "/etc/kafka-connect-properties/distributed/distributed.properties"
ํฌ๊ฒ ๋ณ๊ฒฝ๋๋ ์ ์ ์๊ณ , container๋ฅผ ๋๋ฆด ๋, connect-distributed์ ์์ ์๊ตฌ์ฌํญ์ ๋ฐ์ํ distributed.properties๋ก ์คํํ๋๋ก ๋ณ๊ฒฝํฉ๋๋ค.
Registry Task using REST API
Standalone ๋ชจ๋์์๋ ์ด๋ค ์์
(task)๋ฅผ ๋๋ฆด์ง .properties ํ์ผ์ ์์ฑํ๊ณ ์ด๋ฅผ connect-standalone์ ๋๊ฒจ์ฃผ์์ต๋๋ค.
Distributed ๋ชจ๋์์๋ ์์ (task)์ ๋ฑ๋กํ ๋ Kafka Connect์ REST API๋ฅผ ์ฌ์ฉํฉ๋๋ค!
$ curl -X POST -H "Content-Type: application/json" \
http://localhost:8083/connectors \
--data "@/etc/kafka-connect-properties/file-sink-connector/local-file-sink.json"
์์ curl ๋ช
๋ น์ด์์ POST์ body๋ฅผ local-file-sink.json์ผ๋ก ์ ๋ฌ ํฉ๋๋ค.
์ด๋ฅผ ์ํด local-file-sink.json์ ์๋์ ๊ฐ์ด ์์ฑํ ํ, Standalone ๋ชจ๋์์ ํ๋ ๊ฒ์ฒ๋ผ K8s Secret์ผ๋ก ๋ง๋ค์ด Pod์ Volume Mount๋ก ์ฃผ์
ํฉ๋๋ค.
(์! ์ฐธ๊ณ ๋ก ์๋ ๊ฒ .json ํ์ผ๋ก ์ฃผ์
ํ๋ ๊ฒฝ์ฐ ๊ฒฝ๋ก ๋งจ ์์ @๋ฅผ ๊ผญ ๋ฃ์ด์ค์ผ ํ์ต๋๋ค;;)
// @./local-file-sink.json
{
"name": "local-file-sink",
"config":
{
"connector.class": "FileStreamSink",
"tasks.max": "1",
"topics": "szcode2.qa.avro.server",
"file": "/tmp/test.sink.txt"
}
}
curl์ ํตํ ์์
๋ฑ๋ก์ Kafka Connector Pod์ด ๋ํ๋ก์ด ๋๊ณ , ๋ช์ด๊ฐ์ ๋๋ฉ ํ ๋ฑ๋ก์ด ๊ฐ๋ฅํฉ๋๋ค.
๊ทธ๋ฆฌ๊ณ , ์์ ๋ฑ๋ก ํ์ ๋ 1๋ถ~3๋ถ ์ ๋ ๊ธฐ๋ค๋ฆฌ๋ฉด, Confluent์์๋ Distributed ๋ชจ๋์ Connector๊ฐ ๋ฑ๋ก๋ ๊ฒ์ ํ์ธํ ์ ์์ต๋๋ค ๐
๋งบ์๋ง
๋ญ๊ฐ Kafka๋ฅผ ์ฒ์ ๊ณต๋ถํ ๋ ๋ดค๋ ๊ธฐ์ต์ด ์ด๋ ดํ์ด ๋๋ ๊ฒ ๊ฐ์๋ฐ, ์ง์ ๋์๋ณด๋ ์ Standalone ๋ชจ๋์ Distributed ๋ชจ๋, ๋ ๋ฐฉ์์ด ์กด์ฌํ๋์ง ์ ์๋ฟ๋ ๊ฒ ๊ฐ์ต๋๋ค ใ ใ (์ญ์ ์ง์ ํด๋ด์ผ ๋์ด)
Distributed Mode ์์ฑ ์ค์ rest.advertised.host.name ์ชฝ์ ์์ง ์ ๋๋ก ๋ชป ๋ดค๋๋ฐ, ๋์ค์ ์๊ฐ์ด ๋๋ฉด ์ข๋ ์ดํด๋ณด๊ณ ์ ํฉ๋๋ค. (์ ๋นํ ๋์ด์ฃผ๋ ๊ฒ๋ ํ์ ใ
ใ
)
์ด๋ฒ์ ํ์ฌ ์ ๋ฌด๋ก Kafka ์์ ์ ๊ฝค ๋ง์ด ํด๋ณด๊ฒ ๋์ด์ ๋ค์ ์๊ฒฉ์ฆ์ผ๋ก Confluent Certificate๋ฅผ ๋ชฉํ๋ก ์ค์ ํ์ต๋๋ค! ์ด์ชฝ ์ํ๊ณ์ ๋ํด์๋ ๋๋๋ ๋ง์ด ์ ์ ์๊ฒ ๋๊ธธ ใ ใ ๊ทธ๋ผ ์์ผ๋ก๋ ์์ข์ฃ! ๐