這篇文章主要介紹了spark 3.0中如何實現查詢計劃,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
成都創(chuàng)新互聯專注于企業(yè)營銷型網站建設、網站重做改版、定陶網站定制設計、自適應品牌網站建設、HTML5、商城開發(fā)、集團公司官網建設、成都外貿網站制作、高端網站制作、響應式網頁設計等建站業(yè)務,價格優(yōu)惠性價比高,為定陶等各大城市提供網站開發(fā)制作服務。
我們考慮一個簡單的例子,一個查詢中涉及到filter以及aggregation,join操作的語句:
# in PySpark API: query = ( questionsDF .filter(col('year') == 2019) .groupBy('user_id') .agg( count('*').alias('cnt') ) .join(usersDF, 'user_id') )
我們把例子中的usersDF是一組問問題的用戶,這些問題用questionsDF來表示。這些問題用year的這一列來進行分區(qū),代表著哪一年問的問題。在這個查詢里,我們對2019年問問題的用戶感興趣,并且想知道每個人問了多少問題,而且我們想知道在輸出中我們想知道一些額外信息,這就是為什么我們在聚合之后進行了usersDF的join操作。
這里有兩種基本的方式去查看物理計劃。第一種是在DataFrame上調用explain函數,該函數展現這個計劃的文本化的展示:
這在spark 3.0有了一些優(yōu)化,explain函數帶有了一個新參數 mode,這個參數的值可以是:formatted,cost,codegen。使用formatted模式將會把查詢計劃轉化為更加有組織的輸出(這里之展現了一部分): 在formatted計劃中,我們能看到裸數,改裸數只是展現了操作的名字并帶有一個括號的數字。在數的下面,這里有一些數字對應的細節(jié)描述。cost模式將會展示除了物理計劃之外的優(yōu)化的邏輯計劃,這些邏輯計劃帶有每個操作的統計信息,所以我們能看到在不同執(zhí)行階段的數據大小。最終codegen模式展現了將會執(zhí)行的生成的java代碼。
第二種方式是查看spark ui中的sql tab,這里有正在跑的和已經完成了的查詢。通過點擊你要查看的查詢,我們可以看到物理計劃的文本表示。在下面這個圖片中,我們結合圖形表示,文本表示以及它們之間的對應關系: 不同點是圖形表示的葉子節(jié)點在上面,根節(jié)點在下面,而文本表示的是反過來的。
在物理計劃的圖形表示中,你能看到一些操作被組織成了一大塊藍色的矩形。這些大矩形對應著codegen階段。這是發(fā)生在物理計劃的優(yōu)化階段。這個是叫做CollapseCodegenStages來負責優(yōu)化的,原理是把支持代碼生成的操作聚合到一起,通過消除虛擬函數的調用來加速。但是并不是所有的操作支持代碼生成。所以一些操作(如exchange操作)并不是大矩形的一部分。在我們的例子中,這里有三個codegen stages,對應著三個大矩形,你能在操作的括號中看到codegen stage的id。從這個樹我們也可以分辨出一個操作是夠支持代碼生成,因為加入支持代碼生成的話,這里將會在對應的操作的括號里有個星號。
我們簡單的分析一下在我們查詢中的每一個操作。
scan parquet操作代表著從parquet文件中讀取數據。從明細信息中,我們能直接看到從這個數據源中我們選擇了哪些列。雖然我們沒指定具體的字段,但是這里也會應用ColumnPruning規(guī)則,這個規(guī)則會確保只有真正字段才會從這個數據源中提取出來。我們也能看到有兩種filters:PartitionFilters和PushFilters。PartitionFilters應用在數據源分區(qū)的字段上。這是非常重要的因為我們能跳過我們不需要的數據。檢查對應的filters是否傳播到正確的位置總是沒錯的。這是因為我們盡可能讀取少量的數據,因為IO是比較費時的。在spark 2.4,這里還有一個代表實際讀取到的分區(qū)的partitionCount字段,這個字段在spark 3.0已經去掉了。
PushFilters把字段直接下推到parquet文件中去,假如parquet文件過濾的列是按照過濾字段排序的話,這個規(guī)則就很有用了,因為這種情況下,我們能利用parquet內部結構去過濾數據。parquet文件是按照行組和每個行組的元數據文件組成的。這個元數據包含了每個行組的最大最小值,基于這個信息,我們就能判斷是否讀取這個行組。
Filter操作佷容易理解。它僅僅是代表過濾條件。但是這個操作怎么創(chuàng)建的并不是很明顯,因為在查詢中它并不是直接對應著過濾條件。因為所有的filters首先被Catalyst optimzer處理,改規(guī)則可能修改或者重新移動她們。這里有好幾個規(guī)則在她們轉換為物理計劃前的邏輯計劃。我們列舉了一下:
PushDownPredicates-這個規(guī)則通過其他的操作把filter下推到離數據源更近的地方,但不是所有的操作都支持。比如,如果表達式不是確定性的,這就不行,假如我們使用類似first,last,collect_set,collect_list,rand等,filters操作就不能通過這些操作而進行下推,因為這些函數是不確定性的。
CombineFilters-結合兩個臨近的操作合成一個(收集兩個filters條件合成一個更為復雜的的條件)
InferFiltersFromConstraints-這個規(guī)則實際上會創(chuàng)建新的filter操作,如從join操作(從inner join中創(chuàng)建一個joining key is not null)
PruneFilters-移除多余的filters(比如一個filters總是true)
Exchange操作代表著shuffle操作,意味著物理數據的集群范圍內的移動。這個操作是很費時的,因為它會通過網絡移動數據。查詢計劃的信息也包含了一些數據重新分區(qū)的細節(jié)。在我們的例子中,是hashPartitioning(user_id,200): 這意味著數據將會根據user_id列重新分區(qū)為200個分區(qū),有著同樣user_id的行將會屬于同一個分區(qū),將會分配到同一個executor上。為了確保只有200分區(qū),spark將會計算user_id的hashcode并且對200取模。這個結果就是不同的user_ids就會分到同一個分區(qū)。同時有些分區(qū)可能是空的。這里也有其他類型的分區(qū)值的去留意一下:
RoundRobinPartitioning-數據將會隨機分配到n個分區(qū)中,n在函數repartition(n)中指定
SinglePartition-所有數據將會分配到一個分區(qū)中,進而到一個executor中。
RangePartitioning-這個用在對數據排序中,用在orderBy或者sort操作中
這個代表著數據聚合,這個經常是兩個操作,要么被Exchange分開或者不分開: 為什么這里有兩個HashAggregate操作的原因是第一個是部分聚合,它在每個executor上每個分區(qū)分別進行聚合。在我們的例子中,你能看到partial_count(1)的function字段,最終的部分聚合結果就是第二個聚合。這個操作也展示了數據按照哪個分組的Keys字段。results字段展示了在聚合以后的可用的列。
BroadcastHashJoin(BHJ)代表著join算法的操作,除了這個,還有SortMergeJoin和ShuffleHashJoin。BHJ總是伴隨著BroadcastExchange,這個代表著廣播shuffle-數據將會收集到driver端并且會被傳播到需要的executor上。
這是在spark 3.0引入的新操作,用于列行之間的轉換
感謝你能夠認真閱讀完這篇文章,希望小編分享的“spark 3.0中如何實現查詢計劃”這篇文章對大家有幫助,同時也希望大家多多支持創(chuàng)新互聯,關注創(chuàng)新互聯行業(yè)資訊頻道,更多相關知識等著你來學習!