Spark 3์˜ ํ•ต์‹ฌ ๊ธฐ๋Šฅ์ด AQE์— ๋Œ€ํ•ด ์‚ดํŽด๋ณด์ž.

11 minute read

Databricks Certification ์ทจ๋“์„ ๋ชฉํ‘œ๋กœ Apache Spark๋ฅผ โ€œ์ œ๋Œ€๋กœโ€ ๊ณต๋ถ€ํ•ด๋ณด๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ํšŒ์‚ฌ์—์„  Databricks Unity Catalog๋ฅผ ๋„์ž…ํ•˜๋ ค๊ณ  ๋ถ„ํˆฌํ•˜๊ณ  ์žˆ๋Š”๋ฐ์š”. Spark์™€ ์ข€ ์นœํ•ด์งˆ ์ˆ˜ ์žˆ์„๊นŒ์š”? ๐ŸŽ‡ ์ „์ฒด ํฌ์ŠคํŠธ๋Š” Development - Spark์—์„œ ํ™•์ธํ•ด์‹ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Adaptive Query Execution์ด๋ž€?

๋ณธ์ธ ํšŒ์‚ฌ๋Š” 2018๋…„๋ถ€ํ„ฐ Databricks๋ฅผ ๋„์ž…ํ•ด Spark๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ์—ˆ๋‹ค. ์‚ฌ์šฉํ•œ์ง€ ์˜ค๋ž˜ ๋˜์–ด์„œ ๊ทธ๋Ÿฐ์ง€ Databricks Job๋“ค์— ์ •๋ง ๋‹ค์–‘ํ•œ Spark Config๋“ค์ด ์„ธํŒ… ๋˜์–ด ์žˆ์—ˆ๋‹ค. ์š”์ฆ˜ ํšŒ์‚ฌ์—์„œ ์‚ฌ์šฉํ•˜๋Š” Spark Config๋ฅผ ์ •๋ฆฌํ•ด์„œ ๋ฐœํ‘œ๋ฅผ ์ค€๋น„ํ•˜๊ณ  ์žˆ๋Š”๋ฐ, spark.sql.adaptive.enabled๋ผ๋Š” config๊ฐ€ ๋‚ด ๋ˆˆ๊ธธ์„ ๋Œ๊ฒŒ ๋˜์—ˆ๊ณ , ๊ทธ๋ ‡๊ฒŒ ์˜ค๋Š˜์˜ ์ฃผ์ œ์ธ Adaptive Query Execution(์ดํ•˜ AQE)์— ๋Œ€ํ•ด ์‚ดํŽด๋ณด๊ฒŒ ๋˜์—ˆ๋‹ค.

AQE ๊ธฐ๋Šฅ์€ Spark 3.0๋ถ€ํ„ฐ ๋„์ž…๋œ Spark Optimization ํ…Œํฌ๋‹‰์ด๋‹ค. ์ฐธ๊ณ ๋กœ Spark 3.0์€ 2020๋…„ 6์›”์— ๊ณต๊ฐœ ๋˜์—ˆ๊ณ , Spark 3.2๋ถ€ํ„ฐ๋Š” ๋”ฐ๋กœ ์„ค์ •ํ•˜์ง€ ์•Š์œผ๋ฉด AQE enabled๊ฐ€ ๊ธฐ๋ณธ๊ฐ’์ด๋‹ค.

Dynamically coalescing shuffle partitions

Spark์—์„œ ๊ฑฐ๋Œ€ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃฐ ๋•Œ, ์…”ํ”Œ๋ง(shuffling)์€ ์ฟผ๋ฆฌ ํผํฌ๋จผ์Šค์— ํฐ ์˜ํ–ฅ์„ ๋ผ์น˜๋Š” ์š”์†Œ์ด๋‹ค. ์…”ํ”Œ๋ง์€ Executor๊ฐ€ ์ฒ˜๋ฆฌํ•˜๋˜ ํŒŒํ‹ฐ์…˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ์žฌ์ •๋ ฌ ํ•˜๋Š” ๊ณผ์ •์œผ๋กœ groupByKey, reduceByKey, join, distinct, repartition ๋“ฑ์˜ ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•  ๋•Œ ๋ฐœ์ƒํ•œ๋‹ค.

Databricks: Adaptive Query Execution

