Kafka Shell Script ๋๋ฌ๋ณด๊ธฐ
๋ค์ด๊ฐ๋ฉฐ
ํ์ฌ์์ Confluent๋ฅผ ํตํด Kafka ํด๋ฌ์คํฐ๋ฅผ ์ ์ฌ์ฉํ๊ณ ์์์ต๋๋ค. ๊ทธ๋ฐ๋ฐ, ์ด ์นดํ์นด์ ๋ํด์ ์ข๋ ์์ธํ ์๊ณ , ์ ๋ฌธ์ฑ์ ๊ฐ์ถ๊ณ ์ถ๋ค๋ ์๊ฐ์ด ๋ค์ด์ 2025๋ ์ฒซ ๋ชฉํ๋ก Confluent์ Kafka ์๊ฒฉ์ฆ์ธ Confluent Certified Developer for Apache Kafkaยฎ ์๊ฒฉ์ฆ์ ์ค๋นํ๊ณ ์์ต๋๋ค โ๏ธ
์ด๋ฒ ํฌ์คํธ๋ ์์ฑ ์์ ๊ธฐ์ค ์ต์ ๋ฒ์ ์ธ Kafka 3.9์์ ์ ๊ณตํ๋ ๋ชจ๋ Kafka Shell Script์ ๊ธฐ๋ฅ์ ํ๋ฒ์ฉ ์ดํด๋ณด๋ ๊ฑธ ๋ชฉํ๋ก ๊ธฐ๋กํ ํฌ์คํธ ์ ๋๋ค.
With Authentication
๋๋ถ๋ถ ๋ก์ปฌ K8s์ ๋ํ๋ก์ด ํ Kafka ํด๋ฌ์คํฐ์ ์คํํด๋ณด์๊ณ , ์ผ๋ถ ๊ฒฝ์ฐ๋ Confluent Cluster์์ ์คํํด๋ณด์์ต๋๋ค.
๋ก์ปฌ์ ๋ํ๋ก์ด ํ ์นดํ์นด ํด๋ฌ์คํฐ์์๋ ํธํ๊ฒ ์คํํด๋ณด๊ธฐ ์ํด์ Authentication ๊ณผ์ ์ ์์ด์ต๋๋ค! ๋ง์ฝ, SASL ์ธ์ฆ์ ํด์ผ ํ๋ค๋ฉด, Shell ์คํ ๋ ์๋์ ๊ฐ์ด ์ถ๊ฐ๋ก ์ค์ ํด์ค์๋ค.
$ kafka-xxxx.sh \
--bootstrap-servers xxxx:9092 \
--command-config /tmp/xxxx.properties
Kafka Shell์ ๋ก์ปฌ์์ ์คํํ ์๋ ์์ ๊ฒ์ด๊ณ , Kafka Shell์ ์คํํ ์ ์๋ ์ปจํ
์ด๋์ kubectl exec
๋ก ์ ์ํด ์คํํ ์๋ ์์ต๋๋ค. ์ ๊ฐ CCDAK ์๊ฒฉ์ฆ์ ์ค๋นํ๋ฉด์ ๋๋ ์ ์ โ์ปจํ
์ด๋๋ก ๊ฒฉ๋ฆฌ๋ ํ๊ฒฝ์์ Kafka Shell์ ์คํํ๋๊ฒ ๊ฐ์ฅ ์์ ํ๋คโ๋ ๊ฒ ์
๋๋ค. ๋งฅ๋ถ ๋ก์ปฌ์ Kafka Shell์์๋ Exception์ ๋ฟ์ผ๋ฉฐ ์ ๋๋ก ๋์ํ์ง ์๋ ๊ฒ์ด Kafka Broker pod์์๋ ์ ๋๋ก ๋์ํ๋ ๊ฒฝ์ฐ๊ฐ ์ข
์ข
์์์ต๋๋ค!
์ ๋ฌธ
kafka-topics.sh
- โKafka Shell:
kafka-topics.sh
โ ํฌ์คํธ๋ก ๋ถ๋ฆฌ
- โKafka Shell:
kafka-configs.sh
- ์ด๋ฏธ ์์ฑํ ์นดํ์นด ํ ํฝ์ Config๋ฅผ ๋ณ๊ฒฝํ ๋ ์ฌ์ฉํจ.
--alter
๋ฅผ ์ฌ์ฉ. - โKafka Shell:
kafka-topics.sh
โ ํฌ์คํธ์ ํจ๊ป ๊ธฐ์ ํจ.
- ์ด๋ฏธ ์์ฑํ ์นดํ์นด ํ ํฝ์ Config๋ฅผ ๋ณ๊ฒฝํ ๋ ์ฌ์ฉํจ.
kafka-console-producer.sh
- โKafka Shell: Console Produce/Consumeโ ํฌ์คํธ๋ก ๋ถ๋ฆฌ
kafka-console-consumer.sh
- โKafka Shell: Console Produce/Consumeโ ํฌ์คํธ๋ก ๋ถ๋ฆฌ
kafka-consumer-groups.sh
--list
- ์ปจ์๋จธ ๊ทธ๋ฃน ๋ฆฌ์คํ
--describe --group xxxx
- ์ปจ์๋จธ ๊ทธ๋ฃน ์์ธ ๋ณด๊ธฐ
--execute --to-latest/--to-earlist/...
- ์ปจ์๋จธ ๊ทธ๋ฃน์ offset์ ์ด๊ธฐํ ํ๊ฑฐ๋ ์ต์ ์ผ๋ก ์ด๋ ์ํค๋ ค๊ณ ํ ๋
Performance Test
์นดํ์นด ํด๋ฌ์คํฐ์ ํผํฌ๋จผ์ค๋ฅผ ์ธก์ ํ๊ธฐ ์ํ ๋๊ตฌ ์ ๋๋ค.
$ kafka-producer-perf-test.sh \
--producer-props bootstrap.servers=bitnami-kafka:9092 \
--topic test \
--num-records 100 \
--throughput 10 \
--record-size 100
52 records sent, 10.3 records/sec (0.00 MB/sec), 8.3 ms avg latency, 316.0 ms max latency.
100 records sent, 10.027073 records/sec (0.00 MB/sec), 4.84 ms avg latency, 316.00 ms max latency, 1 ms 50th, 13 ms 95th, 316 ms 99th, 316 ms 99.9th.
$ kafka-consumer-perf-test.sh \
--bootstrap-server bitnami-kafka:9092 \
--topic test \
--messages 10
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2025-01-15 09:26:16:817, 2025-01-15 09:26:20:246, 0.0096, 0.0028, 105, 30.6212, 3411, 18, 0.5312, 5833.3333
Reassign Partitions
ํ ํฝ์ ์ด๋ฃจ๋ ๋ฆฌ๋ ํํฐ์
์ด ํ๋์ ๋ธ๋ก์ปค์ ์ฌ๋ฆฌ๋ ํซ์คํ(Hot spot) ํ์์ ๋ฐฉ์งํ๊ธฐ ์ํ ๋๊ตฌ ์
๋๋ค. kafka-reassign-partitions.sh
๋ช
๋ น์ด๋ฅผ ์ฌ์ฉํ๋ฉด, ๋ฆฌ๋ ํํฐ์
๊ณผ ํ๋ก์ ํํฐ์
์์น๋ฅผ ์ฌ์ค์ ํฉ๋๋ค.
๋จผ์ ์ฌ๋ถ๋ฐฐ ์์ ์ ์ํด ์ ๋นํ ํํฐ์ ๊ณผ Replication Factor๋ฅผ ๊ฐ์ง ํ ํฝ์ ํ๋ ์์ฑํฉ๋๋ค.
$ kafka-topics.sh \
--bootstrap-server bitnami-kafka:9092 \
--topic test2 \
--partitions 3 \
--replication-factor 3 \
--create
$ kafka-topics.sh \
--bootstrap-server bitnami-kafka:9092 \
--topic test2 \
--describe
Topic: test2 TopicId: a3vf0OqaSN6sqjtAVfrOhw PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test2 Partition: 0 Leader: 101 Replicas: 101,100,102 Isr: 101,100,102 Elr: N/A LastKnownElr: N/A
Topic: test2 Partition: 1 Leader: 100 Replicas: 100,102,101 Isr: 100,102,101 Elr: N/A LastKnownElr: N/A
Topic: test2 Partition: 2 Leader: 102 Replicas: 102,101,100 Isr: 102,101,100 Elr: N/A LastKnownElr: N/A
์ด๊ฒ ํ์ฌ ๋ฆฌ๋ ํํฐ์
์ด ๊ฐ ๋ธ๋ก์ปค์ ๊ณ ๋ฅด๊ฒ ๋ถ์ฐ ๋์ด ์์ต๋๋ค. ์ด๋ฅผ 102
๋ฒ ๋ธ๋ก์ปค์ ์ง์ค์์ผ ๊ฐ์ ๋ก ํซ์คํ์ผ๋ก ๋ง๋ค์ด๋ด
์๋ค!
์ผ๋จ ๋ช
๋ น์ด๋ฅผ ์คํํ๊ธฐ ์ํด์ --reassignment-json-file
์ ์ ๋ฌํ json ํ์ผ์ด ํ์ํฉ๋๋ค.
$ cat <<EOF > /tmp/partitions.json
{
"partitions":
[
{ "topic": "test2", "partition": 0, "replicas": [102,100,101] }
],
"version": 1
}
EOF
$ kafka-reassign-partitions.sh \
--bootstrap-server bitnami-kafka:9092 \
--reassignment-json-file /tmp/partitions.json \
--execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test2","partition":0,"replicas":[100,102,101],"log_dirs":["any","any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignment for test2-0
$ kafka-reassign-partitions.sh \
--bootstrap-server bitnami-kafka:9092 \
--reassignment-json-file /tmp/partitions.json \
--verify
Status of partition reassignment:
Reassignment of partition test2-0 is completed.
Clearing broker-level throttles on brokers 100,101,102
Clearing topic-level throttles on topic test2
๊ทธ๋ฆฌ๊ณ ๋ค์ --describe
๋ฅผ ํด๋ณด๋ฉด
$ kafka-topics.sh \
--bootstrap-server bitnami-kafka:9092 \
--topic test2 \
--describe
Topic: test2 TopicId: a3vf0OqaSN6sqjtAVfrOhw PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test2 Partition: 0 Leader: 101 Replicas: 101,102,100 Isr: 101,100,102 Elr: N/A LastKnownElr: N/A
Topic: test2 Partition: 1 Leader: 100 Replicas: 100,101,102 Isr: 100,102,101 Elr: N/A LastKnownElr: N/A
Topic: test2 Partition: 2 Leader: 102 Replicas: 102,101,100 Isr: 102,101,100 Elr: N/A LastKnownElr: N/A
์์ง ๋ฐ์์ด ์ ๋์๋ค. ๋ฆฌ๋ ์ฌ์ ์ถ์ ์ํด์๋ kafka-leader-election.sh
๋ช
๋ น์ด๋ฅผ ์ถ๊ฐ๋ก ์คํํด์ผ ํฉ๋๋ค.
$ kafka-leader-election.sh \
--bootstrap-server bitnami-kafka:9092 \
--topic test2 \
--partition 0 \
--election-type preferred
๊ทธ๋ฆฌ๊ณ ๋ค์ --describe
๋ฅผ ํด๋ณด๋ฉด
$ kafka-topics.sh \
--bootstrap-server bitnami-kafka:9092 \
--topic test2 \
--describe
Topic: test2 TopicId: a3vf0OqaSN6sqjtAVfrOhw PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test2 Partition: 0 Leader: 102 Replicas: 102,100,101 Isr: 101,100,102 Elr: N/A LastKnownElr: N/A
Topic: test2 Partition: 1 Leader: 100 Replicas: 101,100,102 Isr: 100,102,101 Elr: N/A LastKnownElr: N/A
Topic: test2 Partition: 2 Leader: 102 Replicas: 102,101,100 Isr: 102,101,100 Elr: N/A LastKnownElr: N/A
0
๋ฒ ํํฐ์
์ด ๋ธ๋ก์ปค 102
๋ฒ์ผ๋ก ์ฎ๊ฒจ์ก๋ค!! ใ
ใ
๊ฐ์ ๋ก ํซ์คํ์ผ๋ก ๋ง๋ค์ด์ก์ต๋๋ค ใ
ใ
๊ทธ๋ฌ๋ ์ด๋ฐ ํํฐ์ ์ฌ์ค์ ์ ์นดํ์นด ๋ด๋ถ์ ๋ง์ ์์ ์ ์๊ตฌํ๊ธฐํด ํด๋ฌ์คํฐ์ ๋ถํ๊ฐ ์ฌ ์ ์๊ณ , ๋ ํด๋น ํ ํฝ์ ๊ตฌ๋ ํ๋ ์ปจ์๋จธ์๊ฒ๋ ๋ฆฌ๋ฐธ๋ฐ์ฑ์ ํธ๋ฆฌ๊ฑฐ ํ๊ฒ ๋ฉ๋๋ค. ๊ทธ๋์ ์ด์ ํ๊ฒฝ์์๋ ์ฌ์ค์ ์ ์์ฃผ ์กฐ์ฌํ ์งํํด์ผ ํ๋ฉฐ, ๋ง์ฝ ์ฅ์ ๊ฐ ์์๋๋ค๋ฉด ํํฐ์ ์ ์ ๊ท ์์ฑํด ํด๊ฒฐํ๋ ๊ฒ์ด ๋ฐฉ๋ฒ์ผ ์๋ ์์ต๋๋ค.
Kafka Delete record
Kafka Dump Logs
Kafka Cluster
kafka-broker-api-versions.sh
๋ก ์นดํ์นด ํด๋ฌ์คํฐ๋ฅผ ๊ตฌ์ฑํ๋ ๋ธ๋ก์ปค ๋ชฉ๋ก๊ณผ ๊ฐ ๋ธ๋ก์ปค๊ฐ ์ง์ํ๋ API ๋ฒ์ ์ ํ์ธํ ์ ์์ต๋๋ค.
$ kafka-broker-api-versions.sh \
--bootstrap-server bitnami-kafka:9092
bitnami-kafka-broker-1.bitnami-kafka-broker-headless.kafka.svc.cluster.local:9092 (id: 101 rack: null) -> (
Produce(0): 0 to 11 [usable: 11],
Fetch(1): 0 to 17 [usable: 17],
ListOffsets(2): 0 to 9 [usable: 9],
Metadata(3): 0 to 12 [usable: 12],
LeaderAndIsr(4): 0 to 7 [usable: 7],
StopReplica(5): 0 to 4 [usable: 4],
UpdateMetadata(6): 0 to 8 [usable: 8],
...
)
...
๋ธ๋ก์ปค์ ๊ฐ API๋ ๊ณ ์ ํ API Key ๊ฐ๊ณผ Versioning์ด ๋๋ค๊ณ ํฉ๋๋ค. ํ์ ๋ธ๋ก์ปค์์ ์ง์ํ๋ API ๋ฒ์ ์ ๋ฒ์๋ฅผ ํ์ธํ ์ ์์ต๋๋ค.
Connectors
connect-distributed.sh
connect-standalone.sh
๊ณ ๊ธ
kafka-e2e-latency.sh
kafka-mirror-maker.sh
kafka-streams-application-reset.sh
zookeeper-server-start.sh
connect-mirror-maker.sh
kafka-features.sh
kafka-producer-perf-test.sh
zookeeper-server-stop.sh
connect-plugin-path.sh
kafka-get-offsets.sh
kafka-reassign-partitions.sh
kafka-transactions.sh
zookeeper-shell.sh
kafka-jmx.sh
kafka-replica-verification.sh
kafka-verifiable-consumer.sh
kafka-acls.sh
kafka-consumer-perf-test.sh
kafka-leader-election.sh
kafka-run-class.sh
kafka-verifiable-producer.sh
kafka-broker-api-versions.sh
kafka-delegation-tokens.sh
kafka-log-dirs.sh
kafka-server-start.sh
trogdor.sh
kafka-client-metrics.sh
kafka-delete-records.sh
kafka-metadata-quorum.sh
kafka-server-stop.sh
kafka-dump-log.sh
kafka-metadata-shell.sh
kafka-storage.sh
zookeeper-security-migration.sh
kafka-cluster.sh