Delta Lake 데이터 처리에 대한 고찰
회사에서 Databricks를 통해 Spark Cluster를 운영하고 있습니다. 본 글은 Databricks를 기준으로 작성했음을 미리 밝힙니다.
Create Delta Table
With Specific Name
CREATE TABLE `sample_schema`.`sample_table` (
id INT,
name STRING
) USING delta
만약 LOCATION
을 따로 지정하지 않으면, Databricks의 기본 경로에 _delta_log/
폴더와 함께 테이블이 생성된다. 웬만하면 아래와 같이 LOCATION
을 지정하자.
CREATE TABLE `sample_schema`.`sample_table` (
id INT,
name STRING
) USING delta
LOCATION `s3a://{BUCKET_NAME}/{PATH}`
Without Name
CREATE TABLE delta.`s3://{BUCKET_NAME}/{PATH}` (
id INT,
name STRING
) USING delta;
SELECT * FROM delta.`s3://{BUCKET_NAME}/{PATH}`
이것이 가능한 이유는 Delta 테이블은 스키마 정보가 Hive Metastore에 저장되는 것이 아닌 _delta_log/
폴더에 담기기 때문이다. 즉, 스키마 정보를 Hive Metastore에는 등록하지 않아도 데이터를 S3에서 읽을 수 있다.
앞에 붙은 delta
는 스키마 이름이 아니라 delta
테이블을 읽거나 만든다는 예약 키워드다.
Read Delta Table
With Table Name
테이블 이름을 지정했을 경우에는 {SCHEMA_NAME}.{TABLE_NAME}
으로 읽으면 된다.
SELECT * FROM `sample_schema`.`sample_table`
val df = spark.table("sample_schema.sample_table")
Scala에선 spark.read.table()
로 접근하는 것도 가능하다.
val df = spark.read.table("sample_schema.sample_table")
요 방식은 Delta 테이블에 Option을 붙일 수 있다.
val df = spark.read.option("versionAsOf", 1).table("sample_schema.sample_table")
Without Table Name
SELECT * FROM delta.`s3://{BUCKET_NAME}/{PATH}`
val df = spark.read.format("delta").load("s3://{BUCKET_NAME}/{PATH}")
Write to Delta Table
save vs. saveAsTable
// 테이블 이름으로 Write
df.write.format("delta").mode("append").saveAsTable("sample_schema.sample_table")
// S3에 바로 Write
df.write.format("delta").mode("append").save("s3://{BUCKET_NAME}/{PATH}")
append vs overwrite
// 기존 데이터는 남기고 append
df.write.format("delta").mode("append").saveAsTable("sample_schema.sample_table")
// 기존 데이터 모두 drop 하고 overwrite
df.write.format("delta").mode("overwrite").saveAsTable("sample_schema.sample_table")
replaceWhere
기존 데이터 중 일부를 대체하고 싶을 때 사용.
df.write.format("delta")
.mode("overwrite")
.option("replaceWhere", "id = 1")
.saveAsTable("sample_schema.sample_table")
with Commit Message
df.write.format("delta")
.mode("overwrite")
.option("userMetadata", "HAHA HOHO!")
.saveAsTable("sample_schema.sample_table")
이렇게 userMetadata
에 값을 주면, 이후에 Delta의 Version History를 검색할 때 유용하게 쓸 수 있다.
import io.delta.tables.DeltaTable
val d = DeltaTable.forName("example_schema.example_table")
val d_history = d.history.select("version", "operation", "userMetadata").show()
---
+-------+------------+------------+
|version| operation|userMetadata|
+-------+------------+------------+
| 6| WRITE|comment-test|
| 5| WRITE| null|
| 4| WRITE| null|
| 3| WRITE| null|
| 2| WRITE| null|
| 1| WRITE| null|
| 0|CREATE TABLE| null|
+-------+------------+------------+
Delete
Parquet 기반 테이블과 달리 DML 연산이 가능하다!! 다만, 삭제의 경우 데이터가 물리적으로 삭제 되는 것이 아니다! 기존에 읽던 .parquet
파일이 그대로 존재한다.
예를 들어, id
가 1부터 1000까지 있는 테이블에 아래와 같은 DELETE
연산을 해보자.
DELETE FROM example_schema.example_table
WHERE id BETWEEN 1 AND 500
이 경우, id=1~1000
데이터가 들어있던 .parquet
파일에서 삭제된 부분을 제외한 id=501~1000
데이터가 담긴 .parquet
파일이 새로 생성된다. 데이터를 절반쯤 날렸으니, 새로 생성된 .parquet
는 기존 파일의 절반 쯤의 크기를 갖게 된다.
이럼 점 때문에, DELETE
연산은 오히려 스토리지 용량이 늘어난다. 또, DELETE
연산을 할수록 여분 데이터가 복제되어 신규 버전에 쌓이는 것이기 때문에 DELETE
연산을 여러번 할수록 Delta에 담긴 .parquet
파일들과 크기의 총합이 횟수만큼 증가하게 된다.
단, 이때 변경되는 .parquet
파일은 DELETE
연산으로 영향을 받는 .parquet
파일만으로 한정 된다. 예를 들어, 데이터가 아래와 같이 2개의 Repartition으로 저장 되어 있었다고 하자.
id | repartition |
---|---|
1~50 | parquet-1 |
51~100 | parquet-2 |
만약, DELETE
연산으로 id 1부터 10까지의 데이터를 기준다고 해보자. 그러면, Delta는 parquet-1
파일을 읽어서 DELETE
연산을 처리한 새로운 .parquet
파일을 만든다. 연산 범위 밖에 있는 parquet-2
파일은 아무 영향 없이 그대로 존재한다!
Update
UPDATE example_schema.example_table
SET name = "hahaha"
WHERE id >= 150
UPDATE
도 마찬가지로 기존 parquet 데이터가 SET
내용을 적용한 후에, 새로운 .parquet
파일을 만든다. 이때, UPDATE
전의 기존 .parquet
와 적용 후의 .parquet
파일은 행 갯수가 같다.
즉, Delta의 DELETE
/UPDATE
연산 둘다 데이터의 온전한 스냅샷을 만든다는 걸 알 수 있다!!
Merge Into
Delta에서는 MERGE
라는 Upsert 연산을 지원한다. 만약 조건에 맞으면 기존 데이터를 갱신하고, 조건에 맞지 않으면 신규 데이터를 삽입한다.
MERGE INTO example_schema.example_table AS target
USING example_schema.product AS source
ON
target.id = source.id
WHEN MATCHED THEN
UPDATE SET
target.id = source.id,
target.name = source.name
WHEN NOT MATCHED THEN
INSERT *
위의 쿼리에는 UPDATE
, INSERT
동작을 수행하지만, DELETE
동작도 할 수 있다! 예를 들어, WHEN MATCHED THEN DELETE
로 만약 조건에 맞는 데이터가 있다면, 삭제할 수도 있다!!