Kafka Connector on k8s - Distributed Mode
์ด๋ฒ ํฌ์คํธ๋ Kafka Connector on k8s - Standalone Mode์์ ๋ด์ฉ์ด ์ด์ด์ง๋๋ค. ๐
์ Distributed Mode์ ๊ด์ฌ์ ๊ฐ๊ฒ ๋์๋์?
ํ์ฌ์์ Confluent๋ฅผ ํตํด Kafka ํด๋ฌ์คํฐ๋ฅผ ์ฌ์ฉํ๊ณ ์์ต๋๋ค. ๋๋ถ๋ถ์ ๊ฒฝ์ฐ Confluent-managed Connector๋ฅผ ์ฌ์ฉ ํ์์ผ๋, ์ต๊ทผ Confluent์์ ์ง์ํ์ง ์๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ก Topic์ ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํด์ผ ํ ์ผ์ด ์๊ฒผ๊ณ , ์ด๋ฅผ K8s์ Standalone mode๋ก ๋์ ์ต๋๋ค.
๊ทธ๋ ๊ฒ ํ ๋ฌ ์ ๋ ์ ์ด์ํ๊ณ ์๋ค๊ฐ Confluent ์ ๊ธฐ ๋ฏธํ ์์ Connector๋ฅผ Standalone ๋ชจ๋๋ก ์ฐ๊ณ ์๋ค๊ณ ๋ง์ ๋๋ฆฌ๋, Standalone ๋ชจ๋๋ ๋ฌธ์ ์ด๋๋ฅผ ๋ด๋ โ๊ฐ๋ฐ, ํ ์คํธ ๋ชฉ์ ์ผ๋ก ์ฐ์์คโ๋ผ๊ณ ๋์์์ง Prod ํ๊ฒฝ์์๋ โDistributedโ ๋ชจ๋๋ก ๋๋ฆฌ๋ ๊ฒ์ด ๊ถ์ฅ ์ฌํญ์ด๋ผ๋ ํผ๋๋ฐฑ์ ๋ค์์ต๋๋ค! ์ํผ ์ฌ๊ธฐ๊น์ง๊ฐ ๋ฐฐ๊ฒฝ์ด์๊ตฌ์! ์ด๋ป๊ฒ ๊ตฌํํ๋์ง ์ดํด๋ณด๊ฒ ์ต๋๋ค.
Standalone vs. Distributed: Scalability
Kafka Connect๋ฅผ Standalone ๋ชจ๋๋ก ๋ํ๋ก์ด ํ ์ํ์์ Pod Replica๋ฅผ 1์์ 2๋ก ๋๋ฆฌ๊ฒ ๋๋ค๋ฉด, ๊ฐ Pod์ด ๋์ผ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ฒ ๋ฉ๋๋ค. ์ฆ, Throughput์ด 2๋ฐฐ๊ฐ ๋๋ ๊ฒ์ ๋ง์ง๋ง ๋ฐ์ดํฐ๋ 2๋ฐฐ์ฉ ์ค๋ณตํด์ ๋ค์ด์ค๊ฒ ๋ฉ๋๋ค!!
๋ฐ๋ฉด์ Distributed ๋ชจ๋๋ก ๋ํ๋ก์ด ํ ์ํ์์ Pod Replica๋ฅผ 2๋ก ๋๋ฆฌ๊ฒ ๋๋ฉด, ๊ฐ Pod์ด ๋ฐ์ดํฐ๋ฅผ โ์ ๋ฐ์ฉ ๋๋์ดโ ์ฒ๋ฆฌํ๊ฒ ๋ฉ๋๋ค. ์ฆ, Throughput์ 2๋ฐฐ๋ก ๋๋ฆฌ๋ฉด์ ๋ฐ์ดํฐ๋ ์ค๋ณต๋์ง ์์ต๋๋ค!
Standalone vs. Distributed: Config Topics
Standalone ๋ชจ๋์์๋ Connector๋ฅผ ์คํํ ๋, ์๋์ ๊ฐ์ด ์คํํฉ๋๋ค.
$ connect-standalone standalone.properties local-file-sink.properties
standalone.properties
์๋ Kafka Server์ ์ ์ํ๊ธฐ ์ํ ์ ๋ณด๊ฐ ๋ด๊ฒจ์์๊ณ , local-file-sink.properties
์๋ File Sink Connector๋ฅผ ๋์ํ๊ธฐ ์ํ ์ ๋ณด๊ฐ ๋ด๊ฒจ ์์์ต๋๋ค.
Distributed ๋ชจ๋์์๋ ๋ค์ Connector Plugin ๊ตฌ์ฑ์ด ํ์ผ์ด ์๋๋ผ Kafka Configuration Topic์ ๊ธฐ๋ก๋ฉ๋๋ค.
developer.confluent.io
Distributed ๋ชจ๋๋ Standalone๊ณผ ๋ฌ๋ฆฌ 3๊ฐ์ง ํ ํฝ์ ํ์๋ก ํฉ๋๋ค.
config.storage.topic
- Kafka Connector์์ ์คํํ๋ ์์ (task)์ ๋ํ ๊ตฌ์ฑ ์ ๋ณด
local-file-sink.properties
ํ์ผ์ ์๋ ์ ๋ณด๊ฐ ์ ํ ํฝ์ ๋ด๊ธด๋ค.
offset.storage.topic
- Kafka Connector์ ์์ ์ด ์ด๋๊น์ง ์ฒ๋ฆฌ ํ๋์ง ๊ธฐ๋กํ ์ ๋ณด
- Standalone์์๋
offset.storage.file.filename
์ ๋ช ์ํ ํ์ผ์ ํด๋น ์ ๋ณด๊ฐ ๋ด๊ฒผ๋ค.
status.storage.topic
- Kafka Connector ์์์ ๋์ํ๋ ๊ฐ๋ณ ์์ (task)์ ์ํ๋ฅผ ์ ์ฅํ๋ ์ ๋ณด
- Distributed ๋ชจ๋์์๋ Fault Tolerance๋ฅผ ์ํด ๊ฐ Task๊ฐ ์๋ก ์ํ๋ฅผ ์ฒดํฌํ๋ค.
์ด๋ ๊ฒ ๊ตฌ์ฑ ์ ๋ณด๊ฐ Kafka Connect ๋ด๋ถ๊ฐ ์๋๋ผ, ์ธ๋ถ(remote)์ธ Topic์ ๊ธฐ๋ก๋๊ธฐ ๋๋ฌธ์ ๋ชจ๋ Pod์ด ์ค๋จ๋๊ฑฐ๋ ์ ์ค ๋๋๋ผ๋, Topic์ ๊ธฐ๋กํด๋์๋ ์ ๋ณด๋ฅผ ๋ฐํ์ผ๋ก Kafka Connect๋ฅผ ์์ ํ๊ฒ ๋ค์ ์คํํ ์ ์์ต๋๋ค ๐
Standalone โ Distributed
์ ๊ฒฝ์ฐ๋ Standalone ๋ชจ๋๋ก ๋ํ๋ก์ด ํ Kafka Connect๋ฅผ Distributed ๋ชจ๋๋ก ์ ํํ๋ ๊ฒฝ์ฐ ์์ต๋๋ค. ์ด๋ค properties๋ฅผ ๋ณ๊ฒฝ ํ๋์ง ์์ฃผ๋ก ์ดํด๋ณด๋ฉด
- offset.storage.file.filename=/tmp/connect.offsets
+ group.id=local-file-sink
+ config.storage.topic=_local_file_sink.config
+ offset.storage.topic=_local_file_sink.offset
+ status.storage.topic=_local_file_sink.status
์ฐ์ ๋์ด์ offset ์ ๋ณด๋ฅผ Kafka Connect์ ๋ก์ปฌ์ ์ ์ฅํ์ง ์๊ธฐ ๋๋ฌธ์ offset.storage.file.filename
๊ฐ์ด ํ์ ์์ต๋๋ค.
๊ทธ๋ฆฌ๊ณ Distributed ๋ชจ๋๋ก ๋์ํ๊ธฐ ์ํด ๊ฐ ํ์คํฌ์ ์ ๋ณด๋ฅผ ์ ์ฅํ Topic 3๊ฐ์ง๋ฅผ ์ง์ ํฉ๋๋ค.
config.storage.topic
offset.storage.topic
status.storage.topic
Distributed ๋ชจ๋๋ก Kafka Connect๋ฅผ ๋ํ๋ก์ด ํ๊ฒ ๋๋ฉด, Kafka Connect๋ฅผ ํด๋ฌ์คํฐ(cluster) ํ์์ผ๋ก ์ด์ํ๊ฒ ๋ฉ๋๋ค. ๊ทธ๋์ ์ด ํด๋ฌ์คํฐ ์ด๋ฆ์ group.id
๋ก ์ง์ ํด์ค๋๋ค.
Update Pod Yaml
Pod Yaml๋ ์๋์ ๊ฐ์ด ๋ณ๊ฒฝํฉ๋๋ค.
- command:
- - "connect-standalone"
- - "/etc/kafka-connect-properties/standalone/standalone.properties"
- - "/etc/kafka-connect-properties/file-sink-connector/local-file-sink.properties"
+ command:
+ - "connect-distributed"
+ - "/etc/kafka-connect-properties/distributed/distributed.properties"
ํฌ๊ฒ ๋ณ๊ฒฝ๋๋ ์ ์ ์๊ณ , container๋ฅผ ๋๋ฆด ๋, connect-distributed
์ ์์ ์๊ตฌ์ฌํญ์ ๋ฐ์ํ distributed.properties
๋ก ์คํํ๋๋ก ๋ณ๊ฒฝํฉ๋๋ค.
Registry Task using REST API
Standalone ๋ชจ๋์์๋ ์ด๋ค ์์
(task)๋ฅผ ๋๋ฆด์ง .properties
ํ์ผ์ ์์ฑํ๊ณ ์ด๋ฅผ connect-standalone
์ ๋๊ฒจ์ฃผ์์ต๋๋ค.
Distributed ๋ชจ๋์์๋ ์์ (task)์ ๋ฑ๋กํ ๋ Kafka Connect์ REST API๋ฅผ ์ฌ์ฉํฉ๋๋ค!
$ curl -X POST -H "Content-Type: application/json" \
http://localhost:8083/connectors \
--data "@/etc/kafka-connect-properties/file-sink-connector/local-file-sink.json"
์์ curl
๋ช
๋ น์ด์์ POST์ body๋ฅผ local-file-sink.json
์ผ๋ก ์ ๋ฌ ํฉ๋๋ค.
์ด๋ฅผ ์ํด local-file-sink.json
์ ์๋์ ๊ฐ์ด ์์ฑํ ํ, Standalone ๋ชจ๋์์ ํ๋ ๊ฒ์ฒ๋ผ K8s Secret์ผ๋ก ๋ง๋ค์ด Pod์ Volume Mount๋ก ์ฃผ์
ํฉ๋๋ค.
(์! ์ฐธ๊ณ ๋ก ์๋ ๊ฒ .json
ํ์ผ๋ก ์ฃผ์
ํ๋ ๊ฒฝ์ฐ ๊ฒฝ๋ก ๋งจ ์์ @
๋ฅผ ๊ผญ ๋ฃ์ด์ค์ผ ํ์ต๋๋ค;;)
// @./local-file-sink.json
{
"name": "local-file-sink",
"config":
{
"connector.class": "FileStreamSink",
"tasks.max": "1",
"topics": "szcode2.qa.avro.server",
"file": "/tmp/test.sink.txt"
}
}
curl
์ ํตํ ์์
๋ฑ๋ก์ Kafka Connector Pod์ด ๋ํ๋ก์ด ๋๊ณ , ๋ช์ด๊ฐ์ ๋๋ฉ ํ ๋ฑ๋ก์ด ๊ฐ๋ฅํฉ๋๋ค.
๊ทธ๋ฆฌ๊ณ , ์์ ๋ฑ๋ก ํ์ ๋ 1๋ถ~3๋ถ ์ ๋ ๊ธฐ๋ค๋ฆฌ๋ฉด, Confluent์์๋ Distributed ๋ชจ๋์ Connector๊ฐ ๋ฑ๋ก๋ ๊ฒ์ ํ์ธํ ์ ์์ต๋๋ค ๐
๋งบ์๋ง
๋ญ๊ฐ Kafka๋ฅผ ์ฒ์ ๊ณต๋ถํ ๋ ๋ดค๋ ๊ธฐ์ต์ด ์ด๋ ดํ์ด ๋๋ ๊ฒ ๊ฐ์๋ฐ, ์ง์ ๋์๋ณด๋ ์ Standalone ๋ชจ๋์ Distributed ๋ชจ๋, ๋ ๋ฐฉ์์ด ์กด์ฌํ๋์ง ์ ์๋ฟ๋ ๊ฒ ๊ฐ์ต๋๋ค ใ ใ (์ญ์ ์ง์ ํด๋ด์ผ ๋์ด)
Distributed Mode ์์ฑ ์ค์ rest.advertised.host.name
์ชฝ์ ์์ง ์ ๋๋ก ๋ชป ๋ดค๋๋ฐ, ๋์ค์ ์๊ฐ์ด ๋๋ฉด ์ข๋ ์ดํด๋ณด๊ณ ์ ํฉ๋๋ค. (์ ๋นํ ๋์ด์ฃผ๋ ๊ฒ๋ ํ์ ใ
ใ
)
์ด๋ฒ์ ํ์ฌ ์ ๋ฌด๋ก Kafka ์์ ์ ๊ฝค ๋ง์ด ํด๋ณด๊ฒ ๋์ด์ ๋ค์ ์๊ฒฉ์ฆ์ผ๋ก Confluent Certificate๋ฅผ ๋ชฉํ๋ก ์ค์ ํ์ต๋๋ค! ์ด์ชฝ ์ํ๊ณ์ ๋ํด์๋ ๋๋๋ ๋ง์ด ์ ์ ์๊ฒ ๋๊ธธ ใ ใ ๊ทธ๋ผ ์์ผ๋ก๋ ์์ข์ฃ! ๐