์œ„์˜ ์‚ฌ์ง„์ด Partition์ด Shuffling ๋˜๋Š” ๊ณผ์ •์„ ํ‘œํ˜„ํ•œ ๊ฒƒ์œผ๋กœ ์ƒ‰๊น”๋ณ„๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ์žฌ์ •๋ ฌ ๋˜๋Š” ๊ฑธ ๋ณผ ์ˆ˜ ์žˆ๋‹ค. ์œ„์˜ ๊ฒฝ์šฐ, ์…”ํ”Œ๋œ ํŒŒํ‹ฐ์…˜(์ดํ•˜ ์…”ํ”Œ ํŒŒํ‹ฐ์…˜)์ด 5๊ฐœ๋กœ ์…”ํ”Œ๋ง ๋˜์—ˆ๋Š”๋ฐ, [2, 3, 4]๋ฒˆ ๋ฐ์ดํ„ฐ ๊ฒฝ์šฐ ํฌ๊ธฐ๊ฐ€ ์ž‘๊ณ , [1, 4]๋ฒˆ ๋ฐ์ดํ„ฐ๋Š” ํฌ๊ธฐ๊ฐ€ ์ƒ๋Œ€์ ์œผ๋กœ ํฐ ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค. ์ด ๊ฒฝ์šฐ, [2, 3, 4]๋ฒˆ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋Š” ๊ธˆ๋ฐฉ ๋˜๋”๋ผ๋„ [1, 4]๋ฒˆ ๋ฐ์ดํ„ฐ์˜ ์ฒ˜๋ฆฌ๊ฐ€ ์™„๋ฃŒ ๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ ค์•ผ ํ•œ๋‹ค.

Databricks: Adaptive Query Execution

Spark AQE๋Š” ์–ด์งœํ”ผ ์ „์ฒด ์ž‘์—…์ด ์™„๋ฃŒ ๋˜๋ ค๋ฉด, [1, 4]๋ฒˆ ๋ฐ์ดํ„ฐ์˜ ์ž‘์—…์„ ๊ธฐ๋‹ค๋ ค์•ผ ํ•˜๋Š”๋ฐ, ์ด๋Ÿด ๊บผ๋ฉด [2, 3, 4]๋ฒˆ ๋ฐ์ดํ„ฐ๋ฅผ ๋ญ‰์ณ์„œ [1, 4]๊ณผ ๋น„์Šทํ•˜๊ฒŒ ๋งŒ๋“ค๊ณ , ์—ฐ์‚ฐ์— ์‚ฌ์šฉํ•˜๋Š” Task ๊ฐฏ์ˆ˜๋„ ์ค„์ด๋Š”๊ฒŒ ํšจ์œจ์ ์ด๋ผ๊ณ  ํŒ๋‹จํ•œ๋‹ค.

๋ณธ๋ž˜ Spark์—๋Š” ์…”ํ”Œ๋ง ํ›„์˜ ํŒŒํ‹ฐ์…˜ ๊ฐฏ์ˆ˜๋ฅผ ์ง€์ •ํ•˜๋Š” spark.sql.shuffle.partitions ๊ฐ’์ด ์žˆ๊ณ , ๋”ฐ๋กœ ์„ค์ •ํ•˜์ง€ ์•Š๋Š”๋‹ค๋ฉด โ€œ200โ€œ์ด ๊ธฐ๋ณธ๊ฐ’์ด๋‹ค. ๋งŒ์•ฝ ์ด ๊ฐ’์„ ๊ทธ๋Œ€๋กœ ๋”ฐ๋ฅธ๋‹ค๋ฉด, ์…”ํ”Œ ํŒŒํ‹ฐ์…˜ ๊ฐฏ์ˆ˜๊ฐ€ ํ•ญ์ƒ 200๊ฐœ์”ฉ ๋งŒ๋“ค์–ด์ง„๋‹ค. ๋„ˆ๋ฌด ๋งŽ์€ ํŒŒํ‹ฐ์…˜์€ ๋งŽ์€ ํƒœ์Šคํฌ๋ฅผ ๋งŒ๋“ค๊ณ , ์ด๋Š” ์ž‘์—… ๋ถ€ํ•˜๋กœ ์ด์–ด์ง€๋ฏ€๋กœ ์…”ํ”Œ ํŒŒํ‹ฐ์…˜์„ ์ ์ ˆํžˆ ์„ธํŒ… ํ•ด์•ผ ํ•œ๋‹ค.


์š” Dynamic coalescing์ด ์–ด๋–ป๊ฒŒ ๊ตฌํ˜„๋˜์–ด ์žˆ๋Š”์ง€๋Š” ๋ชจ๋ฅด๊ฒ ์ง€๋งŒ, spark.sql.adaptive.coalescePartitions.initialPartitionNum์œผ๋กœ ์ดˆ๊ธฐ ์…”ํ”Œ ํŒŒํ‹ฐ์…˜์„ ์ง€์ •ํ•ด์•ผ ํ•˜๋Š” ๊ฑธ๋กœ ๋ด์„œ๋Š” ์•„๋งˆ ์ฒ˜์Œ์—” ๋น„ํšจ์œจ์ ์ธ ์…”ํ”Œ ํŒŒํ‹ฐ์…˜์„ ํ•œ๋ฒˆ ์ˆ˜ํ–‰ํ•˜๊ณ , ๊ทธ ์ดํ›„์— AQE Optimizer๊ฐ€ ๋Œ๋ฉด์„œ, Coalescing์„ ์ˆ˜ํ–‰ํ•ด ์…”ํ”Œ ํŒŒํ‹ฐ์…˜ ๊ฐฏ์ˆ˜๋ฅผ ์ค„์—ฌ์ฃผ๋Š” ๊ฒƒ ๊ฐ™๋‹ค. ์ฆ‰, ์…”ํ”Œ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ํ›„์† ์ž‘์—…์ด ๋” ์ ์€ ๊ฐฏ์ˆ˜์˜ Task๋ฅผ ์‚ฌ์šฉํ•˜๊ฒŒ ๋˜๋Š” ๊ฒƒ.

์ฐธ๊ณ ๋กœ coalescePartitions.initialPartitionNum์„ ๋”ฐ๋กœ ์ง€์ •ํ•˜์ง€ ์•Š์œผ๋ฉด, shuffle.partitions ๊ฐ’์„ ๊ทธ๋Œ€๋กœ ์“ฐ๊ณ , ๊ทธ๊ฒƒ์˜ ๊ธฐ๋ณธ๊ฐ’์€ 200์ด๋‹ค.


์…”ํ”Œ ํŒŒํ‹ฐ์…˜์„ ์–ผ๋งŒํผ ๋ณ‘ํ•ฉํ•˜๋Š”์ง€๋Š” 2๊ฐ€์ง€ ๊ธฐ์ค€์ด ์กด์žฌํ•œ๋‹ค. spark.sql.adaptive.coalescePartitions.parallelismFirst๋ผ๋Š” ๊ฐ’์ด ์ฐธ/๊ฑฐ์ง“์ธ์ง€์— ๋”ฐ๋ผ ๋‹ค๋ฅด๋‹ค.

  • parallelismFirst = true
    • ๋™์‹œ์ฒ˜๋ฆฌ๋ฅผ ์ตœ๋Œ€ํ™” ํ•˜๊ธฐ ์œ„ํ•ด, ๋ฐ์ดํ„ฐ๋ฅผ ๋„ˆ๋ฌด ๋งŽ์ด ๋ณ‘ํ•ฉํ•˜์ง€ ์•Š๊ณ  ํŒŒํ‹ฐ์…˜์„ ์ ๋‹นํžˆ ๋ณ‘ํ•ฉํ•œ๋‹ค.
    • ๊ทธ๋ž˜์„œ ์…”ํ”Œ ํŒŒํ‹ฐ์…˜์˜ ๋ณ‘ํ•ฉ์ด spark.sql.adaptive.coalescePartitions.minPartitionSize์— ๋ช…์‹œ๋œ ๊ฐ’(default: 1Mb)์˜ ํฌ๊ธฐ ๊ฐ€ ๋˜๋ฉด, ๋”์ด์ƒ ๋ณ‘ํ•ฉํ•˜์ง€ ์•Š๋Š”๋‹ค. ๋˜, coalescePartitions.minPartitionSize ๊ฐ’์€ ์•„๋ž˜์— ๋‚˜์˜ฌ advisoryPartitionSizeInBytes ๊ฐ’์˜ ์ตœ๋Œ€ 20% ์ •๋„๋กœ๋งŒ ์„ค์ • ๊ฐ€๋Šฅํ•˜๋‹ค.
    • Spark 3.2.0๋ถ€ํ„ฐ ๋„์ž…๋œ ์„ค์ •๊ฐ’์œผ๋กœ, Dynamic Coalescing์œผ๋กœ ์ธํ•œ ์„ฑ๋Šฅ ์ €ํ•˜๋ฅผ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•จ.
  • parallelismFirst = false
    • ๋ณธ๋ž˜ Dynamic Coalescing์˜ ๋™์ž‘ ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•œ๋‹ค.
    • advisoryPartitionSizeInBytes์˜ ๊ฐ’(default: 64MB)์— ๊ฐ€๊น๋„๋ก ์…”ํ”Œ ํŒŒํ‹ฐ์…˜์„ ๋ณ‘ํ•ฉํ•œ๋‹ค.

