Spark๊ฐ€ Lazy Evaluation์„ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ณผ์ •. Narrow Transformation๊ณผ Wide Transformation. RDD Partition๊ณผ Spark Task์— ๋Œ€ํ•ด.

7 minute read

Databricks Certification ์ทจ๋“์„ ๋ชฉํ‘œ๋กœ Apache Spark๋ฅผ โ€œ์ œ๋Œ€๋กœโ€ ๊ณต๋ถ€ํ•ด๋ณด๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ํšŒ์‚ฌ์—์„  Databricks Unity Catalog๋ฅผ ๋„์ž…ํ•˜๋ ค๊ณ  ๋ถ„ํˆฌํ•˜๊ณ  ์žˆ๋Š”๋ฐ์š”. Spark์™€ ์ข€ ์นœํ•ด์งˆ ์ˆ˜ ์žˆ์„๊นŒ์š”? ๐ŸŽ‡ ์ „์ฒด ํฌ์ŠคํŠธ๋Š” Development - Spark์—์„œ ํ™•์ธํ•ด์‹ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋“ค์–ด๊ฐ€๋ฉฐ

Databricks

ํšŒ์‚ฌ์—์„œ Spark๋ฅผ Databricks์—์„œ ์‚ฌ์šฉํ•˜๋ฉด์„œ, ์š”๋Ÿฐ Spark Jobs ํ™”๋ฉด๊ณผ Stage ๋“ค์ด ์–ด๋–ค ์˜๋ฏธ๋ฅผ ๊ฐ€์ง€๋Š”์ง€ ๋Š˜ ๊ถ๊ธˆํ–ˆ๋‹ค.

Spark Action๊ณผ Spark Job

RDD ๋˜๋Š” DataFrame์— ์–ด๋–ค ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•ด ๊ฒฐ๊ณผ๋ฅผ ์ƒ์„ฑํ•˜๋Š” ์—ฐ์‚ฐ์ด๋‹ค. Action์„ ํ˜ธ์ถœํ•˜๋ฉด, Spark๊ฐ€ ์—ฐ์‚ฐ์— ๋Œ€ํ•œ DAG ๊ทธ๋ž˜ํ”„๋ฅผ ์ƒ์„ฑํ•ด ์‹คํ–‰ํ•˜๊ณ , ๊ทธ ๊ฒฐ๊ณผ๋ฅผ ๋กœ์ปฌ ๋ณ€์ˆ˜์— ๋‹ด๋Š”๋‹ค.

Spark Action์€ Spark์—์„œ ๋งŒ๋“  Query Execution Plan์ด ์‹ค์ œ๋กœ ์‹คํ–‰๋˜๋Š” ์ฆ‰, โ€œLazy Evaluationโ€์„ ์‹คํ–‰ํ•˜๋Š” ํ–‰๋™์ด๋‹ค. Spark Action์˜ ์˜ˆ์‹œ๋กœ๋Š” ์•„๋ž˜ ์ž‘์—…๋“ค์ด ์žˆ๋‹ค.

  • .collect()
  • .count()
  • .take(n)
  • .reduce(func)
  • .saveAsTextFile(path)

Spark Transformation๊ณผ Spark Stage

Spark Transformation์€ ๊ธฐ์กด RDD์—์„œ ์ƒˆ๋กœ์šด RDD๋ฅผ ์ƒ์„ฑํ•˜๋Š” ์—ฐ์‚ฐ์ด๋‹ค. ๋ชจ๋“  Transformation์€ ์ฆ‰์‹œ ์‹คํ–‰๋˜์ง€ ์•Š๋Š”๋‹ค. Query Plan์— ๋ฐ˜์˜๋งŒ ๋˜๊ณ , Spark Action์ด ํŠธ๋ฆฌ๊ฑฐ ๋  ๋•Œ์— ๊ณ„์‚ฐ๋˜๋Š” ๊ฒƒ์ด๋‹ค. Spark Transformation์˜ RDD ์—ฐ์‚ฐ์˜ ์˜ˆ์‹œ๋กœ๋Š” ์•„๋ž˜ ์—ฐ์‚ฐ๋“ค์ด ์žˆ๋‹ค.

  • .map()
  • .filter()
  • .groupByKey()
  • .sortByKey()
  • .join()

