本篇文章給大家分享的是有關(guān)Spark-submit執(zhí)行流程是怎么樣的,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
在葉集等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務理念,為客戶提供網(wǎng)站制作、網(wǎng)站建設(shè) 網(wǎng)站設(shè)計制作按需設(shè)計網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),成都全網(wǎng)營銷,成都外貿(mào)網(wǎng)站建設(shè)公司,葉集網(wǎng)站建設(shè)費用合理。
我們在進行Spark任務提交時,會使用“spark-submit -class .....”樣式的命令來提交任務,該命令為Spark目錄下的shell腳本。它的作用是查詢spark-home,調(diào)用spark-class命令。
if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
隨后會執(zhí)行spark-class命令,以SparkSubmit類為參數(shù)進行任務向Spark程序的提交,而Spark-class的shell腳本主要是執(zhí)行以下幾個步驟:
(1)加載spark環(huán)境參數(shù),從conf中獲取
if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi . "${SPARK_HOME}"/bin/load-spark-env.sh # 尋找javahome if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" else if [ "$(command -v java)" ]; then RUNNER="java" else echo "JAVA_HOME is not set" >&2 exit 1 fi fi
(2)載入java,jar包等
# Find Spark jars. if [ -d "${SPARK_HOME}/jars" ]; then SPARK_JARS_DIR="${SPARK_HOME}/jars" else SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars" fi
(3)調(diào)用org.apache.spark.launcher中的Main進行參數(shù)注入
build_command() { "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" printf "%d\0" $? }
(4)shell腳本監(jiān)測任務執(zhí)行狀態(tài),是否完成或者退出任務,通過執(zhí)行返回值,判斷是否結(jié)束
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then echo "${CMD[@]}" | head -n-1 1>&2 exit 1 fi if [ $LAUNCHER_EXIT_CODE != 0 ]; then exit $LAUNCHER_EXIT_CODE fi CMD=("${CMD[@]:0:$LAST}") exec "${CMD[@]}"
檢測執(zhí)行模式(class or submit)構(gòu)建cmd,在submit中進行參數(shù)的檢查(SparkSubmitOptionParser),構(gòu)建命令行并且打印回spark-class中,最后調(diào)用exec執(zhí)行spark命令行提交任務。通過組裝而成cmd內(nèi)容如下所示:
/usr/local/java/jdk1.8.0_91/bin/java-cp /data/spark-1.6.0-bin-hadoop2.6/conf/:/data/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/data/hadoop-2.6.5/etc/hadoop/ -Xms1g-Xmx1g -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=1234 org.apache.spark.deploy.SparkSubmit --classorg.apache.spark.repl.Main --nameSpark shell --masterspark://localhost:7077 --verbose/tool/jarDir/maven_scala-1.0-SNAPSHOT.jar
(1)Spark任務在提交之后會執(zhí)行SparkSubmit中的main方法
def main(args: Array[String]): Unit = { val submit = new SparkSubmit() submit.doSubmit(args) }
(2)doSubmit()對log進行初始化,添加spark任務參數(shù),通過參數(shù)類型執(zhí)行任務:
def doSubmit(args: Array[String]): Unit = { // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to // be reset before the application starts. val uninitLog = initializeLogIfNecessary(true, silent = true) val appArgs = parseArguments(args) if (appArgs.verbose) { logInfo(appArgs.toString) } appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) case SparkSubmitAction.PRINT_VERSION => printVersion() } }
SUBMIT:使用提供的參數(shù)提交application
KILL(Standalone and Mesos cluster mode only):通過REST協(xié)議終止任務
REQUEST_STATUS(Standalone and Mesos cluster mode only):通過REST協(xié)議請求已經(jīng)提交任務的狀態(tài)
PRINT_VERSION:對log輸出版本信息
(3)調(diào)用submit函數(shù):
def doRunMain(): Unit = { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { runMain(args, uninitLog) } }) } catch { case e: Exception => // Hadoop's AuthorizationException suppresses the exception's stack trace, which // makes the message printed to the output by the JVM not very helpful. Instead, // detect exceptions with empty stack traces here, and treat them differently. if (e.getStackTrace().length == 0) { error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") } else { throw e } } } else { runMain(args, uninitLog) } }
doRunMain為集群調(diào)用子main class準備參數(shù),然后調(diào)用runMain()執(zhí)行任務invoke main
Spark在作業(yè)提交中會采用多種不同的參數(shù)及模式,都會根據(jù)不同的參數(shù)選擇不同的分支執(zhí)行,因此在最后提交的runMain中會將所需要的參數(shù)傳遞給執(zhí)行函數(shù)。
以上就是Spark-submit執(zhí)行流程是怎么樣的,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。