Dynamically switching join strategies

์ผ๋‹จ ์š”๊ฑธ ์ดํ•ดํ•˜๊ธฐ ์ „์— Spark์˜ Join ๋ฐฉ์‹ 2๊ฐ€์ง€๋ฅผ ์ดํ•ดํ•ด์•ผ ํ•œ๋‹ค.

Sort Merge Join

Prof. Dr. Jens Dittrich: Sort-Merge Join, Co-Grouping

์œ„์˜ ๊ทธ๋ฆผ๊ณผ ๊ฐ™์ด Customer ํ…Œ์ด๋ธ”๊ณผ Cities_Dictionary ํ…Œ์ด๋ธ”์„ cityID๋ผ๋Š” ์ปฌ๋Ÿผ์„ ๊ธฐ์ค€์œผ๋กœ join ํ•˜๋Š” ๊ฒฝ์šฐ๋ฅผ ์‚ดํŽด๋ณด์ž.

๋จผ์ €, ๋‘ ํ…Œ์ด๋ธ”์„ cityID๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์ •๋ ฌ(Sort) ํ•œ๋‹ค. ์œ„์˜ ๊ทธ๋ฆผ ์ƒ์œผ๋กœ๋Š” ์ด๋ฏธ ์ •๋ ฌ๋œ ์ƒํƒœ๋กœ ๋‚˜์™€ ์žˆ๋‹ค.

๊ทธ ๋‹ค์Œ์€ ๋ณ‘ํ•ฉ(Merge) ํ•œ๋‹ค. ๊ฐ ํ…Œ์ด๋ธ”์—์„œ cityID์˜ ๊ฐ€์žฅ ์ฒ˜์Œ ๊ฐ’์— ํฌ์ธํ„ฐ(Pointer)๋ฅผ ๋‘๊ณ , ํ•˜๋‚˜์”ฉ ํ•˜๋‚˜์”ฉ ํ–‰์„ ์ˆœํšŒํ•œ๋‹ค. ๋งŒ์•ฝ ๋‘ ํ…Œ์ด๋ธ”์˜ ๊ฐ’์ด ์ผ์น˜(=) ํ•œ๋‹ค๋ฉด ๋ณ‘ํ•ฉ์„ ์ˆ˜ํ–‰ํ•˜๊ณ , ๋งค์นญ ๋˜์ง€ ์•Š๋Š” ๋‹ค๋ฉด, ๋น„๊ต ํ•˜๋Š” ์˜ค๋ฅธ์ชฝ ํ…Œ์ด๋ธ”์˜ ํฌ์ธํ„ฐ๊ฐ€ ๋‹ค์Œ ํ–‰์„ ๊ฐ€๋ฆฌํ‚ค๊ฒŒ ํ•œ๋‹ค.

์‹œ๊ฐ„ ๋ณต์žก๋„๋ฅผ ์–˜๊ธฐํ•˜์ž๋ฉด, $O(A \log A + B \log B)$.

Broadcast Hash Join

์š” Join ๋ฐฉ๋ฒ•์€ Broadcast Join๊ณผ Hash Join, ๋‘ ๋ฐฉ์‹์ด ๊ฒฐํ•ฉํ•œ ๋ฐฉ์‹์ด๋‹ค.

Hash Join์— ๋Œ€ํ•ด์„œ ๋จผ์ € ์„ค๋ช…ํ•˜๋ฉด, ์•„๋ž˜์™€ ๊ฐ™์€ pseudo-code๋ฅผ ๊ฐ€์ง„๋‹ค.

def hash_join(left: Table, right: Table, join_key: Column):
  hash_table = {}

  # Generate hash table
  for idx, row in left:
    hash_key = row[join_key]
    if hash_key in hash_table:
      hash_table[hash_key].append(idx)
    else:
      hash_table[hash_key] = [idx]

  ret = []

  # Do join using hash table
  for idx, row in right:
    hash_key = row[join_key]

    if hash_key in hash_table:
      for idx in hash_table[hash_key]:
        ret.append(row.join(left[idx]))
    else:
      continue

  return ret

ํ…Œ์ด๋ธ” ์ค‘ ํ•˜๋‚˜๋กœ Hash Table์„ ๋งŒ๋“ค๊ณ , ๋ฐ˜๋Œ€ ํ…Œ์ด๋ธ”์—์„œ ๊ทธ Hash Table๊ณผ ๋งค์นญํ•˜๋Š” ํ–‰์ด ์žˆ๋Š”์ง€ ์ฐพ์•„์„œ Join ํ•˜๋Š” ๊ธฐ๋ฒ•์ด๋‹ค. ์ฒ˜์Œ์— Hash Table์„ ๋งŒ๋“œ๋Š” ๋ฐ์— ์„ ํ˜• ์‹œ๊ฐ„์ด, ๊ทธ๋ฆฌ๊ณ  ๋ฐ˜๋Œ€ ํ…Œ์ด๋ธ”์— Hash Table์„ ์ ์šฉํ•ด ๊ฒฐ๊ณผ๋ฅผ ๋งŒ๋“œ๋Š” ๋ฐ์— ์„ ํ˜• ์‹œ๊ฐ„์ด ๊ฑธ๋ฆฐ๋‹ค. ์‹œ๊ฐ„ ๋ณต์žก๋„๋Š” $O(A + B)$.


Broadcast Join์€ ์—ฐ์‚ฐ์˜ ๋Œ€์ƒ์ด ๋˜๋Š” ํ…Œ์ด๋ธ” ๋‘˜ ์ค‘ ํ•˜๋‚˜๊ฐ€ ์ถฉ๋ถ„ํžˆ ์ž‘์„ ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ๊ธฐ๋ฒ•์œผ๋กœ, Small ํ…Œ์ด๋ธ” ์ „์ฒด๋ฅผ ๋ชจ๋“  Worker ๋…ธ๋“œ์— ๋ฟŒ๋ฆฌ๊ณ , ๊ทธ ์•ˆ์—์„œ Join์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ธฐ๋ฒ•์ด๋‹ค.

๋ณธ๋ž˜ Spark์—์„œ์˜ Join์€ ๋‘ ํ…Œ์ด๋ธ”์„ ์…”ํ”Œ๋ง ํ•œ ํ›„์—, ๊ฐ™์€ ํ‚ค๋ฅผ ๊ฐ€์ง„ ๋ ˆ์ฝ”๋“œ๊ฐ€ ๊ฐ™์€ ํŒŒํ‹ฐ์…˜์— ๋ฐฐ์น˜ ํ•œ๋‹ค. ๊ทธ๋ฆฌ๊ณ , ๊ฐ™์€ ํ‚ค๋ฅผ ๊ฐ€์ง„ ๋‘ ํ…Œ์ด๋ธ”์˜ ํŒŒํ‹ฐ์…˜์„ ๊ฐ™์€ Worker ๋…ธ๋“œ์— ๋ฐฐ์น˜ํ•˜๊ณ , ๊ทธ ์•ˆ์—์„œ ๋กœ์ปฌ Join์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

๋ฐ˜๋ฉด, Broadcast Join์€ ์…”ํ”Œ๋ง ๊ณผ์ •์ด ์—†๋‹ค. Small ํ…Œ์ด๋ธ”๊ณผ Large ํ…Œ์ด๋ธ” ๋‘˜๋‹ค ์…”ํ”Œ๋ง ํ•˜์ง€ ์•Š๊ณ , ์ฒ˜์Œ ํŒŒํ‹ฐ์…˜ ๊ทธ๋Œ€๋กœ ๊ฐ ์›Œ์ปค์— ๋ถ„๋ฐฐ๋˜์–ด ๊ทธ๊ณณ์—์„œ ๋กœ์ปฌ Join ๋œ๋‹ค. ์…”ํ”Œ๋ง์ด๋ผ๋Š” ์—ฐ์‚ฐ์„ ํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— ๊ณ„์‚ฐ๊ณผ ๋„คํŠธ์›Œํฌ ์ด๋™์—์„œ ์ด๋“์„ ์–ป๋Š”๋‹ค.


์•”ํŠผ ์ •๋ฆฌํ•˜๋ฉด, Broadcast Hash Join์€ Broadcast Join์ธ๋ฐ, ๋กœ์ปฌ Join์˜ ๋ฐฉ์‹์ด Hash Join์ด๋ผ๋Š” ๋ง!!

Dynamically switching join strategies: Sort-Merge โ†’ Broadcast Hash

Databricks: Adaptive Query Execution

Spark AQE๋Š” Join ๋˜๋Š” ๋‘ ํ…Œ์ด๋ธ”์˜ ์‚ฌ์ด์ฆˆ๋ฅผ runtime์—์„œ ํ™•์ธํ•˜๊ณ , ๋งŒ์•ฝ ํ•œ์ชฝ ํ…Œ์ด๋ธ”์˜ ํฌ๊ธฐ๊ฐ€ ์ถฉ๋ถ„ํžˆ ์ž‘๋‹ค๋ฉด, Broadcast Hash Join์œผ๋กœ ๋ณ€๊ฒฝ ํ›„ ์ฒ˜๋ฆฌํ•œ๋‹ค.

ํ…Œ์ด๋ธ” ํฌ๊ธฐ๊ฐ€ ์ถฉ๋ถ„ํžˆ ์ž‘์€์ง€๋Š” spark.sql.adaptive.autoBroadcastJoinThreshold์— ๋ช…์‹œ๋œ ๊ฐ’์œผ๋กœ ํŒ๋‹จํ•œ๋‹ค. ๋งŒ์•ฝ, ๋”ฐ๋กœ ์„ค์ •ํ•ด์ฃผ์ง€ ์•Š๋Š”๋‹ค๋ฉด, spark.sql.autoBroadcastJoinThreshold์˜ ๊ฐ’์„ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉํ•˜๋ฉฐ, ๊ทธ๋•Œ์˜ ๊ธฐ๋ณธ๊ฐ’์€ 10Mb์ด๋‹ค.

Dynamically optimizing skew joins

๋ฐ์ดํ„ฐ์˜ ๋ทธ๊ท ํ˜•(skewing)์€ ํŒŒํ‹ฐ์…”๋‹ ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃฐ ๋•Œ, ์„ฑ๋Šฅ ์ €ํ•˜๋ฅผ ์•ผ๊ธฐํ•  ์ˆ˜ ์žˆ๋‹ค.

Databricks: Adaptive Query Execution

์œ„์˜ ๊ทธ๋ฆผ์˜ ๊ฒฝ์šฐ, A0 ๋ฐ์ดํ„ฐ๊ฐ€ ํŠน๋ณ„ํžˆ ๋งŽ์€ ์ƒํƒœ๋ผ A0์™€ B0๋ฅผ Join ํ•˜๋Š” ๊ฒƒ์ด ๋ณ‘๋ชฉ์œผ๋กœ ์ž‘์šฉํ•œ๋‹ค. Spark AQE์—์„  ์ด๋ ‡๊ฒŒ ํŠน๋ณ„ํžˆ ํ•˜๋‚˜์˜ ํŒŒํ‹ฐ์…˜์— ๋ฐ์ดํ„ฐ๊ฐ€ ๋งŽ์€ ๊ฒฝ์šฐ๋ฅผ ํ•ธ๋“ค๋ง ํ•˜๊ธฐ ์œ„ํ•ด, ๊ทธ ํŒŒํ‹ฐ์…˜์„ ์ ˆ๋ฐ˜ ์ •๋„๋กœ ๋ถ„ํ• ํ•˜์—ฌ Join์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

Databricks: Adaptive Query Execution

์œ„์˜ ๊ทธ๋ฆผ์—์„œ A0-0, A0-1์ด ๋ถ„ํ• ๋œ ํŒŒํ‹ฐ์…˜์— ํ•ด๋‹นํ•˜๊ณ , Right Table์˜ $B0$๋Š” ๋ถ„ํ• ๋œ $A0$ ํŒŒํ‹ฐ์…˜๊ณผ Join ํ•˜๊ธฐ ์œ„ํ•ด ๋ณต์ œ๋˜์—ˆ๋‹ค.

