๊ผญ Java๋กœ Kafka Producer ๊ฐœ๋ฐœํ•  ํ•„์š˜ ์—†์ž–์•„ ยฏ\ย (ใƒ„)_/ยฏ ๊ทธ๋ฆฌ๊ณ  Confluent Schema Registry๋กœ ์Šคํ‚ค๋งˆ ์ •์˜ํ•˜๊ณ  ๋ฐ์ดํ„ฐ ์ ์žฌ ํ•˜๊ธฐ

9 minute read

์™œ Python Producer์— ๊ด€์‹ฌ์„ ๊ฐ–๊ฒŒ ๋˜์—ˆ๋‚˜์š”?

โ€œKafka ๊ฐœ๋ฐœ์€ Java๊ฐ€ ์ผ๋ฐ˜์ ์ด์ฃ โ€๋ผ๋Š” ๋ง์„ ์ฐธ ๋งŽ์ด ๋“ค์€ ๊ฒƒ ๊ฐ™๋‹ค. ๊ทธ๋Ÿฐ๋ฐ, ํšŒ์‚ฌ์—์„œ์˜ Kafka Producer Application์€ Python์œผ๋กœ ๊ฐœ๋ฐœ ๋˜์–ด ์žˆ๊ณ , ์ด๊ฑธ๋กœ ๊ฝค ๋งŽ์€ ๋ฐ์ดํ„ฐ๊ฐ€ ์ฒ˜๋ฆฌ๋˜๊ณ  ์žˆ๋‹ค!! (์ง€๊ธˆ๋„!)

๊ทธ๋ ‡๊ฒŒ ์ƒ๊ฐํ•˜๋‹ˆ โ€˜Python Producer๋„ ๋‚˜์˜์ง€ ์•Š๋Š”๋ฐ?โ€™๋ผ๋Š” ์ƒ๊ฐ์ด ๋“ค๊ธฐ๋„ ํ•˜๊ณ , ํ•œ๋ฒˆ ์ œ๋Œ€๋กœ ์ •๋ฆฌํ•ด๋ณด๋ฉด ์ข‹์„ ๊ฒƒ ๊ฐ™๋‹ค๋Š” ์ƒ๊ฐ์ด ๋“ค์—ˆ๋‹ค. ์ด ๊ธ€์€ ๊ทธ๋Ÿฐ ์ƒ๊ฐ์ด ๋“ค์€ ๋‚ด๊ฐ€ ํ…Œ์ŠคํŠธ ํ•ด๋ณธ Python Producer ์ฝ”๋“œ๋“ค์„ ์•„์นด์ด๋ธŒ ํ•˜๊ณ  ๊นจ๋‹ฌ์€ ์ ์„ ์ ์€ ๊ธ€์ด๋‹ค.

๊ธ€์„ ์‹œ์ž‘ํ•˜๊ธฐ ์ „์— ๋ฏธ๋ฆฌ ๋ฐํžˆ์ž๋ฉด, ์ง€๊ธˆ ํšŒ์‚ฌ์—์„œ Confluent Cloud๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์žˆ๊ณ , ์•„๋ž˜ ์˜ˆ์‹œ ์ฝ”๋“œ๋“ค ์—ญ์‹œ confluent_kafka pip ํŒจํ‚ค์ง€๋ฅผ ์‚ฌ์šฉํ•จ์„ ๋ฐํž™๋‹ˆ๋‹ค.

Simple Producer

key์™€ value ๊ฐ’๋งŒ ์ •ํ•˜๊ณ  Kafka ํ† ํ”ฝ์— ๋ฐ์ดํ„ฐ๋ฅผ ์˜๋Š” ์˜ˆ์ œ ์ž…๋‹ˆ๋‹ค.

from confluent_kafka import SerializingProducer

producer = SerializingProducer({
  'bootstrap.servers': 'xxxx.confluent.cloud:9092',
  'security.protocol': 'SASL_SSL',
  'sasl.mechanism': 'PLAIN',
  'sasl.username': 'xxxx',
  'sasl.password': 'xxxx',
})

def acked(err, msg):
    if err is not None:
        print(f'Failed to deliver message: {str(msg)}: {str(err)}')
    else:
        print(f'Message produced: {str(msg)}, offset: {msg.offset()}')

producer.produce(
  topic='my_topic',
  key='my_key',
  value='my_value',
  on_delivery=acked
)

producer.poll(1)
producer.flush()

Kafka Topic์—์„œ๋Š” ์ „๋‹ฌ ๋ฐ›์€ my_value ๊ทธ๋Œ€๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ์ ์žฌ๋ฉ๋‹ˆ๋‹ค. ํฌ์ŠคํŠธ ์ „๋ฐ˜์—์„œ ์ค‘์š”ํ•˜๊ฒŒ ๋ณผ ์ ์€ value.serializer ์ž…๋‹ˆ๋‹ค. ์—ฌ๊ธฐ์—์„œ Serializer๋ฅผ ๋”ฐ๋กœ ์„ค์ •ํ•˜์ง€ ์•Š์•˜๋Š”๋ฐ, ์ด ๊ฒฝ์šฐ Value Serializer๊ฐ€ ์•„๋ฌด๊ฒƒ๋„ ์ง€์ •๋˜์ง€ ์•Š๊ณ  None์œผ๋กœ ์„ธํŒ… ๋ฉ๋‹ˆ๋‹ค.

์ด ๊ฒฝ์šฐ, ์•„๋ฌด๋Ÿฐ Serialization ์—†์ด ๋ฐ”๋กœ Kafka๋กœ produce ๋ฉ๋‹ˆ๋‹ค. confluentinc/confluent-kafka-python

๋‹จ, str์ด ์•„๋‹Œ int, dict, list ๋“ฑ๋“ฑ ๋‹ค๋ฅธ ๋ชจ๋“  ํƒ€์ž…์€ ์•„๋ž˜์™€ ๊ฐ™์€ ์˜ค๋ฅ˜์™€ ํ•จ๊ป˜ Kafka์— ๋ฐ์ดํ„ฐ๊ฐ€ ์ ์žฌ๋˜์ง€ ์•Š๊ณ  ์‹คํŒจํ•ฉ๋‹ˆ๋‹ค.

TypeError: a bytes-like object is required, not 'int'

String Producer

from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer

producer = SerializingProducer({
  'bootstrap.servers': 'xxxx.confluent.cloud:9092',
  'security.protocol': 'SASL_SSL',
  'sasl.mechanism': 'PLAIN',
  'sasl.username': 'xxxx',
  'sasl.password': 'xxxx',
  'key.serializer': StringSerializer('utf_8'),
  'value.serializer': StringSerializer('utf_8'),

})

producer.produce(
  topic='my_topic',
  key='my_key',
  value='my_value',
  on_delivery=acked
)

์—ฌ๊ธฐ์„œ๋ถ€ํ„ฐ acked()์™€ poll(), flush()์— ๋Œ€ํ•œ ๋ถ€๋ถ„์€ ๋นผ๊ณ  ์ž‘์„ฑํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค. String Producer๋Š” StringSerializer๋ฅผ Serializer๋กœ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

StringSerializer('utf_8')๋กœ ์„ค์ •ํ•˜๋Š”๋ฐ, utf_8์€ ์ฝ”๋ฑ(codec) ๊ฐ’์„ ๋งํ•ฉ๋‹ˆ๋‹ค. ๊ธฐ๋ณธ๊ฐ’์ด utf_8๋กœ ์„ค์ •๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

๋งŒ์•ฝ string์ด ์•„๋‹Œ int(12345)์™€ ๊ฐ™์€ ๊ฐ’์„ ์ค€๋‹ค๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด SerializationError๋ฅผ ๊ฒช์Šต๋‹ˆ๋‹ค.

confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="'int' object has no attribute 'encode'"}

Confluent Schema Registry

์—ฌ๊ธฐ์„œ ์ž ๊น! ๋’ค์— ์ด์–ด์งˆ Json Producer์™€ Avro Producer๋ฅผ ํ…Œ์ŠคํŠธ ํ•˜๊ธฐ ์ „์— ๋ฐ˜๋“œ์‹œ Schema Registry์— Schema๊ฐ€ ์„ธํŒ… ๋˜์–ด ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ๊ตฌ์ฒด์ ์œผ๋กœ๋Š” Serializer์— schema_registry client ๊ฐ’์„ ๋„˜๊ฒจ์ค˜์•ผ ํ•ฉ๋‹ˆ๋‹ค.

from confluent_kafka.schema_registry import SchemaRegistryClient

sr_client = SchemaRegistryClient({
    'url': 'https://xxxx.confluent.cloud',
    'basic.auth.user.info': '{Key}:{Secret}'.format(Key='xxxx', Secret='xxxx'),
})

์—ฌ๊ธฐ์„œ๋ถ€ํ„ฐ ๋‹ค๋ฃจ๋Š” ๋ถ€๋ถ„์ด ์ด ๊ธ€์„ ์“ฐ๊ฒŒ ๋œ ๊ฐ€์žฅ ํฐ ๊ณ„๊ธฐ ์ž…๋‹ˆ๋‹ค. ์ƒ๊ฐ๋ณด๋‹ค ๋ณต์žก ํ–ˆ๊ฑฐ๋“ ์š”. ์ผ๋‹จ Serializer ์ข…๋ฅ˜๋ณ„๋กœ ์‚ฌ๋ ˆ๋ฅผ ์‚ดํŽด๋ด…์‹œ๋‹ค.

Json Producer w/ Json Schema

Json Schema

์ผ๋‹จ โ€œUserโ€๋ผ๋Š” Json Schema ์˜ˆ์‹œ๋ถ€ํ„ฐ ์‚ดํŽด๋ด…๋‹ˆ๋‹ค.

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "User",
  "description": "A Confluent Kafka Python User",
  "type": "object",
  "properties": {
    "name": {
      "description": "User's name",
      "type": "string"
    },
    "favorite_number": {
      "description": "User's favorite number",
      "type": "number",
      "exclusiveMinimum": 0
    },
    "favorite_color": {
      "description": "User's favorite color",
      "type": "string"
    }
  },
  "required": [ "name", "favorite_number", "favorite_color" ]
}

์ฒซ์ค„๋ถ€ํ„ฐ $schema๋ผ๋Š” ์š”์ƒํ•œ ๋…€์„์ด ๋“ฑ์žฅํ•˜๋Š”๋ฐ, Json Schema์— ๋Œ€ํ•œ ์Šคํ‚ค๋งˆ ํ‘œ์ค€ ๋ฒ„์ „(draft)์„ ์ ์–ด์ฃผ๋Š” ๋ถ€๋ถ„์ž…๋‹ˆ๋‹ค. Json ์Šคํ‚ค๋งˆ์˜ ํ‘œ์ค€์€ ์‹œ๊ฐ„์ด ์ง€๋‚˜๋ฉด์„œ ์—…๋ฐ์ดํŠธ ๋˜๊ณ  ์ƒˆ๋กœ์šด ๋ฒ„์ „(draft)๊ฐ€ ๋‚˜์˜ค๋ฉด์„œ ๊ธฐ๋Šฅ๊ณผ ๋ฌธ๋ฒ•์ด ๋ณ€ํ™” ํ•ฉ๋‹ˆ๋‹ค. $schema ๋ถ€๋ถ„์€ ์ •์˜ํ•œ ์Šคํ‚ค๋งˆ๊ฐ€ ์–ด๋–ค ํ‘œ์ค€ ๋ฒ„์ „์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•˜๊ณ  ์žˆ๋Š”์ง€ ๋ช…์‹œํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. ํŠน์ • ๋ฒ„์ „์—์„œ๋งŒ ์ง€์›ํ•˜๋Š” ๊ธฐ๋Šฅ๋“ค์ด ์žˆ๋‹ค๊ณ  ํ•˜๋‹ˆ ๋ฒ„์ „์— ์œ ์˜ํ•ด์„œ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

์ด์–ด์ง€๋Š” title, description ๋“ฑ์˜ ๊ฐ’์€ ๋‹น์—ฐํ•œ ๊ฐ’๋“ค์˜ ๋‚˜์—ด์ด๋ผ ๋ณ„๋„๋กœ ์„ค๋ช…ํ•˜์ง„ ์•Š๊ฒ ์Šต๋‹ˆ๋‹ค.

์•„! ๊ทธ๋ฆฌ๊ณ  JSON schema๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด ์•„๋ž˜์˜ ํŒจํ‚ค์ง€๋ฅผ ์„ค์น˜ํ•ด์ค๋‹ˆ๋‹ค.

$ pip install jsonschema

Json Producer

Json Producer ์ฝ”๋“œ๋ฅผ ์•„๋ž˜์™€ ๊ฐ™์ด ์ž‘์„ฑํ•ฉ๋‹ˆ๋‹ค.

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry.json_schema import JSONSerializer

schema_str = """
  {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "User",
    "description": "A Confluent Kafka Python User",
    "type": "object",
    "properties": {
      "name": {
        "description": "User's name",
        "type": "string"
      },
      "favorite_number": {
        "description": "User's favorite number",
        "type": "number",
        "exclusiveMinimum": 0
      },
      "favorite_color": {
        "description": "User's favorite color",
        "type": "string"
      }
    },
    "required": [ "name", "favorite_number", "favorite_color" ]
  }
"""

producer = SerializingProducer({
  'bootstrap.servers': 'xxxx.confluent.cloud:9092',
  ...
  'key.serializer': StringSerializer('utf_8'),
  'value.serializer': JSONSerializer(
    schema_registry_client=sr_client,
    schema_str=schema_str,
  ),
})

producer.produce(
  topic='my_topic,
  key='my_user',
  value={
    'name': 'Gildong-Hong',
    'favorite_number': 18,
    'favorite_color': 'blue',
  },
  on_delivery=acked
)

Serializer Configs

๋ช‡๊ฐ€์ง€ Configuration์ด ์žˆ์Šต๋‹ˆ๋‹ค.

  • auto.register.schemas (default: True)
    • ๋งŒ์•ฝ Topic์— Schema๊ฐ€ ๋“ฑ๋ก๋˜์ง€ ์•Š์•˜๋‹ค๋ฉด ์ž๋™์œผ๋กœ ๋“ฑ๋กํ•ด์ฃผ๋Š” ์˜ต์…˜
    • Schema ์ด๋ฆ„์€ ์•„๋ž˜์˜ subject.name.strategy์— ๋”ฐ๋ผ ๊ฒฐ์ •๋˜๋Š”๋ฐ
      • Default ๊ฐ’์ธ TopicName Strategy๋ฅผ ๋”ฐ๋ฅธ๋‹ค๋ฉด, {TopicName}-key, {TopicName}-value์™€ ๊ฐ™์€ ํ˜•์‹์œผ๋กœ ๋“ฑ๋ก๋ฉ๋‹ˆ๋‹ค.
  • subject.name.strategy (default: topic_subject_name_strategy)
  • normalize.schemas (default: False)
    • ๋Œ€์ถฉ ์Šคํ‚ค๋งˆ์— ์ •์˜๋œ ์ปฌ๋Ÿผ ์ˆœ์„œ๊นŒ์ง€ ๋”ฐ๋ฅด๋„๋ก validation์„ ์ˆ˜ํ–‰ํ•  ๊ฒƒ์ธ๊ฐ€์— ๋Œ€ํ•œ ์˜ต์…˜์ž…๋‹ˆ๋‹ค.
  • use.latest.version (default: False)
    • ํ† ํ”ฝ์— ๋“ฑ๋ก๋œ Schema์˜ ์ตœ์‹  ๋ฒ„์ „์œผ๋กœ Serialization์„ ํ•  ๊ฒƒ์ด๋ƒ์— ๋Œ€ํ•œ ์˜ต์…˜์ž…๋‹ˆ๋‹ค.
    • ์œ„์—์„œ ์„ค๋ช…ํ•œ auto.register.schemas ์˜ต์…˜๊ณผ ํ•จ๊ป˜ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.
    • Confluent์—์„œ ํ† ํ”ฝ์— ๋“ฑ๋ก๋œ ์ตœ์‹  ์Šคํ‚ค๋งˆ๋ฅผ ์ฐพ์•„์„œ ๊ทธ๊ฑธ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. [confluent-kafka-python] json_schema.py
producer = SerializingProducer({
  'bootstrap.servers': 'xxxx.cloud:9092',
  ...
  'key.serializer': StringSerializer('utf_8'),
  'value.serializer': JSONSerializer(
    schema_registry_client=sr_client,
    schema_str=schema_str,
    conf = {
        'auto.register.schemas': False,
        'use.latest.version': True,
    }
  ),
})

use.latest.version ์˜ต์…˜ ๊ด€๋ จํ•ด์„œ ํ…Œ์ŠคํŠธ๋ฅผ ์ข€๋” ํ•ด๋ณด๋‹ˆ, schema_str ๊ฐ’์€ ์—ฌ์ „ํžˆ ๋„ฃ์–ด์ค˜์•ผ ํ–ˆ์Šต๋‹ˆ๋‹ค. None์ด๋Ÿฐ ๊ฐ’์„ ๋„ฃ์–ด์„œ๋„ ์•ˆ ๋ฉ๋‹ˆ๋‹ค. ์•ž์—์„œ ์ •์˜ํ•œ ๋Œ€๋กœ schema_str ๊ฐ’์„ ๋„ฃ์–ด์•ผ ํ•˜๋ฉฐ, ์ œ๊ณตํ•œ schema_str ๊ฐ’์˜ Json ์Šคํ‚ค๋งˆ์— ๋”ฐ๋ผ validation๋„ ์ด๋ค„์ง‘๋‹ˆ๋‹ค. ๊ทธ์ €, Schema Registry์— ์‹ ๊ทœ๋กœ ๋“ฑ๋กํ•˜์ง€ ์•Š๊ณ  ๊ธฐ์กด ๊ฒƒ์„ ์‚ฌ์šฉํ•œ๋‹ค์˜ ์ผ„์„ญ?๋งŒ์„ ์ œ๊ณตํ•˜๋Š” ์˜ต์…˜์ธ ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค. (์•„์‰ฝ)

Avro Producer w/ Avro Schema

Avro Schema

Avro ํฌ๋งท์— ๋Œ€ํ•ด์„  ์ง€๋‚œ ํฌ์ŠคํŠธ์ธ โ€œHello, Avro!โ€ ํฌ์ŠคํŠธ์—์„œ ๋‹ค๋ค˜์œผ๋‹ˆ ์ž์„ธํ•œ ์ฃผ์†Œ ์„ค๋ช…์€ ์ƒ๋ฝํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค ใ…Žใ…Ž ์™œ ์“ฐ๋Š๋ƒ๊ณ  ๋ˆ„๊ตฐ๊ฐ€ ๋ฌป๋Š”๋‹ค๋ฉด Json ๋ณด๋‹ค ์••์ถ•๋ฅ ์ด ์ข‹์•„์„œ ์‚ฌ์šฉํ•œ๋‹ค๋กœ ๋‹ต๋ณ€ํ•  ์ˆ˜ ์žˆ์„ ๋“ฏ ํ•ฉ๋‹ˆ๋‹ค. (๊ทธ๋ฆฌ๊ณ  Union Schema ๊ธฐ๋Šฅ๋„์š”!)

{
    "name": "User",
    "type": "record",
    "fields": [
        {
            "name": "name",
            "type": "string"
        },
        {
            "name": "favorite_number",
            "type": "long"
        },
        {
            "name": "favorite_color",
            "type": "string"
        }
    ]
}

์šฐ์„  ์œ„์™€ ๊ฐ™์ด ์ •์˜ํ•œ Avro Schema๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

Avro Producer

์•ž์„œ ์‚ดํŽด๋ณธ JSON Producer์˜ ์‚ฌ๋ก€์™€ ์ •๋ง ๋น„์Šทํ•ฉ๋‹ˆ๋‹ค. Serializer์— Schema Registry Client๋ฅผ ์ „๋‹ฌํ•ด์ค˜์•ผ ํ•ฉ๋‹ˆ๋‹ค.

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry.avro import AvroSerializer

schema_str = """
...
"""

producer = SerializingProducer({
  'bootstrap.servers': 'xxxx.confluent.cloud:9092',
  ...
  'key.serializer': StringSerializer('utf_8'),
  'value.serializer': AvroSerializer(
    schema_registry_client=sr_client,
    schema_str=schema_str,
  ),
})

producer.produce(
  topic='my_topic,
  key='my_user',
  value={
    'name': 'Gildong-Hong',
    'favorite_number': 18,
    'favorite_color': 'blue',
  },
  on_delivery=acked
)

Serializer Config

Producer config์— ๋Œ€ํ•œ ๋ถ€๋ถ„๋„ JsonSerializer์˜ ๊ฒƒ๊ณผ ๋™์ผํ•ฉ๋‹ˆ๋‹ค. auto.register.schemas๋กœ Topic์— ์‹ ๊ทœ/๋ณ€๊ฒฝ๋œ ์Šคํ‚ค๋งˆ๋ฅผ ์ž๋™์œผ๋กœ ๋ฐ˜์˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋งบ์Œ๋ง

Kafka ๊ทธ๋ฆฌ๊ณ  Confluent ํ”Œ๋žซํผ์— ๋Œ€ํ•ด ์–ด๋ ดํ’‹์ด ์•Œ๋˜ ๊ฒƒ๋“ค์ด ์ง์ ‘ ์ฝ”๋“œ๋ฅผ ๊ตฌ์„ฑํ•˜๊ณ  ์˜ต์…˜๋“ค์„ ์‹คํ—˜ํ•ด๋ณด๋ฉด์„œ ์–ป๊ฒŒ ๋˜๋Š”๊ฒŒ ๋งŽ์•„์ง€๋Š” ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค.

์š”์ฆ˜์—๋Š” ์ด๋ก ์„ ๊ณต๋ถ€ํ•˜๋Š” ๊ฒƒ๋ณด๋‹ค ๊ทธ๊ฑธ ํ™œ์šฉํ•˜๊ณ  ์ ์šฉํ•˜๊ณ , ํ”„๋กœ๋•์„ ๋งŒ๋“ค์–ด ๊ฐ€๋Š” ๊ฒƒ์ด ๋” ๊ฐ€์น˜ ์žˆ๋‹ค๊ณ  ๋Š๋ผ๊ฒŒ ๋˜๋Š” ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค. ๋น ๋ฅด๊ฒŒ ๋ฐฐ์›Œ์„œ ๋น ๋ฅด๊ฒŒ ์ ์šฉํ•˜๊ณ , ๊ทธ๋ฆฌ๊ณ  ๋น ๋ฅด๊ฒŒ ์‹คํŒจํ•˜๊ณ (ใ…‹ใ…‹). ๊ธ€์„ ์“ฐ๋Š” ์‹œ์ ์—๋Š” 2024๋…„์ด ์ด์ œ ํ•œ ๋‹ฌ๋„ ์ฑ„ ๋‚จ์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค. ๋‚ด๋…„์—๋Š” ๋ญ˜ ํ•ด๋ณผ๊นŒ? ์‹œ๊ฐ„์„ ์–ด๋–ป๊ฒŒ ์จ๋ณผ๊นŒ? ๊ณ ๋ฏผํ•˜๊ณ  ์ƒ์ƒ ํ•ด๋ณด๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ๋‚ด๋…„์— ์žฌ๋ฐŒ๋Š” ๊ฒƒ๋“ค์€ ์™•์ฐฝ ํ•  ์ˆ˜ ์žˆ๊ธธ!! ๐Ÿ„

์ด๊ฒƒ์ €๊ฒƒ Serializer๋ฅผ ๋ฐ”๊ฟ”๊ฐ€๋ฉฐ ํ…Œ์ŠคํŠธ ํ•ด๋ณผ ๋•Œ, confluentinc/confluent-kafka-python์— ์žˆ๋Š” examples ํด๋”์˜ ์˜ˆ์‹œ ์ฝ”๋“œ๋“ค์ด ๋งŽ์ด ๋„์›€์ด ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. Producer ๋ง๊ณ ๋„ Consumer ์ชฝ ์˜ˆ์‹œ ์ฝ”๋“œ๋„ ์žˆ์œผ๋‹ˆ ์ฐธ๊ณ ํ•˜์‹œ๊ธธ!!

Categories:

Updated: