Hello, Avro!
์ Avro๋ฅผ ์ฐ๊ฒ ๋์๋์??
๋ฐ์ดํฐ ์์ง๋์ด๋ก ์ผ์ ํ๋ฉด์ ํ์ฌ ์ด๋ฒคํธ ํ์ดํ๋ผ์ธ์ ๋๋งก์์ ๊ด๋ฆฌํ๊ณ ์์ต๋๋ค. ํ๋ฃจ 3์ต ๊ฑด ์ ๋์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์์ Kafka์ ๋ฐ์ด๋ฃ๊ณ ์๋๋ฐ์! ์ด ๊ณผ์ ์์ AvroSerializer
๋ก ๋ฐ์ดํฐ๋ฅผ ์ธ์ฝ๋ฉํ์ฌ Kafka์ ๋ณด๋ด๊ณ ์์ต๋๋ค.
๊ทธ๋ฐ๋ฐ, ์ด๋ ์๊ฐ!! ์ Avro ํฌ๋งท์ ์ฐ๋ ๊ฑธ๊น?๋ผ๋ ์๊ฐ์ด ๋ค์์ต๋๋ค. 1๋ ๋๊ฒ ์ด๋ฒคํธ ํ์ดํ๋ผ์ธ์ ๊ด๋ฆฌํ๊ณ ์์๋๋ฐ, Avro๊ฐ ๋ญ์ง ์ ๋๋ก ์์ง๋ ๋ชปํ๋ ์ํ์์ Avro๋ฅผ ์ฐ๊ณ ์๋ค๋ ๊ฑธ ๋ฌธ๋ ๊นจ๋ซ๊ฒ ๋์์ต๋๋ค. ์ด๋ฒ ํฌ์คํธ๋ โ์์ง ๋ถ์กฑํ๊ฒ ๋ง๊ตฌ๋โ๋ผ๋ ๋ง์, ๊ทธ๋ฆฌ๊ณ โ์ฌ์ ํ ์ฌ๋ฐ๋ ๊ฒ๋ค์ด ๋จ์๊ตฐ!โ๋ผ๋ ์๊ฐ์ผ๋ก ์ ๋ฆฌํด๋ณด๊ฒ ์ต๋๋ค.
What is Avro
JSON, CSV๋ฅผ ๋ด์ ๋ฐ์ดํฐ ํ์์
๋๋ค. ํน์ดํ ์ ์ ์คํค๋ง ์ ๋ณด๊ฐ ๋ฐ์ดํฐ ํ์ผ์ ํจ๊ป ๋ด๊ธด๋ค๋ ์ ์
๋๋ค. ์ฆ, xxxx.avro
๋ผ๋ ํ์ผ ์์ ์๋์ ๊ฐ์ ์คํค๋ง ์ ๋ณด๋ฅผ ๋ด์ ์ ์์ต๋๋ค.
{
"namespace": "NAMESPACE",
"name": "NAME",
"doc": "Any Commentary",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
]
}
์ด๋, avro์ ์คํค๋ง ์ ๋ณด๋ .avsc
๋ผ๋ Avro Schema๋ผ๋ ํฌ๋งท์ ๋ง์ถฐ ์์ฑ๋ฉ๋๋ค.
OโReilly - Operationalizing the Data Lake
Letโs Start Avro
Avro ํฌ๋งท์ ์ฌ์ฉํ๋ ์ฌ๋ฌ ๋ฐฉ๋ฒ์ด ์์ง๋ง, ์ด๋ฒ ํฌ์คํธ์์๋ Python์ผ๋ก ์งํํฉ๋๋ค. ์ด๋ฅผ ์ํด fastavro
๋ผ๋ ํจํค์ง๋ฅผ ์ค์นํฉ๋๋ค. ์ ๋ ์์ฑ ์์ ๊ธฐ์ค์ผ๋ก ์ต์ ๋ฒ์ ์ธ 1.9.7
๋ฒ์ ์ผ๋ก ์งํํ์ต๋๋ค.
pip install fastavro==1.9.7
Avro Write
from fastavro import parse_schema, writer
schema = {
"namespace": "earth",
"name": "person",
"doc": "person information",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
]
}
parsed_schema = parse_schema(schema)
message = {
"name": "Jobs",
"age": 26,
}
with open("./hello.avro", "wb") as f:
writer(f, parsed_schema, [message] * 100)
๊ฐ๋จํฉ๋๋ค. fastavro
์ writer()
ํจ์์ ์คํค๋ง์ ์ ์ฅํ ๋ฐ์ดํฐ๋ฅผ ์ ๋ฌํฉ๋๋ค.
Avro Read
from fastavro import reader
with open("hello.avro", "rb") as f:
_reader = reader(f)
print(_reader)
print(_reader.writer_schema)
print(_reader.reader_schema)
for record in _reader:
print(record)
break
Avro ํ์ผ์๋ ์คํค๋ง ์ ๋ณด๊ฐ ์ด๋ฏธ ๋ด๊ฒจ ์๊ธฐ ๋๋ฌธ์, ์ฝ์ ๋๋ ์คํค๋ง๋ฅผ ์ง์ ํ ํ์๊ฐ ์์ต๋๋ค.
Avro Schema
Avro ์คํค๋ง๋ JSON ํฌ๋งท์ผ๋ก ์์ฑ๋ฉ๋๋ค.
{
"namespace": "earth",
"name": "person",
"doc": "person information",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
]
}
.avro
ํ์ผ์ ์ถ๋ ฅํด๋ณด๋ฉด, Avro Schema๋ ๋ฐ์ด๋๋ฆฌ๊ฐ ์ง๋ ฌํ๋์ง ์๊ณ , ๊ทธ ๊ฐ ๊ทธ๋๋ก ์กด์ฌํฉ๋๋ค.
Avro Schema์ ์กด์ฌํ๋ name
๊ณผ namespace
ํ๋๋ ์คํค๋ง๋ฅผ ์๋ณํ๊ธฐ ์ํ ์๋ณ์ ์
๋๋ค. ์ ๋ ์ฝ๊ฒ ์๊ฐํด name
์ด RDB์ Table, namespace
๊ฐ RBD์ database์ ๋์๋๋ ๊ฐ๋
์ด๋ผ๊ณ ์ดํดํ์ต๋๋ค.
Reject Schema Mismatch when AVRO Write
message = {
"name": "Jobs",
# no `age` field
}
fastavro.write()
๋ฅผ ํ ๋, ์ ์ฅํ ๋ฉ์์ง๊ฐ ์ค์ ํ ์คํค๋ง์ ๋ถ์ผ์น ํ๋ค๋ฉด, avro write์ด ์ผ์ด๋์ง ์์ต๋๋ค. ์๋ฅผ ๋ค์ด, ์คํค๋ง์๋ age
ํ๋๊ฐ ์ ์๋์ด ์๋๋ฐ, ๋ฉ์์ง์๋ ์๋ค๋ฉด ValueError: no value and no default for age
Exception์ ๋ณด๊ฒ ๋ฉ๋๋ค.
๋ฐ๋๋ก, ๋ฉ์์ง์ ์ ์ํ์ง ์์ ์ปฌ๋ผ์ด ์ถ๊ฐ๋ก ๋ค์ด์ค๋ ๊ฒฝ์ฐ๋
message = {
"name": "Jobs",
"age": 26,
"address": "USA" # new field, but not defined
}
Avro write๋ ์ฑ๊ณตํ์ง๋ง, ์ ์ฅ๋ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด๋ณด๋ฉด, ์ ์ํ์ง ์์ ํ๋๋ .avro
์ ์ ์ฅ๋์ง ์์ต๋๋ค.
Rejection์ ํ์ ์ ๋ํด์๋ ์ผ์ด๋๋๋ฐ,
message = {
"name": "Jobs",
"age": "26" # string, but defined as int
}
age
๋ฅผ int๊ฐ ์๋ string์ผ๋ก ์ ๋ฌํ๋ฉด TypeError: an integer is required on field age
๋ผ๋ Exception์ ๋ณด๊ฒ ๋ฉ๋๋ค.
Nullable Field
Avro์์ ์ด๋ค ํ๋๊ฐ Nullable ํ์ง๋ ์๋์ ๊ฐ์ ์คํค๋ง๋ก ํํํ ์ ์์ต๋๋ค.
{
"namespace": "earth",
"name": "person",
"doc": "person information",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["null", "int"]},
]
}
์ด๋ฐ ๊ฒฝ์ฐ, age
ํ๋๊ฐ nullable ์ด๊ธฐ ๋๋ฌธ์, ํด๋น ํ๋์ ๊ฐ์ด ์์ด๋ avro write๊ฐ ๊ฐ๋ฅํฉ๋๋ค.
Union Field
์ฌ์ค ์์์ nullable ํ๋๋ฅผ ํํํ๋ ๋ฐฉ๋ฒ์ Avro ์คํค๋ง์ โUnionโ ํ์ ์ ํ์ฉํ ๊ฒ์ ๋๋ค. Union ํ์ ์ ํ๋์ ํ๋์ ์ฌ๋ฌ ๋ฐ์ดํฐ ํ์ ์ ์ธ ์ ์๋๋ก ํ์ฉํฉ๋๋ค.
{
"namespace": "earth",
"name": "person",
"doc": "person information",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["null", "int", "string"]},
]
}
๊ทธ๋์ ์ด๋ ๊ฒ age๊ฐ int
์ด๋ฉด์, string
์ด๋ฉด์, nullable์ธ ๊ฒฝ์ฐ๋ ๊ฐ๋ฅํฉ๋๋ค!!
Schema Reference
Avro์์๋ ์ ์ํ ์คํค๋ง๋ฅผ ๋ค๋ฅธ ์คํค๋ง์ ํ๋ ๊ฐ์ฒด๋ก ์ฌ์ฉํ๋ ๊ฒ๋ ๊ฐ๋ฅํ๋ค. ์๋ ์์ ๋ earth.animal
๋ผ๋ ์คํค๋ง๋ฅผ ์ ์ํ๊ณ , ์ด๋ฅผ earth.person
๋ผ๋ ์คํค๋ง์ pet
ํ๋์ ํ์
์ผ๋ก ์ฌ์ฉํ๋ ์์ ์ด๋ค.
schema = [{
"namespace": "earth",
"name": "animal",
"doc": "animal information",
"type": "record",
"fields": [
{"name": "pet_name", "type": "string"},
{"name": "type", "type": ["null", "string"]},
]
}, {
"namespace": "earth",
"name": "person",
"doc": "person information",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["null", "int", "string"]},
{"name": "pet", "type": ["null", "earth.animal"]},
]
}]
message = [{
"name": "John",
"age": "30",
"pet": {
"pet_name": "Maru",
"type": "dog"
}
}]
Union Schema
์์ Schema Reference ์ผ์ด์ค์์๋ xxxx.avro
ํ์ผ์ด ํ๋์ earth.pet
๊ณผ earth.person
, ๋ ๊ฐ์ avro ์คํค๋ง๊ฐ ํค๋์ ์ ์ฅ๋ ๊ฒ ์
๋๋ค. ์์ ์์์์๋ earth.person
์ ๋ํ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ ๊ฒ์ฒ๋ผ ์ ์์ง๋ง, ์๋์ ๊ฐ์ด earth.pet
๊ณผ earth.person
๋ฐ์ดํฐ๋ฅผ ํ๋์ avro ํ์ผ์ ์ ์ฅํ๋ ๊ฒ๋ ๊ฐ๋ฅํฉ๋๋ค.
...
message = [{
"name": "John",
"age": "30",
"pet": {
"name": "Maru",
"type": "dog"
}
}, {
"pet_name": "Maru-2",
"type": "human?"
}]
์ํผ ์ ํ๊ณ ์ถ์ ๋ง์ Avro๋ ๋ค์ค ์คํค๋ง๋ ์ง์ํฉ๋๋ค!
๋งบ์๋ง
Avro๋ ๋ช ๋ฐฑํ Schema๊ฐ ์๋ ์๋ฃ๊ตฌ์กฐ์ง๋ง, Union Field๋ Union Schema์ ์ผ์ด์ค๋ฅผ ๋ณด๋ฉด ๋ค์ค ํ์ ์ด๋ ๋ค์ค ์คํค๋ง๋ฅผ ์ง์ํ๋ค๋ ์ ์ฐ์ฑ๋ ๊ฐ์ถ๊ณ ์์ต๋๋ค.
ํฌ์คํธ๋ฅผ ์์ฑํ๋ฉด์ ์ง๊ธ์ ์ด๋ฒคํธ ํ์ดํ๋ผ์ธ ์ฝ๋๊ฐ ์ด๋ค ์๋๋ก ์ปค์คํ ์ ํ๋์ง ์์ธํ ์ดํด๋ณด๋ ๊ณ๊ธฐ๊ฐ ๋ ๊ฒ ๊ฐ์ต๋๋ค. ์์ผ๋ก ์ข ์ข Kafka๋ฅผ ๊ณต๋ถํ๋ฉฐ ์ ํ๋ ์ง์๋ค์ ์ ๋ฆฌํด๋ณด๊ฒ ์ต๋๋ค ใ ใ