Json๋ณด๋‹ค ๋น ๋ฅด๊ณ  ์ปดํŒฉํŠธํ•œ Avro ํฌ๋งท์„ ๋งŒ๋‚˜๋ณด์ž ๐Ÿ‘‹

7 minute read

์™œ Avro๋ฅผ ์“ฐ๊ฒŒ ๋˜์—ˆ๋‚˜์š”??

๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๋กœ ์ผ์„ ํ•˜๋ฉด์„œ ํ˜„์žฌ ์ด๋ฒคํŠธ ํŒŒ์ดํ”„๋ผ์ธ์„ ๋„๋งก์•„์„œ ๊ด€๋ฆฌํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ํ•˜๋ฃจ 3์–ต ๊ฑด ์ •๋„์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„์„œ Kafka์— ๋ฐ€์–ด๋„ฃ๊ณ  ์žˆ๋Š”๋ฐ์š”! ์ด ๊ณผ์ •์—์„œ AvroSerializer๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ธ์ฝ”๋”ฉํ•˜์—ฌ Kafka์— ๋ณด๋‚ด๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.

๊ทธ๋Ÿฐ๋ฐ, ์–ด๋Š ์ˆœ๊ฐ„!! ์™œ Avro ํฌ๋งท์„ ์“ฐ๋Š” ๊ฑธ๊นŒ?๋ผ๋Š” ์ƒ๊ฐ์ด ๋“ค์—ˆ์Šต๋‹ˆ๋‹ค. 1๋…„ ๋„˜๊ฒŒ ์ด๋ฒคํŠธ ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ด€๋ฆฌํ•˜๊ณ  ์žˆ์—ˆ๋Š”๋ฐ, Avro๊ฐ€ ๋ญ”์ง€ ์ œ๋Œ€๋กœ ์•Œ์ง€๋„ ๋ชปํ•˜๋Š” ์ƒํƒœ์—์„œ Avro๋ฅผ ์“ฐ๊ณ  ์žˆ๋‹ค๋Š” ๊ฑธ ๋ฌธ๋“ ๊นจ๋‹ซ๊ฒŒ ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ์ด๋ฒˆ ํฌ์ŠคํŠธ๋Š” โ€˜์•„์ง ๋ถ€์กฑํ•œ๊ฒŒ ๋งŽ๊ตฌ๋‚˜โ€™๋ผ๋Š” ๋งˆ์Œ, ๊ทธ๋ฆฌ๊ณ  โ€˜์—ฌ์ „ํžˆ ์žฌ๋ฐŒ๋Š” ๊ฒƒ๋“ค์ด ๋‚จ์•˜๊ตฐ!โ€™๋ผ๋Š” ์ƒ๊ฐ์œผ๋กœ ์ •๋ฆฌํ•ด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

What is Avro

JSON, CSV๋ฅผ ๋‹ด์€ ๋ฐ์ดํ„ฐ ํ˜•์‹์ž…๋‹ˆ๋‹ค. ํŠน์ดํ•œ ์ ์€ ์Šคํ‚ค๋งˆ ์ •๋ณด๊ฐ€ ๋ฐ์ดํ„ฐ ํŒŒ์ผ์— ํ•จ๊ป˜ ๋‹ด๊ธด๋‹ค๋Š” ์  ์ž…๋‹ˆ๋‹ค. ์ฆ‰, xxxx.avro๋ผ๋Š” ํŒŒ์ผ ์•ˆ์— ์•„๋ž˜์™€ ๊ฐ™์€ ์Šคํ‚ค๋งˆ ์ •๋ณด๋ฅผ ๋‹ด์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

{
  "namespace": "NAMESPACE",
  "name": "NAME",
  "doc": "Any Commentary",
  "type": "record",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"},
  ]
}

์ด๋•Œ, avro์˜ ์Šคํ‚ค๋งˆ ์ •๋ณด๋Š” .avsc๋ผ๋Š” Avro Schema๋ผ๋Š” ํฌ๋งท์— ๋งž์ถฐ ์ž‘์„ฑ๋ฉ๋‹ˆ๋‹ค.

Oโ€™Reilly - Operationalizing the Data Lake

Letโ€™s Start Avro

Avro ํฌ๋งท์„ ์‚ฌ์šฉํ•˜๋Š” ์—ฌ๋Ÿฌ ๋ฐฉ๋ฒ•์ด ์žˆ์ง€๋งŒ, ์ด๋ฒˆ ํฌ์ŠคํŠธ์—์„œ๋Š” Python์œผ๋กœ ์ง„ํ–‰ํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ์œ„ํ•ด fastavro๋ผ๋Š” ํŒจํ‚ค์ง€๋ฅผ ์„ค์น˜ํ•ฉ๋‹ˆ๋‹ค. ์ €๋Š” ์ž‘์„ฑ ์‹œ์  ๊ธฐ์ค€์œผ๋กœ ์ตœ์‹  ๋ฒ„์ „์ธ 1.9.7 ๋ฒ„์ „์œผ๋กœ ์ง„ํ–‰ํ–ˆ์Šต๋‹ˆ๋‹ค.

pip install fastavro==1.9.7

Avro Write

from fastavro import parse_schema, writer

schema = {
  "namespace": "earth",
  "name": "person",
  "doc": "person information",
  "type": "record",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"},
  ]
}

parsed_schema = parse_schema(schema)

message = {
  "name": "Jobs",
  "age": 26,
}

with open("./hello.avro", "wb") as f:
  writer(f, parsed_schema, [message] * 100)

๊ฐ„๋‹จํ•ฉ๋‹ˆ๋‹ค. fastavro์˜ writer() ํ•จ์ˆ˜์— ์Šคํ‚ค๋งˆ์™€ ์ €์žฅํ•  ๋ฐ์ดํ„ฐ๋ฅผ ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

Avro Read

from fastavro import reader

with open("hello.avro", "rb") as f:
  _reader = reader(f)
  print(_reader)
  print(_reader.writer_schema)
  print(_reader.reader_schema)

  for record in _reader:
    print(record)
    break

Avro ํŒŒ์ผ์—๋Š” ์Šคํ‚ค๋งˆ ์ •๋ณด๊ฐ€ ์ด๋ฏธ ๋‹ด๊ฒจ ์žˆ๊ธฐ ๋•Œ๋ฌธ์—, ์ฝ์„ ๋•Œ๋Š” ์Šคํ‚ค๋งˆ๋ฅผ ์ง€์ •ํ•  ํ•„์š”๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.

Avro Schema

Avro ์Šคํ‚ค๋งˆ๋Š” JSON ํฌ๋งท์œผ๋กœ ์ž‘์„ฑ๋ฉ๋‹ˆ๋‹ค.

{
  "namespace": "earth",
  "name": "person",
  "doc": "person information",
  "type": "record",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"},
  ]
}

.avro ํŒŒ์ผ์„ ์ถœ๋ ฅํ•ด๋ณด๋ฉด, Avro Schema๋Š” ๋ฐ”์ด๋„ˆ๋ฆฌ๊ฐ€ ์ง๋ ฌํ™”๋˜์ง€ ์•Š๊ณ , ๊ทธ ๊ฐ’ ๊ทธ๋Œ€๋กœ ์กด์žฌํ•ฉ๋‹ˆ๋‹ค.

Avro Schema์— ์กด์žฌํ•˜๋Š” name๊ณผ namespace ํ•„๋“œ๋Š” ์Šคํ‚ค๋งˆ๋ฅผ ์‹๋ณ„ํ•˜๊ธฐ ์œ„ํ•œ ์‹๋ณ„์ž ์ž…๋‹ˆ๋‹ค. ์ €๋Š” ์‰ฝ๊ฒŒ ์ƒ๊ฐํ•ด name์ด RDB์˜ Table, namespace๊ฐ€ RBD์˜ database์— ๋Œ€์‘๋˜๋Š” ๊ฐœ๋…์ด๋ผ๊ณ  ์ดํ•ดํ–ˆ์Šต๋‹ˆ๋‹ค.

Reject Schema Mismatch when AVRO Write

message = {
  "name": "Jobs",
  # no `age` field
}

