Kafka Connector on k8s - Standalone Mode
๋ค์ด๊ฐ๋ฉฐ
ํ์ฌ์์ Confluent๋ฅผ ํตํด Kafka ํด๋ฌ์คํฐ๋ฅผ ์ฌ์ฉํ๊ณ ์์ต๋๋ค. ๋๋ถ๋ถ์ ๊ฒฝ์ฐ Confluent-managed Connector๋ฅผ ์ฌ์ฉ ํ์์ผ๋, ์ต๊ทผ Confluent์์ ์ง์ํ์ง ์๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ก Topic์ ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํด์ผ ํ ์ผ์ด ์๊ฒผ์ต๋๋คโฆ! ์ ๋ Confluent๋ฅผ ์ดํดํ ์ข์ ๊ธฐํ๋ผ๊ณ ์๊ฐํ๊ณ , Kubernetes ์์์ Sink Connector๋ฅผ ์ด์ํ๋ ๊ฒฝํ์ ํด๋ณผ ์ ์์์ต๋๋ค.
๊ตฌ๊ธ์ ๊ฒ์ํด๋ณด๋ฉด, Strimzi๋ฅผ ์ฌ์ฉํด Kafka Cluster์ Connector๊น์ง ๋์ฐ๋ ๊ฒฝ์ฐ๋ฅผ ๋ง์ด ๋ดค๋๋ฐ์. Strimzi๋ ๋งค๋ ฅ์ ์ธ ๋๊ตฌ์ด์ง๋ง, ์ด๋ฒ ์์ ์์๋ ๊ทธ๊ฒ ์์ด Sink Connector๋ฅผ ๋์๋ณด๊ณ ์ถ์์ต๋๋ค! ๊ทธ๋์ ์ ๋ง๊ณ ๋ ์ด๋ฐ ๋์ฆ๋ฅผ ๊ฐ์ง ๋ถ๋ค์ด ์์๊น ํ์ฌ ๋ด์ฉ์ ์ ๋ฆฌํด๋ณด์์ต๋๋ค ใ ใ
Kafka Connector๋?
๋ด์ฉ์ ๋ค์ด๊ฐ๊ธฐ ์ ์ ์ ๊น โKafka Connectorโ์ ๋ํด ์ค๋ช ํ๊ณ ์ ํฉ๋๋ค. โKafka Connectorโ๋ Kafka Topic์ ๋ฐ์ดํฐ๋ฅผ Produceํ๊ฑฐ๋ Consume ํ๋ ํจํด๊ณผ ๊ธฐ๋ฅ์ ์ ๋ฆฌํด Interface๋ก ๋ง๋ ๊ฒ ์ ๋๋ค.
์๋ฅผ ๋ค์ด, Topic ๋ฐ์ดํฐ๋ฅผ AWS S3์ ์ ์ฌํ๊ณ ์ ํ๋ ์์ ์ Kafka๋ฅผ ์ฌ์ฉํ๋ ํ๊ณผ ๊ธฐ์ ์์ ์์ฃผ ๋น๋ฒํ๊ฒ ์ผ์ด๋๋ ํจํด์ ๋๋ค. ์ด๋ฅผ ๊ณตํต ์ฌ๋ก๋ฅผ ์ง์ํ๊ธฐ ์ํด S3 Sink Connector๋ผ๋ ํจํค์ง๊ฐ ์กด์ฌํฉ๋๋ค. ๋ง์ฝ ์ด๊ฒ ์์๋ค๋ฉด, Kafka์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด S3์ ์ ์ฌํ๋ S3 Consumer๋ฅผ ๋ชจ๋๊ฐ ์ฝ๋๋ฅผ ์ผ์ผ์ด ์์ฑํด์ผ ํ์ ๊ฒ ์ ๋๋ค.
Standalone vs. Distributed
Kafka Connector๋ Standalone ๋ชจ๋์ Distributed ๋ชจ๋, 2๊ฐ์ง ๋ชจ๋๋ฅผ ์ง์ ํฉ๋๋ค. ๊ทธ๋์ Confluent์์ ๋ฒํผ ๋ธ๊น์ผ๋ก Connector๋ฅผ ๋ํ๋ก์ด ํ๋ ์ ์๊ฒ ๋ ์ค ์ด๋ค ๊ฑธ ์ ํํ ์ง๋ ํฐ ๊ณ ๋ฏผ ํฌ์ธํธ ์์ต๋๋ค.
Confluent์ Kafka Connect ๋ฌธ์์๋ ์๋ ๊ฒ ๋์ ์์ต๋๋ค.
- Standalone workers
- Standalone mode is the simplest mode, where a single process is responsible for executing all connectors and tasks.
- Standalone mode is convenient for getting started, during development, and in certain situations where only one process makes sense, such as collecting logs from a host.
- Distributed Workers
- Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. In distributed mode, you start many worker processes using the same group.id and they coordinate to schedule execution of connectors and tasks across all available workers.
์์ฝํ๋ฉด, Standalone ๋ชจ๋๋ ์ฑ๊ธ ํ๋ก์ธ์ค ๋ฐฉ์์ด๊ณ , Distributed ๋ชจ๋๋ ํ์ฅ์ฑ๊ณผ ์๋ ๋ณต๊ตฌ๋ฅผ ์ง์ํ๋ค๊ณ ํฉ๋๋ค. ์ด๋ฐ ์ ๋๋ฌธ์ Standalone ๋ชจ๋๋ ํ ์คํธ์ ๊ฐ๋ฐ ๊ณผ์ ์์ ๊ถ์ฅ ๋๋ ๋ชจ๋์ ๋๋ค.
์ผ๋จ ์ฒ์์ด๋ Standalone ๋ชจ๋๋ก ์์ํด๋ณด๊ธฐ๋ก ๋ง์ ๋จน์๊ณ , ์ถํ์ Confluent ์ชฝ ์กฐ์ธ์ ๋ฐ๋ผ Distributed ๋ชจ๋๋ก ์ ํ ํ์์ต๋๋ค. ๋น๋กฏ ์ถ๊ฐ ์์ ์ด ์์์ง๋ง, Kafka Connector๋ฅผ ์ฒ์ ๋ํ๋ก์ด ํด๋ณด๋ ์ํฉ์ด๋ผ๋ฉด Standalone ๋ชจ๋๋ถํฐ ์์ํด๋ณด๊ธธ ์ถ์ฒ๋๋ฆฝ๋๋ค ๐
File Sink Connector
File Sink Connector๋ Kafka Connect์์ ๊ธฐ๋ณธ์ผ๋ก ์ ๊ณต๋๋ Connector ์ ๋๋ค. ๊ทธ๋์ plugin ์ค์น ๊ณผ์ ์ด ๋ฐ๋ก ํ์ ์์ต๋๋ค.
์ฒ์์ ์ด๋ค Docker Image๋ฅผ ์ฌ์ฉํ ์ง ์ ํด์ผ ํ๋๋ฐ์. ์ ๋ Confluent์์ ๊ด๋ฆฌํ๋ Connector base ์ด๋ฏธ์ง์ธ confluentinc/cp-kafka-connect:7.7.1.amd64
๋ฅผ ์ฌ์ฉํ์์ต๋๋ค.
๊ทธ๋ฆฌ๊ณ ์๋์ ๊ฐ์ด K8s pod yaml์ ๊ตฌ์ฑํ์ฌ ํ ์คํธ๋ฅผ ์งํํ์์ต๋๋ค.
apiVersion: apps/v1
kind: Deployment
metadata:
name: file-sink-connector
spec:
selector:
matchLabels:
app: file-sink-connector
template:
metadata:
labels:
app: file-sink-connector
spec:
containers:
- name: file-sink-connector
image: confluentinc/cp-kafka-connect:7.7.1.amd64
command: ["sleep", "infinity"]
env:
- name: CLASSPATH
value: /usr/share/filestream-connectors/*
resources:
limits:
cpu: 1000m
memory: 2048Mi
requests:
cpu: 500m
memory: 1024Mi
ports:
- containerPort: 8083
์ฒ์์ pod์ ๋์์ kafka connect์์ ์ธ ํ์ผ๋ค์ด ์ด๋์ ์๋์ง ํ์
ํ๊ธฐ ์ํด์
pod command๋ฅผ sleep infinity
๋ก ์ฃผ๊ณ ์์ ํ์ต๋๋ค.
์์ ์ ํ๋ค๋ณด๋ CPU/Mem ๋ฆฌ์์ค๊ฐ ๋ถ์กฑํด Connector๊ฐ ์ ๋จ๋ ๊ฒฝ์ฐ๊ฐ ์์ด์ ๋ฆฌ์์ค๋ ๋๋ํ๊ฒ ํ ๋น ํ์ต๋๋ค. ์ผ์ด์ค์ ๋ฐ๋ผ ์ด๊ฒ๋ณด๋ค ์ค์ด์ ๋ ๋ฉ๋๋ค. ๐ฐ
๋ฌดํ ๋๊ธฐ ์ค์ธ k8s pod์ ์ ์ํด ์ดํด๋ณด๋ฉด ์๋ ๋ ๊ฒฝ๋ก์ ์ค์ํ ํ์ผ๊ณผ ์คํฌ๋ฆฝํธ๊ฐ ์๋ ๊ฒ์ ๋ณผ ์ ์์ต๋๋ค
/etc/kafka/
- ๊ฐ์ข default properties ํ์ผ์ด ๋ชจ์ฌ ์์ต๋๋ค.
connect-standalone.properties
,connect-distributed.properties
, โฆ
/usr/bin/
- ๊ฐ์ข binary script๋ค์ด ๋ชจ์ฌ ์์ต๋๋ค.
- Connector๋ฅผ ์คํํ๋
connect-standalone
,connect-distributed
๋ช ๋ น์ด๋ ์ด๊ณณ์ ์์น ํฉ๋๋ค.
Configure Properties
์ด์ Connector์์ Confluent์ ์ ์ํ๊ธฐ ์ํ Property๋ฅผ ๊ตฌ์ฑํด๋ด ์๋ค.
Standalone Properties
Standalone Connector๊ฐ ๋์ํ๊ณ , Confluent ํ๋ซํผ๊ณผ ํต์ ํ ์ ์๋๋ก Properties๋ฅผ ์ ์ํฉ๋๋ค. ์์ฑ ๋ณ๋ก ์์ธํ ๋ด์ฉ์ Confluent ๋ฌธ์๋ฅผ ์ฐธ๊ณ ํ์๊ธธ ๋ฐ๋๋๋ค.
# @./standalone.properties
bootstrap.servers=xxxx.aws.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx";
request.timeout.ms=20000
retry.backoff.ms=500
consumer.bootstrap.servers=xxxx.aws.confluent.cloud:9092
consumer.ssl.endpoint.identification.algorithm=https
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx";
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
consumer.auto.offset.reset=earliest
offset.flush.interval.ms=10000
offset.storage.file.filename=/tmp/connect.offsets
# Required connection configs for Confluent Cloud Schema Registry
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=xxxx:xxxx
value.converter.schema.registry.url=https://xxxx.aws.confluent.cloud
Schema Registry ๋ถ๋ถ์ ๋ณธ์ธ์ด Subscribe ํ ํ ํฝ์ key/value์ ๋ง์ถฐ ์ค์ ํ๋๋ก ํฉ๋๋ค. ์ ๋ Avro value๋ก ๊ฐ์ด ๋ด๊ธฐ๊ณ ์์ด AvroConverter
๋ฅผ ์ฌ์ฉํ์ต๋๋ค.
์์ฑํ standalone.properties
ํ์ผ์ Pod์์ ์ฌ์ฉํ ์ ์๋๋ก ์๋ ์ปค๋งจ๋๋ฅผ ํตํด K8s Secret์ผ๋ก ์์ฑํฉ๋๋ค.
$ kubectl create secret generic file-sink-standalone-properties --from-file=standalone.properties
Kafka Connect Configuration ๋ฌธ์๋ฅผ ์ดํด๋ณด๋ฉด, .properties
ํ์ผ์ด ์๋๋ผ ํ๊ฒฝ ๋ณ์(ENV)๋ฅผ ํตํด ์ธํ
ํ๋ ๋ถ๋ถ์ด ๋์ต๋๋ค. ๋ฌธ์์ ๋ฐ๋ฅด๋ฉด, CONNECT_
๋ผ๋ ์ ๋์ฌ(prefix)๋ฅผ ๋ถ์ฌ Connector Configuration์ด ๊ฐ๋ฅํฉ๋๋ค.
์ ๋ ์ฒ์์ ENV๋ฅผ ํตํด ์ธํ
ํ๋ ๋ฐฉ๋ฒ์ ์ฌ์ฉ ํ์ต๋๋ค๋ง, ์์
์ ํ๋ค๋ณด๋ ๋ณ๋ ํ์ผ๋ก ๋ถ๋ฆฌํ๋ ๊ฒ์ด ๋ ํธ๋ฆฌํ๊ณ , ๋ ์๊ฒ Standalone Properties๋ง CONNECT_
ENV๋ก ๋ฃ์ ์ ์๊ณ , Connector Plugin์ Properties๋ ๋ฐ๋ก .properties
ํ์ผ ํํ๋ก ์กด์ฌํด์ผ ํ์ต๋๋ค. ๊ทธ๋์ ํ์์ ํต์ผํ๊ณ ์ Properties ์ ์๋ฅผ ๋ชจ๋ xxxx.properties
๋ก ํต์ผํ์์ต๋๋ค ๐
File Sink Properties
์ด์ด์ File Sink Plugin์ ์ฌ์ฉํ๊ธฐ ์ํ Properties๋ฅผ ์ ์ํฉ๋๋ค. ์์ฑ ๋ณ๋ก ์์ธํ ๋ด์ฉ์ Confluent ๋ฌธ์๋ฅผ ์ฐธ๊ณ ํ์๊ธธ ๋ฐ๋๋๋ค.
file
์์ฑ์ ์ ์ํ ๊ฒฝ๋ก์ Topic ๋ฐ์ดํฐ๊ฐ ํ์ค์ฉ ์์ด๊ฒ ๋ฉ๋๋ค.
# @./local-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=YOUR_TOPIC
๊ทธ๋ฆฌ๊ณ ๋ง์ฐฌ๊ฐ์ง๋ก K8s Secret์ผ๋ก ๋ง๋ค์ด Pod์์ ์ด ํ์ผ์ ์ฌ์ฉํ ์ ์๋๋ก ํฉ๋๋ค.
$ kubectl create secret generic local-file-sink-properties --from-file=local-file-sink.properties
Deploy Connector
์ด์ ์๋์ yaml ํ์ผ์ ์ฌ์ฉํด File Sink Connector๋ฅผ ๋ํ๋ก์ด ํฉ๋๋ค.
apiVersion: apps/v1
kind: Deployment
metadata:
name: file-sink-connector
spec:
selector:
matchLabels:
app: file-sink-connector
template:
metadata:
labels:
app: file-sink-connector
spec:
containers:
- name: file-sink-connector
image: confluentinc/cp-kafka-connect:7.7.1.amd64
command: ["connect-standalone", "/etc/kafka-connect-properties/standalone/standalone.properties", "/etc/kafka-connect-properties/file-sink-connector/local-file-sink.properties"]
env:
- name: CLASSPATH
value: /usr/share/filestream-connectors/*
resources:
limits:
cpu: 1000m
memory: 2048Mi
requests:
cpu: 500m
memory: 1024Mi
ports:
- containerPort: 8083
volumeMounts:
- name: local-file-sink-properties
mountPath: /etc/kafka-connect-properties/file-sink-connector/
readOnly: true
- name: file-sink-standalone-properties
mountPath: /etc/kafka-connect-properties/standalone/
readOnly: true
volumes:
- name: local-file-sink-properties
configMap:
name: local-file-sink-properties
- name: file-sink-standalone-properties
configMap:
name: file-sink-standalone-properties
๋งบ์๋ง
File Sink ํ๋ฌ๊ทธ์ธ์ด ์๋ ๋ค๋ฅธ ํ๋ฌ๊ทธ์ธ์ ์ฌ์ฉํ๊ณ ์ถ๋ค๋ฉด, ์์ ํ์์ Connector Plugin์ ๋ํ ๋ถ๋ถ์ ํ์์ ๋ง๊ฒ ์์ ํ๋ฉด ๋ฉ๋๋ค.
๋ค์์ผ๋ก๋ ํ์ฅ์ฑ๊ณผ ์์ ์ฑ์ ๊ฐ์ถ Distributed ๋ชจ๋๋ก Connector๋ฅผ ๋์๋ด ์๋ค! ๐