Kafka Connect๋กœ Topic ๋ฐ์ดํ„ฐ๋ฅผ ์†์‰ฝ๊ฒŒ ์ „๋‹ฌํ•˜๊ธฐ โœŒ๏ธ File Sink ํ”Œ๋Ÿฌ๊ทธ์ธ์œผ๋กœ ํ•ธ์ฆˆ์˜จ๊นŒ์ง€!

8 minute read

๋“ค์–ด๊ฐ€๋ฉฐ

ํšŒ์‚ฌ์—์„œ Confluent๋ฅผ ํ†ตํ•ด Kafka ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ๋Œ€๋ถ€๋ถ„์˜ ๊ฒฝ์šฐ Confluent-managed Connector๋ฅผ ์‚ฌ์šฉ ํ–ˆ์—ˆ์œผ๋‚˜, ์ตœ๊ทผ Confluent์—์„œ ์ง€์›ํ•˜์ง€ ์•Š๋Š” ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋กœ Topic์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ ์žฌํ•ด์•ผ ํ•  ์ผ์ด ์ƒ๊ฒผ์Šต๋‹ˆ๋‹คโ€ฆ! ์ €๋Š” Confluent๋ฅผ ์ดํ•ดํ•  ์ข‹์€ ๊ธฐํšŒ๋ผ๊ณ  ์ƒ๊ฐํ–ˆ๊ณ , Kubernetes ์œ„์—์„œ Sink Connector๋ฅผ ์šด์˜ํ•˜๋Š” ๊ฒฝํ—˜์„ ํ•ด๋ณผ ์ˆ˜ ์žˆ์—ˆ์Šต๋‹ˆ๋‹ค.

๊ตฌ๊ธ€์— ๊ฒ€์ƒ‰ํ•ด๋ณด๋ฉด, Strimzi๋ฅผ ์‚ฌ์šฉํ•ด Kafka Cluster์™€ Connector๊นŒ์ง€ ๋„์šฐ๋Š” ๊ฒฝ์šฐ๋ฅผ ๋งŽ์ด ๋ดค๋Š”๋ฐ์š”. Strimzi๋„ ๋งค๋ ฅ์ ์ธ ๋„๊ตฌ์ด์ง€๋งŒ, ์ด๋ฒˆ ์ž‘์—…์—์„œ๋Š” ๊ทธ๊ฒƒ ์—†์ด Sink Connector๋ฅผ ๋„์›Œ๋ณด๊ณ  ์‹ถ์—ˆ์Šต๋‹ˆ๋‹ค! ๊ทธ๋ž˜์„œ ์ € ๋ง๊ณ ๋„ ์ด๋Ÿฐ ๋‹ˆ์ฆˆ๋ฅผ ๊ฐ€์ง„ ๋ถ„๋“ค์ด ์žˆ์„๊นŒ ํ•˜์—ฌ ๋‚ด์šฉ์„ ์ •๋ฆฌํ•ด๋ณด์•˜์Šต๋‹ˆ๋‹ค ใ…Žใ…Ž

Kafka Connector๋ž€?

๋‚ด์šฉ์„ ๋“ค์–ด๊ฐ€๊ธฐ ์ „์— ์ž ๊น โ€œKafka Connectorโ€์— ๋Œ€ํ•ด ์„ค๋ช…ํ•˜๊ณ ์ž ํ•ฉ๋‹ˆ๋‹ค. โ€œKafka Connectorโ€๋Š” Kafka Topic์— ๋ฐ์ดํ„ฐ๋ฅผ Produceํ•˜๊ฑฐ๋‚˜ Consume ํ•˜๋Š” ํŒจํ„ด๊ณผ ๊ธฐ๋Šฅ์„ ์ •๋ฆฌํ•ด Interface๋กœ ๋งŒ๋“  ๊ฒƒ ์ž…๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด, Topic ๋ฐ์ดํ„ฐ๋ฅผ AWS S3์— ์ ์žฌํ•˜๊ณ ์ž ํ•˜๋Š” ์ž‘์—…์€ Kafka๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ํŒ€๊ณผ ๊ธฐ์—…์—์„œ ์•„์ฃผ ๋นˆ๋ฒˆํ•˜๊ฒŒ ์ผ์–ด๋‚˜๋Š” ํŒจํ„ด์ž…๋‹ˆ๋‹ค. ์ด๋ฅผ ๊ณตํ†ต ์‚ฌ๋ก€๋ฅผ ์ง€์›ํ•˜๊ธฐ ์œ„ํ•ด S3 Sink Connector๋ผ๋Š” ํŒจํ‚ค์ง€๊ฐ€ ์กด์žฌํ•ฉ๋‹ˆ๋‹ค. ๋งŒ์•ฝ ์ด๊ฒŒ ์—†์—ˆ๋‹ค๋ฉด, Kafka์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด S3์— ์ ์žฌํ•˜๋Š” S3 Consumer๋ฅผ ๋ชจ๋‘๊ฐ€ ์ฝ”๋“œ๋ฅผ ์ผ์ผ์ด ์ž‘์„ฑํ•ด์•ผ ํ–ˆ์„ ๊ฒƒ ์ž…๋‹ˆ๋‹ค.

