Spark Jobs, Stages and Tasks
Databricks Certification ์ทจ๋์ ๋ชฉํ๋ก Apache Spark๋ฅผ โ์ ๋๋กโ ๊ณต๋ถํด๋ณด๊ณ ์์ต๋๋ค. ํ์ฌ์์ Databricks Unity Catalog๋ฅผ ๋์ ํ๋ ค๊ณ ๋ถํฌํ๊ณ ์๋๋ฐ์. Spark์ ์ข ์นํด์ง ์ ์์๊น์? ๐ ์ ์ฒด ํฌ์คํธ๋ Development - Spark์์ ํ์ธํด์ค ์ ์์ต๋๋ค.
๋ค์ด๊ฐ๋ฉฐ
ํ์ฌ์์ Spark๋ฅผ Databricks์์ ์ฌ์ฉํ๋ฉด์, ์๋ฐ Spark Jobs ํ๋ฉด๊ณผ Stage ๋ค์ด ์ด๋ค ์๋ฏธ๋ฅผ ๊ฐ์ง๋์ง ๋ ๊ถ๊ธํ๋ค.
Spark Action๊ณผ Spark Job
RDD ๋๋ DataFrame์ ์ด๋ค ์ฐ์ฐ์ ์ํํด ๊ฒฐ๊ณผ๋ฅผ ์์ฑํ๋ ์ฐ์ฐ์ด๋ค. Action์ ํธ์ถํ๋ฉด, Spark๊ฐ ์ฐ์ฐ์ ๋ํ DAG ๊ทธ๋ํ๋ฅผ ์์ฑํด ์คํํ๊ณ , ๊ทธ ๊ฒฐ๊ณผ๋ฅผ ๋ก์ปฌ ๋ณ์์ ๋ด๋๋ค.
Spark Action์ Spark์์ ๋ง๋ Query Execution Plan์ด ์ค์ ๋ก ์คํ๋๋ ์ฆ, โLazy Evaluationโ์ ์คํํ๋ ํ๋์ด๋ค. Spark Action์ ์์๋ก๋ ์๋ ์์ ๋ค์ด ์๋ค.
.collect()
.count()
.take(n)
.reduce(func)
.saveAsTextFile(path)
Spark Transformation๊ณผ Spark Stage
Spark Transformation์ ๊ธฐ์กด RDD์์ ์๋ก์ด RDD๋ฅผ ์์ฑํ๋ ์ฐ์ฐ์ด๋ค. ๋ชจ๋ Transformation์ ์ฆ์ ์คํ๋์ง ์๋๋ค. Query Plan์ ๋ฐ์๋ง ๋๊ณ , Spark Action์ด ํธ๋ฆฌ๊ฑฐ ๋ ๋์ ๊ณ์ฐ๋๋ ๊ฒ์ด๋ค. Spark Transformation์ RDD ์ฐ์ฐ์ ์์๋ก๋ ์๋ ์ฐ์ฐ๋ค์ด ์๋ค.
.map()
.filter()
.groupByKey()
.sortByKey()
.join()
๊ทธ๋ฆฌ๊ณ Transformation ์ฐ์ฐ์, ๊ทธ๊ฒ์ด ํ๋์ ๋ ธ๋์์ ๋ฐ๋ก ์ํ ํ ์ ์๋์ง, ์๋๋ฉด ์ฒ๋ฆฌํ ๋ฐ์ดํฐ๊ฐ ์ฌ๋ฌ ๋ ธ๋์ ๋ถ์ฐ๋์ด ์์ด ์ฐ์ฐ์ ์ฒ๋ฆฌํ๊ธฐ ์ํด ๋ ธ๋ ์ฌ์ด์ ๋ฐ์ดํฐ ์ด๋์ด ํ์ํ์ง์ ๋ฐ๋ผ Narrow Trans.์ Wide Trans.๋ก ๋๋๋ค.
Narrow Transformation
์ฐ์ฐ์์ ์ฒ๋ฆฌํ๋ ๊ฐ input ํํฐ์
์ ์ค์ง ํ๋์ output ํํฐ์
์ ๊ฒฐ๊ณผ์๋ง ๊ธฐ์ฌํ๋ ์ฐ์ฐ์ด๋ค. ๋ํ์ ์ผ๋ก .filter()
๋๋ .where()
์ฐ์ฐ์ด ์ด์ ํด๋นํ๋ค.
RDD์ .map()
ํจ์๋ 1-to-1 ์ฐ์ฐ์ด๊ธฐ ๋๋ฌธ์ Narrow Trans.์ ํด๋นํ๋ค.
.union()
์ฐ์ฐ๋ ๊ฐ input ํํฐ์
์ด union RDD์ ํ ํํฐ์
์ผ๋ก ๊ทธ๋๋ก ์ด๋ํ๊ธฐ ๋๋ฌธ์ Narrow Trans.๋ก ๋ถ๋ฅํ๋ค.
๋, ๋ง์ฝ Join ํ๋ ๋ ํํฐ์ ์ด ์๋ก โco-partitionedโ ๋์ด ์๋ค๋ฉด, Join์ ์ํ ์ ํ๋ง์ด ๋ฐ์ํ์ง ์๊ธฐ ๋๋ฌธ์ Narrow Trans.๊ฐ ๋๋ค. ๋ฐ์ดํฐ๊ฐ โco-partitionedโ ๋์ด ์๋ค๋ ๋ง์ Join ํ๋ ๋ฐ์ดํฐ๊ฐ Join Jey๋ฅผ ๊ธฐ์ค์ผ๋ก ํํฐ์ ๋์ด ์๋ ๊ฒฝ์ฐ๋ฅผ ๋งํ๋ค.
Wide Transformation
Wide๋ Narrow์ ๋ฐ๋๋ก ์ฐ์ฐ์ ์ํด ๋ฐ์ดํฐ์ ์ด๋์ด ๋ฐ์ํ๋ค. ์ด๊ฒ์ โ์ ํ๋ง(Shuffling)โ์ด๋ผ๊ณ ํ๋ค.
.groupByKey()
๋ก ๊ฐ ํํฐ์
์ ํฉ์ด์ง ๋ฐ์ดํฐ๋ฅผ ๊ทธ๋ฃนํ ํ๊ฑฐ๋, ์๋ก co-partitioned ๋์ง ์์ ๋ ๋ฐ์ดํฐ๋ฅผ Join ํ ๋๋ ์
ํ๋ง์ด ๋ฐ์ํ๋ฉฐ, ์ด๋ค์ Wide Trans.๋ก ๋ถ๋ฅํ๋ค.
๋, ๋ฐ์ดํฐ๋ฅผ ์ ๋ ฌํ๋ .sortByKey()
์ญ์ ํ ํํฐ์
์ ์๋ ๋ฐ์ดํฐ๊ฐ ๋ค๋ฅธ N๊ฐ ํํฐ์
์ผ๋ก ํฉ์ด์ง ์ ์๊ธฐ ๋๋ฌธ์, Wide Trans.์ด๋ค.
Skipped Stage
The grayed boxes represents skipped stages. Spark is smart enough to skip some stages if they donโt need to be recomputed. If the data is checkpointed or cached, then Spark would skip recomputing those stages. - Databricks
์ฆ, Spark๋ ์ด๋ฏธ ๊ณ์ฐ๋์ด checkpoint์ ๋ด์๊ฑฐ๋ ์บ์ฑํ Stage์ ๋ฐ์ดํฐ๋ ๋ค์ ๊ณ์ฐํ์ง ์๊ณ ์ฌ์ฌ์ฉํ๋ค.
RDD์ Partition๊ณผ Spark Task
Partition์ RDD์ ๊ฐ๋ ์ผ๋ก, Spark Executor๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ์ต์ ๋จ์์ด๋ค. ๊ด์ ์ ๋ ผ๋ฆฌ์ , ๋ฌผ๋ฆฌ์ 2๊ฐ์ง ๊ด์ ์์ ๋ชจ๋ ํด์ํ๋๋ฐ,
[๋ ผ๋ฆฌ์ ๊ด์ ]
- RDD ๋ฐ์ดํฐ๋ ํํฐ์ ๋จ์๋ก ๋ถํ ๋์ด ์ฒ๋ฆฌ๋๋ค.
- ๊ฐ ํํฐ์ ์ ๋ถ๋ณ์ฑ์ ๊ฐ์ง๊ณ , Transformation์ ์ํํ๋ฉด ๊ธฐ์กด RDD์์ ์๋ก์ด RDD๊ฐ ์๊ธฐ๋ ๊ฒ์ด๋ค.
- ํํฐ์ ์ด ์ฒ๋ฆฌ๋๋ ๋ฐฉ์์ ๋ฐ๋ผ Narrow Trans.์ Wide Trans.๋ก ๋ถ๋ฅํ๋ค.
[๋ฌผ๋ฆฌ์ ๊ด์ ]
- Spark Executor์๋ ๋ฐ์ดํฐ๊ฐ Partition ๋จ์๋ก ๋ฌผ๋ฆฌ์ ์ผ๋ก ์กด์ฌํ๋ค.
- ๋ ผ๋ฆฌ์ ๊ด์ ์์๋ ํํฐ์ ํฌ๊ธฐ๋ฅผ ๋ ผํ์ง ์์์ง๋ง, ๋ฌผ๋ฆฌ์ ํ๊ฒฝ์์๋ ํํฐ์ ํฌ๊ธฐ๋ฅผ ์ ์ ํ ์ค์ ํ๋ ๊ฒ์ด ์ค์ํ๋ค.
- ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋๋ผ๋
- ํํฐ์ ์ฌ์ด์ฆ๊ฐ ๋๋ฌด ์์ผ๋ฉด ์ฆ์ I/O๋ก ์ค๋ฒํค๋๊ฐ ๋ฐ์ํ๋ฉฐ
- ํํฐ์ ์ฌ์ด์ฆ๊ฐ ๋๋ฌด ํฌ๋ฉด, ํน์ ๋ ธ๋์ ๋ถํ๊ฐ ์ปค์ง ์๋ ์๊ณ , ๋ ๋ง์ Memory๊ฐ ํ์ํด Spill์ด ๋ฐ์ํ ์ ์๋ค.
ํ๋์ Partition์ ํ๋์ Task์ ๋์๋๊ณ , ํ๋์ Task๋ Executor์ CPU 1 core๋ฅผ ์ฌ์ฉํด ์ฒ๋ฆฌํ๋ค. ์ ํํ ๋งํ์๋ฉด, spark.task.cpus
์ ๋ช
์๋ ๊ฐ๋งํผ์ CPU core๋ฅผ ์ฌ์ฉํด ์ฒ๋ฆฌํ๋๋ฐ, ์ด๊ฒ์ default ๊ฐ์ด 1
์ด๊ธฐ ๋๋ฌธ์, ํ๋์ Task๋ฅผ ์ฒ๋ฆฌํ๊ธฐ ์ํด 1 core๋ฅผ ์ฌ์ฉํ๋ค๊ณ ๋งํ๋ค. ์ฐธ๊ณ ๋ก spark.task.cpus
๊ฐ์ ์ ์ ๋จ์๋ก๋ง ์กฐ์ ํ ์ ์๋ค.
Executor ์ชฝ์๋ ํ๋ํ ์ ์๋ ๊ฐ๋ค์ด ์ข ์๋๋ฐ,
spark.executor.cores
- Executor์์ ์ฌ์ฉํ ์ ์๋ ์ด CPU ์ฝ์ด ๊ฐฏ์.
- Standalone ๋ชจ๋๋ฅผ ์ฌ์ฉํ๋ค๋ฉด, Executor์ ๊ฐ์ฉ ์ฝ์ด๋ฅผ ๋ชจ๋ ์ฌ์ฉํ๋ค.
spark.executor.memory
- ํ๋์ Task๊ฐ ์ฌ์ฉ ๊ฐ๋ฅํ Memory ์ฌ์ด์ฆ
- ๊ธฐ๋ณธ๊ฐ์
1g
์์์ ์ดํด๋ณธ Stage๋ N๊ฐ Task๋ฅผ ๋ฌถ์ ๊ฐ๋ ์ด๋ค. ๊ทธ๋ฆฌ๊ณ ๊ฐ Task๋ ์๋ก ๋ค๋ฅธ ํํฐ์ ์ ์ฒ๋ฆฌํ๊ธฐ ๋๋ฌธ์ ์๋ก ๋ ๋ฆฝ์ ์ผ๋ก ์คํ ๊ฐ๋ฅํ๋ค. ๋ฐ๋ผ์ ๊ฐ์ Stage์ ๋ฌถ์ธ Task๋ ๊ฐ ๋ ธ๋์์ ๋ณ๋ ฌ๋ก ์คํ๋๋ค.
Spark Task์ RDD Partition์ 1-to-1 ๊ด๊ณ์ด๊ธฐ ๋๋ฌธ์, ๊ฒฐ๊ตญ Task ๊ฐฏ์๋ RDD๋ฅผ ์ผ๋ง๋ ํํฐ์ ํ๋๋์ ๋ฌ๋ ธ๋ค. ๊ทธ๋ฆฌ๊ณ ์ด ๊ฐ์ ์๋์ ๋ Spark Config์ ์ํด ๊ฒฐ์ ๋๋ค.
spark.default.parallelism
- RDD ๋ฐ์ดํฐ์ Transformation ์ฐ์ฐ์ ํ์ ๋, ๋ฆฌํด๋๋ ํํฐ์ ๊ฐฏ์.
- ๊ธฐ๋ณธ์ผ๋ก ์ธํ
๋ ๊ฐ์ ์๋์ ๊ท์น์ ๋ฐ๋ฅธ๋ค.
.reduceByKey()
,.join()
๊ณผ ๊ฐ์ ์ฐ์ฐ์ parent RDD ์ค ๊ฐ์ฅ ํฐ ํํฐ์ ๊ฐฏ์๋ฅผ parallelism์ผ๋ก ๊ฐ๋๋ค..parallelize()
์ ๊ฐ์ด parent RDD๊ฐ ์๋ ๊ฒฝ์ฐ๋- Local Mode์์๋ local machine์ ์ฝ์ด ๊ฐฏ์๋ก
- Standalone Mode์์๋ Worker Node์ ์ฝ์ด ๊ฐฏ์๋ก
- Mesos์ ๊ทธ์ธ ๋ชจ๋์์๋ ๋ฌธ์๋ฅผ ์ฐธ๊ณ .
spark.sql.shuffle.partitions
- ๋ฐ์ดํฐ๋ฅผ ์ ํ๋ง ํ ๋์ ๊ธฐ๋ณธ ํํฐ์ ๊ฐฏ์์ด๋ค.
- ๊ธฐ๋ณธ๊ฐ์
200
Spark UI์ ํ ํ๋ฉด ์ค ํ๋์ธ๋ฐ, Spark ์์ ์ ์ต์ ํ ํ๋ฉด์ ๋ง์ด ๋ณด๋ ํ๋ฉด์ด๋ค. ๋ ํ์คํฌ๋ฅผ ๋น๊ตํ์ฌ, ๋ฐ์ดํฐ I/O๊ณผ Spark Config๊ฐ ๋ชจ๋ ๊ฐ๋ค๋ฉด, Task ๊ฐฏ์๊ฐ ์ ์ด์ง๋ ๋ฐฉํฅ์ผ๋ก, ๊ทธ๋ฆฌ๊ณ Total Task Time์ด ์ค์ด๋๋ ๋ฐฉํฅ์ผ๋ก ์ต์ ํ ๋๊ณ ์๋์ง ์ฒดํฌ ํ์๋ค.