這篇文章將為大家詳細(xì)講解有關(guān)如何進(jìn)行EMR Spark Relational Cache的執(zhí)行計(jì)劃重寫(xiě),文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
成都創(chuàng)新互聯(lián)致力于互聯(lián)網(wǎng)網(wǎng)站建設(shè)與網(wǎng)站營(yíng)銷,提供成都網(wǎng)站設(shè)計(jì)、成都做網(wǎng)站、網(wǎng)站開(kāi)發(fā)、seo優(yōu)化、網(wǎng)站排名、互聯(lián)網(wǎng)營(yíng)銷、小程序制作、公眾號(hào)商城、等建站開(kāi)發(fā),成都創(chuàng)新互聯(lián)網(wǎng)站建設(shè)策劃專家,為不同類型的客戶提供良好的互聯(lián)網(wǎng)應(yīng)用定制解決方案,幫助客戶在新的全球化互聯(lián)網(wǎng)環(huán)境中保持優(yōu)勢(shì)。
EMR Spark提供的Relational Cache功能,可以通過(guò)對(duì)數(shù)據(jù)模型進(jìn)行預(yù)計(jì)算和高效地存儲(chǔ),加速Spark SQL,為客戶實(shí)現(xiàn)利用Spark SQL對(duì)海量數(shù)據(jù)進(jìn)行即時(shí)查詢的目的。Relational Cache的工作原理類似物化視圖,在用戶提交SQL語(yǔ)句時(shí)對(duì)語(yǔ)句進(jìn)行分析,并選出可用的預(yù)計(jì)算結(jié)果來(lái)加速查詢。為了實(shí)現(xiàn)高效地預(yù)計(jì)算結(jié)果復(fù)用,我們構(gòu)建的預(yù)計(jì)算緩存一般都較為通用,因此對(duì)于用戶query,還需進(jìn)行進(jìn)一步的計(jì)算方能獲得最終結(jié)果。因此,如何快速地找出匹配的緩存,并構(gòu)建出準(zhǔn)確的新執(zhí)行計(jì)劃,就顯得尤為重要。
在Hive 3.x中支持的Materialized View,利用了Apache Calcite對(duì)執(zhí)行計(jì)劃進(jìn)行重寫(xiě)??紤]到Spark SQL使用Catalyst進(jìn)行執(zhí)行計(jì)劃優(yōu)化,引入Calcite太重,因此EMR Spark中的Relational Cache實(shí)現(xiàn)了自己的Catalyst規(guī)則,用于重寫(xiě)執(zhí)行計(jì)劃。下面將介紹執(zhí)行計(jì)劃重寫(xiě)的相關(guān)內(nèi)容。
Spark會(huì)把用戶查詢語(yǔ)句進(jìn)行解析,依次轉(zhuǎn)化為Unresolved Logical Plan(未綁定的邏輯計(jì)劃)、Resolved Logical Plan(綁定的邏輯計(jì)劃)、Optimized Logical Plan(優(yōu)化的邏輯計(jì)劃)、Physical Plan(物理計(jì)劃)。其中,未優(yōu)化的邏輯計(jì)劃根據(jù)用戶查詢語(yǔ)句不同,會(huì)有較大區(qū)別,而Relational Cache作為優(yōu)化的一部分,放在邏輯計(jì)劃優(yōu)化過(guò)程中也較為合適,因此我們拿到的用戶查詢計(jì)劃會(huì)是優(yōu)化中的邏輯計(jì)劃。要與優(yōu)化中的邏輯計(jì)劃匹配,我們選擇把這個(gè)重寫(xiě)過(guò)程放在Spark優(yōu)化器比較靠后的步驟中,同時(shí),預(yù)先將Relational Cache的邏輯計(jì)劃進(jìn)行解析,獲得優(yōu)化后的Cache計(jì)劃,減小匹配時(shí)的復(fù)雜程度。這樣,我們只需匹配做完了謂詞下推、謂詞合并等等優(yōu)化之后的兩個(gè)邏輯計(jì)劃。
在匹配時(shí),我們希望能盡可能多得匹配計(jì)算和IO操作,因此,我們對(duì)目標(biāo)計(jì)劃進(jìn)行前序遍歷,依次進(jìn)行匹配,嘗試找到最多的匹配節(jié)點(diǎn)。而在判斷兩個(gè)節(jié)點(diǎn)是否匹配時(shí),我們采用后序遍歷的方式,希望盡快發(fā)現(xiàn)不匹配的情況,減少計(jì)劃匹配的執(zhí)行時(shí)間。然后我們會(huì)根據(jù)匹配結(jié)果,對(duì)計(jì)劃進(jìn)行重寫(xiě),包括對(duì)于Cache數(shù)據(jù)進(jìn)行進(jìn)一步的Filter、Project、Sort甚至Aggregate等操作,使其與匹配節(jié)點(diǎn)完全等價(jià),然后更新邏輯計(jì)劃節(jié)點(diǎn)的引用綁定,無(wú)縫替換到邏輯計(jì)劃中,這樣就能輕松獲得最終的重寫(xiě)后的計(jì)劃。
Spark中的Join都是二元操作,而實(shí)際的Join順序可能根據(jù)一些策略會(huì)有很大區(qū)別,因此對(duì)于Join節(jié)點(diǎn),必須進(jìn)行特殊處理。我們會(huì)首先將邏輯計(jì)劃進(jìn)行處理,根據(jù)緩存計(jì)劃的Join順序進(jìn)行Join重排。這一步在樹(shù)狀匹配之前就進(jìn)行了,避免不斷重復(fù)Join重排帶來(lái)的時(shí)間浪費(fèi)。重排后的Join可以更大概率地被我們匹配到。
為了實(shí)現(xiàn)Cache的通用性,根據(jù)星型數(shù)據(jù)模型的特點(diǎn),我們引入了Record Preserve的概念。這和傳統(tǒng)數(shù)據(jù)庫(kù)中的Primary Key/Foreign Key的關(guān)系較為類似,當(dāng)有主鍵的表與非空外鍵指向的表在外鍵上進(jìn)行Join時(shí),記錄的條數(shù)不會(huì)變化,不會(huì)膨脹某條記錄,也不會(huì)丟失某條記錄。PK/FK的語(yǔ)意在大數(shù)據(jù)處理框架中經(jīng)常缺失,我們引入了新的DDL讓用戶自定義Record Preserve Join的關(guān)系。當(dāng)用戶定義A Inner Join B是對(duì)于A表Record Preserve時(shí),我們也會(huì)把A Inner Join B和A的關(guān)系匹配起來(lái)。有了PK/FK的幫助,我們能匹配上的情況大大增加了,一個(gè)Relational Cache可以被更多看似區(qū)別巨大的查詢共享,這可以很好的為用戶節(jié)約額外的存儲(chǔ)開(kāi)銷和預(yù)計(jì)算開(kāi)銷。
一般的Aggregate匹配較為簡(jiǎn)單,而Spark支持的Grouping Set操作,會(huì)構(gòu)建出Expand邏輯計(jì)劃節(jié)點(diǎn),相當(dāng)于把一條記錄轉(zhuǎn)為多條,使用Grouping ID進(jìn)行標(biāo)記。由于Expand的子節(jié)點(diǎn)是所有Grouping的情況共用的,這里我們只對(duì)子節(jié)點(diǎn)進(jìn)行一次匹配,再分別進(jìn)行上面的Grouping屬性和Aggregate屬性的匹配。主要是驗(yàn)證目標(biāo)聚合所需的屬性或者聚合函數(shù)都能從某個(gè)Grouping ID對(duì)應(yīng)的聚合結(jié)果中計(jì)算出來(lái),比如粗粒度的Sum可以對(duì)細(xì)粒度的Sum進(jìn)行二次Sum求和,而粗粒度的Count對(duì)細(xì)粒度的Count也應(yīng)通過(guò)二次Sum求和,粗粒度的Average無(wú)法僅從細(xì)粒度的Average中還原出來(lái)等等。
找出匹配的邏輯計(jì)劃之后,就是重寫(xiě)邏輯計(jì)劃的過(guò)程。對(duì)于無(wú)需二次聚合的邏輯計(jì)劃,直接根據(jù)緩存數(shù)據(jù)的schema,從緩存數(shù)據(jù)的Relation中選擇所需列,根據(jù)條件過(guò)濾后,進(jìn)行后續(xù)操作。如果還需二次聚合,選擇所需列時(shí)需保留外部要用的所有列,以及聚合時(shí)需要的列,還有聚合函數(shù)需要的數(shù)據(jù)。二次聚合的聚合函數(shù)需要根據(jù)實(shí)際情況進(jìn)行重寫(xiě),確保能使用Relational Cache中已經(jīng)初步聚合的結(jié)果。這里面需要根據(jù)聚合的語(yǔ)意判斷是否能夠二次聚合。如果時(shí)Grouping Set的聚合,二次聚合之前還需選擇正確的Grouping ID進(jìn)行過(guò)濾。經(jīng)過(guò)二次聚合后,步驟大體和普通的重寫(xiě)一致,只需替換到目標(biāo)計(jì)劃中即可。
我們以一個(gè)例子來(lái)具體說(shuō)明邏輯計(jì)劃的重寫(xiě)結(jié)果。Star Schema Benchmark(論文鏈接https://www.cs.umb.edu/~poneil/StarSchemaB.pdf)是星型模型數(shù)據(jù)分析的一個(gè)標(biāo)準(zhǔn)Benchmark,其結(jié)構(gòu)定義如圖所示:
我們構(gòu)建Relational Cache的SQL語(yǔ)句如下:
SELECT GROUPING_ID() AS grouping_id, lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum, SUM(lo_revenue) AS lo_revenue_SUM, SUM(lo_supplycost) AS lo_supplycost_SUM, SUM(V_REVENUE) AS V_REVENUE_SUMFROM supplier, p_lineorder, dates, customer, partWHERE lo_orderdate = d_datekey AND lo_custkey = c_custkey AND lo_suppkey = s_suppkey AND lo_partkey = p_partkeyGROUP BY lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum GROUPING SETS ((d_year, d_weeknuminyear, lo_discount, lo_quantity), (d_year, lo_discount, lo_quantity), (lo_discount, lo_quantity), (d_yearmonthnum, lo_discount, lo_quantity), (d_year, p_category, p_brand, s_region), (d_year, p_category, s_region), (d_year, s_region), (d_year, s_region, c_region, s_nation, c_nation), (d_year, s_city, c_city, s_nation, c_nation), (d_year, s_city, c_city), (d_year, d_yearmonth, s_city, c_city), (d_year, s_region, c_region, c_nation, p_mfgr), (d_year, s_region, s_nation, c_region, p_mfgr, p_category), (d_year, s_nation, s_city, c_region, p_brand, p_category, p_brand), (d_year, s_nation, s_city, c_region, p_brand, p_category), (d_year, s_nation, s_city, c_region, p_category, p_brand))
我們從中選出一條查詢作為示例。具體查詢語(yǔ)句:
select c_city, s_city, d_year, sum(lo_revenue) as revenue from customer, lineorder, supplier, dates where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey and c_nation = 'UNITED KINGDOM' and (c_city='UNITED KI1' or c_city='UNITED KI5') and (s_city='UNITED KI1' or s_city='UNITED KI5') and s_nation = 'UNITED KINGDOM' and d_yearmonth = 'Dec1997' group by c_city, s_city, d_year order by d_year asc, revenue desc
原始邏輯計(jì)劃如下所示:
Sort [d_year#47 ASC NULLS FIRST, revenue#558L DESC NULLS LAST], true+- Aggregate [c_city#22, s_city#39, d_year#47], [c_city#22, s_city#39, d_year#47, sum(cast(lo_revenue_SUM#773L as bigint)) AS revenue#558L] +- Filter ((((((((isnotnull(s_nation#40) && ((s_city#39 = UNITED KI1) || (s_city#39 = UNITED KI5))) && (s_nation#40 = UNITED KINGDOM)) && isnotnull(d_yearmonth#49)) && (d_yearmonth#49 = Dec1997)) && isnotnull(c_nation#23)) && (c_nation#23 = UNITED KINGDOM)) && ((c_city#22 = UNITED KI1) || (c_city#22 = UNITED KI5))) && (grouping_id#662 = 19322)) +- Relation[grouping_id#662,lo_discount#759,s_city#39,c_city#22,p_category#762,lo_quantity#763,d_weeknuminyear#764,s_nation#40,s_region#766,p_mfgr#767,c_region#768,p_brand1#769,c_nation#23,d_yearmonthnum#771,d_yearmonth#49,lo_revenue_SUM#773L,lo_supplycost_SUM#774L,V_REVENUE_SUM#775L,d_year#47] parquet
由此可見(jiàn),執(zhí)行計(jì)劃大大簡(jiǎn)化,我們可以做到亞秒級(jí)響應(yīng)用戶的命中查詢。
在實(shí)際測(cè)試過(guò)程中,我們發(fā)現(xiàn)當(dāng)多個(gè)Relational Cache存在時(shí),匹配時(shí)間線性增長(zhǎng)明顯。由于我們?cè)趍etastore中存儲(chǔ)的是Cache的SQL語(yǔ)句,取SQL語(yǔ)句和再次解析的時(shí)間都不容小覷,這就使得匹配過(guò)程明顯增長(zhǎng),背離了我們追求亞秒級(jí)響應(yīng)的初衷。因此我們?cè)赟park中構(gòu)建了邏輯計(jì)劃緩存,將解析過(guò)的Relational Cache的計(jì)劃緩存在內(nèi)存中,每個(gè)Relational Cache只緩存一份,計(jì)劃本身占用空間有限,因此我們可以緩存住幾乎所有的Relational Cache的優(yōu)化后的邏輯計(jì)劃,從而在第一次查詢之后,所有查詢都不再收到取SQL語(yǔ)句和再次解析的延遲困擾。經(jīng)過(guò)這樣的優(yōu)化,匹配時(shí)間大幅減少到100ms的量級(jí)。
關(guān)于如何進(jìn)行EMR Spark Relational Cache的執(zhí)行計(jì)劃重寫(xiě)就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。