Jump into Spark Sessions and Contexts
Databricks Certification ์ทจ๋์ ๋ชฉํ๋ก Apache Spark๋ฅผ โ์ ๋๋กโ ๊ณต๋ถํด๋ณด๊ณ ์์ต๋๋ค. ํ์ฌ์์ Databricks Unity Catalog๋ฅผ ๋์ ํ๋ ค๊ณ ๋ถํฌํ๊ณ ์๋๋ฐ์. Spark์ ์ข ์นํด์ง ์ ์์๊น์? ๐ ์ ์ฒด ํฌ์คํธ๋ Development - Spark์์ ํ์ธํด์ค ์ ์์ต๋๋ค.
SparkSession? SparkContext?
์ด ๊ธ์ Apache Spark๋ฅผ ๊ณต๋ถํ๋ฉด์ SparkSession
๊ณผ SparkContext
๋ฅผ ํท๊ฐ๋ ค ํ๋ ๋์ ์ด์ผ๊ธฐ๋ฅผ ๋ด์ ํฌ์คํธ์ด๋ค.
Spark๋ฅผ ์ฒ์ ๊ณต๋ถํ๋ฉด, ๊ฐ์๋ ๊ณต์๋ฌธ์๋ฅผ ํตํด์ SparkContext
๋ก RDD๋ฅผ ๋ง๋๋ ๊ฑธ ๋จผ์ ๋ฐฐ์ ๋ค. ๊ทธ๋ฐ๋ฐ, ํ์ฌ์์ Databricks๋ฅผ ํตํด โsparkโ๋ ์ด๋ฆ์ ๋ณ์๋ก ์ฌ์ฉํ๋ SparkSession
๋ฅผ ๋จผ์ ์ฌ์ฉํ๋ค.
์ต๊ทผ์ ํ์ฌ์์ Databricks์์ Unity Catalog๋ก ๋ง์ด๊ทธ๋ ์ด์
์ ํ๋๋ฐ, UC์์๋ โSparkContext
๋ฅผ ์ฌ์ฉํ์ง ๋ชป ํ๋คโ Limitation๋ ์์๋ค.
๋ฌธ์๋ฅผ ์ฝ์ด๋ณด๋ฉด SparkContext
, SQLContext
์ ๊ฐ์ด Context API๋ฅผ ์ฌ์ฉํ์ง ๋ชปํ๊ณ , ๊ทธ์ ๊ด๋ จ๋ .parallelize()
ํจ์๋ฅผ ํฌํจํด RDD API์ Dataset API๋ ์ฌ์ฉ ๋ชป ํ๊ฒ ๋๋ค;;
์ด๋ฐ ๋์ ๊ฒฝํ๋ค์ SparkContext
์ RDD๊ฐ โ๊ตฌ์๋์ ์ ๋ฌผโ์ด ์๋์ง ์๊ฐํ๊ฒ ํ๊ณ , ๊ด๋ จ๋ ๊ฐ๋
๋ค์ ๋ฐ์ ๋ค์ด๋ ๋ฐ์, ์๊ฒ๋ชจ๋ฅด๊ฒ ์ฅ๋ฒฝ์ด ๋์๋ ๊ฒ ๊ฐ๋ค.
์ํผ ์ง๊ธ์ Databricks๋ Databricks, Spark๋ Spark๋ผ๋ ์๊ฐ์ผ๋ก SparkContext
์ ๋ํด ๋ฐ์๋ค์ด๊ณ , ์ดํด๋ณด๋ ค๊ณ ํ๋ค.
Jump into SparkSession
ํ์ฌ์์ ์ ์ผ ๋จผ์ ์ตํ๋ ๋ฐฉ์์ด ์ SparkSession
์ด๋ ์๊ฑธ ๋จผ์ ์ดํด๋ณด์.
Databricks์์ spark
๋ผ๋ ๋ณ์๋ก pyspark๋ scala spark๋ ์ ๊ทผํ ์ ์๋ค. ์๊ธฐ์์ ์ฃผ๋ก ์ฐ๋ ๊ธฐ๋ฅ๋ค์
spark.sql()
- SQL ์ฟผ๋ฆฌ๋ฅผ ์คํ์ํฌ ๋ ์ฌ์ฉ
spark.udf.register()
- Spark UDF ํจ์๋ฅผ ๋ฑ๋กํ ๋ ์ฌ์ฉ
spark.table()
- Hive ํ ์ด๋ธ์ ์ ๊ทผํ ์ ์๋ Spark DataFrame์ ๋ฐํ.
- ์์ง Spark-Hive์ ๊ด๊ณ์ ๋ํด์ ์ ๋๋ก ์ดํด๋ณด์ง ๋ชป ํ์. TODO!
spark.catalog
- Spark Session์์ ์ ๊ทผํ ์ ์๋ ๋ชจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๊ทธ ์์ ๊ฐ์ฒด๋ค(Table, View, Function)์ ๊ด๋ฆฌํ๋ API
spark.catalog.listTables()
: ํ ์ด๋ธ ๋ชฉ๋ก์ ์กฐํspark.catalog.setCurrentDatabase()
: ํ์ฌ ์ฌ์ฉํ๋ Database๋ฅผ ์ง์
- Spark Session์์ ์ ๊ทผํ ์ ์๋ ๋ชจ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๊ทธ ์์ ๊ฐ์ฒด๋ค(Table, View, Function)์ ๊ด๋ฆฌํ๋ API
spark.read
- ๋ฐ์ดํฐ๋ฅผ ๋ค์ํญ ํ์(csv, jso, parquet ๋ฑ)์์ ์ฝ์ด๋ค์ฌ Spark DataFrame์ผ๋ก ๋ฐํ.
spark.write
- Spark DataFrame์ ๋ค์ํ ํ์(์์ ๋์ผ)์ผ๋ก ์ ์ฅ.
spark.conf
- ํ์ฌ Spark Session์ ์ค์ ์ ๊ด๋ฆฌํ๋ API
spark.streams
- Spark Structured Streaming ๊ด๋ จ API
spark.SparkContext
,spark.sqlContext
- SparkSession์ ๋ด์ฅ๋ Spark/Sql Context
- ๊ทธ์ธ ๋ฑ๋ฑ
๋ณธ์ธ์ด ์ง๊ธ Databricks์์ spark
๋ก ์ฌ์ฉํ๋ ์ฝ๋๋ค์ ๋ณธ ๊ฒ๋ง ์ด ์ ๋๋ค. ์ฌ๊ธฐ์ ๋ช ๊ฐ์ง๋ฅผ ๊ธฐ๋ฅ์ ๋ ์ ๊ณตํ๋๋ฐ, ๋๋จธ์ง ๊ธฐ๋ฅ๋ค์ Spark ๊ณต์ ๋ฌธ์ ์ฐธ๊ณ .
Unified entry point for interacting with Spark
Spark Session์ ์ค๋ช
ํ๋ ๊ฐ์ฅ ๊ฐ๋จํ ๋ฌธ์ฅ์ด๋ค. spark
๋ผ๋ ๋ณ์๋ฅผ ํตํด์ SQL๋ก ์คํํ์ง, Hive ํ
์ด๋ธ ๋ชฉ๋ก๋ ์กฐํํ์ง, UDF๋ ๋ฑ๋กํ์ง, Parquet ํ์ผ์ read/write ํ๊ธฐ๋ ํ์งโฆ ์ ๋ง ๋ง์ ์์
์ Spark Session์ ํตํด์ ์ํํ๋ค.
์ํผ ์ง๊ธ๊น์ง Spark Session์ ๋ํด ์ฃผ์ ๋ฆฌ ์ฃผ์ ๋ฆฌ ๋ ๋ค์๋๋ฐ, ๋ค์์ ์ด ํฌ์คํธ์ ๋๋ฒ์งธ ์ฃผ์ ์ธ SparkContext
๋ก ๋์ด๊ฐ๋ณด์.
Jump into SparkContext
Spark ๊ณต๋ถ๋ฅผ ์์ํ ๋, RDD์ ํจ๊ป ๊ฐ์ฅ ๋จผ์ ๋ง์ฃผ์น๋ ๋
์์ธ SparkContext
๋ค. ์์ ์๋ SparkContext
๋ก Spark๋ฅผ ์ ํ๋๋ฐ, ๋น์ฐ ํ์ ์ง๋ ๋ชจ๋ฅธ๋ค. ์๋ํ๋ฉด, SparkSession
์ spark 2.0๋ถํฐ ๋์
๋ ๋ฐฉ์์ด๊ธฐ ๋๋ฌธ์ด๋ค.
SparkContext
- spark 1.x
SparkSession
- spark 2.0 (2016-07-26 ์ถ์)
์ฐธ๊ณ ๋ก ์์ฑ์ผ(24.08.27) ๊ธฐ์ค Spark๋ 3.5.2
๋ฒ์ ๊น์ง ๋์๋ค. Spark 3.0 ๋ฒ์ ์์ ๊ฐ์ฅ ์ฃผ์ํ ๋ณ๊ฒฝ์ ์ AQE(Adaptive Query Execution)์ด๊ณ ๋ค์ ํฌ์คํธ์์ ๋ค๋ค๋ณผ ์์ ์ด๋ค. link
SparkContext
๋ RDD(Resilient Distributed Data)๋ฅผ ๋ค๋ฃจ๊ธฐ ์ํ entry point์ด๋ค. RDD๋ ์ด๊ธฐ Spark๋ฅผ ์ด๋ฃจ๋ ๊ฐ์ฅ ๊ธฐ์ด์ ์ธ ๋ฐ์ดํฐ ๊ตฌ์กฐ์ด๋ค. RDD์ ๋ํด์๋ ์ง๊ธ ์์ธํ ์ธ๊ธํ๊ธฐ๋ ์ด๋ ค์์ ๋ณ๋ ํฌ์คํธ์์ ๋ค๋ค๋ณด๊ฒ ๋ค. ๋์ถฉ ์์์ ์ธ ํํ์ DataFrame์ด๋ผ๊ณ ๋ณด๋ฉด ๋ ๊ฒ ๊ฐ๋ค.
๋ช๊ฐ์ง ์ฝ๋๋ฅผ ํตํด SparkSession
์ SparkContext
์ ์ฐจ์ด๋ฅผ ์ดํด๋ณด์.
Empty Data
Spark Session์์๋โฆ
scala> spark.emptyDataFrame
org.apache.spark.sql.DataFrame = []
Spark Context์์๋โฆ
scala> sc.emptyRDD
org.apache.spark.rdd.RDD[Nothing] = EmptyRDD[0] at emptyRDD at <console>:24
Range
Spark Session์์๋โฆ
scala> spark.range(10)
org.apache.spark.sql.Dataset[Long] = [id: bigint]
Spark Context์์๋โฆ
scala> sc.parallelize(1 to 9)
org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
Spark Session(์ดํ SS)์ .range()
๊ฐ Spark Context(์ดํ SC)์ .parallelize()
์ ๋์ํ๋ค.
์ฐธ๊ณ ๋ก SC์๋ .range()
ํจ์๊ฐ ์๋๋ฐ, ๋ฐํํ๋ RDD๊ฐ .parallelize()
์ ์กฐ๊ธ ๋ค๋ฅด๋ค.
scala> sc.range(0, 10)
org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[3] at range at <console>:24
Read CSV
Spark Session์์๋โฆ
val df = spark.read
.option("header", "true") // ์ฒซ ๋ฒ์งธ ํ์ ํค๋๋ก ์ฌ์ฉ
.option("inferSchema", "true") // ๋ฐ์ดํฐ ํ์
์ ์๋์ผ๋ก ์ถ๋ก
.csv("/path/to/your/file.csv")
// DataFrame ๋ด์ฉ ์ถ๋ ฅ
df.show()
Spark Context ์์ โฆ
// CSV ํ์ผ์ ํ
์คํธ ํ์ผ๋ก ์ฝ๊ธฐ
val rdd = sc.textFile("/path/to/your/file.csv")
// CSV ํ์ผ์ ์ฒซ ๋ฒ์งธ ํ์ ํค๋๋ก ์ฒ๋ฆฌํ๊ณ , ๋ฐ์ดํฐ๋ฅผ ๋ถํ
val header = rdd.first() // ์ฒซ ๋ฒ์งธ ํ์ ํค๋๋ก ์ถ์ถ
val data = rdd.filter(row => row != header) // ํค๋๋ฅผ ์ ์ธํ ๋ฐ์ดํฐ๋ง ํํฐ๋ง
// ๊ฐ ํ์ ์ฝค๋ง๋ก ๋ถํ ํ์ฌ RDD๋ก ๋ณํ
val parsedData = data.map(line => line.split(","))
// RDD ๋ด์ฉ ์ถ๋ ฅ
parsedData.collect().foreach(row => println(row.mkString(", ")))
Spark Context ๋ฐฉ์์ low-level API๋ผ์ ๊ทธ๋ฐ์ง CSV ํ์ผ์ ์ฝ๋ ๊ฐ๋จํ ์์ ๋, ๋ช๋ฒ์ ์ฒ๋ฆฌ๋ฅผ ๊ฑฐ์ณ์ ์ํ๋๋ ๋ชจ์ต์ด๋ค.
SQLContext
scala> val sqlContext = spark.sqlContext
res3: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@59aba9b3
scala> val df = sqlContext.read.json("./people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.createOrReplaceTempView("people")
scala> sqlContext.sql("SELECT name, age FROM people WHERE age > 21").show()
+----+---+
|name|age|
+----+---+
|Andy| 30|
+----+---+
SQL Context๋ spark๊ฐ ์ฝ์ ๋ฐ์ดํฐ๋ฅผ SQL์ ์ฌ์ฉํด ์ฟผ๋ฆฌํ๊ฑฐ๋ ์กฐ์ํ๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค. ์ฝ๋์ ์ฌ์ฉํ json ํ์ผ์ spark example์์ ํ์ธ ๊ฐ๋ฅํ๋ค.
HiveContext
์ฌ์ค ์์ง Hive๋ฅผ ์ ๋๋ก ๊ณต๋ถํ์ง ์์์์, ์ด ๋ถ๋ถ์ ์ ๋๋ก ์ดํดํ์ง ๋ชป ํ์ง๋ง, ํ์ฌ ํ์์ Databricks์์ Hive Metastore๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ ๋ณด๋ฉด, ๋์ถฉ Spark SQL ์ฟผ๋ฆฌ๊ฐ Hive ํ ์ด๋ธ์ ์ฟผ๋ฆฌํ๋ ๊ฑฐ๋ผ๊ณ ์ง์ํ๊ณ ์๋ค.
Spark์ HiveContext
๋ Hive์ ์ ์ฅ๋ ํ
์ด๋ธ์ Spark์์ ์ฟผ๋ฆฌํ๋ ์๋ํฌ์ธํธ๋ผ๊ณ ์ดํดํ๊ณ ์๋ค. ์๋ ์ฝ๋๋ฅผ ์ฌ์ฉํ๋ฉด, spark-shell
์ ์คํํ ๊ฒฝ๋ก์ Hive metastore๋ฅผ ๋ก์ปฌ ์ธํ
ํด๋ณผ ์ ์๋ค.
scala> val df = spark.read.json("./people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.write.mode("overwrite").saveAsTable("people")
24/08/29 17:54:56 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/08/29 17:54:56 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/08/29 17:54:57 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
24/08/29 17:54:57 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore seokyunha@127.0.2.2
24/08/29 17:54:57 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
24/08/29 17:54:58 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
24/08/29 17:54:58 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
24/08/29 17:54:58 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/08/29 17:54:58 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/08/29 17:54:58 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
์์ ๋ช ๋ น์ด๋ฅผ ์คํํ๋ฉด, ๋ก์ปฌ์ ๋ ํด๋๊ฐ ์๋์ผ๋ก ์์ฑ๋๋ค.
./metastore_db/
- ์๋์ผ๋ก ์์ฑ๋ Hive Metastore๋ค.
- Apache Derby๋ผ๋ ์์ ์๋ฐ๋ก ๊ตฌํ๋ RDBMS๋ค.
- ์ฝ๊ฐ sqlite ๊ฐ์ ๋ ์์.
- ์ด๊ณณ์ Database, Table ๊ทธ๋ฆฌ๊ณ ์ปฌ๋ผ ์ ๋ณด ๋ฑ์ด ์ ์ฅ๋๋ค.
./spark-warehouse/people
.saveAsTable()
๋ก ์ ์ฅํ Hive ํ ์ด๋ธ์ด ๋ฌผ๋ฆฌ์ ์ผ๋ก ์ ์ฅ๋ ๊ฒฝ๋ก
๋ก์ปฌ์ Hive Metastore๋ฅผ ๊ตฌ์ถํ๊ธฐ ๋๋ฌธ์, spark-shell
์ ์ข
๋ฃํ๊ณ ๋ค์ ์คํํด๋ ๊ฐ์ ๊ฒฝ๋ก์์ ์คํ๋ง ํ๋ค๋ฉด, metastore_db
ํด๋๋ฅผ ํตํด ์ด์ ์ ์ ์ฅํ๋ Hive ํ
์ด๋ธ ์ ๋ณด๋ฅผ ๋ค์ ์ฌ์ฉํ ์ ์๋ค.
scala> spark.sql("SELECT * FROM default.people").show
+----+-------+
| age| name|
+----+-------+
|NULL|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
HiveContext
๋ฅผ ์ฌ์ฉํด์ HiveQL์ ๋ ๋ฆฌ๋ ค๋ฉด ์๋์ ๊ฐ์ด ํ๋ฉด ๋๋ค.
scala> import org.apache.spark.sql.hive.HiveContext
scala> val hiveContext = new HiveContext(sc)
scala> hiveContext.sql("SELECT * FROM default.people")
Multiple SparkSession, but only one SparkContext
ํ๋์ Spark Cluster ์์ SparkSession์ ๋์์ ์ฌ๋ฌ ๊ฐ ์กด์ฌํ๊ฑฐ๋ ์์ฑํ ์ ์์ง๋ง, SparkContext์ ์ค์ง ํ๋๋ง ์กด์ฌํ๋ค.
์ฆ, ์ฌ๋ฌ SparkSession์ด ํ๋์ ๊ฐ์ SparkContext๋ฅผ ์๋ก ๊ณต์ ํ๋ค๋ ๋ป์ด๋ค. Spark์์ Multiple Spark Session์ ์ง์ํ๊ธฐ ๋๋ฌธ์, ๊ฐ์์ Databricks ๋
ธํธ๋ถ(๋๋ Jupyter ๋
ธํธ๋ถ)์์ ์์ฑํ๋ TEMP VIEW
์ ์ด๋ฆ์ด ๊ฐ์๋ ์๋ก ๊ฐ์ญ ํ์ง ์๋๋ค.
Databricks์์ ์๋ก ๋ค๋ฅธ ๋ ๋ ธํธ๋ถ์์ ๊ฐ์ ํด๋ฌ์คํฐ๋ฅผ ๋ถ์ฌ ํ์ธํด๋ณด๋ฉด
์์ ๊ฐ์ด sc
์ ID๋ ๋์ผํ ๊ฑธ ๋ณผ ์ ์๋ค. ์ด๋ ID๋ ์ด๋ค YYYYMMDDHHmmss
ํฌ๋งท์ธ๋ฐ, Databricks ํด๋ฌ์คํฐ๊ฐ ์์๋ UTC ์๊ฐ์ด ID ๊ฐ์ด ๋์๋ค.
๋งบ์๋ง
์ด๋ฒ ํฌ์คํธ๋ฅผ ์์ฑํ๋ฉด์, SparkSession
๊ณผ SparkContext
, SQLContext
, HiveContext
๊น์ง, ๋ชจํธํ๊ฒ ์๋ ๊ฐ๋
์ ์ดํดํ๊ฒ ๋ ๊ฒ ๊ฐ๋ค. Spark์ Hive๋ ๋์ด ์ ๋งจ๋ ์ฎ์ด๋์ง ๊ถ๊ธํ๋๋ฐ, ์ด๊ฒ๋ ์ด๋ ดํ์ด ์๊ฒ ๋ ๊ฒ ๊ฐ๋ค. โRDD
์ DataFrame
์ ์ฐจ์ด์ ์?โ ๊ฐ์ ์ง๋ฌธ๋ ์๋กญ๊ฒ ์๊ฒจ๋ฌ๋ค.
๋ฌผ๋ก ์์ง๋ Databricks์ Shared-mode์์ Unity Catalog์์ ์ SparkContext
, SQLContext
, RDD
๋ฅผ ์ ํํ๊ฒ ๋์๋์ง๋ ์ ๋ชจ๋ฅด๊ฒ ์ง๋ง, Spark๋ฅผ ๊ณต๋ถํ๋ค๋ณด๋ฉด ๊ณง ์๊ฒ ๋๊ฒ ์งโฆ!?