本篇內(nèi)容介紹了“怎么掌握Flink on YARN應(yīng)用啟動(dòng)流程”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
墨玉ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書未來(lái)市場(chǎng)廣闊!成為創(chuàng)新互聯(lián)的ssl證書銷售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:18980820575(備注:SSL證書合作)期待與您的合作!Flink on YARN 流程圖
Flink on YARN集群部署模式涉及YARN和Flink兩大開源框架,應(yīng)用啟動(dòng)流程的很多環(huán)節(jié)交織在一起,為了便于大家理解,在一張圖上畫出了Flink on YARN基礎(chǔ)架構(gòu)和應(yīng)用啟動(dòng)全流程,并對(duì)關(guān)鍵角色和流程進(jìn)行了介紹說明,整個(gè)啟動(dòng)流程又被劃分成客戶端提交(流程標(biāo)注為紫色)、Flink Cluster啟動(dòng)和Job提交運(yùn)行(流程標(biāo)注為橙色)兩個(gè)階段分別闡述,由于分支和細(xì)節(jié)太多,本文會(huì)忽略掉一些,只介紹關(guān)鍵流程(基于Flink開源1.9版本源碼整理)。
客戶端提交流程
1.執(zhí)行命令:bin/flink run -d -m yarn-cluster ...或bin/yarn-session.sh ...來(lái)提交per-job運(yùn)行模式或session運(yùn)行模式的應(yīng)用;
2.解析命令參數(shù)項(xiàng)并初始化,啟動(dòng)指定運(yùn)行模式,如果是per-job運(yùn)行模式將根據(jù)命令行參數(shù)指定的Job主類創(chuàng)建job graph;
如果可以從命令行參數(shù)(-yid )或YARN properties臨時(shí)文件(${java.io.tmpdir}/.yarn-properties-${user.name})中獲取應(yīng)用ID,向指定的應(yīng)用提交Job;
否則當(dāng)命令行參數(shù)中包含 -d(表示detached模式)和 -m yarn-cluster(表示指定YARN集群模式),啟動(dòng)per-job運(yùn)行模式;
否則當(dāng)命令行參數(shù)項(xiàng)不包含 -yq(表示查詢YARN集群可用資源)時(shí),啟動(dòng)session運(yùn)行模式;
3.獲取YARN集群信息、新應(yīng)用ID并啟動(dòng)運(yùn)行前檢查;
通過YarnClient向YARN ResourceManager(下文縮寫為:YARN RM,YARN Master節(jié)點(diǎn),負(fù)責(zé)整個(gè)集群資源的管理和調(diào)度)請(qǐng)求創(chuàng)建一個(gè)新應(yīng)用(YARN RM收到創(chuàng)建應(yīng)用請(qǐng)求后生成新應(yīng)用ID和container申請(qǐng)的資源上限后返回),并且獲取YARN Slave節(jié)點(diǎn)報(bào)告(YARN RM返回全部slave節(jié)點(diǎn)的ID、狀態(tài)、rack、http地址、總資源、已使用資源等信息);
運(yùn)行前檢查:(1) 簡(jiǎn)單驗(yàn)證YARN集群能否訪問;(2) 大node資源能否滿足flink JobManager/TaskManager vcores資源申請(qǐng)需求;(3) 指定queue是否存在(不存在也只是打印WARN信息,后續(xù)向YARN提交時(shí)排除異常并退出);(4)當(dāng)預(yù)期應(yīng)用申請(qǐng)的Container資源會(huì)超出YARN資源限制時(shí)拋出異常并退出;(5) 當(dāng)預(yù)期應(yīng)用申請(qǐng)不能被滿足時(shí)(例如總資源超出YARN集群可用資源總量、Container申請(qǐng)資源超出NM可用資源大值等)提供一些參考信息。
4.將應(yīng)用配置(flink-conf.yaml、logback.xml、log4j.properties)和相關(guān)文件(flink jars、ship files、user jars、job graph等)上傳至分布式存儲(chǔ)(例如HDFS)的應(yīng)用暫存目錄(/user/${user.name}/.flink/);
5.準(zhǔn)備應(yīng)用提交上下文(ApplicationSubmissionContext,包括應(yīng)用的名稱、類型、隊(duì)列、標(biāo)簽等信息和應(yīng)用Master的container的環(huán)境變量、classpath、資源大小等),注冊(cè)處理部署失敗的shutdown hook(清理應(yīng)用對(duì)應(yīng)的HDFS目錄),然后通過YarnClient向YARN RM提交應(yīng)用;
6.循環(huán)等待直到應(yīng)用狀態(tài)為RUNNING,包含兩個(gè)階段:
循環(huán)等待應(yīng)用提交成功(SUBMITTED):默認(rèn)每隔200ms通過YarnClient獲取應(yīng)用報(bào)告,如果應(yīng)用狀態(tài)不是NEW和NEW_SAVING則認(rèn)為提交成功并退出循環(huán),每循環(huán)10次會(huì)將當(dāng)前的應(yīng)用狀態(tài)輸出至日志:"Application submission is not finished, submitted application is still in ",提交成功后輸出日志:"Submitted application "
循環(huán)等待應(yīng)用正常運(yùn)行(RUNNING):每隔250ms通過YarnClient獲取應(yīng)用報(bào)告,每輪循環(huán)也會(huì)將當(dāng)前的應(yīng)用狀態(tài)輸出至日志:"Deploying cluster, current state "。應(yīng)用狀態(tài)成功變?yōu)镽UNNING后將輸出日志"YARN application has been deployed successfully." 并退出循環(huán),如果等到的是非預(yù)期狀態(tài)如FAILED/FINISHED/KILLED,就會(huì)在輸出YARN返回的診斷信息("The YARN application unexpectedly switched to state during deployment. Diagnostics from YARN: ...")之后拋出異常并退出。
Flink Cluster啟動(dòng)流程
1.YARN RM中的ClientRMService(為普通用戶提供的RPC服務(wù)組件,處理來(lái)自客戶端的各種RPC請(qǐng)求,比如查詢YARN集群信息,提交、終止應(yīng)用等)接收到應(yīng)用提交請(qǐng)求,簡(jiǎn)單校驗(yàn)后將請(qǐng)求轉(zhuǎn)交給RMAppManager(YARN RM內(nèi)部管理應(yīng)用生命周期的組件);
2.RMAppManager根據(jù)應(yīng)用提交上下文內(nèi)容創(chuàng)建初始狀態(tài)為NEW的應(yīng)用,將應(yīng)用狀態(tài)持久化到RM狀態(tài)存儲(chǔ)服務(wù)(例如ZooKeeper集群,RM狀態(tài)存儲(chǔ)服務(wù)用來(lái)保證RM重啟、HA切換或發(fā)生故障后集群應(yīng)用能夠正常恢復(fù),后續(xù)流程中的涉及狀態(tài)存儲(chǔ)時(shí)不再贅述),應(yīng)用狀態(tài)變?yōu)镹EW_SAVING;
3.應(yīng)用狀態(tài)存儲(chǔ)完成后,應(yīng)用狀態(tài)變?yōu)镾UBMITTED;RMAppManager開始向ResourceScheduler(YARN RM可拔插資源調(diào)度器,YARN自帶三種調(diào)度器FifoScheduler/FairScheduler/CapacityScheduler,其中CapacityScheduler支持功能最多使用最廣泛,F(xiàn)ifoScheduler功能最簡(jiǎn)單基本不可用,今年社區(qū)已明確不再繼續(xù)支持FairScheduler,建議已有用戶遷至CapacityScheduler)提交應(yīng)用,如果無(wú)法正常提交(例如隊(duì)列不存在、不是葉子隊(duì)列、隊(duì)列已停用、超出隊(duì)列大應(yīng)用數(shù)限制等)則拋出拒絕該應(yīng)用,應(yīng)用狀態(tài)先變?yōu)镕INAL_SAVING觸發(fā)應(yīng)用狀態(tài)存儲(chǔ)流程并在完成后變?yōu)镕AILED;如果提交成功,應(yīng)用狀態(tài)變?yōu)锳CCEPTED;
4.開始創(chuàng)建應(yīng)用運(yùn)行實(shí)例(ApplicationAttempt,由于一次運(yùn)行實(shí)例中最重要的組件是ApplicationMaster,下文簡(jiǎn)稱AM,它的狀態(tài)代表了ApplicationAttempt的當(dāng)前狀態(tài),所以ApplicationAttempt實(shí)際也代表了AM),初始狀態(tài)為NEW;
5.初始化應(yīng)用運(yùn)行實(shí)例信息,并向ApplicationMasterService(AM&RM協(xié)議接口服務(wù),處理來(lái)自AM的請(qǐng)求,主要包括注冊(cè)和心跳)注冊(cè),應(yīng)用實(shí)例狀態(tài)變?yōu)镾UBMITTED;
6.RMAppManager維護(hù)的應(yīng)用實(shí)例開始初始化AM資源申請(qǐng)信息并重新校驗(yàn)隊(duì)列,然后向ResourceScheduler申請(qǐng)AM Container(Container是YARN中資源的抽象,包含了內(nèi)存、CPU等多維度資源),應(yīng)用實(shí)例狀態(tài)變?yōu)锳CCEPTED;
7.ResourceScheduler會(huì)根據(jù)優(yōu)先級(jí)(隊(duì)列/應(yīng)用/請(qǐng)求每個(gè)維度都有優(yōu)先級(jí)配置)從根隊(duì)列開始層層遞進(jìn),先后選擇當(dāng)前優(yōu)先級(jí)最高的子隊(duì)列、應(yīng)用直至具體某個(gè)請(qǐng)求,然后結(jié)合集群資源分布等情況作出分配決策,AM Container分配成功后,應(yīng)用實(shí)例狀態(tài)變?yōu)锳LLOCATED_SAVING,并觸發(fā)應(yīng)用實(shí)例狀態(tài)存儲(chǔ)流程,存儲(chǔ)成功后應(yīng)用實(shí)例狀態(tài)變?yōu)锳LLOCATED;
8.RMAppManager維護(hù)的應(yīng)用實(shí)例開始通知ApplicationMasterLauncher(AM生命周期管理服務(wù),負(fù)責(zé)啟動(dòng)或清理AM container)啟動(dòng)AM container,ApplicationMasterLauncher與YARN NodeManager(下文簡(jiǎn)稱YARN NM,與YARN RM保持通信,負(fù)責(zé)管理單個(gè)節(jié)點(diǎn)上的全部資源、Container生命周期、附屬服務(wù)等,監(jiān)控節(jié)點(diǎn)健康狀況和Container資源使用)建立通信并請(qǐng)求啟動(dòng)AM container;
9.ContainerManager(YARN NM核心組件,管理所有Container的生命周期)接收到AM container啟動(dòng)請(qǐng)求,YARN NM開始校驗(yàn)Container Token及資源文件,創(chuàng)建應(yīng)用實(shí)例和Container實(shí)例并存儲(chǔ)至本地,結(jié)果返回后應(yīng)用實(shí)例狀態(tài)變?yōu)長(zhǎng)AUNCHED;
10.ResourceLocalizationService(資源本地化服務(wù),負(fù)責(zé)Container所需資源的本地化。它能夠按照描述從HDFS上下載Container所需的文件資源,并盡量將它們分?jǐn)偟礁鱾€(gè)磁盤上以防止出現(xiàn)訪問熱點(diǎn))初始化各種服務(wù)組件、創(chuàng)建工作目錄、從HDFS下載運(yùn)行所需的各種資源至Container工作目錄(路徑為: ${yarn.nodemanager.local-dirs}/usercache/${user}/appcache//);
11.ContainersLauncher(負(fù)責(zé)container的具體操作,包括啟動(dòng)、重啟、恢復(fù)和清理等)將待運(yùn)行Container所需的環(huán)境變量和運(yùn)行命令寫到Container工作目錄下的launch_container.sh腳本中,然后運(yùn)行該腳本啟動(dòng)Container;
12.Container進(jìn)程加載并運(yùn)行ClusterEntrypoint(Flink JobManager入口類,每種集群部署模式和應(yīng)用運(yùn)行模式都有相應(yīng)的實(shí)現(xiàn),例如在YARN集群部署模式下,per-job應(yīng)用運(yùn)行模式實(shí)現(xiàn)類是YarnJobClusterEntrypoint,session應(yīng)用運(yùn)行模式實(shí)現(xiàn)類是YarnSessionClusterEntrypoint),首先初始化相關(guān)運(yùn)行環(huán)境:
輸出各軟件版本及運(yùn)行環(huán)境信息、命令行參數(shù)項(xiàng)、classpath等信息;
注冊(cè)處理各種SIGNAL的handler:記錄到日志
注冊(cè)JVM關(guān)閉保障的shutdown hook:避免JVM退出時(shí)被其他shutdown hook阻塞
打印YARN運(yùn)行環(huán)境信息:用戶名
從運(yùn)行目錄中加載flink conf
初始化文件系統(tǒng)
創(chuàng)建并啟動(dòng)各類內(nèi)部服務(wù)(包括RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等)
將RPC address和port更新到flink conf配置
13.啟動(dòng)ResourceManager(Flink資源管理核心組件,包含YarnResourceManager和SlotManager兩個(gè)子組件,YarnResourceManager負(fù)責(zé)外部資源管理,與YARN RM建立通信并保持心跳,申請(qǐng)或釋放TaskManager資源,注銷應(yīng)用等;SlotManager則負(fù)責(zé)內(nèi)部資源管理,維護(hù)全部Slot信息和狀態(tài))及相關(guān)服務(wù),創(chuàng)建異步AMRMClient,開始注冊(cè)AM,注冊(cè)成功后每隔一段時(shí)間(心跳間隔配置項(xiàng):${yarn.heartbeat.interval},默認(rèn)5s)向YARN RM發(fā)送心跳來(lái)發(fā)送資源更新請(qǐng)求和接受資源變更結(jié)果。YARN RM內(nèi)部該應(yīng)用和應(yīng)用運(yùn)行實(shí)例的狀態(tài)都變?yōu)镽UNNING,并通知AMLivelinessMonitor服務(wù)監(jiān)控AM是否存活狀態(tài),當(dāng)心跳超過一定時(shí)間(默認(rèn)10分鐘)觸發(fā)AM failover流程;
14.啟動(dòng)Dispatcher(負(fù)責(zé)接收用戶提供的作業(yè),并且負(fù)責(zé)為這個(gè)新提交的作業(yè)拉起一個(gè)新的 JobManager)及相關(guān)服務(wù)(包括REST endpoint等),在per-job運(yùn)行模式下,Dispatcher將直接從Container工作目錄加載JobGrap文件;在session運(yùn)行模式下,Dispatcher將在接收客戶端提交的Job(_通過BlockServer接收job grap文件)后再進(jìn)行后續(xù)流程;
15.根據(jù)JobGraph啟動(dòng)JobManager(負(fù)責(zé)作業(yè)調(diào)度、管理Job和Task的生命周期),構(gòu)建ExecutionGraph(JobGraph的并行化版本,調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu));
16.JobManager開始執(zhí)行ExecutionGraph,向ResourceManager申請(qǐng)資源;
17.ResourceManager將資源請(qǐng)求加入等待請(qǐng)求隊(duì)列,并通過心跳向YARN RM申請(qǐng)新的Container資源來(lái)啟動(dòng)TaskManager進(jìn)程;后續(xù)流程如果有空閑Slot資源,SlotManager將其分配給等待請(qǐng)求隊(duì)列中匹配的請(qǐng)求,不用再通過18. YarnResourceManager申請(qǐng)新的TaskManager;
**18.YARN ApplicationMasterService接收到資源請(qǐng)求后,解析出新的資源請(qǐng)求并更新應(yīng)用請(qǐng)求信息; **
19.YARN ResourceScheduler成功為該應(yīng)用分配資源后更新應(yīng)用信息,ApplicationMasterService接收到Flink JobManager的下一次心跳時(shí)返回新分配資源信息;
20.Flink ResourceManager接收到新分配的Container資源后,準(zhǔn)備好TaskManager啟動(dòng)上下文(ContainerLauncherContext,生成TaskManager配置并上傳至分布式存儲(chǔ),配置其他依賴和環(huán)境變量等),然后向YARN NM申請(qǐng)啟動(dòng)TaskManager進(jìn)程,YARN NM啟動(dòng)Container的流程與AM Container啟動(dòng)流程基本類似,區(qū)別在于應(yīng)用實(shí)例在NM上已存在并未RUNNING狀態(tài)時(shí)則跳過應(yīng)用實(shí)例初始化流程,這里不再贅述;
21.TaskManager進(jìn)程加載并運(yùn)行YarnTaskExecutorRunner(Flink TaskManager入口類),初始化流程完成后啟動(dòng)TaskExecutor(負(fù)責(zé)執(zhí)行Task相關(guān)操作);
22.TaskExecutor啟動(dòng)后先向ResourceManager注冊(cè),成功后再向SlotManager匯報(bào)自己的Slot資源與狀態(tài); SlotManager接收到Slot空閑資源后主動(dòng)觸發(fā)Slot分配,從等待請(qǐng)求隊(duì)列中選出合適的資源請(qǐng)求后,向 TaskManager請(qǐng)求該Slot資源
23.TaskManager收到請(qǐng)求后檢查該Slot是否可分配(不存在則返回異常信息)、Job是否已注冊(cè)(沒有則先注冊(cè)再分配Slot),檢查通過后將Slot分配給JobManager;
24.JobManager檢查Slot分配是否重復(fù),通過后通知Execution執(zhí)行部署task流程,向TaskExecutor提交task; TaskExecutor啟動(dòng)新的線程運(yùn)行Task。
“怎么掌握Flink on YARN應(yīng)用啟動(dòng)流程”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!