cdn.xitu.io/2018/11/21/1673560dca70a6b7?w=1433&h=534&f=jpeg&s=309760">?
?
spark sql 可以說(shuō)是 spark 中的精華部分了,我感覺(jué)整體復(fù)雜度是 spark streaming 的 5 倍以上,現(xiàn)在 spark 官方主推 structed streaming, spark streaming ?維護(hù)的也不積極了, 我們基于 spark 來(lái)構(gòu)建大數(shù)據(jù)計(jì)算任務(wù),重心也要向 DataSet 轉(zhuǎn)移,原來(lái)基于 RDD 寫(xiě)的代碼遷移過(guò)來(lái),好處是非常大的,尤其是在性能方面,有質(zhì)的提升, ?spark sql 中的各種內(nèi)嵌的性能優(yōu)化是比人裸寫(xiě) RDD 遵守各種所謂的最佳實(shí)踐更靠譜的,尤其對(duì)新手來(lái)講, 比如有些最佳實(shí)踐講到先 filter 操作再 map 操作,這種 spark sql 中會(huì)自動(dòng)進(jìn)行謂詞下推,比如盡量避免使用 shuffle 操作,spark sql 中如果你開(kāi)啟了相關(guān)的配置,會(huì)自動(dòng)使用 broadcast join 來(lái)廣播小表,把 shuffle join 轉(zhuǎn)化為 map join 等等,真的能讓我們省很多心。?
?
spark sql 的代碼復(fù)雜度是問(wèn)題的本質(zhì)復(fù)雜度帶來(lái)的,spark sql 中的?Catalyst 框架大部分邏輯是在一個(gè) Tree 類型的數(shù)據(jù)結(jié)構(gòu)上做各種折騰,基于 scala 來(lái)實(shí)現(xiàn)還是很優(yōu)雅的,scala 的偏函數(shù)和強(qiáng)大的 Case 正則匹配,讓整個(gè)代碼看起來(lái)還是清晰的, 這篇文章簡(jiǎn)單的描述下 spark sql 中的一些機(jī)制和概念。
?
SparkSession 是我們編寫(xiě) spark 應(yīng)用代碼的入口,啟動(dòng)一個(gè) spark-shell 會(huì)提供給你一個(gè)創(chuàng)建 SparkSession, 這個(gè)對(duì)象是整個(gè) spark 應(yīng)用的起始點(diǎn),我們來(lái)看下 sparkSession 的一些重要的變量和方法:
?
?
?
上面提到的 sessionState 是一個(gè)很關(guān)鍵的東西,維護(hù)了當(dāng)前 session 使用的所有的狀態(tài)數(shù)據(jù),有以下各種需要維護(hù)的東西:
?
?
?
spark sql 內(nèi)部使用 dataFrame 和 Dataset 來(lái)表示一個(gè)數(shù)據(jù)集合,然后你可以在這個(gè)數(shù)據(jù)集合上應(yīng)用各種統(tǒng)計(jì)函數(shù)和算子,有人可能對(duì) ?DataFrame 和 Dataset 分不太清,其實(shí)?DataFrame 就是一種類型為 Row 的 DataSet,
?
type?DataFrame?=?Dataset[Row]
?
這里說(shuō)的 Row 類型在 Spark sql 對(duì)外暴露的 API 層面來(lái)說(shuō)的, 然而 DataSet 并不要求輸入類型為 Row,也可以是一種強(qiáng)類型的數(shù)據(jù),DataSet 底層處理的數(shù)據(jù)類型為 Catalyst 內(nèi)部?InternalRow?或者 UnsafeRow?類型, 背后有一個(gè)?Encoder 進(jìn)行隱式轉(zhuǎn)換,把你輸入的數(shù)據(jù)轉(zhuǎn)換為內(nèi)部的?InternalRow,那么這樣推論,DataFrame 就對(duì)應(yīng) RowEncoder。
?
在?Dataset 上進(jìn)行?transformations 操作就會(huì)生成一個(gè)元素為 LogicalPlan 類型的樹(shù)形結(jié)構(gòu), 我們來(lái)舉個(gè)例子,假如我有一張學(xué)生表,一張分?jǐn)?shù)表,需求是統(tǒng)計(jì)所有大于 11 歲的學(xué)生的總分。
?
?
?
?
這個(gè)?queryExecution 就是整個(gè)執(zhí)行計(jì)劃的執(zhí)行引擎, 里面有執(zhí)行過(guò)程中,各個(gè)中間過(guò)程變量,整個(gè)執(zhí)行流程如下
?
?
?
那么我們上面例子中的 sql 語(yǔ)句經(jīng)過(guò) Parser 解析后就會(huì)變成一個(gè)抽象語(yǔ)法樹(shù),對(duì)應(yīng)解析后的邏輯計(jì)劃 AST 為
?
?
?
形象一點(diǎn)用圖來(lái)表示
?
?
?
我們可以看到過(guò)濾條件變?yōu)榱?Filter 節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)是 UnaryNode 類型, 也就是只有一個(gè)孩子,兩個(gè)表中的數(shù)據(jù)變?yōu)榱?UnresolvedRelation 節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)是?LeafNode 類型, 顧名思義,葉子節(jié)點(diǎn), JOIN 操作就表位了 Join 節(jié)點(diǎn), 這個(gè)是一個(gè)?BinaryNode 節(jié)點(diǎn),有兩個(gè)孩子。
?
上面說(shuō)的這些節(jié)點(diǎn)都是 LogicalPlan 類型的, 可以理解為進(jìn)行各種操作的 Operator, spark sql 對(duì)應(yīng)各種操作定義了各種 Operator。
?
?
?
這些 operator 組成的抽象語(yǔ)法樹(shù)就是整個(gè) Catatyst 優(yōu)化的基礎(chǔ),Catatyst 優(yōu)化器會(huì)在這個(gè)樹(shù)上面進(jìn)行各種折騰,把樹(shù)上面的節(jié)點(diǎn)挪來(lái)挪去來(lái)進(jìn)行優(yōu)化。
?
現(xiàn)在經(jīng)過(guò) Parser 有了抽象語(yǔ)法樹(shù),但是并不知道 score,sum 這些東西是啥,所以就需要 analyer 來(lái)定位,?analyzer 會(huì)把 AST 上所有 Unresolved 的東西都轉(zhuǎn)變?yōu)?resolved 狀態(tài),sparksql 有很多resolve 規(guī)則,都很好理解,例如 ResolverRelations 就是解析表(列)的基本類型等信息,ResolveFuncions 就是解析出來(lái)函數(shù)的基本信息,比如例子中的sum 函數(shù),ResolveReferences 可能不太好理解,我們?cè)?sql 語(yǔ)句中使用的字段比如 Select name 中的 name 對(duì)應(yīng)一個(gè)變量, 這個(gè)變量在解析表的時(shí)候就作為一個(gè)變量(Attribute 類型)存在了,那么 Select 對(duì)應(yīng)的 Project 節(jié)點(diǎn)中對(duì)應(yīng)的相同的變量就變成了一個(gè)引用,他們有相同的 ID,所以經(jīng)過(guò) ResolveReferences 處理后,就變成了?AttributeReference 類型???,保證在最后真正加載數(shù)據(jù)的時(shí)候他們被賦予相同的值,就跟我們寫(xiě)代碼的時(shí)候定義一個(gè)變量一樣,這些 Rule 就反復(fù)作用在節(jié)點(diǎn)上,指定樹(shù)節(jié)點(diǎn)趨于穩(wěn)定,當(dāng)然優(yōu)化的次數(shù)多了會(huì)浪費(fèi)性能,所以有的 rule??作用 Once, 有的 rule 作用 FixedPoint, 這都是要取舍的。好了,?不說(shuō)廢話,我們做個(gè)小實(shí)驗(yàn)。
?
?
?
我們使用?ResolverRelations 對(duì)我們的 AST 進(jìn)行解析,解析后可以看到原來(lái)的?UnresolvedRelation 變成了 LocalRelation,這個(gè)表示一個(gè)本地內(nèi)存中的表,這個(gè)表是我們使用 createOrReplaceTempView 的時(shí)候注冊(cè)在 catalog 中的,這個(gè) relove 操作無(wú)非就是在 catalog 中查表,找出這個(gè)表的 schema,?而且解析出來(lái)相應(yīng)的字段,把外層用戶定義的 各個(gè) StructField 轉(zhuǎn)變?yōu)?AttibuteReference,使用 ID 進(jìn)行了標(biāo)記。
?
?
?
我們?cè)偈褂?ResolveReferences 來(lái)搞一下,你會(huì)發(fā)現(xiàn)上層節(jié)點(diǎn)中的相同的字段都變成了擁有相同 ID 的引用,他們的類型都是?AttibuteReference。最終所有的 rule 都應(yīng)用后,整個(gè) AST 就變?yōu)榱?
?
?
?
下面重點(diǎn)來(lái)了,要進(jìn)行邏輯優(yōu)化了,我們看下邏輯優(yōu)化有哪些:
?
?
?
sparksql 中的邏輯優(yōu)化種類繁多,spark sql 中的?Catalyst 框架大部分邏輯是在一個(gè) Tree 類型的數(shù)據(jù)結(jié)構(gòu)上做各種折騰,基于 scala 來(lái)實(shí)現(xiàn)還是很優(yōu)雅的,scala 的偏函數(shù) 和 強(qiáng)大的 Case 正則匹配,讓整個(gè)代碼看起來(lái)還是清晰的,廢話少說(shuō),我們來(lái)搞個(gè)小實(shí)驗(yàn)。
?
?
?
看到了沒(méi),把我的 (100 + 10) 換成了 110。
?
?
?
使用?PushPredicateThroughJoin 把一個(gè)單單對(duì) stu 表做過(guò)濾的 Filter 給下推到 Join 之前了,會(huì)少加載很多數(shù)據(jù),性能得到了優(yōu)化,我們來(lái)看下最終的樣子。
?
?
?
至少用了?ColumnPruning,PushPredicateThroughJoin,ConstantFolding,RemoveRedundantAliases 邏輯優(yōu)化手段,現(xiàn)在我的小樹(shù)變成了:
?
?
做完邏輯優(yōu)化,畢竟只是抽象的邏輯層,還需要先轉(zhuǎn)換為物理執(zhí)行計(jì)劃,將邏輯上可行的執(zhí)行計(jì)劃變?yōu)?Spark 可以真正執(zhí)行的計(jì)劃。
?
?
?
spark sql 把邏輯節(jié)點(diǎn)轉(zhuǎn)換為了相應(yīng)的物理節(jié)點(diǎn),?比如 Join 算子,Spark 根據(jù)不同場(chǎng)景為該算子制定了不同的算法策略,有BroadcastHashJoin、ShuffleHashJoin 以及 SortMergeJoin 等, 當(dāng)然這里面有很多優(yōu)化的點(diǎn),spark 在轉(zhuǎn)換的時(shí)候會(huì)根據(jù)一些統(tǒng)計(jì)數(shù)據(jù)來(lái)智能選擇,這就涉及到基于代價(jià)的優(yōu)化,這也是很大的一塊,后面可以開(kāi)一篇文章單講, 我們例子中的由于數(shù)據(jù)量小于 10M, 自動(dòng)就轉(zhuǎn)為了?BroadcastHashJoin,眼尖的同學(xué)可以看到好像多了一些節(jié)點(diǎn),我們來(lái)解釋下,?BroadcastExchange 節(jié)點(diǎn)繼承 Exchage 類,用來(lái)在節(jié)點(diǎn)間交換數(shù)據(jù),這里的BroadcastExchange 就是會(huì)把 LocalTableScan出來(lái)的數(shù)據(jù) broadcast 到每個(gè) executor 節(jié)點(diǎn),用來(lái)做 map-side join。最后的 Aggregate 操作被分為了兩步,第一步先進(jìn)行并行聚合,然后對(duì)聚合后的結(jié)果,再進(jìn)行 Final 聚合,這個(gè)就類似域名 map-reduce? 里面的 combine 和最后的 reduce, 中間加上了一個(gè)?Exchange hashpartitioning, 這個(gè)是為了保證相同的 key shuffle 到相同的分區(qū),當(dāng)前物理計(jì)劃的 Child 輸出數(shù)據(jù)的 Distribution 達(dá)不到要求的時(shí)候需要進(jìn)行Shuffle,這個(gè)是在最后的?EnsureRequirement 階段插入的交換數(shù)據(jù)節(jié)點(diǎn),在數(shù)據(jù)庫(kù)領(lǐng)域里面,有那么一句話,叫得 join 者得天下,我們重點(diǎn)講一些 spark sql 在 join 操作的時(shí)候做的一些取舍。
?
Join 操作基本上能上會(huì)把兩張 Join 的表分為大表和小表,大表作為流式遍歷表,小表作為查找表,然后對(duì)大表中的每一條記錄,根據(jù) Key 來(lái)取查找表中取相同 Key 的記錄。
?
spark 支持所有類型的 Join:
?
?
?
spark sql 中 join 操作根據(jù)各種條件選擇不同的 join 策略,分為?BroadcastHashJoin,?SortMergeJoin, ShuffleHashJoin。
?
站在用戶的角度思考問(wèn)題,與客戶深入溝通,找到天河網(wǎng)站設(shè)計(jì)與天河網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶體驗(yàn)好的作品,建站類型包括:成都網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計(jì)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、域名注冊(cè)、虛擬主機(jī)、企業(yè)郵箱。業(yè)務(wù)覆蓋天河地區(qū)。
可以看到這個(gè)最終執(zhí)行的時(shí)候分分成了兩個(gè) stage, 把小表 broeadcastExechage 到了大表上做?BroadcastHashJoin, 沒(méi)有進(jìn)化 shuffle 操作,然后最后一步聚合的時(shí)候,先在 map 段進(jìn)行了一次 HashAggregate sum 函數(shù), 然后 Exchage 操作根據(jù) name 把相同 key 的數(shù)據(jù) shuffle 到同一個(gè)分區(qū),然后做最終的 HashAggregate sum?操作,這里有個(gè) WholeStageCodegen 比較奇怪,這個(gè)是干啥的呢,因?yàn)槲覀冊(cè)趫?zhí)行 Filter ,Project 這些 operator 的時(shí)候,這些 operator 內(nèi)部包含很多? Expression, 比如?SELECT sum(v),name, 這里的 sum 和 v 都是 Expression,這里面的 v 屬于 Attribute 變量表達(dá)式,表達(dá)式也是樹(shù)形數(shù)據(jù)結(jié)構(gòu),sum(v) ?就是 sum 節(jié)點(diǎn)和 sum 的子節(jié)點(diǎn) v 組成的一個(gè)樹(shù)形結(jié)構(gòu),這些表達(dá)式都是可以求值和生成代碼的,表達(dá)式最基本的功能就是求值,對(duì)輸入的 Row 進(jìn)行計(jì)算 , Expression 需要實(shí)現(xiàn)?def eval(input: InternalRow = null): Any?函數(shù)來(lái)實(shí)現(xiàn)它的功能。
?
表達(dá)式是對(duì) Row 進(jìn)行加工,輸出的可以是任意類型,但是 Project 和 Filter 這些 Plan 輸出的類型是?def output: Seq[Attribute], 這個(gè)就是代表一組變量,比如我們例子中的?Filter (age >= 11) 這個(gè)plan, 里面的?age>11 就是一個(gè)表達(dá)式,這個(gè) > 表達(dá)式依賴兩個(gè)子節(jié)點(diǎn), 一個(gè)Literal常量表達(dá)式求值出來(lái)就是 11, 另外一個(gè)是?Attribute 變量表達(dá)式 age, 這個(gè)變量在 analyze 階段轉(zhuǎn)變?yōu)榱?AttributeReference 類型,但是它是Unevaluable,為了獲取屬性在輸入 Row 中對(duì)應(yīng)的值, 還得根據(jù) schema 關(guān)聯(lián)綁定一下這個(gè)變量在一行數(shù)據(jù)的 index, 生成 BoundReference,然后?BoundReference 這種表達(dá)式在 eval 的時(shí)候就可以根據(jù) index 來(lái)獲取 Row 中的值。??age>11 這個(gè)表達(dá)式最終輸出類型為 boolean 類型,但是 Filter 這個(gè) Plan 輸出類型是?Seq[Attribute] 類型。
?
可以想象到,數(shù)據(jù)在一個(gè)一個(gè)的 plan 中流轉(zhuǎn),然后每個(gè) plan 里面表達(dá)式都會(huì)對(duì)數(shù)據(jù)進(jìn)行處理,就相當(dāng)于經(jīng)過(guò)了一個(gè)個(gè)小函數(shù)的調(diào)用處理,這里面就有大量的函數(shù)調(diào)用開(kāi)銷(xiāo),那么我們是不是可以把這些小函數(shù)內(nèi)聯(lián)一下,當(dāng)成一個(gè)大函數(shù),WholeStageCodegen 就是干這事的。
?
?
可以看到最終執(zhí)行計(jì)劃每個(gè)節(jié)點(diǎn)前面有個(gè) * 號(hào),說(shuō)明整段代碼生成被啟用,在我們的例子中,F(xiàn)ilter, Project,BroadcastHashJoin,Project,HashAggregate 這一段都啟用了整段代碼生成,級(jí)聯(lián)為了兩個(gè)大函數(shù),有興趣可以使用 a.queryExecution.debug.codegen 看下生成后的代碼長(zhǎng)什么樣子。然而 Exchange 算子并沒(méi)有實(shí)現(xiàn)整段代碼生成,因?yàn)樗枰ㄟ^(guò)網(wǎng)絡(luò)發(fā)送數(shù)據(jù)。
?
我今天的分享就到這里,其實(shí) spark sql 里面有很多有意思的東西,但是因?yàn)閱?wèn)題的本質(zhì)復(fù)雜度,導(dǎo)致需要高度抽象才能把這一切理順,這樣就給代碼閱讀者帶來(lái)了理解困難, 但是你如果真正看進(jìn)去了,就會(huì)有很多收獲。如果對(duì)本文有任何見(jiàn)解,歡迎在文末留言說(shuō)出你的想法。??
**牛人說(shuō)**
?
「牛人說(shuō)」專欄致力于技術(shù)人思想的發(fā)現(xiàn),其中包括技術(shù)實(shí)踐、技術(shù)干貨、技術(shù)見(jiàn)解、成長(zhǎng)心得,還有一切值得被發(fā)現(xiàn)的內(nèi)容。我們希望集合最優(yōu)秀的技術(shù)人,挖掘獨(dú)到、犀利、具有時(shí)代感的聲音。
?
投稿郵箱:marketing@qiniu.com
br/>??
**牛人說(shuō)**
?
「牛人說(shuō)」專欄致力于技術(shù)人思想的發(fā)現(xiàn),其中包括技術(shù)實(shí)踐、技術(shù)干貨、技術(shù)見(jiàn)解、成長(zhǎng)心得,還有一切值得被發(fā)現(xiàn)的內(nèi)容。我們希望集合最優(yōu)秀的技術(shù)人,挖掘獨(dú)到、犀利、具有時(shí)代感的聲音。
?
投稿郵箱:marketing@qiniu.com
?
?