๊ทธ๋ฆฌ๊ณ  Transformation ์—ฐ์‚ฐ์€, ๊ทธ๊ฒƒ์ด ํ•˜๋‚˜์˜ ๋…ธ๋“œ์—์„œ ๋ฐ”๋กœ ์ˆ˜ํ–‰ ํ•  ์ˆ˜ ์žˆ๋Š”์ง€, ์•„๋‹ˆ๋ฉด ์ฒ˜๋ฆฌํ•  ๋ฐ์ดํ„ฐ๊ฐ€ ์—ฌ๋Ÿฌ ๋…ธ๋“œ์— ๋ถ„์‚ฐ๋˜์–ด ์žˆ์–ด ์—ฐ์‚ฐ์„ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ๋…ธ๋“œ ์‚ฌ์ด์— ๋ฐ์ดํ„ฐ ์ด๋™์ด ํ•„์š”ํ•œ์ง€์— ๋”ฐ๋ผ Narrow Trans.์™€ Wide Trans.๋กœ ๋‚˜๋‰œ๋‹ค.

Narrow Transformation

Databricks: Data Engineers Guide to Apache Spark and Delta Lake

์—ฐ์‚ฐ์—์„œ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฐ input ํŒŒํ‹ฐ์…˜์˜ ์˜ค์ง ํ•˜๋‚˜์˜ output ํŒŒํ‹ฐ์…˜์˜ ๊ฒฐ๊ณผ์—๋งŒ ๊ธฐ์—ฌํ•˜๋Š” ์—ฐ์‚ฐ์ด๋‹ค. ๋Œ€ํ‘œ์ ์œผ๋กœ .filter() ๋˜๋Š” .where() ์—ฐ์‚ฐ์ด ์ด์— ํ•ด๋‹นํ•œ๋‹ค.

RDD์˜ .map() ํ•จ์ˆ˜๋„ 1-to-1 ์—ฐ์‚ฐ์ด๊ธฐ ๋•Œ๋ฌธ์— Narrow Trans.์— ํ•ด๋‹นํ•œ๋‹ค.

.union() ์—ฐ์‚ฐ๋„ ๊ฐ input ํŒŒํ‹ฐ์…˜์ด union RDD์˜ ํ•œ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๊ทธ๋Œ€๋กœ ์ด๋™ํ•˜๊ธฐ ๋•Œ๋ฌธ์— Narrow Trans.๋กœ ๋ถ„๋ฅ˜ํ•œ๋‹ค.

๋˜, ๋งŒ์•ฝ Join ํ•˜๋Š” ๋‘ ํŒŒํ‹ฐ์…˜์ด ์„œ๋กœ โ€œco-partitionedโ€ ๋˜์–ด ์žˆ๋‹ค๋ฉด, Join์„ ์œ„ํ•œ ์…”ํ”Œ๋ง์ด ๋ฐœ์ƒํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— Narrow Trans.๊ฐ€ ๋œ๋‹ค. ๋ฐ์ดํ„ฐ๊ฐ€ โ€œco-partitionedโ€ ๋˜์–ด ์žˆ๋‹ค๋Š” ๋ง์€ Join ํ•˜๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ Join Jey๋ฅผ ๊ธฐ์ค€์œผ๋กœ ํŒŒํ‹ฐ์…˜ ๋˜์–ด ์žˆ๋Š” ๊ฒฝ์šฐ๋ฅผ ๋งํ•œ๋‹ค.

Wide Transformation

Databricks: Data Engineers Guide to Apache Spark and Delta Lake

Wide๋Š” Narrow์™€ ๋ฐ˜๋Œ€๋กœ ์—ฐ์‚ฐ์„ ์œ„ํ•ด ๋ฐ์ดํ„ฐ์˜ ์ด๋™์ด ๋ฐœ์ƒํ•œ๋‹ค. ์ด๊ฒƒ์„ โ€œ์…”ํ”Œ๋ง(Shuffling)โ€์ด๋ผ๊ณ  ํ•œ๋‹ค.

.groupByKey()๋กœ ๊ฐ ํŒŒํ‹ฐ์…˜์— ํฉ์–ด์ง„ ๋ฐ์ดํ„ฐ๋ฅผ ๊ทธ๋ฃนํ•‘ ํ•˜๊ฑฐ๋‚˜, ์„œ๋กœ co-partitioned ๋˜์ง€ ์•Š์€ ๋‘ ๋ฐ์ดํ„ฐ๋ฅผ Join ํ•  ๋•Œ๋„ ์…”ํ”Œ๋ง์ด ๋ฐœ์ƒํ•˜๋ฉฐ, ์ด๋“ค์„ Wide Trans.๋กœ ๋ถ„๋ฅ˜ํ•œ๋‹ค.

๋˜, ๋ฐ์ดํ„ฐ๋ฅผ ์ •๋ ฌํ•˜๋Š” .sortByKey() ์—ญ์‹œ ํ•œ ํŒŒํ‹ฐ์…˜์— ์žˆ๋˜ ๋ฐ์ดํ„ฐ๊ฐ€ ๋‹ค๋ฅธ N๊ฐœ ํŒŒํ‹ฐ์…˜์œผ๋กœ ํฉ์–ด์งˆ ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์—, Wide Trans.์ด๋‹ค.

Skipped Stage

The grayed boxes represents skipped stages. Spark is smart enough to skip some stages if they donโ€™t need to be recomputed. If the data is checkpointed or cached, then Spark would skip recomputing those stages. - Databricks

์ฆ‰, Spark๋Š” ์ด๋ฏธ ๊ณ„์‚ฐ๋˜์–ด checkpoint์— ๋‹ด์•˜๊ฑฐ๋‚˜ ์บ์‹ฑํ•œ Stage์˜ ๋ฐ์ดํ„ฐ๋Š” ๋‹ค์‹œ ๊ณ„์‚ฐํ•˜์ง€ ์•Š๊ณ  ์žฌ์‚ฌ์šฉํ•œ๋‹ค.

RDD์˜ Partition๊ณผ Spark Task

Partition์€ RDD์˜ ๊ฐœ๋…์œผ๋กœ, Spark Executor๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ์ตœ์†Œ ๋‹จ์œ„์ด๋‹ค. ๊ด€์ ์€ ๋…ผ๋ฆฌ์ , ๋ฌผ๋ฆฌ์  2๊ฐ€์ง€ ๊ด€์ ์—์„œ ๋ชจ๋‘ ํ•ด์„ํ•˜๋Š”๋ฐ,

[๋…ผ๋ฆฌ์  ๊ด€์ ]

  • RDD ๋ฐ์ดํ„ฐ๋Š” ํŒŒํ‹ฐ์…˜ ๋‹จ์œ„๋กœ ๋ถ„ํ• ๋˜์–ด ์ฒ˜๋ฆฌ๋œ๋‹ค.
  • ๊ฐ ํŒŒํ‹ฐ์…˜์€ ๋ถˆ๋ณ€์„ฑ์„ ๊ฐ€์ง€๊ณ , Transformation์„ ์ˆ˜ํ–‰ํ•˜๋ฉด ๊ธฐ์กด RDD์—์„œ ์ƒˆ๋กœ์šด RDD๊ฐ€ ์ƒ๊ธฐ๋Š” ๊ฒƒ์ด๋‹ค.
  • ํŒŒํ‹ฐ์…˜์ด ์ฒ˜๋ฆฌ๋˜๋Š” ๋ฐฉ์‹์— ๋”ฐ๋ผ Narrow Trans.์™€ Wide Trans.๋กœ ๋ถ„๋ฅ˜ํ•œ๋‹ค.