Standalone vs. Distributed

Kafka Connector๋Š” Standalone ๋ชจ๋“œ์™€ Distributed ๋ชจ๋“œ, 2๊ฐ€์ง€ ๋ชจ๋“œ๋ฅผ ์ง€์› ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋™์•ˆ Confluent์—์„œ ๋ฒ„ํŠผ ๋”ธ๊น์œผ๋กœ Connector๋ฅผ ๋””ํ”Œ๋กœ์ด ํ•˜๋˜ ์ €์—๊ฒŒ ๋‘˜ ์ค‘ ์–ด๋–ค ๊ฑธ ์„ ํƒํ• ์ง€๋Š” ํฐ ๊ณ ๋ฏผ ํฌ์ธํŠธ ์˜€์Šต๋‹ˆ๋‹ค.

Confluent์˜ Kafka Connect ๋ฌธ์„œ์—๋Š” ์š”๋ ‡๊ฒŒ ๋‚˜์™€ ์žˆ์Šต๋‹ˆ๋‹ค.

  • Standalone workers
    • Standalone mode is the simplest mode, where a single process is responsible for executing all connectors and tasks.
    • Standalone mode is convenient for getting started, during development, and in certain situations where only one process makes sense, such as collecting logs from a host.
  • Distributed Workers
    • Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. In distributed mode, you start many worker processes using the same group.id and they coordinate to schedule execution of connectors and tasks across all available workers.

์š”์•ฝํ•˜๋ฉด, Standalone ๋ชจ๋“œ๋Š” ์‹ฑ๊ธ€ ํ”„๋กœ์„ธ์Šค ๋ฐฉ์‹์ด๊ณ , Distributed ๋ชจ๋“œ๋Š” ํ™•์žฅ์„ฑ๊ณผ ์ž๋™ ๋ณต๊ตฌ๋ฅผ ์ง€์›ํ•œ๋‹ค๊ณ  ํ•ฉ๋‹ˆ๋‹ค. ์ด๋Ÿฐ ์  ๋•Œ๋ฌธ์— Standalone ๋ชจ๋“œ๋Š” ํ…Œ์ŠคํŠธ์™€ ๊ฐœ๋ฐœ ๊ณผ์ •์—์„œ ๊ถŒ์žฅ ๋˜๋Š” ๋ชจ๋“œ์ž…๋‹ˆ๋‹ค.

์ผ๋‹จ ์ฒ˜์Œ์ด๋‹ˆ Standalone ๋ชจ๋“œ๋กœ ์‹œ์ž‘ํ•ด๋ณด๊ธฐ๋กœ ๋งˆ์Œ ๋จน์—ˆ๊ณ , ์ถ”ํ›„์— Confluent ์ชฝ ์กฐ์–ธ์„ ๋”ฐ๋ผ Distributed ๋ชจ๋“œ๋กœ ์ „ํ™˜ ํ•˜์˜€์Šต๋‹ˆ๋‹ค. ๋น„๋กฏ ์ถ”๊ฐ€ ์ž‘์—…์ด ์žˆ์—ˆ์ง€๋งŒ, Kafka Connector๋ฅผ ์ฒ˜์Œ ๋””ํ”Œ๋กœ์ด ํ•ด๋ณด๋Š” ์ƒํ™ฉ์ด๋ผ๋ฉด Standalone ๋ชจ๋“œ๋ถ€ํ„ฐ ์‹œ์ž‘ํ•ด๋ณด๊ธธ ์ถ”์ฒœ๋“œ๋ฆฝ๋‹ˆ๋‹ค ๐Ÿ˜Š

File Sink Connector

File Sink Connector๋Š” Kafka Connect์—์„œ ๊ธฐ๋ณธ์œผ๋กœ ์ œ๊ณต๋˜๋Š” Connector ์ž…๋‹ˆ๋‹ค. ๊ทธ๋ž˜์„œ plugin ์„ค์น˜ ๊ณผ์ •์ด ๋”ฐ๋กœ ํ•„์š” ์—†์Šต๋‹ˆ๋‹ค.

์ฒ˜์Œ์—” ์–ด๋–ค Docker Image๋ฅผ ์‚ฌ์šฉํ• ์ง€ ์ •ํ•ด์•ผ ํ–ˆ๋Š”๋ฐ์š”. ์ €๋Š” Confluent์—์„œ ๊ด€๋ฆฌํ•˜๋Š” Connector base ์ด๋ฏธ์ง€์ธ confluentinc/cp-kafka-connect:7.7.1.amd64๋ฅผ ์‚ฌ์šฉํ•˜์˜€์Šต๋‹ˆ๋‹ค.

๊ทธ๋ฆฌ๊ณ  ์•„๋ž˜์™€ ๊ฐ™์ด K8s pod yaml์„ ๊ตฌ์„ฑํ•˜์—ฌ ํ…Œ์ŠคํŠธ๋ฅผ ์ง„ํ–‰ํ•˜์˜€์Šต๋‹ˆ๋‹ค.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: file-sink-connector
spec:
  selector:
    matchLabels:
      app: file-sink-connector
  template:
    metadata:
      labels:
        app: file-sink-connector
    spec:
      containers:
      - name: file-sink-connector
        image: confluentinc/cp-kafka-connect:7.7.1.amd64
        command: ["sleep", "infinity"]
        env:
          - name: CLASSPATH
            value: /usr/share/filestream-connectors/*
        resources:
          limits:
            cpu: 1000m
            memory: 2048Mi
          requests:
            cpu: 500m
            memory: 1024Mi
        ports:
        - containerPort: 8083

์ฒ˜์Œ์— pod์„ ๋„์›Œ์„œ kafka connect์—์„œ ์“ธ ํŒŒ์ผ๋“ค์ด ์–ด๋””์— ์žˆ๋Š”์ง€ ํŒŒ์•…ํ•˜๊ธฐ ์œ„ํ•ด์„œ pod command๋ฅผ sleep infinity๋กœ ์ฃผ๊ณ  ์‹œ์ž‘ ํ–ˆ์Šต๋‹ˆ๋‹ค.

์ž‘์—…์„ ํ•˜๋‹ค๋ณด๋‹ˆ CPU/Mem ๋ฆฌ์†Œ์Šค๊ฐ€ ๋ถ€์กฑํ•ด Connector๊ฐ€ ์•ˆ ๋œจ๋Š” ๊ฒฝ์šฐ๊ฐ€ ์žˆ์–ด์„œ ๋ฆฌ์†Œ์Šค๋Š” ๋„‰๋„‰ํ•˜๊ฒŒ ํ• ๋‹น ํ–ˆ์Šต๋‹ˆ๋‹ค. ์ผ€์ด์Šค์— ๋”ฐ๋ผ ์ด๊ฒƒ๋ณด๋‹ค ์ค„์ด์…”๋„ ๋ฉ๋‹ˆ๋‹ค. ๐Ÿ’ฐ

๋ฌดํ•œ ๋Œ€๊ธฐ ์ค‘์ธ k8s pod์— ์ ‘์†ํ•ด ์‚ดํŽด๋ณด๋ฉด ์•„๋ž˜ ๋‘ ๊ฒฝ๋กœ์— ์ค‘์š”ํ•œ ํŒŒ์ผ๊ณผ ์Šคํฌ๋ฆฝํŠธ๊ฐ€ ์žˆ๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค

  • /etc/kafka/
    • ๊ฐ์ข… default properties ํŒŒ์ผ์ด ๋ชจ์—ฌ ์žˆ์Šต๋‹ˆ๋‹ค.
    • connect-standalone.properties, connect-distributed.properties, โ€ฆ
  • /usr/bin/
    • ๊ฐ์ข… binary script๋“ค์ด ๋ชจ์—ฌ ์žˆ์Šต๋‹ˆ๋‹ค.
    • Connector๋ฅผ ์‹คํ–‰ํ•˜๋Š” connect-standalone, connect-distributed ๋ช…๋ น์–ด๋„ ์ด๊ณณ์— ์œ„์น˜ ํ•ฉ๋‹ˆ๋‹ค.

Configure Properties

์ด์ œ Connector์—์„œ Confluent์— ์ ‘์†ํ•˜๊ธฐ ์œ„ํ•œ Property๋ฅผ ๊ตฌ์„ฑํ•ด๋ด…์‹œ๋‹ค.

Standalone Properties

Standalone Connector๊ฐ€ ๋™์ž‘ํ•˜๊ณ , Confluent ํ”Œ๋žซํผ๊ณผ ํ†ต์‹ ํ•  ์ˆ˜ ์žˆ๋„๋ก Properties๋ฅผ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค. ์†์„ฑ ๋ณ„๋กœ ์ž์„ธํ•œ ๋‚ด์šฉ์€ Confluent ๋ฌธ์„œ๋ฅผ ์ฐธ๊ณ ํ•˜์‹œ๊ธธ ๋ฐ”๋ž๋‹ˆ๋‹ค.

# @./standalone.properties
bootstrap.servers=xxxx.aws.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx";
request.timeout.ms=20000
retry.backoff.ms=500

consumer.bootstrap.servers=xxxx.aws.confluent.cloud:9092
consumer.ssl.endpoint.identification.algorithm=https
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx";
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
consumer.auto.offset.reset=earliest

offset.flush.interval.ms=10000
offset.storage.file.filename=/tmp/connect.offsets

# Required connection configs for Confluent Cloud Schema Registry
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=xxxx:xxxx
value.converter.schema.registry.url=https://xxxx.aws.confluent.cloud

Schema Registry ๋ถ€๋ถ„์€ ๋ณธ์ธ์ด Subscribe ํ•  ํ† ํ”ฝ์˜ key/value์— ๋งž์ถฐ ์„ค์ •ํ•˜๋„๋ก ํ•ฉ๋‹ˆ๋‹ค. ์ €๋Š” Avro value๋กœ ๊ฐ’์ด ๋‹ด๊ธฐ๊ณ  ์žˆ์–ด AvroConverter๋ฅผ ์‚ฌ์šฉํ–ˆ์Šต๋‹ˆ๋‹ค.

์ž‘์„ฑํ•œ standalone.properties ํŒŒ์ผ์„ Pod์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก ์•„๋ž˜ ์ปค๋งจ๋“œ๋ฅผ ํ†ตํ•ด K8s Secret์œผ๋กœ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

$ kubectl create secret generic file-sink-standalone-properties --from-file=standalone.properties

Kafka Connect Configuration ๋ฌธ์„œ๋ฅผ ์‚ดํŽด๋ณด๋ฉด, .properties ํŒŒ์ผ์ด ์•„๋‹ˆ๋ผ ํ™˜๊ฒฝ ๋ณ€์ˆ˜(ENV)๋ฅผ ํ†ตํ•ด ์„ธํŒ…ํ•˜๋Š” ๋ถ€๋ถ„์ด ๋‚˜์˜ต๋‹ˆ๋‹ค. ๋ฌธ์„œ์— ๋”ฐ๋ฅด๋ฉด, CONNECT_๋ผ๋Š” ์ ‘๋‘์‚ฌ(prefix)๋ฅผ ๋ถ™์—ฌ Connector Configuration์ด ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค.

์ €๋„ ์ฒ˜์Œ์—” ENV๋ฅผ ํ†ตํ•ด ์„ธํŒ…ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์‚ฌ์šฉ ํ–ˆ์Šต๋‹ˆ๋‹ค๋งŒ, ์ž‘์—…์„ ํ•˜๋‹ค๋ณด๋‹ˆ ๋ณ„๋„ ํŒŒ์ผ๋กœ ๋ถ„๋ฆฌํ•˜๋Š” ๊ฒƒ์ด ๋” ํŽธ๋ฆฌํ•˜๊ณ , ๋˜ ์š”๊ฒŒ Standalone Properties๋งŒ CONNECT_ ENV๋กœ ๋„ฃ์„ ์ˆ˜ ์žˆ๊ณ , Connector Plugin์˜ Properties๋Š” ๋”ฐ๋กœ .properties ํŒŒ์ผ ํ˜•ํƒœ๋กœ ์กด์žฌํ•ด์•ผ ํ–ˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋ž˜์„œ ํ˜•์‹์„ ํ†ต์ผํ•˜๊ณ ์ž Properties ์ •์˜๋ฅผ ๋ชจ๋‘ xxxx.properties๋กœ ํ†ต์ผํ•˜์˜€์Šต๋‹ˆ๋‹ค ๐Ÿ™

File Sink Properties

์ด์–ด์„œ File Sink Plugin์„ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•œ Properties๋ฅผ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค. ์†์„ฑ ๋ณ„๋กœ ์ž์„ธํ•œ ๋‚ด์šฉ์€ Confluent ๋ฌธ์„œ๋ฅผ ์ฐธ๊ณ ํ•˜์‹œ๊ธธ ๋ฐ”๋ž๋‹ˆ๋‹ค.

file ์†์„ฑ์— ์ •์˜ํ•œ ๊ฒฝ๋กœ์— Topic ๋ฐ์ดํ„ฐ๊ฐ€ ํ•œ์ค„์”ฉ ์Œ“์ด๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

# @./local-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=YOUR_TOPIC

๊ทธ๋ฆฌ๊ณ  ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ K8s Secret์œผ๋กœ ๋งŒ๋“ค์–ด Pod์—์„œ ์ด ํŒŒ์ผ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•ฉ๋‹ˆ๋‹ค.

$ kubectl create secret generic local-file-sink-properties --from-file=local-file-sink.properties

Deploy Connector

์ด์ œ ์•„๋ž˜์˜ yaml ํŒŒ์ผ์„ ์‚ฌ์šฉํ•ด File Sink Connector๋ฅผ ๋””ํ”Œ๋กœ์ด ํ•ฉ๋‹ˆ๋‹ค.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: file-sink-connector
spec:
  selector:
    matchLabels:
      app: file-sink-connector
  template:
    metadata:
      labels:
        app: file-sink-connector
    spec:
      containers:
      - name: file-sink-connector
        image: confluentinc/cp-kafka-connect:7.7.1.amd64
        command: ["connect-standalone", "/etc/kafka-connect-properties/standalone/standalone.properties", "/etc/kafka-connect-properties/file-sink-connector/local-file-sink.properties"]
        env:
          - name: CLASSPATH
            value: /usr/share/filestream-connectors/*
        resources:
          limits:
            cpu: 1000m
            memory: 2048Mi
          requests:
            cpu: 500m
            memory: 1024Mi
        ports:
        - containerPort: 8083
        volumeMounts:
          - name: local-file-sink-properties
            mountPath: /etc/kafka-connect-properties/file-sink-connector/
            readOnly: true
          - name: file-sink-standalone-properties
            mountPath: /etc/kafka-connect-properties/standalone/
            readOnly: true
      volumes:
        - name: local-file-sink-properties
          configMap:
            name: local-file-sink-properties
        - name: file-sink-standalone-properties
          configMap:
            name: file-sink-standalone-properties

๋งบ์Œ๋ง

File Sink ํ”Œ๋Ÿฌ๊ทธ์ธ์ด ์•„๋‹Œ ๋‹ค๋ฅธ ํ”Œ๋Ÿฌ๊ทธ์ธ์„ ์‚ฌ์šฉํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด, ์œ„์˜ ํ‹€์—์„œ Connector Plugin์— ๋Œ€ํ•œ ๋ถ€๋ถ„์„ ํ˜•์‹์— ๋งž๊ฒŒ ์ˆ˜์ •ํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ์œผ๋กœ๋Š” ํ™•์žฅ์„ฑ๊ณผ ์•ˆ์ •์„ฑ์„ ๊ฐ–์ถ˜ Distributed ๋ชจ๋“œ๋กœ Connector๋ฅผ ๋„์›Œ๋ด…์‹œ๋‹ค! ๐Ÿ‘Š

Categories:

Updated: