Delta ํ…Œ์ด๋ธ”์˜ ๋ฒ„์ „ ๊ด€๋ฆฌ๋ฅผ ํ†ตํ•ด ํŠน์ • ์‹œ์ ๊ณผ ํŠน์ • ๋ฒ„์ „์œผ๋กœ ์‹œ๊ฐ„ ์—ฌํ–‰ํ•˜๊ธฐ. ํŠน์ • ๋ฒ„์ „์œผ๋กœ ๋ณต๊ตฌํ•˜๋Š” ๊ฒƒ๊นŒ์ง€!

2 minute read

ํšŒ์‚ฌ์—์„œ Databricks๋ฅผ ํ†ตํ•ด Spark Cluster๋ฅผ ์šด์˜ํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ๋ณธ ๊ธ€์€ Databricks๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์ž‘์„ฑํ–ˆ์Œ์„ ๋ฏธ๋ฆฌ ๋ฐํž™๋‹ˆ๋‹ค.

Time Travel

Delta ํ…Œ์ด๋ธ”์€ ํŠน์ • ์‹œ์ ๊ณผ ํŠน์ • ๋ฒ„์ „์œผ๋กœ ํ…Œ์ด๋ธ” ์‹œ๊ฐ„ ์—ฌํ–‰(Time Travel) ํ•  ์ˆ˜ ์žˆ๋‹ค. ๋•๋ถ„์— ๋ฐ์ดํ„ฐ๋ฅผ ์‹ค์ˆ˜๋กœ ์‚ญ์ œํ•˜๋”๋ผ๋„, ์ง์ „ ๋ฒ„์ „์œผ๋กœ ๋Œ์•„๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

ํ˜„์žฌ ์–ด๋–ค ๋ฒ„์ „์ธ์ง€ ํ™•์ธํ•˜๊ธฐ

์•„๋ž˜์˜ SQL ๊ตฌ๋ฌธ์œผ๋กœ Delta ํ…Œ์ด๋ธ”์˜ ์ˆ˜์ • ๊ธฐ๋ก์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

DESC HISTORY <schema>.<table>
version timestamp userId operation
10 2024-06-14T01:12:19Z xxxx MERGE
9 2024-06-14T00:54:37Z xxxx UPDATE
8 2024-06-14T00:47:27Z xxxx DELETE
7 2024-06-14T00:24:52Z xxxx DELETE
6 2024-06-13T15:06:12Z xxxx WRITE

Scala์—์„  ์•„๋ž˜์˜ ์ฝ”๋“œ๋กœ ๊ฐ™์€ ์ •๋ณด๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

import io.delta.tables.DeltaTable

val df = DeltaTable.forName("<schema>.<table>")
val history_df = df.history()

ํŠน์ • ๋ฒ„์ „์œผ๋กœ ์ด๋™

SQL์—์„  VERSION AS OF ๋ช…๋ น์–ด๋ฅผ ๋ถ™์—ฌ์„œ ์ฟผ๋ฆฌํ•˜๋ฉด ๋œ๋‹ค.

SELECT
  *
FROM
  <schema>.<table>
  VERSION AS OF 1

์ด๋•Œ, @๋ฅผ ์‚ฌ์šฉํ•ด์„œ ๋” ์งง๊ฒŒ SQL ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

SELECT * FROM <schema>.<table>@v1

ํŠน์ • ๋ฒ„์ „์„ ์ฟผ๋ฆฌํ•˜๋ ค๋ฉด @v๋ฅผ ๋ถ™์ด๊ณ , ํŠน์ • ์‹œ์ ์œผ๋กœ ์ฟผ๋ฆฌํ•˜๋ ค๋ฉด @yyyyMMdd ํฌ๋งท์œผ๋กœ ์ ‘๊ทผํ•˜๋ฉด ๋œ๋‹ค.

Scala์—์„œ๋Š” ํ…Œ์ด๋ธ”์„ ์ฝ์„ ๋•Œ versionAsOf ์˜ต์…˜์„ ์ฃผ๋ฉด ๋œ๋‹ค.

val df = spark.read.option("versionAsOf", 1).table("<schema>.<table>")
println(df.count())

์œ„์˜ @ ๋ฌธ๋ฒ•์€ Scala, Python์—์„œ๋„ ๋ชจ๋‘ ํ†ตํ•œ๋‹ค.

spark.table("<schema>.<table>@v1")

ํŠน์ • ์‹œ์ ์œผ๋กœ ์ด๋™

SQL์—์„  TIMESTAMP AS OF ๋ช…๋ น์–ด๋ฅผ ๋ถ™์—ฌ์„œ ์ฟผ๋ฆฌํ•˜๋ฉด ๋œ๋‹ค.

SELECT
  *
FROM
  <schema>.<table>
  TIMESTAMP AS OF '2024-06-06'

timestamp ํ˜•์‹์€ yyyy-MM-DD hh:mm:ss.0์œผ๋กœ ์ค„ ์ˆ˜๋„ ์žˆ๊ณ , DATE_SUB(CURRENT_DATE(), 7)์™€ ๊ฐ™์ด SQL ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด ๊ฐ’์„ ์ „๋‹ฌํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

Scala์—์„  timestampAsOf ์˜ต์…˜์„ ์ฃผ๋ฉด ๋œ๋‹ค.

val df = spark.read.option("timestampAsOf", "2024-06-06").table("<schema>.<table>")
println(df.count())

ํŠน์ • ์‹œ์ ์œผ๋กœ ๋ณต๊ตฌ

SQL์—์„  RESTORE ๋ช…๋ น์–ด๋กœ ํ…Œ์ด๋ธ”์„ ๋ณต๊ตฌํ•˜๋ฉด ๋œ๋‹ค.

RESTORE TABLE <schema>.<table> TO VERSION AS OF 1

์ด ๊ฒฝ์šฐ, ํ˜„์žฌ ๋ฒ„์ „์—์„œ +1์„ ํ•ด์„œ RESTORE ๋ฒ„์ „์„ ํ•˜๋‚˜ ์ถ”๊ฐ€ํ•˜์—ฌ ํ…Œ์ด๋ธ”์„ ๋ณต๊ตฌํ•œ๋‹ค. ๋‹จ, Delta ํžˆ์Šคํ† ๋ฆฌ์— ์ถ”๊ฐ€๋  ๋ฟ ์ƒˆ๋กœ์šด .parquet ํŒŒ์ผ์ด ์ƒ๊ธฐ๊ฑฐ๋‚˜ ํ•˜์ง„ ์•Š๋Š”๋‹ค.

Scala์—์„  ์•„๋ž˜์™€ ๊ฐ™์ด ์‹คํ–‰ํ•˜๋ฉด ๋œ๋‹ค.

import io.delta.tables.DeltaTable

val df = DeltaTable.forName("<schema>.<table>")

df.restoreToVersion(1)
df.restoreToTimestamp("2024-06-06")

References