fastavro.write()๋ฅผ ํ•  ๋•Œ, ์ €์žฅํ•  ๋ฉ”์‹œ์ง€๊ฐ€ ์„ค์ •ํ•œ ์Šคํ‚ค๋งˆ์™€ ๋ถˆ์ผ์น˜ ํ•œ๋‹ค๋ฉด, avro write์ด ์ผ์–ด๋‚˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ์Šคํ‚ค๋งˆ์—๋Š” age ํ•„๋“œ๊ฐ€ ์ •์˜๋˜์–ด ์žˆ๋Š”๋ฐ, ๋ฉ”์‹œ์ง€์—๋Š” ์—†๋‹ค๋ฉด ValueError: no value and no default for age Exception์„ ๋ณด๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

๋ฐ˜๋Œ€๋กœ, ๋ฉ”์‹œ์ง€์— ์ •์˜ํ•˜์ง€ ์•Š์€ ์ปฌ๋Ÿผ์ด ์ถ”๊ฐ€๋กœ ๋“ค์–ด์˜ค๋Š” ๊ฒฝ์šฐ๋Š”

message = {
  "name": "Jobs",
  "age": 26,
  "address": "USA" # new field, but not defined
}

Avro write๋Š” ์„ฑ๊ณตํ•˜์ง€๋งŒ, ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด๋ณด๋ฉด, ์ •์˜ํ•˜์ง€ ์•Š์€ ํ•„๋“œ๋Š” .avro์— ์ €์žฅ๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

Rejection์€ ํƒ€์ž…์— ๋Œ€ํ•ด์„œ๋„ ์ผ์–ด๋‚˜๋Š”๋ฐ,

message = {
  "name": "Jobs",
  "age": "26" # string, but defined as int
}

age๋ฅผ int๊ฐ€ ์•„๋‹Œ string์œผ๋กœ ์ „๋‹ฌํ•˜๋ฉด TypeError: an integer is required on field age๋ผ๋Š” Exception์„ ๋ณด๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

Nullable Field

Avro์—์„œ ์–ด๋–ค ํ•„๋“œ๊ฐ€ Nullable ํ•œ์ง€๋Š” ์•„๋ž˜์™€ ๊ฐ™์€ ์Šคํ‚ค๋งˆ๋กœ ํ‘œํ˜„ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

{
  "namespace": "earth",
  "name": "person",
  "doc": "person information",
  "type": "record",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": ["null", "int"]},
  ]
}

์ด๋Ÿฐ ๊ฒฝ์šฐ, age ํ•„๋“œ๊ฐ€ nullable ์ด๊ธฐ ๋•Œ๋ฌธ์—, ํ•ด๋‹น ํ•„๋“œ์˜ ๊ฐ’์ด ์—†์–ด๋„ avro write๊ฐ€ ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค.

Union Field

์‚ฌ์‹ค ์œ„์—์„œ nullable ํ•„๋“œ๋ฅผ ํ‘œํ˜„ํ•˜๋Š” ๋ฐฉ๋ฒ•์€ Avro ์Šคํ‚ค๋งˆ์˜ โ€œUnionโ€ ํƒ€์ž…์„ ํ™œ์šฉํ•œ ๊ฒƒ์ž…๋‹ˆ๋‹ค. Union ํƒ€์ž…์€ ํ•˜๋‚˜์˜ ํ•„๋“œ์— ์—ฌ๋Ÿฌ ๋ฐ์ดํ„ฐ ํƒ€์ž…์„ ์“ธ ์ˆ˜ ์žˆ๋„๋ก ํ—ˆ์šฉํ•ฉ๋‹ˆ๋‹ค.

{
  "namespace": "earth",
  "name": "person",
  "doc": "person information",
  "type": "record",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": ["null", "int", "string"]},
  ]
}

๊ทธ๋ž˜์„œ ์ด๋ ‡๊ฒŒ age๊ฐ€ int์ด๋ฉด์„œ, string์ด๋ฉด์„œ, nullable์ธ ๊ฒฝ์šฐ๋„ ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค!!

Schema Reference

Avro์—์„œ๋Š” ์ •์˜ํ•œ ์Šคํ‚ค๋งˆ๋ฅผ ๋‹ค๋ฅธ ์Šคํ‚ค๋งˆ์˜ ํ•„๋“œ ๊ฐ์ฒด๋กœ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ๋„ ๊ฐ€๋Šฅํ•˜๋‹ค. ์•„๋ž˜ ์˜ˆ์ œ๋Š” earth.animal๋ผ๋Š” ์Šคํ‚ค๋งˆ๋ฅผ ์ •์˜ํ•˜๊ณ , ์ด๋ฅผ earth.person๋ผ๋Š” ์Šคํ‚ค๋งˆ์˜ pet ํ•„๋“œ์˜ ํƒ€์ž…์œผ๋กœ ์‚ฌ์šฉํ•˜๋Š” ์˜ˆ์ œ์ด๋‹ค.

schema = [{
  "namespace": "earth",
  "name": "animal",
  "doc": "animal information",
  "type": "record",
  "fields": [
    {"name": "pet_name", "type": "string"},
    {"name": "type", "type": ["null", "string"]},
  ]
}, {
  "namespace": "earth",
  "name": "person",
  "doc": "person information",
  "type": "record",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": ["null", "int", "string"]},
    {"name": "pet", "type": ["null", "earth.animal"]},
  ]
}]

message = [{
  "name": "John",
  "age": "30",
  "pet": {
    "pet_name": "Maru",
    "type": "dog"
  }
}]

Union Schema

์œ„์˜ Schema Reference ์ผ€์ด์Šค์—์„œ๋Š” xxxx.avro ํŒŒ์ผ์ด ํ•˜๋‚˜์— earth.pet๊ณผ earth.person, ๋‘ ๊ฐœ์˜ avro ์Šคํ‚ค๋งˆ๊ฐ€ ํ—ค๋”์— ์ €์žฅ๋  ๊ฒƒ ์ž…๋‹ˆ๋‹ค. ์œ„์˜ ์˜ˆ์‹œ์—์„œ๋Š” earth.person์— ๋Œ€ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•˜๋Š” ๊ฒƒ์ฒ˜๋Ÿผ ์ ์—ˆ์ง€๋งŒ, ์•„๋ž˜์™€ ๊ฐ™์ด earth.pet๊ณผ earth.person ๋ฐ์ดํ„ฐ๋ฅผ ํ•˜๋‚˜์˜ avro ํŒŒ์ผ์— ์ €์žฅํ•˜๋Š” ๊ฒƒ๋„ ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค.

...
message = [{
  "name": "John",
  "age": "30",
  "pet": {
    "name": "Maru",
    "type": "dog"
  }
}, {
  "pet_name": "Maru-2",
  "type": "human?"
}]

์•”ํŠผ ์ „ํ•˜๊ณ  ์‹ถ์€ ๋ง์€ Avro๋Š” ๋‹ค์ค‘ ์Šคํ‚ค๋งˆ๋„ ์ง€์›ํ•ฉ๋‹ˆ๋‹ค!

๋งบ์Œ๋ง

Avro๋Š” ๋ช…๋ฐฑํžˆ Schema๊ฐ€ ์žˆ๋Š” ์ž๋ฃŒ๊ตฌ์กฐ์ง€๋งŒ, Union Field๋‚˜ Union Schema์˜ ์ผ€์ด์Šค๋ฅผ ๋ณด๋ฉด ๋‹ค์ค‘ ํƒ€์ž…์ด๋‚˜ ๋‹ค์ค‘ ์Šคํ‚ค๋งˆ๋ฅผ ์ง€์›ํ•œ๋‹ค๋Š” ์œ ์—ฐ์„ฑ๋„ ๊ฐ–์ถ”๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.

ํฌ์ŠคํŠธ๋ฅผ ์ž‘์„ฑํ•˜๋ฉด์„œ ์ง€๊ธˆ์˜ ์ด๋ฒคํŠธ ํŒŒ์ดํ”„๋ผ์ธ ์ฝ”๋“œ๊ฐ€ ์–ด๋–ค ์˜๋„๋กœ ์ปค์Šคํ…€์„ ํ–ˆ๋Š”์ง€ ์ž์„ธํžˆ ์‚ดํŽด๋ณด๋Š” ๊ณ„๊ธฐ๊ฐ€ ๋œ ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค. ์•ž์œผ๋กœ ์ข…์ข… Kafka๋ฅผ ๊ณต๋ถ€ํ•˜๋ฉฐ ์ ‘ํ•˜๋Š” ์ง€์‹๋“ค์„ ์ •๋ฆฌํ•ด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค ใ…Žใ…Ž

Categories:

Updated: