Delta Lake Time Travel
ํ์ฌ์์ Databricks๋ฅผ ํตํด Spark Cluster๋ฅผ ์ด์ํ๊ณ ์์ต๋๋ค. ๋ณธ ๊ธ์ Databricks๋ฅผ ๊ธฐ์ค์ผ๋ก ์์ฑํ์์ ๋ฏธ๋ฆฌ ๋ฐํ๋๋ค.
Time Travel
Delta ํ ์ด๋ธ์ ํน์ ์์ ๊ณผ ํน์ ๋ฒ์ ์ผ๋ก ํ ์ด๋ธ ์๊ฐ ์ฌํ(Time Travel) ํ ์ ์๋ค. ๋๋ถ์ ๋ฐ์ดํฐ๋ฅผ ์ค์๋ก ์ญ์ ํ๋๋ผ๋, ์ง์ ๋ฒ์ ์ผ๋ก ๋์๊ฐ ๋ฐ์ดํฐ๋ฅผ ํ์ธํ ์ ์๋ค.
ํ์ฌ ์ด๋ค ๋ฒ์ ์ธ์ง ํ์ธํ๊ธฐ
์๋์ SQL ๊ตฌ๋ฌธ์ผ๋ก Delta ํ ์ด๋ธ์ ์์ ๊ธฐ๋ก์ ํ์ธํ ์ ์๋ค.
DESC HISTORY <schema>.<table>
version | timestamp | userId | operation |
---|---|---|---|
10 | 2024-06-14T01:12:19Z | xxxx | MERGE |
9 | 2024-06-14T00:54:37Z | xxxx | UPDATE |
8 | 2024-06-14T00:47:27Z | xxxx | DELETE |
7 | 2024-06-14T00:24:52Z | xxxx | DELETE |
6 | 2024-06-13T15:06:12Z | xxxx | WRITE |
Scala์์ ์๋์ ์ฝ๋๋ก ๊ฐ์ ์ ๋ณด๋ฅผ ํ์ธํ ์ ์๋ค.
import io.delta.tables.DeltaTable
val df = DeltaTable.forName("<schema>.<table>")
val history_df = df.history()
ํน์ ๋ฒ์ ์ผ๋ก ์ด๋
SQL์์ VERSION AS OF
๋ช
๋ น์ด๋ฅผ ๋ถ์ฌ์ ์ฟผ๋ฆฌํ๋ฉด ๋๋ค.
SELECT
*
FROM
<schema>.<table>
VERSION AS OF 1
์ด๋, @
๋ฅผ ์ฌ์ฉํด์ ๋ ์งง๊ฒ SQL ์ฝ๋๋ฅผ ์์ฑํ ์๋ ์๋ค.
SELECT * FROM <schema>.<table>@v1
ํน์ ๋ฒ์ ์ ์ฟผ๋ฆฌํ๋ ค๋ฉด @v
๋ฅผ ๋ถ์ด๊ณ , ํน์ ์์ ์ผ๋ก ์ฟผ๋ฆฌํ๋ ค๋ฉด @yyyyMMdd
ํฌ๋งท์ผ๋ก ์ ๊ทผํ๋ฉด ๋๋ค.
Scala์์๋ ํ
์ด๋ธ์ ์ฝ์ ๋ versionAsOf
์ต์
์ ์ฃผ๋ฉด ๋๋ค.
val df = spark.read.option("versionAsOf", 1).table("<schema>.<table>")
println(df.count())
์์ @
๋ฌธ๋ฒ์ Scala, Python์์๋ ๋ชจ๋ ํตํ๋ค.
spark.table("<schema>.<table>@v1")
ํน์ ์์ ์ผ๋ก ์ด๋
SQL์์ TIMESTAMP AS OF
๋ช
๋ น์ด๋ฅผ ๋ถ์ฌ์ ์ฟผ๋ฆฌํ๋ฉด ๋๋ค.
SELECT
*
FROM
<schema>.<table>
TIMESTAMP AS OF '2024-06-06'
timestamp ํ์์ yyyy-MM-DD hh:mm:ss.0
์ผ๋ก ์ค ์๋ ์๊ณ , DATE_SUB(CURRENT_DATE(), 7)
์ ๊ฐ์ด SQL ํจ์๋ฅผ ํตํด ๊ฐ์ ์ ๋ฌํ ์๋ ์๋ค.
Scala์์ timestampAsOf
์ต์
์ ์ฃผ๋ฉด ๋๋ค.
val df = spark.read.option("timestampAsOf", "2024-06-06").table("<schema>.<table>")
println(df.count())
ํน์ ์์ ์ผ๋ก ๋ณต๊ตฌ
SQL์์ RESTORE
๋ช
๋ น์ด๋ก ํ
์ด๋ธ์ ๋ณต๊ตฌํ๋ฉด ๋๋ค.
RESTORE TABLE <schema>.<table> TO VERSION AS OF 1
์ด ๊ฒฝ์ฐ, ํ์ฌ ๋ฒ์ ์์ +1
์ ํด์ RESTORE
๋ฒ์ ์ ํ๋ ์ถ๊ฐํ์ฌ ํ
์ด๋ธ์ ๋ณต๊ตฌํ๋ค. ๋จ, Delta ํ์คํ ๋ฆฌ์ ์ถ๊ฐ๋ ๋ฟ ์๋ก์ด .parquet
ํ์ผ์ด ์๊ธฐ๊ฑฐ๋ ํ์ง ์๋๋ค.
Scala์์ ์๋์ ๊ฐ์ด ์คํํ๋ฉด ๋๋ค.
import io.delta.tables.DeltaTable
val df = DeltaTable.forName("<schema>.<table>")
df.restoreToVersion(1)
df.restoreToTimestamp("2024-06-06")