Hello, Python Producers!
์ Python Producer์ ๊ด์ฌ์ ๊ฐ๊ฒ ๋์๋์?
โKafka ๊ฐ๋ฐ์ Java๊ฐ ์ผ๋ฐ์ ์ด์ฃ โ๋ผ๋ ๋ง์ ์ฐธ ๋ง์ด ๋ค์ ๊ฒ ๊ฐ๋ค. ๊ทธ๋ฐ๋ฐ, ํ์ฌ์์์ Kafka Producer Application์ Python์ผ๋ก ๊ฐ๋ฐ ๋์ด ์๊ณ , ์ด๊ฑธ๋ก ๊ฝค ๋ง์ ๋ฐ์ดํฐ๊ฐ ์ฒ๋ฆฌ๋๊ณ ์๋ค!! (์ง๊ธ๋!)
๊ทธ๋ ๊ฒ ์๊ฐํ๋ โPython Producer๋ ๋์์ง ์๋๋ฐ?โ๋ผ๋ ์๊ฐ์ด ๋ค๊ธฐ๋ ํ๊ณ , ํ๋ฒ ์ ๋๋ก ์ ๋ฆฌํด๋ณด๋ฉด ์ข์ ๊ฒ ๊ฐ๋ค๋ ์๊ฐ์ด ๋ค์๋ค. ์ด ๊ธ์ ๊ทธ๋ฐ ์๊ฐ์ด ๋ค์ ๋ด๊ฐ ํ ์คํธ ํด๋ณธ Python Producer ์ฝ๋๋ค์ ์์นด์ด๋ธ ํ๊ณ ๊นจ๋ฌ์ ์ ์ ์ ์ ๊ธ์ด๋ค.
๊ธ์ ์์ํ๊ธฐ ์ ์ ๋ฏธ๋ฆฌ ๋ฐํ์๋ฉด, ์ง๊ธ ํ์ฌ์์ Confluent Cloud๋ฅผ ์ฌ์ฉํ๊ธฐ ์๊ณ , ์๋ ์์ ์ฝ๋๋ค ์ญ์ confluent_kafka
pip ํจํค์ง๋ฅผ ์ฌ์ฉํจ์ ๋ฐํ๋๋ค.
Simple Producer
key์ value ๊ฐ๋ง ์ ํ๊ณ Kafka ํ ํฝ์ ๋ฐ์ดํฐ๋ฅผ ์๋ ์์ ์ ๋๋ค.
from confluent_kafka import SerializingProducer
producer = SerializingProducer({
'bootstrap.servers': 'xxxx.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': 'xxxx',
'sasl.password': 'xxxx',
})
def acked(err, msg):
if err is not None:
print(f'Failed to deliver message: {str(msg)}: {str(err)}')
else:
print(f'Message produced: {str(msg)}, offset: {msg.offset()}')
producer.produce(
topic='my_topic',
key='my_key',
value='my_value',
on_delivery=acked
)
producer.poll(1)
producer.flush()
Kafka Topic์์๋ ์ ๋ฌ ๋ฐ์ my_value
๊ทธ๋๋ก ๋ฐ์ดํฐ๊ฐ ์ ์ฌ๋ฉ๋๋ค. ํฌ์คํธ ์ ๋ฐ์์ ์ค์ํ๊ฒ ๋ณผ ์ ์ value.serializer
์
๋๋ค. ์ฌ๊ธฐ์์ Serializer๋ฅผ ๋ฐ๋ก ์ค์ ํ์ง ์์๋๋ฐ, ์ด ๊ฒฝ์ฐ Value Serializer๊ฐ ์๋ฌด๊ฒ๋ ์ง์ ๋์ง ์๊ณ None
์ผ๋ก ์ธํ
๋ฉ๋๋ค.
์ด ๊ฒฝ์ฐ, ์๋ฌด๋ฐ Serialization ์์ด ๋ฐ๋ก Kafka๋ก produce ๋ฉ๋๋ค. confluentinc/confluent-kafka-python
๋จ, str
์ด ์๋ int
, dict
, list
๋ฑ๋ฑ ๋ค๋ฅธ ๋ชจ๋ ํ์
์ ์๋์ ๊ฐ์ ์ค๋ฅ์ ํจ๊ป Kafka์ ๋ฐ์ดํฐ๊ฐ ์ ์ฌ๋์ง ์๊ณ ์คํจํฉ๋๋ค.
TypeError: a bytes-like object is required, not 'int'
String Producer
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
producer = SerializingProducer({
'bootstrap.servers': 'xxxx.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': 'xxxx',
'sasl.password': 'xxxx',
'key.serializer': StringSerializer('utf_8'),
'value.serializer': StringSerializer('utf_8'),
})
producer.produce(
topic='my_topic',
key='my_key',
value='my_value',
on_delivery=acked
)
์ฌ๊ธฐ์๋ถํฐ acked()
์ poll()
, flush()
์ ๋ํ ๋ถ๋ถ์ ๋นผ๊ณ ์์ฑํ๊ฒ ์ต๋๋ค. String Producer๋ StringSerializer
๋ฅผ Serializer๋ก ์ฌ์ฉํฉ๋๋ค.
StringSerializer('utf_8')
๋ก ์ค์ ํ๋๋ฐ, utf_8
์ ์ฝ๋ฑ(codec) ๊ฐ์ ๋งํฉ๋๋ค. ๊ธฐ๋ณธ๊ฐ์ด utf_8
๋ก ์ค์ ๋์ด ์์ต๋๋ค.
๋ง์ฝ string์ด ์๋ int(12345)
์ ๊ฐ์ ๊ฐ์ ์ค๋ค๋ฉด ์๋์ ๊ฐ์ด SerializationError๋ฅผ ๊ฒช์ต๋๋ค.
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="'int' object has no attribute 'encode'"}
Confluent Schema Registry
์ฌ๊ธฐ์ ์ ๊น! ๋ค์ ์ด์ด์ง Json Producer์ Avro Producer๋ฅผ ํ
์คํธ ํ๊ธฐ ์ ์ ๋ฐ๋์ Schema Registry์ Schema๊ฐ ์ธํ
๋์ด ์์ด์ผ ํฉ๋๋ค. ๊ตฌ์ฒด์ ์ผ๋ก๋ Serializer์ schema_registry
client ๊ฐ์ ๋๊ฒจ์ค์ผ ํฉ๋๋ค.
from confluent_kafka.schema_registry import SchemaRegistryClient
sr_client = SchemaRegistryClient({
'url': 'https://xxxx.confluent.cloud',
'basic.auth.user.info': '{Key}:{Secret}'.format(Key='xxxx', Secret='xxxx'),
})
์ฌ๊ธฐ์๋ถํฐ ๋ค๋ฃจ๋ ๋ถ๋ถ์ด ์ด ๊ธ์ ์ฐ๊ฒ ๋ ๊ฐ์ฅ ํฐ ๊ณ๊ธฐ ์ ๋๋ค. ์๊ฐ๋ณด๋ค ๋ณต์ก ํ๊ฑฐ๋ ์. ์ผ๋จ Serializer ์ข ๋ฅ๋ณ๋ก ์ฌ๋ ๋ฅผ ์ดํด๋ด ์๋ค.
Json Producer w/ Json Schema
Json Schema
์ผ๋จ โUserโ๋ผ๋ Json Schema ์์๋ถํฐ ์ดํด๋ด ๋๋ค.
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "User",
"description": "A Confluent Kafka Python User",
"type": "object",
"properties": {
"name": {
"description": "User's name",
"type": "string"
},
"favorite_number": {
"description": "User's favorite number",
"type": "number",
"exclusiveMinimum": 0
},
"favorite_color": {
"description": "User's favorite color",
"type": "string"
}
},
"required": [ "name", "favorite_number", "favorite_color" ]
}
์ฒซ์ค๋ถํฐ $schema
๋ผ๋ ์์ํ ๋
์์ด ๋ฑ์ฅํ๋๋ฐ, Json Schema์ ๋ํ ์คํค๋ง ํ์ค ๋ฒ์ (draft)์ ์ ์ด์ฃผ๋ ๋ถ๋ถ์
๋๋ค. Json ์คํค๋ง์ ํ์ค์ ์๊ฐ์ด ์ง๋๋ฉด์ ์
๋ฐ์ดํธ ๋๊ณ ์๋ก์ด ๋ฒ์ (draft)๊ฐ ๋์ค๋ฉด์ ๊ธฐ๋ฅ๊ณผ ๋ฌธ๋ฒ์ด ๋ณํ ํฉ๋๋ค. $schema
๋ถ๋ถ์ ์ ์ํ ์คํค๋ง๊ฐ ์ด๋ค ํ์ค ๋ฒ์ ์ ๊ธฐ๋ฐ์ผ๋ก ํ๊ณ ์๋์ง ๋ช
์ํ๋ ๊ฒ์
๋๋ค. ํน์ ๋ฒ์ ์์๋ง ์ง์ํ๋ ๊ธฐ๋ฅ๋ค์ด ์๋ค๊ณ ํ๋ ๋ฒ์ ์ ์ ์ํด์ ์ฌ์ฉํฉ๋๋ค.
์ด์ด์ง๋ title
, description
๋ฑ์ ๊ฐ์ ๋น์ฐํ ๊ฐ๋ค์ ๋์ด์ด๋ผ ๋ณ๋๋ก ์ค๋ช
ํ์ง ์๊ฒ ์ต๋๋ค.
์! ๊ทธ๋ฆฌ๊ณ JSON schema๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด ์๋์ ํจํค์ง๋ฅผ ์ค์นํด์ค๋๋ค.
$ pip install jsonschema
Json Producer
Json Producer ์ฝ๋๋ฅผ ์๋์ ๊ฐ์ด ์์ฑํฉ๋๋ค.
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry.json_schema import JSONSerializer
schema_str = """
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "User",
"description": "A Confluent Kafka Python User",
"type": "object",
"properties": {
"name": {
"description": "User's name",
"type": "string"
},
"favorite_number": {
"description": "User's favorite number",
"type": "number",
"exclusiveMinimum": 0
},
"favorite_color": {
"description": "User's favorite color",
"type": "string"
}
},
"required": [ "name", "favorite_number", "favorite_color" ]
}
"""
producer = SerializingProducer({
'bootstrap.servers': 'xxxx.confluent.cloud:9092',
...
'key.serializer': StringSerializer('utf_8'),
'value.serializer': JSONSerializer(
schema_registry_client=sr_client,
schema_str=schema_str,
),
})
producer.produce(
topic='my_topic,
key='my_user',
value={
'name': 'Gildong-Hong',
'favorite_number': 18,
'favorite_color': 'blue',
},
on_delivery=acked
)
Serializer Configs
๋ช๊ฐ์ง Configuration์ด ์์ต๋๋ค.
auto.register.schemas
(default: True)- ๋ง์ฝ Topic์ Schema๊ฐ ๋ฑ๋ก๋์ง ์์๋ค๋ฉด ์๋์ผ๋ก ๋ฑ๋กํด์ฃผ๋ ์ต์
- Schema ์ด๋ฆ์ ์๋์
subject.name.strategy
์ ๋ฐ๋ผ ๊ฒฐ์ ๋๋๋ฐ- Default ๊ฐ์ธ TopicName Strategy๋ฅผ ๋ฐ๋ฅธ๋ค๋ฉด,
{TopicName}-key
,{TopicName}-value
์ ๊ฐ์ ํ์์ผ๋ก ๋ฑ๋ก๋ฉ๋๋ค.
- Default ๊ฐ์ธ TopicName Strategy๋ฅผ ๋ฐ๋ฅธ๋ค๋ฉด,
subject.name.strategy
(default: topic_subject_name_strategy)normalize.schemas
(default: False)- ๋์ถฉ ์คํค๋ง์ ์ ์๋ ์ปฌ๋ผ ์์๊น์ง ๋ฐ๋ฅด๋๋ก validation์ ์ํํ ๊ฒ์ธ๊ฐ์ ๋ํ ์ต์ ์ ๋๋ค.
use.latest.version
(default: False)- ํ ํฝ์ ๋ฑ๋ก๋ Schema์ ์ต์ ๋ฒ์ ์ผ๋ก Serialization์ ํ ๊ฒ์ด๋์ ๋ํ ์ต์ ์ ๋๋ค.
- ์์์ ์ค๋ช
ํ
auto.register.schemas
์ต์ ๊ณผ ํจ๊ป ์ฌ์ฉํ ์ ์์ต๋๋ค. - Confluent์์ ํ ํฝ์ ๋ฑ๋ก๋ ์ต์ ์คํค๋ง๋ฅผ ์ฐพ์์ ๊ทธ๊ฑธ ๊ทธ๋๋ก ์ฌ์ฉํฉ๋๋ค. [confluent-kafka-python] json_schema.py
producer = SerializingProducer({
'bootstrap.servers': 'xxxx.cloud:9092',
...
'key.serializer': StringSerializer('utf_8'),
'value.serializer': JSONSerializer(
schema_registry_client=sr_client,
schema_str=schema_str,
conf = {
'auto.register.schemas': False,
'use.latest.version': True,
}
),
})
use.latest.version
์ต์
๊ด๋ จํด์ ํ
์คํธ๋ฅผ ์ข๋ ํด๋ณด๋, schema_str
๊ฐ์ ์ฌ์ ํ ๋ฃ์ด์ค์ผ ํ์ต๋๋ค. None
์ด๋ฐ ๊ฐ์ ๋ฃ์ด์๋ ์ ๋ฉ๋๋ค. ์์์ ์ ์ํ ๋๋ก schema_str
๊ฐ์ ๋ฃ์ด์ผ ํ๋ฉฐ, ์ ๊ณตํ schema_str
๊ฐ์ Json ์คํค๋ง์ ๋ฐ๋ผ validation๋ ์ด๋ค์ง๋๋ค. ๊ทธ์ , Schema Registry์ ์ ๊ท๋ก ๋ฑ๋กํ์ง ์๊ณ ๊ธฐ์กด ๊ฒ์ ์ฌ์ฉํ๋ค์ ์ผ์ญ?๋ง์ ์ ๊ณตํ๋ ์ต์
์ธ ๊ฒ ๊ฐ์ต๋๋ค. (์์ฝ)
Avro Producer w/ Avro Schema
Avro Schema
Avro ํฌ๋งท์ ๋ํด์ ์ง๋ ํฌ์คํธ์ธ โHello, Avro!โ ํฌ์คํธ์์ ๋ค๋ค์ผ๋ ์์ธํ ์ฃผ์ ์ค๋ช ์ ์๋ฝํ๊ฒ ์ต๋๋ค ใ ใ ์ ์ฐ๋๋๊ณ ๋๊ตฐ๊ฐ ๋ฌป๋๋ค๋ฉด Json ๋ณด๋ค ์์ถ๋ฅ ์ด ์ข์์ ์ฌ์ฉํ๋ค๋ก ๋ต๋ณํ ์ ์์ ๋ฏ ํฉ๋๋ค. (๊ทธ๋ฆฌ๊ณ Union Schema ๊ธฐ๋ฅ๋์!)
{
"name": "User",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": "long"
},
{
"name": "favorite_color",
"type": "string"
}
]
}
์ฐ์ ์์ ๊ฐ์ด ์ ์ํ Avro Schema๋ฅผ ์ฌ์ฉํฉ๋๋ค.
Avro Producer
์์ ์ดํด๋ณธ JSON Producer์ ์ฌ๋ก์ ์ ๋ง ๋น์ทํฉ๋๋ค. Serializer์ Schema Registry Client๋ฅผ ์ ๋ฌํด์ค์ผ ํฉ๋๋ค.
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry.avro import AvroSerializer
schema_str = """
...
"""
producer = SerializingProducer({
'bootstrap.servers': 'xxxx.confluent.cloud:9092',
...
'key.serializer': StringSerializer('utf_8'),
'value.serializer': AvroSerializer(
schema_registry_client=sr_client,
schema_str=schema_str,
),
})
producer.produce(
topic='my_topic,
key='my_user',
value={
'name': 'Gildong-Hong',
'favorite_number': 18,
'favorite_color': 'blue',
},
on_delivery=acked
)
Serializer Config
Producer config์ ๋ํ ๋ถ๋ถ๋ JsonSerializer
์ ๊ฒ๊ณผ ๋์ผํฉ๋๋ค. auto.register.schemas
๋ก Topic์ ์ ๊ท/๋ณ๊ฒฝ๋ ์คํค๋ง๋ฅผ ์๋์ผ๋ก ๋ฐ์ํ ์ ์์ต๋๋ค.
๋งบ์๋ง
Kafka ๊ทธ๋ฆฌ๊ณ Confluent ํ๋ซํผ์ ๋ํด ์ด๋ ดํ์ด ์๋ ๊ฒ๋ค์ด ์ง์ ์ฝ๋๋ฅผ ๊ตฌ์ฑํ๊ณ ์ต์ ๋ค์ ์คํํด๋ณด๋ฉด์ ์ป๊ฒ ๋๋๊ฒ ๋ง์์ง๋ ๊ฒ ๊ฐ์ต๋๋ค.
์์ฆ์๋ ์ด๋ก ์ ๊ณต๋ถํ๋ ๊ฒ๋ณด๋ค ๊ทธ๊ฑธ ํ์ฉํ๊ณ ์ ์ฉํ๊ณ , ํ๋ก๋์ ๋ง๋ค์ด ๊ฐ๋ ๊ฒ์ด ๋ ๊ฐ์น ์๋ค๊ณ ๋๋ผ๊ฒ ๋๋ ๊ฒ ๊ฐ์ต๋๋ค. ๋น ๋ฅด๊ฒ ๋ฐฐ์์ ๋น ๋ฅด๊ฒ ์ ์ฉํ๊ณ , ๊ทธ๋ฆฌ๊ณ ๋น ๋ฅด๊ฒ ์คํจํ๊ณ (ใ ใ ). ๊ธ์ ์ฐ๋ ์์ ์๋ 2024๋ ์ด ์ด์ ํ ๋ฌ๋ ์ฑ ๋จ์ง ์์์ต๋๋ค. ๋ด๋ ์๋ ๋ญ ํด๋ณผ๊น? ์๊ฐ์ ์ด๋ป๊ฒ ์จ๋ณผ๊น? ๊ณ ๋ฏผํ๊ณ ์์ ํด๋ณด๊ณ ์์ต๋๋ค. ๋ด๋ ์ ์ฌ๋ฐ๋ ๊ฒ๋ค์ ์์ฐฝ ํ ์ ์๊ธธ!! ๐
์ด๊ฒ์ ๊ฒ Serializer๋ฅผ ๋ฐ๊ฟ๊ฐ๋ฉฐ ํ
์คํธ ํด๋ณผ ๋, confluentinc/confluent-kafka-python
์ ์๋ examples ํด๋์ ์์ ์ฝ๋๋ค์ด ๋ง์ด ๋์์ด ๋์์ต๋๋ค. Producer ๋ง๊ณ ๋ Consumer ์ชฝ ์์ ์ฝ๋๋ ์์ผ๋ ์ฐธ๊ณ ํ์๊ธธ!!