ํŒŒํ‹ฐ์…˜ ๊ฐฏ์ˆ˜๊ฐ€ ๋งŽ์•„์ง์— ๋”ฐ๋ผ Task ์ˆ˜๋Š” ๋งŽ์•„์กŒ์ง€๋งŒ, Executor ์ˆ˜๊ฐ€ ์ถฉ๋ถ„ํ•˜๋‹ค๋ฉด ์ „์ฒด ์ž‘์—…์˜ ์ฒ˜๋ฆฌ ์‹œ๊ฐ„์ด ๋” ๊ฐœ์„ ๋  ๊ฒƒ์ด๋‹ค.

ํŠน์ • ํŒŒํ‹ฐ์…˜์ด skew ๋˜์–ด์žˆ๋Š”์ง€๋Š” ์•„๋ž˜ ๋‘ ์š”์†Œ๋ฅผ ๊ธฐ์ค€์œผ๋กœ ํŒ๋‹จํ•œ๋‹ค.

  • spark.sql.adaptive.skewJoin.skewedPartitionFactor
    • ๋ฐ์ดํ„ฐ์˜ ํŒŒํ‹ฐ์…˜ ์‚ฌ์ด์ฆˆ์˜ ์ค‘์•™๊ฐ’(median)์„ ๊ธฐ์ค€์œผ๋กœ ์ด ๊ฐ’(default: 5.0)์˜ ๋ฐฐ์ˆ˜๋ณด๋‹ค ํฌ๋‹ค๋ฉด, Skewed Partition์ด๋ผ๊ณ  ํŒ๋‹จํ•œ๋‹ค.
    • ๋‹จ, skewedPartitionFactor ์กฐ๊ฑด๊ณผ ์•„๋ž˜์˜ skewedPartitionThresholdInBytes ์กฐ๊ฑด์„ ๋™์‹œ์— ๋งŒ์กฑํ•ด์•ผ ํ•จ.
  • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
    • ๋ฐ์ดํ„ฐ์˜ ํŒŒํ‹ฐ์…˜ ์‚ฌ์ด์ฆˆ๊ฐ€ ์ด ๊ฐ’(default: 256Mb)๋ณด๋‹ค ํฌ๋‹ค๋ฉด, Skewed Partition์ด๋ผ๊ณ  ํŒ๋‹จํ•œ๋‹ค.
    • ์ด๋•Œ, Dynamic Coalescing์— ๊ด€์—ฌํ•˜๋Š” spark.sql.adaptive.advisoryPartitionSizeInBytes ๊ฐ’๋ณด๋‹ค๋Š” ํฌ๋„๋ก ์„ค์ •ํ•  ๊ฒƒ์„ ๊ถŒ์žฅํ•œ๋‹ค. ๊ทธ๋ ‡์ง€ ์•Š์œผ๋ฉด, Skewed Partition์ด ๋ถ„ํ• ๋œ ํ›„์— Dynamic Coalescing์ด ๋ฐœ์ƒํ•˜๊ธฐ ๋•Œ๋ฌธ.

Performance Improvement

Databricks: Adaptive Query Execution

Databricks์˜ ๋ฒค์น˜๋งˆํฌ์— ๋”ฐ๋ฅด๋ฉด, Spark AQE๋ฅผ ํ™œ์„ฑํ™” ํ–ˆ์„ ๋•Œ ๋Œ€๋ถ€๋ถ„์˜ ์ฟผ๋ฆฌ์—์„œ ์„ฑ๋Šฅ ๊ฐœ์„ ์ด ์žˆ์—ˆ๋‹ค๊ณ  ํ•œ๋‹ค. ๋‹ค๋งŒ, ๊ตฌ์ฒด์ ์œผ๋กœ ์–ด๋–ค ์ฟผ๋ฆฌ์ธ์ง€๋Š” ๋ฐํžˆ์ง€ ์•Š์•˜๋‹ค. (๋Š˜ ๊ทธ๋ ‡๋“  ๋ฒค์น˜๋งˆํฌ๋Š” ๋‹ค ๋ฏฟ์œผ๋ฉด ์•ˆ ๋œ๋‹ค.)