[๋ฌผ๋ฆฌ์  ๊ด€์ ]

  • Spark Executor์—๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ Partition ๋‹จ์œ„๋กœ ๋ฌผ๋ฆฌ์ ์œผ๋กœ ์กด์žฌํ•œ๋‹ค.
  • ๋…ผ๋ฆฌ์  ๊ด€์ ์—์„œ๋Š” ํŒŒํ‹ฐ์…˜ ํฌ๊ธฐ๋ฅผ ๋…ผํ•˜์ง€ ์•Š์•˜์ง€๋งŒ, ๋ฌผ๋ฆฌ์  ํ™˜๊ฒฝ์—์„œ๋Š” ํŒŒํ‹ฐ์…˜ ํฌ๊ธฐ๋ฅผ ์ ์ ˆํžˆ ์„ค์ •ํ•˜๋Š” ๊ฒƒ์ด ์ค‘์š”ํ•˜๋‹ค.
  • ๊ฐ™์€ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋”๋ผ๋„
    • ํŒŒํ‹ฐ์…˜ ์‚ฌ์ด์ฆˆ๊ฐ€ ๋„ˆ๋ฌด ์ž‘์œผ๋ฉด ์žฆ์€ I/O๋กœ ์˜ค๋ฒ„ํ—ค๋“œ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉฐ
    • ํŒŒํ‹ฐ์…˜ ์‚ฌ์ด์ฆˆ๊ฐ€ ๋„ˆ๋ฌด ํฌ๋ฉด, ํŠน์ • ๋…ธ๋“œ์— ๋ถ€ํ•˜๊ฐ€ ์ปค์งˆ ์ˆ˜๋„ ์žˆ๊ณ , ๋” ๋งŽ์€ Memory๊ฐ€ ํ•„์š”ํ•ด Spill์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋‹ค.


ํ•˜๋‚˜์˜ Partition์€ ํ•˜๋‚˜์˜ Task์— ๋Œ€์‘๋˜๊ณ , ํ•˜๋‚˜์˜ Task๋Š” Executor์˜ CPU 1 core๋ฅผ ์‚ฌ์šฉํ•ด ์ฒ˜๋ฆฌํ•œ๋‹ค. ์ •ํ™•ํžˆ ๋งํ•˜์ž๋ฉด, spark.task.cpus์— ๋ช…์‹œ๋œ ๊ฐ’๋งŒํผ์˜ CPU core๋ฅผ ์‚ฌ์šฉํ•ด ์ฒ˜๋ฆฌํ•˜๋Š”๋ฐ, ์ด๊ฒƒ์˜ default ๊ฐ’์ด 1์ด๊ธฐ ๋•Œ๋ฌธ์—, ํ•˜๋‚˜์˜ Task๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด 1 core๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค๊ณ  ๋งํ•œ๋‹ค. ์ฐธ๊ณ ๋กœ spark.task.cpus ๊ฐ’์€ ์ •์ˆ˜ ๋‹จ์œ„๋กœ๋งŒ ์กฐ์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

Executor ์ชฝ์—๋„ ํŠœ๋‹ํ•  ์ˆ˜ ์žˆ๋Š” ๊ฐ’๋“ค์ด ์ข€ ์žˆ๋Š”๋ฐ,

  • spark.executor.cores
    • Executor์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ์ด CPU ์ฝ”์–ด ๊ฐฏ์ˆ˜.
    • Standalone ๋ชจ๋“œ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค๋ฉด, Executor์˜ ๊ฐ€์šฉ ์ฝ”์–ด๋ฅผ ๋ชจ๋‘ ์‚ฌ์šฉํ•œ๋‹ค.
  • spark.executor.memory
    • ํ•˜๋‚˜์˜ Task๊ฐ€ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ Memory ์‚ฌ์ด์ฆˆ
    • ๊ธฐ๋ณธ๊ฐ’์€ 1g


์œ„์—์„œ ์‚ดํŽด๋ณธ Stage๋Š” N๊ฐœ Task๋ฅผ ๋ฌถ์€ ๊ฐœ๋…์ด๋‹ค. ๊ทธ๋ฆฌ๊ณ  ๊ฐ Task๋Š” ์„œ๋กœ ๋‹ค๋ฅธ ํŒŒํ‹ฐ์…˜์„ ์ฒ˜๋ฆฌํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์„œ๋กœ ๋…๋ฆฝ์ ์œผ๋กœ ์‹คํ–‰ ๊ฐ€๋Šฅํ•˜๋‹ค. ๋”ฐ๋ผ์„œ ๊ฐ™์€ Stage์— ๋ฌถ์ธ Task๋Š” ๊ฐ ๋…ธ๋“œ์—์„œ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰๋œ๋‹ค.

