Spark ๊ผผ๊ผผํžˆ ์‚ดํŽด๋ณด๊ธฐ์˜ ์ฒซ๊ฑธ์Œ ๐Ÿƒ spark-submit์œผ๋กœ Spark Application ์‹คํ–‰ํ•˜๊ธฐ.

9 minute read

Databricks Certification ์ทจ๋“์„ ๋ชฉํ‘œ๋กœ Apache Spark๋ฅผ โ€œ์ œ๋Œ€๋กœโ€ ๊ณต๋ถ€ํ•ด๋ณด๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ํšŒ์‚ฌ์—์„  Databricks Unity Catalog๋ฅผ ๋„์ž…ํ•˜๋ ค๊ณ  ๋ถ„ํˆฌํ•˜๊ณ  ์žˆ๋Š”๋ฐ์š”. Spark์™€ ์ข€ ์นœํ•ด์งˆ ์ˆ˜ ์žˆ์„๊นŒ์š”? ๐ŸŽ‡ ์ „์ฒด ํฌ์ŠคํŠธ๋Š” Development - Spark์—์„œ ํ™•์ธํ•ด์‹ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์•…๊ธฐ๋ฅผ ๋ฐฐ์šธ ๋•Œ์˜ ์ฒซ ๊ฑธ์Œ์„ ์•…๊ธฐ๋ฅผ ์‚ฌ๋Š” ๊ฒƒ(์Œ.!?), ์ƒˆ๋กœ์šด ์–ธ์–ด๋ฅผ ๋ฐฐ์šธ ๋•Œ๋„ ์–ธ์–ด๋ฅผ ์„ธํŒ… ํ•˜๋Š” ๊ฒƒ๋ถ€ํ„ฐ ํ•˜๋“ฏ์ด. Spark๋„ ๋กœ์ปฌ์— ์„ธํŒ…์„ ํ•ด๋ณด๋ฉด์„œ ๋ฐฐ์šฐ๋Š”๊ฒŒ ๋งŽ์„ ๊ฑฐ๋ผ๊ณ  ์ƒ๊ฐํ•œ๋‹ค. ๋ณธ์ธ์€ ํšŒ์‚ฌ์—์„œ Databricks๋ฅผ ์‚ฌ์šฉํ•ด์„œ ๊ทธ๋™์•ˆ Spark๋ฅผ ํ•œ๋ฒˆ๋„ ์ง์ ‘ ์„ธํŒ… ํ•ด๋ณธ ์ ์ด ์—†์—ˆ๋‹คโ€ฆ! Databricks๊ฐ€ ์ฐธ ํŽธ๋ฆฌํ•˜๊ธด ํ•œ๋ฐ, ๊ทผ๋ณธ ์›๋ฆฌ๋ฅผ ๋ชจ๋ฅด๊ณ  ์“ฐ๋‹ค๋ณด๋‹ˆ ๋ชจ๋ž˜์„ฑ ์œ„์—์„œ ๋ญ”๊ฐ€๋ฅผ ์Œ“์•„์˜ฌ๋ฆฌ๋Š” ๋Š๋‚Œ์„ ๋–จ์น  ์ˆ˜ ์—†์—ˆ๋‹ค. ๊ทธ๋ž˜์„œ ์‹œ์ž‘ํ•˜๊ฒŒ ๋œ Spark ๊ณต๋ถ€โ€ฆ!

์ง€๊ธˆ๊นŒ์ง€ ๋งŽ์€ ๊ฒƒ๋“ค์„ ์ƒˆ๋กœ ์ตํžˆ๊ณ  ๋ฐฐ์› ์ง€๋งŒโ€ฆ!? ์ƒˆ๋กœ์šด ๊ฑธ ๋ฐฐ์šฐ๋Š” ๊ฑธ ๋Š˜ ํž˜๋“ค๊ณ  ๊ดด๋กœ์› ๋‹ค. ์ฒ˜์Œ์—” ์–ด๋ ต๊ฒ ์ง€๋งŒ ๊ณง ์ต์ˆ™ํ•ด์ง€๊ฒ ์ง€ ๋ญ~~ ์•”ํŠผ ์ผ๋‹จ ๋กœ์ปฌ์— pyspark๋ฅผ ์„ค์น˜ํ•ด์„œ Spark๋ฅผ ๋Œ๋ ค๋ณด์ž!!

Install Spark

Spark๋ฅผ ์„ค์น˜ํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ์—ฌ๋Ÿฌ ๋ฐฉ๋ฒ•์ด ์žˆ๊ฒ ์œผ๋‚˜, ์—ฌ๊ธฐ์„œ๋Š” pip3 install pyspark๋ฅผ ์„ค์น˜ํ•ด ์‚ฌ์šฉํ–ˆ๋‹ค. pyspark ์•ˆ์— pip package ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ ๊ฐ์ข… Spark CLI๋“ค์ด ๋ชจ๋‘ ๋“ค์–ด์žˆ๋‹ค. ๋งŒ์•ฝ, pyspark ๋ฐฉ์‹์„ ํ•˜๊ณ  ์‹ถ์ง€ ์•Š๋‹ค๋ฉด, ์•„๋ž˜์™€ ๊ฐ™์ด Spark CLI๋งŒ ์„ค์น˜ํ•ด์„œ ์‚ฌ์šฉํ•ด๋ณผ ์ˆ˜๋„ ์žˆ๋‹ค.

$ brew install apache-spark

Just Install PySpark

# venv ์„ธํŒ…
$ python3 -m venv venv
$ source venv/bin/activate

# pyspark ์„ค์น˜
$ pip3 install pyspark==3.5.2

pyspark CLI

$ pyspark
Python 3.12.4 (main, Jun  6 2024, 18:26:44) [Clang 15.0.0 (clang-1500.3.9.4)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
24/08/18 17:06:43 WARN Utils: Your hostname, Seokyunui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.21 instead (on interface en0)
24/08/18 17:06:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/18 17:06:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.2
      /_/

Using Python version 3.12.4 (main, Jun  6 2024 18:26:44)
Spark context Web UI available at http://192.168.0.21:4040
Spark context available as 'sc' (master = local[*], app id = local-1723968404162).
SparkSession available as 'spark'.
>>> 

์œ„์˜ ์ถœ๋ ฅ์—์„œ ์ฃผ๋ชฉํ•  ๋ถ€๋ถ„์ด ์žˆ๋Š”๋ฐ,

Spark context Web UI available at http://192.168.0.21:4040
Spark context available as sc (master = local[*], app id = local-1723968404162).
SparkSession available as spark.

http://localhoost:4040/jobs/์— ๋“ค์–ด๊ฐ€๋ณด๋ฉด, ์š”๋ ‡๊ฒŒ Spark UI๊ฐ€ ๋œจ๋Š” ๊ฑธ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

ํ•ด๋‹น ์ฝ˜์†”์—์„œ Spark Context์™€ Spark Session์— ์ ‘๊ทผํ•˜๋ ค๋ฉด ๊ฐ๊ฐ spark์™€ sc๋กœ ์ ‘๊ทผํ•˜๋ฉด ๋œ๋‹ค. (์ฐธ๊ณ ๋กœ Databricks๋„ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ๋ถ™์ด๋ฉด spark, sc ๋ณ€์ˆ˜๊ฐ€ ํ•ญ์ƒ ์กด์žฌํ•œ๋‹ค.)

>>> spark
<pyspark.sql.session.SparkSession object at 0x10cc3b6b0>

>>> sc
<SparkContext master=local[*] appName=PySparkShell>

Spark Context์—์„œ master=local[*]๋กœ ๋˜์–ด ์žˆ๋Š” ๋ถ€๋ถ„์€ 2๊ฐ€์ง€ ์˜๋ฏธ๋ฅผ ๋‹ด๊ณ  ์žˆ๋Š”๋ฐ,

  • local: Spark ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ๋กœ์ปฌ ๋ชจ๋“œ์—์„œ ์‹คํ–‰๋จ. ๋กœ์ปฌ ๋ชจ๋“œ์—์„  ํด๋Ÿฌ์Šคํ„ฐ๊ฐ€ ํ•„์š”ํ•˜์ง€ ์•Š์œผ๋ฉฐ, ๋‹จ์ผ ๋จธ์‹ ์—์„œ ๋ชจ๋“  ์ž‘์—…์ด ์ˆ˜ํ–‰๋จ.
  • [*]: ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ๋ชจ๋“  ์ฝ”์–ด๋ฅผ ์˜๋ฏธ. ์ปดํ“จํ„ฐ์˜ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ๋ชจ๋“  ์ฝ”์–ด๋ฅผ ์“ฐ๊ฒ ๋‹ค๋Š” ์˜๋ฏธ์ž„.

๋งŒ์•ฝ ์ฝ”์–ด ๊ฐฏ์ˆ˜๋ฅผ ์ง€์ •ํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด, local[4]์™€ ๊ฐ™์ด ์ ์œผ๋ฉด ๋œ๋‹ค๊ณ  ํ•œ๋‹ค.

import pyspark

import pysparkํ•ด์„œ pyspark ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•˜๋Š” ๊ฒƒ๋„ ๊ฐ€๋Šฅํ•˜๋‹ค. ์•„์ฃผ ๊ฐ„๋‹จํ•œ pyspark ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•ด๋ณด์ž. SparkSession๊ณผ SparkContext๋Š” ํŠน๋ณ„ํ•œ ์ด์œ ๊ฐ€ ์—†๋‹ค๋ฉด, ๋ชจ๋‘ spark์™€ sc๋กœ ์ด๋ฆ„์„ ์ง€์ •ํ•˜๊ฒ ๋‹ค.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
print(spark)

sc = spark.sparkContext
print(sc)

rdd = sc.parallelize(range(10000))
cnt = rdd.count()
print(cnt)
---
<pyspark.sql.session.SparkSession object at 0x105bbe3c0>
<SparkContext master=local[*] appName=pyspark-shell>
10000 

์ด๋•Œ, ๋งˆ์ง€๋ง‰์— time.sleep(60 * 60 * 2)๋ฅผ ์ถ”๊ฐ€ ํ•ด๋‘๋ฉด, Spark UI๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค. ์ฃผ์†Œ๋Š” ์ด์ „๊ณผ ๋˜‘๊ฐ™์ด http://localhoost:4040/jobs/์ด๋‹ค.

์‹คํ–‰ ํ–ˆ๋˜ ๊ฒƒ๋“ค์ด Spark Jobs๋“ค๋กœ ๋งŒ๋“ค์–ด์ ธ์„œ ์ž˜ ๋‚˜์˜จ๋‹ค!!

Spark Local Mode๋ž€?

์ง€๊ธˆ๊นŒ์ง€ ๋กœ์ปฌ์—์„œ pyspark๋ฅผ ์„ค์น˜ํ•ด ์‹คํ–‰ํ•œ ๋ฐฉ๋ฒ•์€ Spark๋ฅผ Local Mode๋กœ ์‚ฌ์šฉํ•œ ๊ฒƒ์ด๋‹ค. Spark๋ฅผ ์‹คํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ํฌ๊ฒŒ โ€œLocal Modeโ€์™€ โ€œCluster Modeโ€๋กœ ๋‚˜๋‰œ๋‹ค. ์‰ฝ๊ฒŒ ์ƒ๊ฐํ•˜๋ฉด ๋‹จ์ผ ๋จธ์‹ ์ด๋ƒ ๋“œ๋ผ์ด๋ฒ„-์›Œ์ปค๋กœ ๊ตฌ์„ฑ๋œ ํด๋Ÿฌ์Šคํ„ฐ๋กœ ๋Œ๋ฆฌ๋ƒ์˜ ์ฐจ์ด.

Spark Local Mode

  • Local Mode (์ง€๊ธˆ๊นŒ์ง€ ํ•œ ๊ฒƒ)
  • Cluster Mode (์–ธ์  ๊ฐ€ ๋‹ค๋ฃฐ ์˜ˆ์ •)
    • Client Deploy Mode
    • Cluster Deploy Mode

์•”ํŠผ ์ง€๊ธˆ๊นŒ์ง€ ์šฐ๋ฆฌ๊ฐ€ ์‹คํ–‰ํ•œ ๋ฐฉ์‹์€ Local Mode๋ผ๋Š” ๊ฒƒ! ์ด ๋ชจ๋“œ๋Š” ๋‹จ์ผ ๋จธ์‹ ์—์„œ ๋ชจ๋“  ๊ฒƒ์„ ์‹คํ–‰ํ•œ๋‹ค. ๊ทธ๋ž˜๋„ Diver์™€ Executor๋Š” ๊ฐ๊ฐ 1๊ฐœ์”ฉ ๋œฌ๋‹ค.

spark-submit

spark-submit์„ ํ†ตํ•ด Spark ๋จธ์‹ ์— python ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•˜๊ฒŒ ๋งŒ๋“ค ์ˆ˜ ์žˆ๋‹ค. Local Mode์—์„œ spark-submit์„ ํ•ด๋ณด์ž.

$ spark-submit \
  --master "local[2]" \
  ./hello-spark.py
---
...
24/08/22 01:15:54 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks resource profile 0
24/08/22 01:15:54 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (172.30.1.16, executor driver, partition 0, PROCESS_LOCAL, 8979 bytes) 
24/08/22 01:15:54 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (172.30.1.16, executor driver, partition 1, PROCESS_LOCAL, 8979 bytes) 
24/08/22 01:15:54 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
24/08/22 01:15:54 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
24/08/22 01:15:55 INFO PythonRunner: Times: total = 329, boot = 285, init = 44, finish = 0
24/08/22 01:15:55 INFO PythonRunner: Times: total = 329, boot = 283, init = 46, finish = 0
24/08/22 01:15:55 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1324 bytes result sent to driver
24/08/22 01:15:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1324 bytes result sent to driver
24/08/22 01:15:55 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 390 ms on 172.30.1.16 (executor driver) (1/2)
24/08/22 01:15:55 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 389 ms on 172.30.1.16 (executor driver) (2/2)
24/08/22 01:15:55 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
24/08/22 01:15:55 INFO PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 51645
24/08/22 01:15:55 INFO DAGScheduler: ResultStage 0 (count at /Users/seokyunha/xxxx/hello-spark.py:10) finished in 0.832 s
24/08/22 01:15:55 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
24/08/22 01:15:55 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
24/08/22 01:15:55 INFO DAGScheduler: Job 0 finished: count at /Users/seokyunha/xxxx/hello-spark.py:10, took 0.848853 s
10000

์•„์ง ๋กœ๊ทธ ์ฝ๋Š” ๋ฒ•์€ ์ž˜ ๋ชจ๋ฅด๊ฒ ๋‹ค. ๋‚˜์ค‘์— ์ฐพ์•„๋ณผ ๊ฒƒ!!

๋งˆ๋ฌด๋ฆฌ

์—ฌ๊ธฐ๊นŒ์ง€๊ฐ€ ๋‹จ์ผ ๋จธ์‹ ์—์„œ Spark ์ž‘์—…์„ ์‹คํ–‰ํ•˜๋Š” Local Mode ์˜€๋‹ค. ํฌ์ŠคํŠธ๋ฅผ ๋‚˜๋ˆ„์–ด, 2ํŽธ์—์„œ๋Š” ๋‹ค์ค‘ ๋จธ์‹  ๊ตฌ์กฐ์ธ Cluster Mode์—์„œ Spark ์ž‘์—…์„ ์‹คํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์‚ดํŽด๋ณด์ž!!

๐Ÿ‘‰ ๋กœ์ปฌ ๋งฅ๋ถ์—์„œ Spark ์‹คํ–‰ํ•˜๊ธฐ - 2ํŽธ: Cluster Mode

Categories:

Updated: