Spark SQL 是 Spark 中的一個(gè)子模塊,主要用于操作結(jié)構(gòu)化數(shù)據(jù)。它具有以下特點(diǎn):
創(chuàng)新互聯(lián)建站,是成都地區(qū)的互聯(lián)網(wǎng)解決方案提供商,用心服務(wù)為企業(yè)提供網(wǎng)站建設(shè)、重慶APP開發(fā)、微信小程序開發(fā)、系統(tǒng)定制網(wǎng)站建設(shè)和微信代運(yùn)營(yíng)服務(wù)。經(jīng)過(guò)數(shù)10多年的沉淀與積累,沉淀的是技術(shù)和服務(wù),讓客戶少走彎路,踏實(shí)做事,誠(chéng)實(shí)做人,用情服務(wù),致力做一個(gè)負(fù)責(zé)任、受尊敬的企業(yè)。對(duì)客戶負(fù)責(zé),就是對(duì)自己負(fù)責(zé),對(duì)企業(yè)負(fù)責(zé)。
為了支持結(jié)構(gòu)化數(shù)據(jù)的處理,Spark SQL 提供了新的數(shù)據(jù)結(jié)構(gòu) DataFrame。DataFrame 是一個(gè)由具名列組成的數(shù)據(jù)集。它在概念上等同于關(guān)系數(shù)據(jù)庫(kù)中的表或 R/Python 語(yǔ)言中的 data frame
。 由于 Spark SQL 支持多種語(yǔ)言的開發(fā),所以每種語(yǔ)言都定義了 DataFrame
的抽象,主要如下:
語(yǔ)言 | 主要抽象 |
---|---|
Scala | Dataset[T] & DataFrame (Dataset[Row] 的別名) |
Java | Dataset[T] |
Python | DataFrame |
R | DataFrame |
DataFrame 和 RDDs 最主要的區(qū)別在于一個(gè)面向的是結(jié)構(gòu)化數(shù)據(jù),一個(gè)面向的是非結(jié)構(gòu)化數(shù)據(jù),它們內(nèi)部的數(shù)據(jù)結(jié)構(gòu)如下:
DataFrame 內(nèi)部的有明確 Scheme 結(jié)構(gòu),即列名、列字段類型都是已知的,這帶來(lái)的好處是可以減少數(shù)據(jù)讀取以及更好地優(yōu)化執(zhí)行計(jì)劃,從而保證查詢效率。
DataFrame 和 RDDs 應(yīng)該如何選擇?
Dataset 也是分布式的數(shù)據(jù)集合,在 Spark 1.6 版本被引入,它集成了 RDD 和 DataFrame 的優(yōu)點(diǎn),具備強(qiáng)類型的特點(diǎn),同時(shí)支持 Lambda 函數(shù),但只能在 Scala 和 Java 語(yǔ)言中使用。在 Spark 2.0 后,為了方便開發(fā)者,Spark 將 DataFrame 和 Dataset 的 API 融合到一起,提供了結(jié)構(gòu)化的 API(Structured API),即用戶可以通過(guò)一套標(biāo)準(zhǔn)的 API 就能完成對(duì)兩者的操作。
這里注意一下:DataFrame 被標(biāo)記為 Untyped API,而 DataSet 被標(biāo)記為 Typed API,后文會(huì)對(duì)兩者做出解釋。
靜態(tài)類型 (Static-typing) 與運(yùn)行時(shí)類型安全 (runtime type-safety) 主要表現(xiàn)如下:
在實(shí)際使用中,如果你用的是 Spark SQL 的查詢語(yǔ)句,則直到運(yùn)行時(shí)你才會(huì)發(fā)現(xiàn)有語(yǔ)法錯(cuò)誤,而如果你用的是 DataFrame 和 Dataset,則在編譯時(shí)就可以發(fā)現(xiàn)錯(cuò)誤 (這節(jié)省了開發(fā)時(shí)間和整體代價(jià))。DataFrame 和 Dataset 主要區(qū)別在于:
在 DataFrame 中,當(dāng)你調(diào)用了 API 之外的函數(shù),編譯器就會(huì)報(bào)錯(cuò),但如果你使用了一個(gè)不存在的字段名字,編譯器依然無(wú)法發(fā)現(xiàn)。而 Dataset 的 API 都是用 Lambda 函數(shù)和 JVM 類型對(duì)象表示的,所有不匹配的類型參數(shù)在編譯時(shí)就會(huì)被發(fā)現(xiàn)。
以上這些最終都被解釋成關(guān)于類型安全圖譜,對(duì)應(yīng)開發(fā)中的語(yǔ)法和分析錯(cuò)誤。在圖譜中,Dataset 最嚴(yán)格,但對(duì)于開發(fā)者來(lái)說(shuō)效率最高。
這里一個(gè)可能的疑惑是 DataFrame 明明是有確定的 Scheme 結(jié)構(gòu) (即列名、列字段類型都是已知的),但是為什么還是無(wú)法對(duì)列名進(jìn)行推斷和錯(cuò)誤判斷,這是因?yàn)?DataFrame 是 Untyped 的。
在上面我們介紹過(guò) DataFrame API 被標(biāo)記為 Untyped API
,而 DataSet API 被標(biāo)記為 Typed API
。DataFrame 的 Untyped
是相對(duì)于語(yǔ)言或 API 層面而言,它確實(shí)有明確的 Scheme 結(jié)構(gòu),即列名,列類型都是確定的,但這些信息完全由 Spark 來(lái)維護(hù),Spark 只會(huì)在運(yùn)行時(shí)檢查這些類型和指定類型是否一致。這也就是為什么在 Spark 2.0 之后,官方推薦把 DataFrame 看做是 DatSet[Row]
,Row 是 Spark 中定義的一個(gè) trait
,其子類中封裝了列字段的信息。
相對(duì)而言,DataSet 是 Typed
的,即強(qiáng)類型。如下面代碼,DataSet 的類型由 Case Class(Scala) 或者 Java Bean(Java) 來(lái)明確指定的,在這里即每一行數(shù)據(jù)代表一個(gè) Person
,這些信息由 JVM 來(lái)保證正確性,所以字段名錯(cuò)誤和類型錯(cuò)誤在編譯的時(shí)候就會(huì)被 IDE 所發(fā)現(xiàn)。
case class Person(name: String, age: Long)
val dataSet: Dataset[Person] = spark.read.json("people.json").as[Person]
這里對(duì)三者做一下簡(jiǎn)單的總結(jié):
DataFrame、DataSet 和 Spark SQL 的實(shí)際執(zhí)行流程都是相同的:
執(zhí)行的第一個(gè)階段是將用戶代碼轉(zhuǎn)換成一個(gè)邏輯計(jì)劃。它首先將用戶代碼轉(zhuǎn)換成 unresolved logical plan
(未解決的邏輯計(jì)劃),之所以這個(gè)計(jì)劃是未解決的,是因?yàn)楸M管您的代碼在語(yǔ)法上是正確的,但是它引用的表或列可能不存在。 Spark 使用 analyzer
(分析器) 基于 catalog
(存儲(chǔ)的所有表和 DataFrames
的信息) 進(jìn)行解析。解析失敗則拒絕執(zhí)行,解析成功則將結(jié)果傳給 Catalyst
優(yōu)化器 (Catalyst Optimizer
),優(yōu)化器是一組規(guī)則的集合,用于優(yōu)化邏輯計(jì)劃,通過(guò)謂詞下推等方式進(jìn)行優(yōu)化,最終輸出優(yōu)化后的邏輯執(zhí)行計(jì)劃。
得到優(yōu)化后的邏輯計(jì)劃后,Spark 就開始了物理計(jì)劃過(guò)程。 它通過(guò)生成不同的物理執(zhí)行策略,并通過(guò)成本模型來(lái)比較它們,從而選擇一個(gè)最優(yōu)的物理計(jì)劃在集群上面執(zhí)行的。物理規(guī)劃的輸出結(jié)果是一系列的 RDDs 和轉(zhuǎn)換關(guān)系 (transformations)。
在選擇一個(gè)物理計(jì)劃后,Spark 運(yùn)行其 RDDs 代碼,并在運(yùn)行時(shí)執(zhí)行進(jìn)一步的優(yōu)化,生成本地 Java 字節(jié)碼,最后將運(yùn)行結(jié)果返回給用戶。
更多大數(shù)據(jù)系列文章可以參見 GitHub 開源項(xiàng)目: 大數(shù)據(jù)入門指南