Spark AQE ๊ธฐ๋Šฅ์€ ์ฟผ๋ฆฌ๊ฐ€ ์•„๋ž˜์˜ ์กฐ๊ฑด์„ ๋งŒ์กฑํ•  ๋•Œ ์‚ฌ์šฉํ•  ๊ฒƒ์„ ๊ถŒ์žฅํ•œ๋‹ค๊ณ  ํ•œ๋‹ค.

  • Streaming ์ฒ˜๋ฆฌ๋ฅผ ํ•˜๋Š” ์ฟผ๋ฆฌ๊ฐ€ ์•„๋‹ ๊ฒƒ
  • ์ ์–ด๋„ ํ•˜๋‚˜์˜ ๋ฐ์ดํ„ฐ ์…”ํ”Œ๋ง์„ ๊ฐ€์งˆ ๊ฒƒ
    • Join, Aggregation, Window Query
  • ๋˜๋Š” ์ ์–ด๋„ ํ•˜๋‚˜์˜ ์„œ๋ธŒ ์ฟผ๋ฆฌ๊ฐ€ ์žˆ์„ ๊ฒƒ

๋งบ์Œ๋ง

Spark 2.x ์„ธ๋Œ€์—์„œ ๊ฐœ๋ฐœ๋œ Spark Query Optimization ๊ธฐ๋ฒ•์€ Static Optimization์œผ๋กœ Spark SQL ์ฟผ๋ฆฌ๋ฅผ ๋ถ„์„ํ•ด ์ฟผ๋ฆฌ ์ˆœ์„œ๋ฅผ ์žฌ๋ฐฐ์—ดํ•˜์—ฌ ๊ฐœ์„ ํ•˜๊ฑฐ๋‚˜, ์–ด๋–ค ์ตœ์ ํ™” ๊ทœ์น™์— ๋”ฐ๋ผ ์ฟผ๋ฆฌ๋ฅผ ์ตœ์ ํ™” ํ–ˆ์—ˆ๋‹ค.

๋ฐ˜๋ฉด์— Spark AQE๋Š” ์™„์ „ํžˆ runtime์— ์ด๋ค„์ง€๋Š” ์ฟผ๋ฆฌ ์ตœ์ ํ™” ๊ธฐ๋ฒ•์ด๋‹ค. ๋ฐ์ดํ„ฐ๊ฐ€ ์…”ํ”Œ๋ง ๋œ ์ดํ›„์— ์ด๋ค„์ง€๋ฉฐ, ์…”ํ”Œ ํŒŒํ‹ฐ์…˜์˜ ํ†ต๊ณ„๊ฐ’(statistics)๋ฅผ ๊ธฐ์ค€์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณ‘ํ•ฉํ•˜๊ธฐ๋„ ํ•˜๊ณ , ๋ถ„ํ• ํ•˜๊ธฐ๋„ ํ•˜๋ฉฐ ๋” ์ €๋ ดํ•œ Join(Broadcast Join)์œผ๋กœ ๋ฐ”๊พธ๊ธฐ๋„ ํ•œ๋‹ค.

์ด๋Ÿฐ ๋ณ€ํ™”๋Š” ๋”์ด์ƒ ์ตœ์ ํ™”๋ฅผ ์œ„ํ•ด ๋ฐ์ดํ„ฐ ์ž์ฒด์˜ ํŠน์„ฑ์„ ์กฐ์‚ฌ ํ•  ํ•„์š”๋Š” ์—†์•ด๋‹ค๊ณ  ํ•œ๋‹คโ€ฆ๊ณ ๋Š” ํ•˜์ง€๋งŒ ๊ทธ๋Ÿผ ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๊ฐ€ ํ•˜๋Š” ์ผ์ด ์—†์–ด์ง€๋Š” ๊ฑธ ใ…‹ใ…‹ใ…‹ Spark AQE๊ฐ€ ์šฐ๋ฆฌ์˜ ๋ถ€๋‹ด์€ ๋œ์–ด์คฌ์ง€๋งŒ, ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐ์ดํ„ฐ์™€ Spark SQL์„ ๋‹ค์‹œ ๊ฒ€ํ† ํ•˜๋Š” ๊ณผ์ •์€ ๋Š˜ ํ•„์š”ํ•œ ๊ฒƒ ๊ฐ™๋‹ค.

์•”ํŠผ, ์ด๋ ‡๊ฒŒ Spark Config ์ค‘ ํ•˜๋‚˜์ธ spark.sql.adaptive.enabled์— ๋Œ€ํ•ด ๊ผผ๊ผผํžˆ ์‚ดํŽด๋ณด์•˜๊ณ , ๋‹ค๋ฅธ ์ฃผ์ œ๋“ค์„ ๋˜ ์‚ดํŽด๋ณด์ž ใ…Žใ…Ž

์ฐธ๊ณ ์ž๋ฃŒ

Categories:

Updated: