Spark Session์ด๋ž€ ๋ฌด์—‡์ธ๊ฐ€? Spark Session์— ๋“ค์–ด์žˆ๋Š”, SparkContext, SQLContext ๋“ฑ์— ๋Œ€ํ•ด ๐ŸŽ‡

11 minute read

Databricks Certification ์ทจ๋“์„ ๋ชฉํ‘œ๋กœ Apache Spark๋ฅผ โ€œ์ œ๋Œ€๋กœโ€ ๊ณต๋ถ€ํ•ด๋ณด๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ํšŒ์‚ฌ์—์„  Databricks Unity Catalog๋ฅผ ๋„์ž…ํ•˜๋ ค๊ณ  ๋ถ„ํˆฌํ•˜๊ณ  ์žˆ๋Š”๋ฐ์š”. Spark์™€ ์ข€ ์นœํ•ด์งˆ ์ˆ˜ ์žˆ์„๊นŒ์š”? ๐ŸŽ‡ ์ „์ฒด ํฌ์ŠคํŠธ๋Š” Development - Spark์—์„œ ํ™•์ธํ•ด์‹ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

SparkSession? SparkContext?

์ด ๊ธ€์€ Apache Spark๋ฅผ ๊ณต๋ถ€ํ•˜๋ฉด์„œ SparkSession๊ณผ SparkContext๋ฅผ ํ—ท๊ฐˆ๋ ค ํ•˜๋˜ ๋‚˜์˜ ์ด์•ผ๊ธฐ๋ฅผ ๋‹ด์€ ํฌ์ŠคํŠธ์ด๋‹ค.

Spark๋ฅผ ์ฒ˜์Œ ๊ณต๋ถ€ํ•˜๋ฉด, ๊ฐ•์˜๋‚˜ ๊ณต์‹๋ฌธ์„œ๋ฅผ ํ†ตํ•ด์„  SparkContext๋กœ RDD๋ฅผ ๋งŒ๋“œ๋Š” ๊ฑธ ๋จผ์ € ๋ฐฐ์› ๋‹ค. ๊ทธ๋Ÿฐ๋ฐ, ํšŒ์‚ฌ์—์„  Databricks๋ฅผ ํ†ตํ•ด โ€œsparkโ€œ๋ž€ ์ด๋ฆ„์˜ ๋ณ€์ˆ˜๋กœ ์‚ฌ์šฉํ•˜๋Š” SparkSession๋ฅผ ๋จผ์ € ์‚ฌ์šฉํ–ˆ๋‹ค.

์ตœ๊ทผ์—” ํšŒ์‚ฌ์—์„œ Databricks์—์„œ Unity Catalog๋กœ ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜์„ ํ–ˆ๋Š”๋ฐ, UC์—์„œ๋Š” โ€œSparkContext๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ๋ชป ํ•œ๋‹คโ€ Limitation๋„ ์žˆ์—ˆ๋‹ค.

Databricks: Shared access mode limitations on Unity Catalog

๋ฌธ์„œ๋ฅผ ์ฝ์–ด๋ณด๋ฉด SparkContext, SQLContext์™€ ๊ฐ™์ด Context API๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ๋ชปํ•˜๊ณ , ๊ทธ์™€ ๊ด€๋ จ๋œ .parallelize() ํ•จ์ˆ˜๋ฅผ ํฌํ•จํ•ด RDD API์™€ Dataset API๋„ ์‚ฌ์šฉ ๋ชป ํ•˜๊ฒŒ ๋œ๋‹ค;;

sparkContext์— ์ ‘๊ทผํ•˜๋Š” ๊ฑด ๊ฐ€๋Šฅํ•˜์ง€๋งŒ, .parallelize() ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜๋Š” ๊ฑด ์•ˆ ๋œ๋‹ค.

์ด๋Ÿฐ ๋‚˜์˜ ๊ฒฝํ—˜๋“ค์€ SparkContext์™€ RDD๊ฐ€ โ€˜๊ตฌ์‹œ๋Œ€์˜ ์œ ๋ฌผโ€™์ด ์•„๋‹Œ์ง€ ์ƒ๊ฐํ•˜๊ฒŒ ํ–ˆ๊ณ , ๊ด€๋ จ๋œ ๊ฐœ๋…๋“ค์„ ๋ฐ›์•„ ๋“ค์ด๋Š” ๋ฐ์—, ์•Œ๊ฒŒ๋ชจ๋ฅด๊ฒŒ ์žฅ๋ฒฝ์ด ๋˜์—ˆ๋˜ ๊ฒƒ ๊ฐ™๋‹ค.

์•”ํŠผ ์ง€๊ธˆ์€ Databricks๋Š” Databricks, Spark๋Š” Spark๋ผ๋Š” ์ƒ๊ฐ์œผ๋กœ SparkContext์— ๋Œ€ํ•ด ๋ฐ›์•„๋“ค์ด๊ณ , ์‚ดํŽด๋ณด๋ ค๊ณ  ํ•œ๋‹ค.

Jump into SparkSession

ํšŒ์‚ฌ์—์„œ ์ œ์ผ ๋จผ์ € ์ตํ˜”๋˜ ๋ฐฉ์‹์ด ์š” SparkSession์ด๋‹ˆ ์š”๊ฑธ ๋จผ์ € ์‚ดํŽด๋ณด์ž.

Databricks์—์„  spark๋ผ๋Š” ๋ณ€์ˆ˜๋กœ pyspark๋“  scala spark๋“  ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋‹ค. ์š”๊ธฐ์—์„œ ์ฃผ๋กœ ์“ฐ๋˜ ๊ธฐ๋Šฅ๋“ค์€

  • spark.sql()
    • SQL ์ฟผ๋ฆฌ๋ฅผ ์‹คํ–‰์‹œํ‚ฌ ๋•Œ ์‚ฌ์šฉ
  • spark.udf.register()
    • Spark UDF ํ•จ์ˆ˜๋ฅผ ๋“ฑ๋กํ•  ๋•Œ ์‚ฌ์šฉ
  • spark.table()
    • Hive ํ…Œ์ด๋ธ”์— ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋Š” Spark DataFrame์„ ๋ฐ˜ํ™˜.
    • ์•„์ง Spark-Hive์˜ ๊ด€๊ณ„์— ๋Œ€ํ•ด์„œ ์ œ๋Œ€๋กœ ์‚ดํŽด๋ณด์ง„ ๋ชป ํ–ˆ์Œ. TODO!
  • spark.catalog
    • Spark Session์—์„œ ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋Š” ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ ๊ทธ ์•ˆ์˜ ๊ฐ์ฒด๋“ค(Table, View, Function)์„ ๊ด€๋ฆฌํ•˜๋Š” API
      • spark.catalog.listTables(): ํ…Œ์ด๋ธ” ๋ชฉ๋ก์„ ์กฐํšŒ
      • spark.catalog.setCurrentDatabase(): ํ˜„์žฌ ์‚ฌ์šฉํ•˜๋Š” Database๋ฅผ ์ง€์ •
  • spark.read
    • ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์–‘ํ•ญ ํ˜•์‹(csv, jso, parquet ๋“ฑ)์—์„œ ์ฝ์–ด๋“ค์—ฌ Spark DataFrame์œผ๋กœ ๋ฐ˜ํ™˜.
  • spark.write
    • Spark DataFrame์„ ๋‹ค์–‘ํ•œ ํ˜•์‹(์œ„์™€ ๋™์ผ)์œผ๋กœ ์ €์žฅ.
  • spark.conf
    • ํ˜„์žฌ Spark Session์˜ ์„ค์ •์„ ๊ด€๋ฆฌํ•˜๋Š” API
  • spark.streams
    • Spark Structured Streaming ๊ด€๋ จ API
  • spark.SparkContext, spark.sqlContext
    • SparkSession์— ๋‚ด์žฅ๋œ Spark/Sql Context
  • ๊ทธ์™ธ ๋“ฑ๋“ฑ

๋ณธ์ธ์ด ์ง€๊ธˆ Databricks์—์„œ spark๋กœ ์‚ฌ์šฉํ•˜๋Š” ์ฝ”๋“œ๋“ค์„ ๋ณธ ๊ฒƒ๋งŒ ์ด ์ •๋„๋‹ค. ์—ฌ๊ธฐ์— ๋ช‡ ๊ฐ€์ง€๋ฅผ ๊ธฐ๋Šฅ์„ ๋” ์ œ๊ณตํ•˜๋Š”๋ฐ, ๋‚˜๋จธ์ง€ ๊ธฐ๋Šฅ๋“ค์€ Spark ๊ณต์‹ ๋ฌธ์„œ ์ฐธ๊ณ .

Unified entry point for interacting with Spark

Spark Session์„ ์„ค๋ช…ํ•˜๋Š” ๊ฐ€์žฅ ๊ฐ„๋‹จํ•œ ๋ฌธ์žฅ์ด๋‹ค. spark๋ผ๋Š” ๋ณ€์ˆ˜๋ฅผ ํ†ตํ•ด์„œ SQL๋กœ ์‹คํ–‰ํ•˜์ง€, Hive ํ…Œ์ด๋ธ” ๋ชฉ๋ก๋„ ์กฐํšŒํ•˜์ง€, UDF๋„ ๋“ฑ๋กํ•˜์ง€, Parquet ํŒŒ์ผ์„ read/write ํ•˜๊ธฐ๋„ ํ•˜์ง€โ€ฆ ์ •๋ง ๋งŽ์€ ์ž‘์—…์„ Spark Session์„ ํ†ตํ•ด์„œ ์ˆ˜ํ–‰ํ•œ๋‹ค.

์•”ํŠผ ์ง€๊ธˆ๊นŒ์ง€ Spark Session์— ๋Œ€ํ•ด ์ฃผ์ €๋ฆฌ ์ฃผ์ €๋ฆฌ ๋– ๋“ค์—ˆ๋Š”๋ฐ, ๋‹ค์Œ์—” ์ด ํฌ์ŠคํŠธ์˜ ๋‘๋ฒˆ์งธ ์ฃผ์ œ์ธ SparkContext๋กœ ๋„˜์–ด๊ฐ€๋ณด์ž.

Jump into SparkContext

Spark ๊ณต๋ถ€๋ฅผ ์‹œ์ž‘ํ•  ๋•Œ, RDD์™€ ํ•จ๊ป˜ ๊ฐ€์žฅ ๋จผ์ € ๋งˆ์ฃผ์น˜๋Š” ๋…€์„์ธ SparkContext๋‹ค. ์˜ˆ์ „์—๋Š” SparkContext๋กœ Spark๋ฅผ ์ ‘ํ•˜๋Š”๋ฐ, ๋‹น์—ฐ ํ–ˆ์„ ์ง€๋„ ๋ชจ๋ฅธ๋‹ค. ์™œ๋ƒํ•˜๋ฉด, SparkSession์€ spark 2.0๋ถ€ํ„ฐ ๋„์ž…๋œ ๋ฐฉ์‹์ด๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.

  • SparkContext
    • spark 1.x
  • SparkSession
    • spark 2.0 (2016-07-26 ์ถœ์‹œ)

์ฐธ๊ณ ๋กœ ์ž‘์„ฑ์ผ(24.08.27) ๊ธฐ์ค€ Spark๋Š” 3.5.2 ๋ฒ„์ „๊นŒ์ง€ ๋‚˜์™”๋‹ค. Spark 3.0 ๋ฒ„์ „์—์„œ ๊ฐ€์žฅ ์ฃผ์š”ํ•œ ๋ณ€๊ฒฝ์ ์€ AQE(Adaptive Query Execution)์ด๊ณ  ๋‹ค์Œ ํฌ์ŠคํŠธ์—์„œ ๋‹ค๋ค„๋ณผ ์˜ˆ์ •์ด๋‹ค. link

SparkContext๋Š” RDD(Resilient Distributed Data)๋ฅผ ๋‹ค๋ฃจ๊ธฐ ์œ„ํ•œ entry point์ด๋‹ค. RDD๋Š” ์ดˆ๊ธฐ Spark๋ฅผ ์ด๋ฃจ๋Š” ๊ฐ€์žฅ ๊ธฐ์ดˆ์ ์ธ ๋ฐ์ดํ„ฐ ๊ตฌ์กฐ์ด๋‹ค. RDD์— ๋Œ€ํ•ด์„œ๋„ ์ง€๊ธˆ ์ž์„ธํžˆ ์–ธ๊ธ‰ํ•˜๊ธฐ๋Š” ์–ด๋ ค์›Œ์„œ ๋ณ„๋„ ํฌ์ŠคํŠธ์—์„œ ๋‹ค๋ค„๋ณด๊ฒ ๋‹ค. ๋Œ€์ถฉ ์›์‹œ์ ์ธ ํ˜•ํƒœ์˜ DataFrame์ด๋ผ๊ณ  ๋ณด๋ฉด ๋  ๊ฒƒ ๊ฐ™๋‹ค.

๋ช‡๊ฐ€์ง€ ์ฝ”๋“œ๋ฅผ ํ†ตํ•ด SparkSession์™€ SparkContext์˜ ์ฐจ์ด๋ฅผ ์‚ดํŽด๋ณด์ž.

Empty Data

Spark Session์—์„œ๋Š”โ€ฆ

scala> spark.emptyDataFrame
org.apache.spark.sql.DataFrame = []

Spark Context์—์„œ๋Š”โ€ฆ

scala> sc.emptyRDD
org.apache.spark.rdd.RDD[Nothing] = EmptyRDD[0] at emptyRDD at <console>:24

Range

Spark Session์—์„œ๋Š”โ€ฆ

scala> spark.range(10)
org.apache.spark.sql.Dataset[Long] = [id: bigint]

Spark Context์—์„œ๋Š”โ€ฆ

scala> sc.parallelize(1 to 9)
org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

Spark Session(์ดํ•˜ SS)์˜ .range()๊ฐ€ Spark Context(์ดํ•˜ SC)์˜ .parallelize()์™€ ๋Œ€์‘ํ•œ๋‹ค.

์ฐธ๊ณ ๋กœ SC์—๋„ .range() ํ•จ์ˆ˜๊ฐ€ ์žˆ๋Š”๋ฐ, ๋ฐ˜ํ™˜ํ•˜๋Š” RDD๊ฐ€ .parallelize()์™€ ์กฐ๊ธˆ ๋‹ค๋ฅด๋‹ค.

scala> sc.range(0, 10)
org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[3] at range at <console>:24

Read CSV

Spark Session์—์„œ๋Š”โ€ฆ

val df = spark.read
  .option("header", "true")  // ์ฒซ ๋ฒˆ์งธ ํ–‰์„ ํ—ค๋”๋กœ ์‚ฌ์šฉ
  .option("inferSchema", "true")  // ๋ฐ์ดํ„ฐ ํƒ€์ž…์„ ์ž๋™์œผ๋กœ ์ถ”๋ก 
  .csv("/path/to/your/file.csv")

// DataFrame ๋‚ด์šฉ ์ถœ๋ ฅ
df.show()

Spark Context ์—์„ โ€ฆ

// CSV ํŒŒ์ผ์„ ํ…์ŠคํŠธ ํŒŒ์ผ๋กœ ์ฝ๊ธฐ
val rdd = sc.textFile("/path/to/your/file.csv")

// CSV ํŒŒ์ผ์˜ ์ฒซ ๋ฒˆ์งธ ํ–‰์„ ํ—ค๋”๋กœ ์ฒ˜๋ฆฌํ•˜๊ณ , ๋ฐ์ดํ„ฐ๋ฅผ ๋ถ„ํ• 
val header = rdd.first()  // ์ฒซ ๋ฒˆ์งธ ํ–‰์„ ํ—ค๋”๋กœ ์ถ”์ถœ
val data = rdd.filter(row => row != header)  // ํ—ค๋”๋ฅผ ์ œ์™ธํ•œ ๋ฐ์ดํ„ฐ๋งŒ ํ•„ํ„ฐ๋ง

// ๊ฐ ํ–‰์„ ์ฝค๋งˆ๋กœ ๋ถ„ํ• ํ•˜์—ฌ RDD๋กœ ๋ณ€ํ™˜
val parsedData = data.map(line => line.split(","))

// RDD ๋‚ด์šฉ ์ถœ๋ ฅ
parsedData.collect().foreach(row => println(row.mkString(", ")))

Spark Context ๋ฐฉ์‹์€ low-level API๋ผ์„œ ๊ทธ๋Ÿฐ์ง€ CSV ํŒŒ์ผ์„ ์ฝ๋Š” ๊ฐ„๋‹จํ•œ ์ž‘์—…๋„, ๋ช‡๋ฒˆ์˜ ์ฒ˜๋ฆฌ๋ฅผ ๊ฑฐ์ณ์„œ ์ˆ˜ํ–‰๋˜๋Š” ๋ชจ์Šต์ด๋‹ค.

SQLContext

scala> val sqlContext = spark.sqlContext
res3: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@59aba9b3

scala> val df = sqlContext.read.json("./people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.createOrReplaceTempView("people")

scala> sqlContext.sql("SELECT name, age FROM people WHERE age > 21").show()
+----+---+
|name|age|
+----+---+
|Andy| 30|
+----+---+

SQL Context๋Š” spark๊ฐ€ ์ฝ์€ ๋ฐ์ดํ„ฐ๋ฅผ SQL์„ ์‚ฌ์šฉํ•ด ์ฟผ๋ฆฌํ•˜๊ฑฐ๋‚˜ ์กฐ์ž‘ํ•˜๋Š” ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•œ๋‹ค. ์ฝ”๋“œ์— ์‚ฌ์šฉํ•œ json ํŒŒ์ผ์€ spark example์—์„œ ํ™•์ธ ๊ฐ€๋Šฅํ•˜๋‹ค.

HiveContext

์‚ฌ์‹ค ์•„์ง Hive๋ฅผ ์ œ๋Œ€๋กœ ๊ณต๋ถ€ํ•˜์ง€ ์•Š์•˜์•„์„œ, ์ด ๋ถ€๋ถ„์„ ์ œ๋Œ€๋กœ ์ดํ•ดํ•˜์ง„ ๋ชป ํ–ˆ์ง€๋งŒ, ํ˜„์žฌ ํŒ€์—์„œ Databricks์—์„œ Hive Metastore๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์„ ๋ณด๋ฉด, ๋Œ€์ถฉ Spark SQL ์ฟผ๋ฆฌ๊ฐ€ Hive ํ…Œ์ด๋ธ”์„ ์ฟผ๋ฆฌํ•˜๋Š” ๊ฑฐ๋ผ๊ณ  ์ง์ž‘ํ•˜๊ณ  ์žˆ๋‹ค.

Spark์˜ HiveContext๋Š” Hive์— ์ €์žฅ๋œ ํ…Œ์ด๋ธ”์„ Spark์—์„œ ์ฟผ๋ฆฌํ•˜๋Š” ์—”๋“œํฌ์ธํŠธ๋ผ๊ณ  ์ดํ•ดํ•˜๊ณ  ์žˆ๋‹ค. ์•„๋ž˜ ์ฝ”๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด, spark-shell์„ ์‹คํ–‰ํ•œ ๊ฒฝ๋กœ์— Hive metastore๋ฅผ ๋กœ์ปฌ ์„ธํŒ… ํ•ด๋ณผ ์ˆ˜ ์žˆ๋‹ค.

scala> val df = spark.read.json("./people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.write.mode("overwrite").saveAsTable("people")
24/08/29 17:54:56 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/08/29 17:54:56 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/08/29 17:54:57 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
24/08/29 17:54:57 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore seokyunha@127.0.2.2
24/08/29 17:54:57 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
24/08/29 17:54:58 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
24/08/29 17:54:58 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
24/08/29 17:54:58 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/08/29 17:54:58 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/08/29 17:54:58 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

์œ„์˜ ๋ช…๋ น์–ด๋ฅผ ์‹คํ–‰ํ•˜๋ฉด, ๋กœ์ปฌ์— ๋‘ ํด๋”๊ฐ€ ์ž๋™์œผ๋กœ ์ƒ์„ฑ๋œ๋‹ค.

  • ./metastore_db/
    • ์ž๋™์œผ๋กœ ์ƒ์„ฑ๋œ Hive Metastore๋‹ค.
    • Apache Derby๋ผ๋Š” ์ˆœ์ˆ˜ ์ž๋ฐ”๋กœ ๊ตฌํ˜„๋œ RDBMS๋‹ค.
      • ์•ฝ๊ฐ„ sqlite ๊ฐ™์€ ๋…€์„์ž„.
    • ์ด๊ณณ์— Database, Table ๊ทธ๋ฆฌ๊ณ  ์ปฌ๋Ÿผ ์ •๋ณด ๋“ฑ์ด ์ €์žฅ๋œ๋‹ค.
  • ./spark-warehouse/people
    • .saveAsTable()๋กœ ์ €์žฅํ•œ Hive ํ…Œ์ด๋ธ”์ด ๋ฌผ๋ฆฌ์ ์œผ๋กœ ์ €์žฅ๋œ ๊ฒฝ๋กœ

๋กœ์ปฌ์— Hive Metastore๋ฅผ ๊ตฌ์ถ•ํ–ˆ๊ธฐ ๋•Œ๋ฌธ์—, spark-shell์„ ์ข…๋ฃŒํ•˜๊ณ  ๋‹ค์‹œ ์‹คํ–‰ํ•ด๋„ ๊ฐ™์€ ๊ฒฝ๋กœ์—์„œ ์‹คํ–‰๋งŒ ํ•œ๋‹ค๋ฉด, metastore_db ํด๋”๋ฅผ ํ†ตํ•ด ์ด์ „์— ์ €์žฅํ–ˆ๋˜ Hive ํ…Œ์ด๋ธ” ์ •๋ณด๋ฅผ ๋‹ค์‹œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

scala> spark.sql("SELECT * FROM default.people").show
+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

HiveContext๋ฅผ ์‚ฌ์šฉํ•ด์„œ HiveQL์„ ๋‚ ๋ฆฌ๋ ค๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด ํ•˜๋ฉด ๋œ๋‹ค.

scala> import org.apache.spark.sql.hive.HiveContext
scala> val hiveContext = new HiveContext(sc)
scala> hiveContext.sql("SELECT * FROM default.people")

Multiple SparkSession, but only one SparkContext

ํ•˜๋‚˜์˜ Spark Cluster ์•ˆ์— SparkSession์€ ๋™์‹œ์— ์—ฌ๋Ÿฌ ๊ฐœ ์กด์žฌํ•˜๊ฑฐ๋‚˜ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ์ง€๋งŒ, SparkContext์€ ์˜ค์ง ํ•˜๋‚˜๋งŒ ์กด์žฌํ•œ๋‹ค.

์ฆ‰, ์—ฌ๋Ÿฌ SparkSession์ด ํ•˜๋‚˜์˜ ๊ฐ™์€ SparkContext๋ฅผ ์„œ๋กœ ๊ณต์œ ํ•œ๋‹ค๋Š” ๋œป์ด๋‹ค. Spark์—์„œ Multiple Spark Session์„ ์ง€์›ํ•˜๊ธฐ ๋•Œ๋ฌธ์—, ๊ฐ์ž์˜ Databricks ๋…ธํŠธ๋ถ(๋˜๋Š” Jupyter ๋…ธํŠธ๋ถ)์—์„œ ์ƒ์„ฑํ•˜๋Š” TEMP VIEW์˜ ์ด๋ฆ„์ด ๊ฐ™์•„๋„ ์„œ๋กœ ๊ฐ„์„ญ ํ•˜์ง€ ์•Š๋Š”๋‹ค.

Databricks์—์„œ ์„œ๋กœ ๋‹ค๋ฅธ ๋‘ ๋…ธํŠธ๋ถ์—์„œ ๊ฐ™์€ ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ๋ถ™์—ฌ ํ™•์ธํ•ด๋ณด๋ฉด

Spark Session์€ ์„œ๋กœ ๋‹ค๋ฅธ ID๋ฅผ ๊ฐ™์ง€๋งŒ, Spark Context๋Š” ๊ฐ™์€ ID๋ฅผ ๊ฐ–๋Š”๋‹ค.

์œ„์™€ ๊ฐ™์ด sc์˜ ID๋Š” ๋™์ผํ•œ ๊ฑธ ๋ณผ ์ˆ˜ ์žˆ๋‹ค. ์ด๋•Œ ID๋Š” ์–ด๋–ค YYYYMMDDHHmmss ํฌ๋งท์ธ๋ฐ, Databricks ํด๋Ÿฌ์Šคํ„ฐ๊ฐ€ ์‹œ์ž‘๋œ UTC ์‹œ๊ฐ„์ด ID ๊ฐ’์ด ๋˜์—ˆ๋‹ค.

๋งบ์Œ๋ง

์ด๋ฒˆ ํฌ์ŠคํŠธ๋ฅผ ์ž‘์„ฑํ•˜๋ฉด์„œ, SparkSession๊ณผ SparkContext, SQLContext, HiveContext๊นŒ์ง€, ๋ชจํ˜ธํ•˜๊ฒŒ ์•Œ๋˜ ๊ฐœ๋…์„ ์ดํ•ดํ•˜๊ฒŒ ๋œ ๊ฒƒ ๊ฐ™๋‹ค. Spark์™€ Hive๋„ ๋‘˜์ด ์™œ ๋งจ๋‚  ์—ฎ์ด๋Š”์ง€ ๊ถ๊ธˆํ–ˆ๋Š”๋ฐ, ์ด๊ฒƒ๋„ ์–ด๋ ดํ’‹์ด ์•Œ๊ฒŒ ๋œ ๊ฒƒ ๊ฐ™๋‹ค. โ€œRDD์™€ DataFrame์˜ ์ฐจ์ด์ ์€?โ€ ๊ฐ™์€ ์งˆ๋ฌธ๋„ ์ƒˆ๋กญ๊ฒŒ ์ƒ๊ฒจ๋‚ฌ๋‹ค.

๋ฌผ๋ก  ์•„์ง๋„ Databricks์˜ Shared-mode์—์„œ Unity Catalog์—์„œ ์™œ SparkContext, SQLContext, RDD๋ฅผ ์ œํ•œํ•˜๊ฒŒ ๋˜์—ˆ๋Š”์ง€๋Š” ์ž˜ ๋ชจ๋ฅด๊ฒ ์ง€๋งŒ, Spark๋ฅผ ๊ณต๋ถ€ํ•˜๋‹ค๋ณด๋ฉด ๊ณง ์•Œ๊ฒŒ ๋˜๊ฒ ์ง€โ€ฆ!?

Categories:

Updated: