Delta Lake Time Travel
ํ์ฌ์์ Databricks๋ฅผ ํตํด Spark Cluster๋ฅผ ์ด์ํ๊ณ ์์ต๋๋ค. ๋ณธ ๊ธ์ Databricks๋ฅผ ๊ธฐ์ค์ผ๋ก ์์ฑํ์์ ๋ฏธ๋ฆฌ ๋ฐํ๋๋ค.
Time Travel

Delta ํ ์ด๋ธ์ ํน์  ์์ ๊ณผ ํน์  ๋ฒ์ ์ผ๋ก ํ ์ด๋ธ ์๊ฐ ์ฌํ(Time Travel) ํ ์ ์๋ค. ๋๋ถ์ ๋ฐ์ดํฐ๋ฅผ ์ค์๋ก ์ญ์ ํ๋๋ผ๋, ์ง์  ๋ฒ์ ์ผ๋ก ๋์๊ฐ ๋ฐ์ดํฐ๋ฅผ ํ์ธํ ์ ์๋ค.
ํ์ฌ ์ด๋ค ๋ฒ์ ์ธ์ง ํ์ธํ๊ธฐ
์๋์ SQL ๊ตฌ๋ฌธ์ผ๋ก Delta ํ ์ด๋ธ์ ์์  ๊ธฐ๋ก์ ํ์ธํ ์ ์๋ค.
DESC HISTORY <schema>.<table>
| version | timestamp | userId | operation | 
|---|---|---|---|
| 10 | 2024-06-14T01:12:19Z | xxxx | MERGE | 
| 9 | 2024-06-14T00:54:37Z | xxxx | UPDATE | 
| 8 | 2024-06-14T00:47:27Z | xxxx | DELETE | 
| 7 | 2024-06-14T00:24:52Z | xxxx | DELETE | 
| 6 | 2024-06-13T15:06:12Z | xxxx | WRITE | 
Scala์์ ์๋์ ์ฝ๋๋ก ๊ฐ์ ์ ๋ณด๋ฅผ ํ์ธํ ์ ์๋ค.
import io.delta.tables.DeltaTable
val df = DeltaTable.forName("<schema>.<table>")
val history_df = df.history()
ํน์  ๋ฒ์ ์ผ๋ก ์ด๋
SQL์์  VERSION AS OF ๋ช
๋ น์ด๋ฅผ ๋ถ์ฌ์ ์ฟผ๋ฆฌํ๋ฉด ๋๋ค.
SELECT
  *
FROM
  <schema>.<table>
  VERSION AS OF 1
์ด๋, @๋ฅผ ์ฌ์ฉํด์ ๋ ์งง๊ฒ SQL ์ฝ๋๋ฅผ ์์ฑํ  ์๋ ์๋ค.
SELECT * FROM <schema>.<table>@v1
ํน์  ๋ฒ์ ์ ์ฟผ๋ฆฌํ๋ ค๋ฉด @v๋ฅผ ๋ถ์ด๊ณ , ํน์  ์์ ์ผ๋ก ์ฟผ๋ฆฌํ๋ ค๋ฉด @yyyyMMdd ํฌ๋งท์ผ๋ก ์ ๊ทผํ๋ฉด ๋๋ค.
Scala์์๋ ํ
์ด๋ธ์ ์ฝ์ ๋ versionAsOf ์ต์
์ ์ฃผ๋ฉด ๋๋ค.
val df = spark.read.option("versionAsOf", 1).table("<schema>.<table>")
println(df.count())
์์ @ ๋ฌธ๋ฒ์ Scala, Python์์๋ ๋ชจ๋ ํตํ๋ค.
spark.table("<schema>.<table>@v1")
ํน์  ์์ ์ผ๋ก ์ด๋
SQL์์  TIMESTAMP AS OF ๋ช
๋ น์ด๋ฅผ ๋ถ์ฌ์ ์ฟผ๋ฆฌํ๋ฉด ๋๋ค.
SELECT
  *
FROM
  <schema>.<table>
  TIMESTAMP AS OF '2024-06-06'
timestamp ํ์์ yyyy-MM-DD hh:mm:ss.0์ผ๋ก ์ค ์๋ ์๊ณ , DATE_SUB(CURRENT_DATE(), 7)์ ๊ฐ์ด SQL ํจ์๋ฅผ ํตํด ๊ฐ์ ์ ๋ฌํ  ์๋ ์๋ค.
Scala์์  timestampAsOf ์ต์
์ ์ฃผ๋ฉด ๋๋ค.
val df = spark.read.option("timestampAsOf", "2024-06-06").table("<schema>.<table>")
println(df.count())
ํน์  ์์ ์ผ๋ก ๋ณต๊ตฌ
SQL์์  RESTORE ๋ช
๋ น์ด๋ก ํ
์ด๋ธ์ ๋ณต๊ตฌํ๋ฉด ๋๋ค.
RESTORE TABLE <schema>.<table> TO VERSION AS OF 1
์ด ๊ฒฝ์ฐ, ํ์ฌ ๋ฒ์ ์์ +1์ ํด์ RESTORE ๋ฒ์ ์ ํ๋ ์ถ๊ฐํ์ฌ ํ
์ด๋ธ์ ๋ณต๊ตฌํ๋ค. ๋จ, Delta ํ์คํ ๋ฆฌ์ ์ถ๊ฐ๋  ๋ฟ ์๋ก์ด .parquet ํ์ผ์ด ์๊ธฐ๊ฑฐ๋ ํ์ง ์๋๋ค.
Scala์์ ์๋์ ๊ฐ์ด ์คํํ๋ฉด ๋๋ค.
import io.delta.tables.DeltaTable
val df = DeltaTable.forName("<schema>.<table>")
df.restoreToVersion(1)
df.restoreToTimestamp("2024-06-06")
