Spark submit依賴包管理!
創(chuàng)新互聯(lián)公司是一家從事企業(yè)網(wǎng)站建設(shè)、做網(wǎng)站、成都做網(wǎng)站、行業(yè)門戶網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)制作的專業(yè)網(wǎng)站設(shè)計(jì)公司,擁有經(jīng)驗(yàn)豐富的網(wǎng)站建設(shè)工程師和網(wǎng)頁(yè)設(shè)計(jì)人員,具備各種規(guī)模與類型網(wǎng)站建設(shè)的實(shí)力,在網(wǎng)站建設(shè)領(lǐng)域樹立了自己獨(dú)特的設(shè)計(jì)風(fēng)格。自公司成立以來曾獨(dú)立設(shè)計(jì)制作的站點(diǎn)上1000+。
使用spark-submit時(shí),應(yīng)用程序的jar包以及通過—jars選項(xiàng)包含的任意jar文件都會(huì)被自動(dòng)傳到集群中。
spark-submit --class --master --jars
Spark使用了下面的URL格式允許不同的jar包分發(fā)策略。
1、文件file方式:
絕對(duì)路徑且file:/URIs是作為driver的HTTP文件服務(wù)器,且每個(gè)executor會(huì)從driver的HTTP服務(wù)器拉取文件;
2、hdfs方式:
http:,https:,ftp:,從這些給定的URI中拉取文件和JAR包;
3、本地local方式:
以local:/開始的URI應(yīng)該是每個(gè)worker節(jié)點(diǎn)的本地文件,這意味著沒有網(wǎng)絡(luò)IO開銷,并且推送或通過NFS/GlusterFS等共享到每個(gè)worker大文件/JAR文件或能很好的工作。
注意:每個(gè)SparkContext的JAR包和文件都會(huì)被復(fù)制到executor節(jié)點(diǎn)的工作目錄下,這將用掉大量的空間,然后還需要清理干凈。
在YARN下,清理是自動(dòng)進(jìn)行的。在Spark Standalone下,自動(dòng)清理可以通過配置spark.worker.cleanup.appDataTtl屬性做到,此配置屬性的默認(rèn)值是7*24*3600。
用戶可以用--packages選項(xiàng)提供一個(gè)以逗號(hào)分隔的maven清單來包含任意其他依賴。
其它的庫(kù)(或SBT中的resolvers)可以用--repositories選項(xiàng)添加(同樣用逗號(hào)分隔),這些命令都可以用在pyspark,spark-shell和spark-submit中來包含一些Spark包。
對(duì)Python而言,--py-files選項(xiàng)可以用來向executors分發(fā).egg,.zip和.py庫(kù)。
源碼走讀:
1、
object SparkSubmit
2、
appArgs.{ SparkSubmitAction.=> (appArgs) SparkSubmitAction.=> (appArgs) SparkSubmitAction.=> (appArgs) }
3、
(args: SparkSubmitArguments): = { (childArgschildClasspathsysPropschildMainClass) = (args) (): = { (args.!= ) { proxyUser = UserGroupInformation.createProxyUser(args.UserGroupInformation.getCurrentUser()) { proxyUser.doAs(PrivilegedExceptionAction[]() { (): = { (childArgschildClasspathsysPropschildMainClassargs.) } })
4、
(jar <- childClasspath) { (jarloader) }
5、
(localJar: loader: MutableURLClassLoader) { uri = Utils.(localJar) uri.getScheme { | => file = File(uri.getPath) (file.exists()) { loader.addURL(file.toURI.toURL) } { (file) } _ => (uri) } }
之后線索就斷了,回歸到j(luò)ava的class類調(diào)用jar包。
6、誰調(diào)用,executor。
(newFiles: HashMap[]newJars: HashMap[]) { hadoopConf = SparkHadoopUtil..newConfiguration() synchronized { ((nametimestamp) <- newFiles .getOrElse(name-) < timestamp) { logInfo(+ name + + timestamp) Utils.(nameFile(SparkFiles.())env.securityManagerhadoopConftimestampuseCache = !isLocal) (name) = timestamp } ((nametimestamp) <- newJars) { localName = name.split().last currentTimeStamp = .get(name) .orElse(.get(localName)) .getOrElse(-) (currentTimeStamp < timestamp) { logInfo(+ name + + timestamp) Utils.(nameFile(SparkFiles.())env.securityManagerhadoopConftimestampuseCache = !isLocal) (name) = timestamp url = File(SparkFiles.()localName).toURI.toURL (!.getURLs().contains(url)) { logInfo(+ url + ) .addURL(url) } } } } }
Utils.fetchFile方法,進(jìn)入
/**
* Download a file or directory to target directory. Supports fetching the file in a variety of
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
* filesystems.
*
* If`useCache`is true, first attempts to fetch the file to a local cache that's shared
* across executors running the same application.`useCache`is used mainly for
* the executors, and not in local mode.
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
(!cachedFile.exists()) { (urllocalDircachedFileNameconfsecurityMgrhadoopConf) }
可見,支持本地files,Hadoop的hdfs,還有http格式的文件。
其中目錄目前支持hdfs!
完畢!