Spark Task์™€ RDD Partition์€ 1-to-1 ๊ด€๊ณ„์ด๊ธฐ ๋•Œ๋ฌธ์—, ๊ฒฐ๊ตญ Task ๊ฐฏ์ˆ˜๋Š” RDD๋ฅผ ์–ผ๋งˆ๋‚˜ ํŒŒํ‹ฐ์…˜ ํ•˜๋Š๋ƒ์— ๋‹ฌ๋ ธ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ์ด ๊ฐ’์€ ์•„๋ž˜์˜ ๋‘ Spark Config์— ์˜ํ•ด ๊ฒฐ์ •๋œ๋‹ค.

  • spark.default.parallelism
    • RDD ๋ฐ์ดํ„ฐ์— Transformation ์—ฐ์‚ฐ์„ ํ–ˆ์„ ๋•Œ, ๋ฆฌํ„ด๋˜๋Š” ํŒŒํ‹ฐ์…˜ ๊ฐฏ์ˆ˜.
    • ๊ธฐ๋ณธ์œผ๋กœ ์„ธํŒ…๋œ ๊ฐ’์€ ์•„๋ž˜์˜ ๊ทœ์น™์„ ๋”ฐ๋ฅธ๋‹ค.
      • .reduceByKey(), .join()๊ณผ ๊ฐ™์€ ์—ฐ์‚ฐ์€ parent RDD ์ค‘ ๊ฐ€์žฅ ํฐ ํŒŒํ‹ฐ์…˜ ๊ฐฏ์ˆ˜๋ฅผ parallelism์œผ๋กœ ๊ฐ–๋Š”๋‹ค.
      • .parallelize()์™€ ๊ฐ™์ด parent RDD๊ฐ€ ์—†๋Š” ๊ฒฝ์šฐ๋Š”
        • Local Mode์—์„œ๋Š” local machine์˜ ์ฝ”์–ด ๊ฐฏ์ˆ˜๋กœ
        • Standalone Mode์—์„œ๋Š” Worker Node์˜ ์ฝ”์–ด ๊ฐฏ์ˆ˜๋กœ
        • Mesos์™€ ๊ทธ์™ธ ๋ชจ๋“œ์—์„œ๋Š” ๋ฌธ์„œ๋ฅผ ์ฐธ๊ณ .
  • spark.sql.shuffle.partitions
    • ๋ฐ์ดํ„ฐ๋ฅผ ์…”ํ”Œ๋ง ํ•  ๋•Œ์˜ ๊ธฐ๋ณธ ํŒŒํ‹ฐ์…˜ ๊ฐฏ์ˆ˜์ด๋‹ค.
    • ๊ธฐ๋ณธ๊ฐ’์€ 200


Databricks

Spark UI์˜ ํ•œ ํ™”๋ฉด ์ค‘ ํ•˜๋‚˜์ธ๋ฐ, Spark ์ž‘์—…์„ ์ตœ์ ํ™” ํ•˜๋ฉด์„œ ๋งŽ์ด ๋ณด๋˜ ํ™”๋ฉด์ด๋‹ค. ๋‘ ํƒœ์Šคํฌ๋ฅผ ๋น„๊ตํ•˜์—ฌ, ๋ฐ์ดํ„ฐ I/O๊ณผ Spark Config๊ฐ€ ๋ชจ๋‘ ๊ฐ™๋‹ค๋ฉด, Task ๊ฐฏ์ˆ˜๊ฐ€ ์ ์–ด์ง€๋Š” ๋ฐฉํ–ฅ์œผ๋กœ, ๊ทธ๋ฆฌ๊ณ  Total Task Time์ด ์ค„์–ด๋“œ๋Š” ๋ฐฉํ–ฅ์œผ๋กœ ์ตœ์ ํ™” ๋˜๊ณ  ์žˆ๋Š”์ง€ ์ฒดํฌ ํ–ˆ์—ˆ๋‹ค.

References

Categories:

Updated: