Spark Adaptive Query Execution
Databricks Certification ์ทจ๋์ ๋ชฉํ๋ก Apache Spark๋ฅผ โ์ ๋๋กโ ๊ณต๋ถํด๋ณด๊ณ ์์ต๋๋ค. ํ์ฌ์์ Databricks Unity Catalog๋ฅผ ๋์ ํ๋ ค๊ณ ๋ถํฌํ๊ณ ์๋๋ฐ์. Spark์ ์ข ์นํด์ง ์ ์์๊น์? ๐ ์ ์ฒด ํฌ์คํธ๋ Development - Spark์์ ํ์ธํด์ค ์ ์์ต๋๋ค.
Adaptive Query Execution์ด๋?
๋ณธ์ธ ํ์ฌ๋ 2018๋
๋ถํฐ Databricks๋ฅผ ๋์
ํด Spark๋ฅผ ์ฌ์ฉํ๊ณ ์์๋ค. ์ฌ์ฉํ์ง ์ค๋ ๋์ด์ ๊ทธ๋ฐ์ง Databricks Job๋ค์ ์ ๋ง ๋ค์ํ Spark Config๋ค์ด ์ธํ
๋์ด ์์๋ค. ์์ฆ ํ์ฌ์์ ์ฌ์ฉํ๋ Spark Config๋ฅผ ์ ๋ฆฌํด์ ๋ฐํ๋ฅผ ์ค๋นํ๊ณ ์๋๋ฐ, spark.sql.adaptive.enabled
๋ผ๋ config๊ฐ ๋ด ๋๊ธธ์ ๋๊ฒ ๋์๊ณ , ๊ทธ๋ ๊ฒ ์ค๋์ ์ฃผ์ ์ธ Adaptive Query Execution(์ดํ AQE)์ ๋ํด ์ดํด๋ณด๊ฒ ๋์๋ค.
AQE ๊ธฐ๋ฅ์ Spark 3.0๋ถํฐ ๋์ ๋ Spark Optimization ํ ํฌ๋์ด๋ค. ์ฐธ๊ณ ๋ก Spark 3.0์ 2020๋ 6์์ ๊ณต๊ฐ ๋์๊ณ , Spark 3.2๋ถํฐ๋ ๋ฐ๋ก ์ค์ ํ์ง ์์ผ๋ฉด AQE enabled๊ฐ ๊ธฐ๋ณธ๊ฐ์ด๋ค.
Dynamically coalescing shuffle partitions
Spark์์ ๊ฑฐ๋ํ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃฐ ๋, ์
ํ๋ง(shuffling)์ ์ฟผ๋ฆฌ ํผํฌ๋จผ์ค์ ํฐ ์ํฅ์ ๋ผ์น๋ ์์์ด๋ค. ์
ํ๋ง์ Executor๊ฐ ์ฒ๋ฆฌํ๋ ํํฐ์
๋ฐ์ดํฐ๋ฅผ ๋ค์ ์ฌ์ ๋ ฌ ํ๋ ๊ณผ์ ์ผ๋ก groupByKey
, reduceByKey
, join
, distinct
, repartition
๋ฑ์ ์ฐ์ฐ์ ์ํํ ๋ ๋ฐ์ํ๋ค.
์์ ์ฌ์ง์ด Partition์ด Shuffling ๋๋ ๊ณผ์ ์ ํํํ ๊ฒ์ผ๋ก ์๊น๋ณ๋ก ๋ฐ์ดํฐ๊ฐ ์ฌ์ ๋ ฌ ๋๋ ๊ฑธ ๋ณผ ์ ์๋ค. ์์ ๊ฒฝ์ฐ, ์ ํ๋ ํํฐ์ (์ดํ ์ ํ ํํฐ์ )์ด 5๊ฐ๋ก ์ ํ๋ง ๋์๋๋ฐ, [2, 3, 4]๋ฒ ๋ฐ์ดํฐ ๊ฒฝ์ฐ ํฌ๊ธฐ๊ฐ ์๊ณ , [1, 4]๋ฒ ๋ฐ์ดํฐ๋ ํฌ๊ธฐ๊ฐ ์๋์ ์ผ๋ก ํฐ ๊ฒ์ ๋ณผ ์ ์๋ค. ์ด ๊ฒฝ์ฐ, [2, 3, 4]๋ฒ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ ๊ธ๋ฐฉ ๋๋๋ผ๋ [1, 4]๋ฒ ๋ฐ์ดํฐ์ ์ฒ๋ฆฌ๊ฐ ์๋ฃ ๋ ๋๊น์ง ๊ธฐ๋ค๋ ค์ผ ํ๋ค.
Spark AQE๋ ์ด์งํผ ์ ์ฒด ์์ ์ด ์๋ฃ ๋๋ ค๋ฉด, [1, 4]๋ฒ ๋ฐ์ดํฐ์ ์์ ์ ๊ธฐ๋ค๋ ค์ผ ํ๋๋ฐ, ์ด๋ด ๊บผ๋ฉด [2, 3, 4]๋ฒ ๋ฐ์ดํฐ๋ฅผ ๋ญ์ณ์ [1, 4]๊ณผ ๋น์ทํ๊ฒ ๋ง๋ค๊ณ , ์ฐ์ฐ์ ์ฌ์ฉํ๋ Task ๊ฐฏ์๋ ์ค์ด๋๊ฒ ํจ์จ์ ์ด๋ผ๊ณ ํ๋จํ๋ค.
๋ณธ๋ Spark์๋ ์
ํ๋ง ํ์ ํํฐ์
๊ฐฏ์๋ฅผ ์ง์ ํ๋ spark.sql.shuffle.partitions
๊ฐ์ด ์๊ณ , ๋ฐ๋ก ์ค์ ํ์ง ์๋๋ค๋ฉด โ200โ์ด ๊ธฐ๋ณธ๊ฐ์ด๋ค. ๋ง์ฝ ์ด ๊ฐ์ ๊ทธ๋๋ก ๋ฐ๋ฅธ๋ค๋ฉด, ์
ํ ํํฐ์
๊ฐฏ์๊ฐ ํญ์ 200๊ฐ์ฉ ๋ง๋ค์ด์ง๋ค. ๋๋ฌด ๋ง์ ํํฐ์
์ ๋ง์ ํ์คํฌ๋ฅผ ๋ง๋ค๊ณ , ์ด๋ ์์
๋ถํ๋ก ์ด์ด์ง๋ฏ๋ก ์
ํ ํํฐ์
์ ์ ์ ํ ์ธํ
ํด์ผ ํ๋ค.
์ Dynamic coalescing์ด ์ด๋ป๊ฒ ๊ตฌํ๋์ด ์๋์ง๋ ๋ชจ๋ฅด๊ฒ ์ง๋ง, spark.sql.adaptive.coalescePartitions.initialPartitionNum
์ผ๋ก ์ด๊ธฐ ์
ํ ํํฐ์
์ ์ง์ ํด์ผ ํ๋ ๊ฑธ๋ก ๋ด์๋ ์๋ง ์ฒ์์ ๋นํจ์จ์ ์ธ ์
ํ ํํฐ์
์ ํ๋ฒ ์ํํ๊ณ , ๊ทธ ์ดํ์ AQE Optimizer๊ฐ ๋๋ฉด์, Coalescing์ ์ํํด ์
ํ ํํฐ์
๊ฐฏ์๋ฅผ ์ค์ฌ์ฃผ๋ ๊ฒ ๊ฐ๋ค. ์ฆ, ์
ํ๋ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ํ์ ์์
์ด ๋ ์ ์ ๊ฐฏ์์ Task๋ฅผ ์ฌ์ฉํ๊ฒ ๋๋ ๊ฒ.
์ฐธ๊ณ ๋ก coalescePartitions.initialPartitionNum
์ ๋ฐ๋ก ์ง์ ํ์ง ์์ผ๋ฉด, shuffle.partitions
๊ฐ์ ๊ทธ๋๋ก ์ฐ๊ณ , ๊ทธ๊ฒ์ ๊ธฐ๋ณธ๊ฐ์ 200์ด๋ค.
์
ํ ํํฐ์
์ ์ผ๋งํผ ๋ณํฉํ๋์ง๋ 2๊ฐ์ง ๊ธฐ์ค์ด ์กด์ฌํ๋ค. spark.sql.adaptive.coalescePartitions.parallelismFirst
๋ผ๋ ๊ฐ์ด ์ฐธ/๊ฑฐ์ง์ธ์ง์ ๋ฐ๋ผ ๋ค๋ฅด๋ค.
parallelismFirst = true
- ๋์์ฒ๋ฆฌ๋ฅผ ์ต๋ํ ํ๊ธฐ ์ํด, ๋ฐ์ดํฐ๋ฅผ ๋๋ฌด ๋ง์ด ๋ณํฉํ์ง ์๊ณ ํํฐ์ ์ ์ ๋นํ ๋ณํฉํ๋ค.
- ๊ทธ๋์ ์
ํ ํํฐ์
์ ๋ณํฉ์ด
spark.sql.adaptive.coalescePartitions.minPartitionSize
์ ๋ช ์๋ ๊ฐ(default:1Mb
)์ ํฌ๊ธฐ ๊ฐ ๋๋ฉด, ๋์ด์ ๋ณํฉํ์ง ์๋๋ค. ๋,coalescePartitions.minPartitionSize
๊ฐ์ ์๋์ ๋์ฌadvisoryPartitionSizeInBytes
๊ฐ์ ์ต๋ 20% ์ ๋๋ก๋ง ์ค์ ๊ฐ๋ฅํ๋ค. - Spark 3.2.0๋ถํฐ ๋์ ๋ ์ค์ ๊ฐ์ผ๋ก, Dynamic Coalescing์ผ๋ก ์ธํ ์ฑ๋ฅ ์ ํ๋ฅผ ๋ฐฉ์งํ๊ธฐ ์ํจ.
parallelismFirst = false
- ๋ณธ๋ Dynamic Coalescing์ ๋์ ๋ฐฉ์์ ์ฌ์ฉํ๋ค.
advisoryPartitionSizeInBytes
์ ๊ฐ(default:64MB
)์ ๊ฐ๊น๋๋ก ์ ํ ํํฐ์ ์ ๋ณํฉํ๋ค.
Dynamically switching join strategies
์ผ๋จ ์๊ฑธ ์ดํดํ๊ธฐ ์ ์ Spark์ Join ๋ฐฉ์ 2๊ฐ์ง๋ฅผ ์ดํดํด์ผ ํ๋ค.
Sort Merge Join
์์ ๊ทธ๋ฆผ๊ณผ ๊ฐ์ด Customer
ํ
์ด๋ธ๊ณผ Cities_Dictionary
ํ
์ด๋ธ์ cityID
๋ผ๋ ์ปฌ๋ผ์ ๊ธฐ์ค์ผ๋ก join ํ๋ ๊ฒฝ์ฐ๋ฅผ ์ดํด๋ณด์.
๋จผ์ , ๋ ํ
์ด๋ธ์ cityID
๋ฅผ ๊ธฐ์ค์ผ๋ก ์ ๋ ฌ(Sort) ํ๋ค. ์์ ๊ทธ๋ฆผ ์์ผ๋ก๋ ์ด๋ฏธ ์ ๋ ฌ๋ ์ํ๋ก ๋์ ์๋ค.
๊ทธ ๋ค์์ ๋ณํฉ(Merge) ํ๋ค. ๊ฐ ํ
์ด๋ธ์์ cityID
์ ๊ฐ์ฅ ์ฒ์ ๊ฐ์ ํฌ์ธํฐ(Pointer)๋ฅผ ๋๊ณ , ํ๋์ฉ ํ๋์ฉ ํ์ ์ํํ๋ค. ๋ง์ฝ ๋ ํ
์ด๋ธ์ ๊ฐ์ด ์ผ์น(=
) ํ๋ค๋ฉด ๋ณํฉ์ ์ํํ๊ณ , ๋งค์นญ ๋์ง ์๋ ๋ค๋ฉด, ๋น๊ต ํ๋ ์ค๋ฅธ์ชฝ ํ
์ด๋ธ์ ํฌ์ธํฐ๊ฐ ๋ค์ ํ์ ๊ฐ๋ฆฌํค๊ฒ ํ๋ค.
์๊ฐ ๋ณต์ก๋๋ฅผ ์๊ธฐํ์๋ฉด, $O(A \log A + B \log B)$.
Broadcast Hash Join
์ Join ๋ฐฉ๋ฒ์ Broadcast Join๊ณผ Hash Join, ๋ ๋ฐฉ์์ด ๊ฒฐํฉํ ๋ฐฉ์์ด๋ค.
Hash Join์ ๋ํด์ ๋จผ์ ์ค๋ช ํ๋ฉด, ์๋์ ๊ฐ์ pseudo-code๋ฅผ ๊ฐ์ง๋ค.
def hash_join(left: Table, right: Table, join_key: Column):
hash_table = {}
# Generate hash table
for idx, row in left:
hash_key = row[join_key]
if hash_key in hash_table:
hash_table[hash_key].append(idx)
else:
hash_table[hash_key] = [idx]
ret = []
# Do join using hash table
for idx, row in right:
hash_key = row[join_key]
if hash_key in hash_table:
for idx in hash_table[hash_key]:
ret.append(row.join(left[idx]))
else:
continue
return ret
ํ ์ด๋ธ ์ค ํ๋๋ก Hash Table์ ๋ง๋ค๊ณ , ๋ฐ๋ ํ ์ด๋ธ์์ ๊ทธ Hash Table๊ณผ ๋งค์นญํ๋ ํ์ด ์๋์ง ์ฐพ์์ Join ํ๋ ๊ธฐ๋ฒ์ด๋ค. ์ฒ์์ Hash Table์ ๋ง๋๋ ๋ฐ์ ์ ํ ์๊ฐ์ด, ๊ทธ๋ฆฌ๊ณ ๋ฐ๋ ํ ์ด๋ธ์ Hash Table์ ์ ์ฉํด ๊ฒฐ๊ณผ๋ฅผ ๋ง๋๋ ๋ฐ์ ์ ํ ์๊ฐ์ด ๊ฑธ๋ฆฐ๋ค. ์๊ฐ ๋ณต์ก๋๋ $O(A + B)$.
Broadcast Join์ ์ฐ์ฐ์ ๋์์ด ๋๋ ํ ์ด๋ธ ๋ ์ค ํ๋๊ฐ ์ถฉ๋ถํ ์์ ๋ ์ฌ์ฉํ๋ ๊ธฐ๋ฒ์ผ๋ก, Small ํ ์ด๋ธ ์ ์ฒด๋ฅผ ๋ชจ๋ Worker ๋ ธ๋์ ๋ฟ๋ฆฌ๊ณ , ๊ทธ ์์์ Join์ ์ํํ๋ ๊ธฐ๋ฒ์ด๋ค.
๋ณธ๋ Spark์์์ Join์ ๋ ํ ์ด๋ธ์ ์ ํ๋ง ํ ํ์, ๊ฐ์ ํค๋ฅผ ๊ฐ์ง ๋ ์ฝ๋๊ฐ ๊ฐ์ ํํฐ์ ์ ๋ฐฐ์น ํ๋ค. ๊ทธ๋ฆฌ๊ณ , ๊ฐ์ ํค๋ฅผ ๊ฐ์ง ๋ ํ ์ด๋ธ์ ํํฐ์ ์ ๊ฐ์ Worker ๋ ธ๋์ ๋ฐฐ์นํ๊ณ , ๊ทธ ์์์ ๋ก์ปฌ Join์ ์ํํ๋ค.
๋ฐ๋ฉด, Broadcast Join์ ์ ํ๋ง ๊ณผ์ ์ด ์๋ค. Small ํ ์ด๋ธ๊ณผ Large ํ ์ด๋ธ ๋๋ค ์ ํ๋ง ํ์ง ์๊ณ , ์ฒ์ ํํฐ์ ๊ทธ๋๋ก ๊ฐ ์์ปค์ ๋ถ๋ฐฐ๋์ด ๊ทธ๊ณณ์์ ๋ก์ปฌ Join ๋๋ค. ์ ํ๋ง์ด๋ผ๋ ์ฐ์ฐ์ ํ์ง ์๊ธฐ ๋๋ฌธ์ ๊ณ์ฐ๊ณผ ๋คํธ์ํฌ ์ด๋์์ ์ด๋์ ์ป๋๋ค.
์ํผ ์ ๋ฆฌํ๋ฉด, Broadcast Hash Join์ Broadcast Join์ธ๋ฐ, ๋ก์ปฌ Join์ ๋ฐฉ์์ด Hash Join์ด๋ผ๋ ๋ง!!
Dynamically switching join strategies: Sort-Merge โ Broadcast Hash
Spark AQE๋ Join ๋๋ ๋ ํ ์ด๋ธ์ ์ฌ์ด์ฆ๋ฅผ runtime์์ ํ์ธํ๊ณ , ๋ง์ฝ ํ์ชฝ ํ ์ด๋ธ์ ํฌ๊ธฐ๊ฐ ์ถฉ๋ถํ ์๋ค๋ฉด, Broadcast Hash Join์ผ๋ก ๋ณ๊ฒฝ ํ ์ฒ๋ฆฌํ๋ค.
ํ
์ด๋ธ ํฌ๊ธฐ๊ฐ ์ถฉ๋ถํ ์์์ง๋ spark.sql.adaptive.autoBroadcastJoinThreshold
์ ๋ช
์๋ ๊ฐ์ผ๋ก ํ๋จํ๋ค. ๋ง์ฝ, ๋ฐ๋ก ์ค์ ํด์ฃผ์ง ์๋๋ค๋ฉด, spark.sql.autoBroadcastJoinThreshold
์ ๊ฐ์ ๊ทธ๋๋ก ์ฌ์ฉํ๋ฉฐ, ๊ทธ๋์ ๊ธฐ๋ณธ๊ฐ์ 10Mb
์ด๋ค.
Dynamically optimizing skew joins
๋ฐ์ดํฐ์ ๋ทธ๊ท ํ(skewing)์ ํํฐ์ ๋ ๋ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃฐ ๋, ์ฑ๋ฅ ์ ํ๋ฅผ ์ผ๊ธฐํ ์ ์๋ค.
์์ ๊ทธ๋ฆผ์ ๊ฒฝ์ฐ, A0
๋ฐ์ดํฐ๊ฐ ํน๋ณํ ๋ง์ ์ํ๋ผ A0
์ B0
๋ฅผ Join ํ๋ ๊ฒ์ด ๋ณ๋ชฉ์ผ๋ก ์์ฉํ๋ค. Spark AQE์์ ์ด๋ ๊ฒ ํน๋ณํ ํ๋์ ํํฐ์
์ ๋ฐ์ดํฐ๊ฐ ๋ง์ ๊ฒฝ์ฐ๋ฅผ ํธ๋ค๋ง ํ๊ธฐ ์ํด, ๊ทธ ํํฐ์
์ ์ ๋ฐ ์ ๋๋ก ๋ถํ ํ์ฌ Join์ ์ํํ๋ค.
์์ ๊ทธ๋ฆผ์์ A0-0
, A0-1
์ด ๋ถํ ๋ ํํฐ์
์ ํด๋นํ๊ณ , Right Table์ $B0$๋ ๋ถํ ๋ $A0$ ํํฐ์
๊ณผ Join ํ๊ธฐ ์ํด ๋ณต์ ๋์๋ค.
ํํฐ์ ๊ฐฏ์๊ฐ ๋ง์์ง์ ๋ฐ๋ผ Task ์๋ ๋ง์์ก์ง๋ง, Executor ์๊ฐ ์ถฉ๋ถํ๋ค๋ฉด ์ ์ฒด ์์ ์ ์ฒ๋ฆฌ ์๊ฐ์ด ๋ ๊ฐ์ ๋ ๊ฒ์ด๋ค.
ํน์ ํํฐ์ ์ด skew ๋์ด์๋์ง๋ ์๋ ๋ ์์๋ฅผ ๊ธฐ์ค์ผ๋ก ํ๋จํ๋ค.
spark.sql.adaptive.skewJoin.skewedPartitionFactor
- ๋ฐ์ดํฐ์ ํํฐ์
์ฌ์ด์ฆ์ ์ค์๊ฐ(median)์ ๊ธฐ์ค์ผ๋ก ์ด ๊ฐ(default:
5.0
)์ ๋ฐฐ์๋ณด๋ค ํฌ๋ค๋ฉด, Skewed Partition์ด๋ผ๊ณ ํ๋จํ๋ค. - ๋จ,
skewedPartitionFactor
์กฐ๊ฑด๊ณผ ์๋์skewedPartitionThresholdInBytes
์กฐ๊ฑด์ ๋์์ ๋ง์กฑํด์ผ ํจ.
- ๋ฐ์ดํฐ์ ํํฐ์
์ฌ์ด์ฆ์ ์ค์๊ฐ(median)์ ๊ธฐ์ค์ผ๋ก ์ด ๊ฐ(default:
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
- ๋ฐ์ดํฐ์ ํํฐ์
์ฌ์ด์ฆ๊ฐ ์ด ๊ฐ(default:
256Mb
)๋ณด๋ค ํฌ๋ค๋ฉด, Skewed Partition์ด๋ผ๊ณ ํ๋จํ๋ค. - ์ด๋, Dynamic Coalescing์ ๊ด์ฌํ๋
spark.sql.adaptive.advisoryPartitionSizeInBytes
๊ฐ๋ณด๋ค๋ ํฌ๋๋ก ์ค์ ํ ๊ฒ์ ๊ถ์ฅํ๋ค. ๊ทธ๋ ์ง ์์ผ๋ฉด, Skewed Partition์ด ๋ถํ ๋ ํ์ Dynamic Coalescing์ด ๋ฐ์ํ๊ธฐ ๋๋ฌธ.
- ๋ฐ์ดํฐ์ ํํฐ์
์ฌ์ด์ฆ๊ฐ ์ด ๊ฐ(default:
Performance Improvement
Databricks์ ๋ฒค์น๋งํฌ์ ๋ฐ๋ฅด๋ฉด, Spark AQE๋ฅผ ํ์ฑํ ํ์ ๋ ๋๋ถ๋ถ์ ์ฟผ๋ฆฌ์์ ์ฑ๋ฅ ๊ฐ์ ์ด ์์๋ค๊ณ ํ๋ค. ๋ค๋ง, ๊ตฌ์ฒด์ ์ผ๋ก ์ด๋ค ์ฟผ๋ฆฌ์ธ์ง๋ ๋ฐํ์ง ์์๋ค. (๋ ๊ทธ๋ ๋ ๋ฒค์น๋งํฌ๋ ๋ค ๋ฏฟ์ผ๋ฉด ์ ๋๋ค.)
Spark AQE ๊ธฐ๋ฅ์ ์ฟผ๋ฆฌ๊ฐ ์๋์ ์กฐ๊ฑด์ ๋ง์กฑํ ๋ ์ฌ์ฉํ ๊ฒ์ ๊ถ์ฅํ๋ค๊ณ ํ๋ค.
- Streaming ์ฒ๋ฆฌ๋ฅผ ํ๋ ์ฟผ๋ฆฌ๊ฐ ์๋ ๊ฒ
- ์ ์ด๋ ํ๋์ ๋ฐ์ดํฐ ์
ํ๋ง์ ๊ฐ์ง ๊ฒ
- Join, Aggregation, Window Query
- ๋๋ ์ ์ด๋ ํ๋์ ์๋ธ ์ฟผ๋ฆฌ๊ฐ ์์ ๊ฒ
๋งบ์๋ง
Spark 2.x ์ธ๋์์ ๊ฐ๋ฐ๋ Spark Query Optimization ๊ธฐ๋ฒ์ Static Optimization์ผ๋ก Spark SQL ์ฟผ๋ฆฌ๋ฅผ ๋ถ์ํด ์ฟผ๋ฆฌ ์์๋ฅผ ์ฌ๋ฐฐ์ดํ์ฌ ๊ฐ์ ํ๊ฑฐ๋, ์ด๋ค ์ต์ ํ ๊ท์น์ ๋ฐ๋ผ ์ฟผ๋ฆฌ๋ฅผ ์ต์ ํ ํ์๋ค.
๋ฐ๋ฉด์ Spark AQE๋ ์์ ํ runtime์ ์ด๋ค์ง๋ ์ฟผ๋ฆฌ ์ต์ ํ ๊ธฐ๋ฒ์ด๋ค. ๋ฐ์ดํฐ๊ฐ ์ ํ๋ง ๋ ์ดํ์ ์ด๋ค์ง๋ฉฐ, ์ ํ ํํฐ์ ์ ํต๊ณ๊ฐ(statistics)๋ฅผ ๊ธฐ์ค์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ณํฉํ๊ธฐ๋ ํ๊ณ , ๋ถํ ํ๊ธฐ๋ ํ๋ฉฐ ๋ ์ ๋ ดํ Join(Broadcast Join)์ผ๋ก ๋ฐ๊พธ๊ธฐ๋ ํ๋ค.
์ด๋ฐ ๋ณํ๋ ๋์ด์ ์ต์ ํ๋ฅผ ์ํด ๋ฐ์ดํฐ ์์ฒด์ ํน์ฑ์ ์กฐ์ฌ ํ ํ์๋ ์์ด๋ค๊ณ ํ๋คโฆ๊ณ ๋ ํ์ง๋ง ๊ทธ๋ผ ๋ฐ์ดํฐ ์์ง๋์ด๊ฐ ํ๋ ์ผ์ด ์์ด์ง๋ ๊ฑธ ใ ใ ใ Spark AQE๊ฐ ์ฐ๋ฆฌ์ ๋ถ๋ด์ ๋์ด์คฌ์ง๋ง, ์ฒ๋ฆฌํ๋ ๋ฐ์ดํฐ์ Spark SQL์ ๋ค์ ๊ฒํ ํ๋ ๊ณผ์ ์ ๋ ํ์ํ ๊ฒ ๊ฐ๋ค.
์ํผ, ์ด๋ ๊ฒ Spark Config ์ค ํ๋์ธ spark.sql.adaptive.enabled
์ ๋ํด ๊ผผ๊ผผํ ์ดํด๋ณด์๊ณ , ๋ค๋ฅธ ์ฃผ์ ๋ค์ ๋ ์ดํด๋ณด์ ใ
ใ