Kafka Shell Script ๋๋ฌ๋ณด๊ธฐ
๋ค์ด๊ฐ๋ฉฐ
ํ์ฌ์์ Confluent๋ฅผ ํตํด Kafka ํด๋ฌ์คํฐ๋ฅผ ์ ์ฌ์ฉํ๊ณ ์์์ต๋๋ค. ๊ทธ๋ฐ๋ฐ, ์ด ์นดํ์นด์ ๋ํด์ ์ข๋ ์์ธํ ์๊ณ , ์ ๋ฌธ์ฑ์ ๊ฐ์ถ๊ณ ์ถ๋ค๋ ์๊ฐ์ด ๋ค์ด์ 2025๋ ๋ชฉํ๋ก Confluent์ Kafka ์๊ฒฉ์ฆ์ธ Confluent Certified Developer for Apache Kafkaยฎ ์๊ฒฉ์ฆ์ ์ค๋นํ๊ณ ์์ต๋๋ค โ๏ธ ์ ์ฒด ํฌ์คํธ๋ ์ฌ๊ธฐ์์ ํ์ธํ ์ ์์ต๋๋ค.
์ด๋ฒ ํฌ์คํธ๋ ์์ฑ ์์ ๊ธฐ์ค ์ต์ ๋ฒ์ ์ธ Kafka 3.9์์ ์ ๊ณตํ๋ ๋ชจ๋ Kafka Shell Script์ ๊ธฐ๋ฅ์ ํ๋ฒ์ฉ ์ดํด๋ณด๋ ๊ฑธ ๋ชฉํ๋ก ๊ธฐ๋กํ ํฌ์คํธ ์ ๋๋ค.
With Authentication
๋๋ถ๋ถ ๋ก์ปฌ K8s์ ๋ํ๋ก์ด ํ Kafka ํด๋ฌ์คํฐ์ ์คํํด๋ณด์๊ณ , ์ผ๋ถ ๊ฒฝ์ฐ๋ Confluent Cluster์์ ์คํํด๋ณด์์ต๋๋ค.
๋ก์ปฌ์ ๋ํ๋ก์ด ํ ์นดํ์นด ํด๋ฌ์คํฐ์์๋ ํธํ๊ฒ ์คํํด๋ณด๊ธฐ ์ํด์ Authentication ๊ณผ์ ์ ์์ด์ต๋๋ค! ๋ง์ฝ, SASL ์ธ์ฆ์ ํด์ผ ํ๋ค๋ฉด, Shell ์คํ ๋ ์๋์ ๊ฐ์ด ์ถ๊ฐ๋ก ์ค์ ํด์ค์๋ค.
$ kafka-xxxx.sh \
--bootstrap-servers xxxx:9092 \
--command-config /tmp/xxxx.properties
Kafka Shell์ ๋ก์ปฌ์์ ์คํํ ์๋ ์์ ๊ฒ์ด๊ณ , Kafka Shell์ ์คํํ ์ ์๋ ์ปจํ
์ด๋์ kubectl exec
๋ก ์ ์ํด ์คํํ ์๋ ์์ต๋๋ค. ์ ๊ฐ CCDAK ์๊ฒฉ์ฆ์ ์ค๋นํ๋ฉด์ ๋๋ ์ ์ โ์ปจํ
์ด๋๋ก ๊ฒฉ๋ฆฌ๋ ํ๊ฒฝ์์ Kafka Shell์ ์คํํ๋๊ฒ ๊ฐ์ฅ ์์ ํ๋คโ๋ ๊ฒ ์
๋๋ค. ๋งฅ๋ถ ๋ก์ปฌ์ Kafka Shell์์๋ Exception์ ๋ฟ์ผ๋ฉฐ ์ ๋๋ก ๋์ํ์ง ์๋ ๊ฒ์ด Kafka Broker pod์์๋ ์ ๋๋ก ๋์ํ๋ ๊ฒฝ์ฐ๊ฐ ์ข
์ข
์์์ต๋๋ค!
์ ๋ฌธ
kafka-topics.sh
- โKafka Shell:
kafka-topics.sh
โ ํฌ์คํธ๋ก ๋ถ๋ฆฌ
- โKafka Shell:
kafka-configs.sh
- ์ด๋ฏธ ์์ฑํ ์นดํ์นด ํ ํฝ์ Config๋ฅผ ๋ณ๊ฒฝํ ๋ ์ฌ์ฉํจ.
--alter
๋ฅผ ์ฌ์ฉ. - โKafka Shell:
kafka-topics.sh
โ ํฌ์คํธ์ ํจ๊ป ๊ธฐ์ ํจ.
- ์ด๋ฏธ ์์ฑํ ์นดํ์นด ํ ํฝ์ Config๋ฅผ ๋ณ๊ฒฝํ ๋ ์ฌ์ฉํจ.
kafka-console-producer.sh
- โKafka Shell: Console Produce/Consumeโ ํฌ์คํธ๋ก ๋ถ๋ฆฌ
kafka-console-consumer.sh
- โKafka Shell: Console Produce/Consumeโ ํฌ์คํธ๋ก ๋ถ๋ฆฌ
kafka-consumer-groups.sh
--list
- ์ปจ์๋จธ ๊ทธ๋ฃน ๋ฆฌ์คํ
--describe --group xxxx
- ์ปจ์๋จธ ๊ทธ๋ฃน ์์ธ ๋ณด๊ธฐ
--execute --to-latest/--to-earlist/...
- ์ปจ์๋จธ ๊ทธ๋ฃน์ offset์ ์ด๊ธฐํ ํ๊ฑฐ๋ ์ต์ ์ผ๋ก ์ด๋ ์ํค๋ ค๊ณ ํ ๋
Performance Test
์นดํ์นด ํด๋ฌ์คํฐ์ ํผํฌ๋จผ์ค๋ฅผ ์ธก์ ํ๊ธฐ ์ํ ๋๊ตฌ ์ ๋๋ค.
$ kafka-producer-perf-test.sh \
--producer-props bootstrap.servers=bitnami-kafka:9092 \
--topic test \
--num-records 100 \
--throughput 10 \
--record-size 100
52 records sent, 10.3 records/sec (0.00 MB/sec), 8.3 ms avg latency, 316.0 ms max latency.
100 records sent, 10.027073 records/sec (0.00 MB/sec), 4.84 ms avg latency, 316.00 ms max latency, 1 ms 50th, 13 ms 95th, 316 ms 99th, 316 ms 99.9th.
$ kafka-consumer-perf-test.sh \
--bootstrap-server bitnami-kafka:9092 \
--topic test \
--messages 10
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2025-01-15 09:26:16:817, 2025-01-15 09:26:20:246, 0.0096, 0.0028, 105, 30.6212, 3411, 18, 0.5312, 5833.3333
Reassign Partitions
ํ ํฝ์ ์ด๋ฃจ๋ ๋ฆฌ๋ ํํฐ์
์ด ํ๋์ ๋ธ๋ก์ปค์ ์ฌ๋ฆฌ๋ ํซ์คํ(Hot spot) ํ์์ ๋ฐฉ์งํ๊ธฐ ์ํ ๋๊ตฌ ์
๋๋ค. kafka-reassign-partitions.sh
๋ช
๋ น์ด๋ฅผ ์ฌ์ฉํ๋ฉด, ๋ฆฌ๋ ํํฐ์
๊ณผ ํ๋ก์ ํํฐ์
์์น๋ฅผ ์ฌ์ค์ ํฉ๋๋ค.
๋จผ์ ์ฌ๋ถ๋ฐฐ ์์ ์ ์ํด ์ ๋นํ ํํฐ์ ๊ณผ Replication Factor๋ฅผ ๊ฐ์ง ํ ํฝ์ ํ๋ ์์ฑํฉ๋๋ค.
$ kafka-topics.sh \
--bootstrap-server bitnami-kafka:9092 \
--topic test2 \
--partitions 3 \
--replication-factor 3 \
--create
$ kafka-topics.sh \
--bootstrap-server bitnami-kafka:9092 \
--topic test2 \
--describe
Topic: test2 TopicId: a3vf0OqaSN6sqjtAVfrOhw PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test2 Partition: 0 Leader: 101 Replicas: 101,100,102 Isr: 101,100,102 Elr: N/A LastKnownElr: N/A
Topic: test2 Partition: 1 Leader: 100 Replicas: 100,102,101 Isr: 100,102,101 Elr: N/A LastKnownElr: N/A
Topic: test2 Partition: 2 Leader: 102 Replicas: 102,101,100 Isr: 102,101,100 Elr: N/A LastKnownElr: N/A
์ด๊ฒ ํ์ฌ ๋ฆฌ๋ ํํฐ์
์ด ๊ฐ ๋ธ๋ก์ปค์ ๊ณ ๋ฅด๊ฒ ๋ถ์ฐ ๋์ด ์์ต๋๋ค. ์ด๋ฅผ 102
๋ฒ ๋ธ๋ก์ปค์ ์ง์ค์์ผ ๊ฐ์ ๋ก ํซ์คํ์ผ๋ก ๋ง๋ค์ด๋ด
์๋ค!
์ผ๋จ ๋ช
๋ น์ด๋ฅผ ์คํํ๊ธฐ ์ํด์ --reassignment-json-file
์ ์ ๋ฌํ json ํ์ผ์ด ํ์ํฉ๋๋ค.
$ cat <<EOF > /tmp/partitions.json
{
"partitions":
[
{ "topic": "test2", "partition": 0, "replicas": [102,100,101] }
],
"version": 1
}
EOF
$ kafka-reassign-partitions.sh \
--bootstrap-server bitnami-kafka:9092 \
--reassignment-json-file /tmp/partitions.json \
--execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test2","partition":0,"replicas":[100,102,101],"log_dirs":["any","any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignment for test2-0
$ kafka-reassign-partitions.sh \
--bootstrap-server bitnami-kafka:9092 \
--reassignment-json-file /tmp/partitions.json \
--verify
Status of partition reassignment:
Reassignment of partition test2-0 is completed.
Clearing broker-level throttles on brokers 100,101,102
Clearing topic-level throttles on topic test2
๊ทธ๋ฆฌ๊ณ ๋ค์ --describe
๋ฅผ ํด๋ณด๋ฉด
$ kafka-topics.sh \
--bootstrap-server bitnami-kafka:9092 \
--topic test2 \
--describe
Topic: test2 TopicId: a3vf0OqaSN6sqjtAVfrOhw PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test2 Partition: 0 Leader: 101 Replicas: 101,102,100 Isr: 101,100,102 Elr: N/A LastKnownElr: N/A
Topic: test2 Partition: 1 Leader: 100 Replicas: 100,101,102 Isr: 100,102,101 Elr: N/A LastKnownElr: N/A
Topic: test2 Partition: 2 Leader: 102 Replicas: 102,101,100 Isr: 102,101,100 Elr: N/A LastKnownElr: N/A
์์ง ๋ฐ์์ด ์ ๋์๋ค. ๋ฆฌ๋ ์ฌ์ ์ถ์ ์ํด์๋ kafka-leader-election.sh
๋ช
๋ น์ด๋ฅผ ์ถ๊ฐ๋ก ์คํํด์ผ ํฉ๋๋ค.
$ kafka-leader-election.sh \
--bootstrap-server bitnami-kafka:9092 \
--topic test2 \
--partition 0 \
--election-type preferred
๊ทธ๋ฆฌ๊ณ ๋ค์ --describe
๋ฅผ ํด๋ณด๋ฉด
$ kafka-topics.sh \
--bootstrap-server bitnami-kafka:9092 \
--topic test2 \
--describe
Topic: test2 TopicId: a3vf0OqaSN6sqjtAVfrOhw PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test2 Partition: 0 Leader: 102 Replicas: 102,100,101 Isr: 101,100,102 Elr: N/A LastKnownElr: N/A
Topic: test2 Partition: 1 Leader: 100 Replicas: 101,100,102 Isr: 100,102,101 Elr: N/A LastKnownElr: N/A
Topic: test2 Partition: 2 Leader: 102 Replicas: 102,101,100 Isr: 102,101,100 Elr: N/A LastKnownElr: N/A
0
๋ฒ ํํฐ์
์ด ๋ธ๋ก์ปค 102
๋ฒ์ผ๋ก ์ฎ๊ฒจ์ก๋ค!! ใ
ใ
๊ฐ์ ๋ก ํซ์คํ์ผ๋ก ๋ง๋ค์ด์ก์ต๋๋ค ใ
ใ
๊ทธ๋ฌ๋ ์ด๋ฐ ํํฐ์ ์ฌ์ค์ ์ ์นดํ์นด ๋ด๋ถ์ ๋ง์ ์์ ์ ์๊ตฌํ๊ธฐํด ํด๋ฌ์คํฐ์ ๋ถํ๊ฐ ์ฌ ์ ์๊ณ , ๋ ํด๋น ํ ํฝ์ ๊ตฌ๋ ํ๋ ์ปจ์๋จธ์๊ฒ๋ ๋ฆฌ๋ฐธ๋ฐ์ฑ์ ํธ๋ฆฌ๊ฑฐ ํ๊ฒ ๋ฉ๋๋ค. ๊ทธ๋์ ์ด์ ํ๊ฒฝ์์๋ ์ฌ์ค์ ์ ์์ฃผ ์กฐ์ฌํ ์งํํด์ผ ํ๋ฉฐ, ๋ง์ฝ ์ฅ์ ๊ฐ ์์๋๋ค๋ฉด ํํฐ์ ์ ์ ๊ท ์์ฑํด ํด๊ฒฐํ๋ ๊ฒ์ด ๋ฐฉ๋ฒ์ผ ์๋ ์์ต๋๋ค.
Kafka Delete record
Kafka Dump Logs
Kafka Cluster
kafka-broker-api-versions.sh
๋ก ์นดํ์นด ํด๋ฌ์คํฐ๋ฅผ ๊ตฌ์ฑํ๋ ๋ธ๋ก์ปค ๋ชฉ๋ก๊ณผ ๊ฐ ๋ธ๋ก์ปค๊ฐ ์ง์ํ๋ API ๋ฒ์ ์ ํ์ธํ ์ ์์ต๋๋ค.
$ kafka-broker-api-versions.sh \
--bootstrap-server bitnami-kafka:9092
bitnami-kafka-broker-1.bitnami-kafka-broker-headless.kafka.svc.cluster.local:9092 (id: 101 rack: null) -> (
Produce(0): 0 to 11 [usable: 11],
Fetch(1): 0 to 17 [usable: 17],
ListOffsets(2): 0 to 9 [usable: 9],
Metadata(3): 0 to 12 [usable: 12],
LeaderAndIsr(4): 0 to 7 [usable: 7],
StopReplica(5): 0 to 4 [usable: 4],
UpdateMetadata(6): 0 to 8 [usable: 8],
...
)
...
๋ธ๋ก์ปค์ ๊ฐ API๋ ๊ณ ์ ํ API Key ๊ฐ๊ณผ Versioning์ด ๋๋ค๊ณ ํฉ๋๋ค. ํ์ ๋ธ๋ก์ปค์์ ์ง์ํ๋ API ๋ฒ์ ์ ๋ฒ์๋ฅผ ํ์ธํ ์ ์์ต๋๋ค.
Connectors
connect-distributed.sh
connect-standalone.sh
๊ณ ๊ธ
kafka-e2e-latency.sh
kafka-mirror-maker.sh
kafka-streams-application-reset.sh
zookeeper-server-start.sh
connect-mirror-maker.sh
kafka-features.sh
kafka-producer-perf-test.sh
zookeeper-server-stop.sh
connect-plugin-path.sh
kafka-get-offsets.sh
kafka-reassign-partitions.sh
kafka-transactions.sh
zookeeper-shell.sh
kafka-jmx.sh
kafka-replica-verification.sh
kafka-verifiable-consumer.sh
kafka-acls.sh
kafka-consumer-perf-test.sh
kafka-leader-election.sh
kafka-run-class.sh
kafka-verifiable-producer.sh
kafka-broker-api-versions.sh
kafka-delegation-tokens.sh
kafka-log-dirs.sh
kafka-server-start.sh
trogdor.sh
kafka-client-metrics.sh
kafka-delete-records.sh
kafka-metadata-quorum.sh
kafka-server-stop.sh
kafka-dump-log.sh
kafka-metadata-shell.sh
kafka-storage.sh
zookeeper-security-migration.sh
kafka-cluster.sh