小編給大家分享一下Hadoop如何實(shí)現(xiàn)job提交,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
七里河網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)建站!從網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、APP開(kāi)發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項(xiàng)目制作,到程序開(kāi)發(fā),運(yùn)營(yíng)維護(hù)。創(chuàng)新互聯(lián)建站成立于2013年到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來(lái)保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)建站。
從如下地方開(kāi)始,就要進(jìn)行job的提交了
boolean isSuccess = job.waitForCompletion(true);
之后,進(jìn)入Job類的waitForCompletion方法。
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } //---- return isSuccessful(); }> > 這里輸入引用文本
之后調(diào)用Job類的submit方法,
public void submit() throws IOException, InterruptedException, ClassNotFoundException { connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); }
connect方法負(fù)責(zé)初始化集群信息:
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); } }); } }
集群信息cluster,包括什么,應(yīng)該很清晰:
private ClientProtocolProvider clientProtocolProvider; private ClientProtocol client;
private UserGroupInformation ugi; private Configuration conf; private FileSystem fs = null;
private Path sysDir = null; private Path stagingAreaDir = null; private Path jobHistoryDir = null;
略微分析下, ClientProtocolProvider是客戶端協(xié)議的生產(chǎn)者,對(duì)應(yīng)的客戶端是ClientProtocol。
ClientProtocolProvider規(guī)定了2個(gè)方法:
create
close 分別也用來(lái)創(chuàng)建和關(guān)閉客戶端ClientProtocol。
而,ClientProtocolProvider的具體實(shí)現(xiàn)類有2個(gè)。
可以看到,有兩個(gè)協(xié)議生產(chǎn)者,分別是yarn和local的。
那么,對(duì)應(yīng)的客戶端ClientProtocol,也會(huì)有兩個(gè)。
ClientProtocol是個(gè)接口,里面規(guī)定了如下幾個(gè)方法:
那么,不同的客戶端yarn或者local,實(shí)現(xiàn)其中的方法即可。 因?yàn)?,我們是本地Eclipse運(yùn)行,直接看local即可,yarn的原理差不多,
OK,經(jīng)過(guò)connect方法之后,cluster中這幾個(gè)就有啦,即使沒(méi)有的話,get的時(shí)候,也會(huì)初始化的。
之后, 使用集群的,F(xiàn)ileSystem和client創(chuàng)建一個(gè)submiter。
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
然后,調(diào)用submitter 的submitJobInternal方法提交作業(yè),OK,進(jìn)入submitJobInternal方法。
JobSubmiter類的submitJobInternal方法大致過(guò)程如下:
checkSpecs(job); 檢查作業(yè)輸出路徑。
//獲得staging路徑,注意:集群cluster中有這個(gè)路徑的名稱,只不過(guò)這里需要?jiǎng)?chuàng)建路徑。 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//在staging路徑下創(chuàng)建一個(gè)以jobid為標(biāo)示的文件夾 JobID jobId = submitClient.getNewJobID(); Path submitJobDir = new Path(jobStagingArea, jobId.toString());
//job需要的一些文件和jar包之類的,都放到剛才的那個(gè)submitJobDir路徑下 copyAndConfigureFiles(job, submitJobDir);
具體的東西包括:
String files = conf.get("tmpfiles"); String libjars = conf.get("tmpjars"); String archives = conf.get("tmparchives");
//寫入job輸入的分片信息 int maps = writeSplits(job, submitJobDir);
split信息包括兩個(gè)部分。 首先調(diào)用Inputformat獲得分片的個(gè)數(shù),具體如何獲得,后續(xù)講。 將返回的分片數(shù)組逐個(gè)遍歷并持久化到一個(gè)文件。
SplitMetaInfo[] info = writeNewSplits(conf, splits, out); 而writeNewSplits代碼主要就是寫分片信息到文件中。
之后,將split的分片信息持久化一個(gè)元數(shù)據(jù)文件。 writeJobSplitMetaInfo方法。
//將job的描述信息,寫到一個(gè)job.xml放到相應(yīng)的staging目錄下的jobid目錄。 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); writeConf(conf, submitJobFile);
FSDataOutputStream out = FileSystem.create(jtFs, jobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); try { conf.writeXml(out); } finally { out.close(); }
//提交作業(yè) status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
OK,提交作業(yè)部分的代碼就到這,后續(xù)寫寫,app master運(yùn)行的過(guò)程。
總結(jié),提交作業(yè)的主要功能。
創(chuàng)建staging路徑
在staging路徑下面創(chuàng)建作業(yè)id的路徑
把job相關(guān)的文件拷貝到路徑下
將job的split信息序列化到文件中
將job的xml寫到路徑下
這些東西都放到hdfs,作為所有節(jié)點(diǎn)共享訪問(wèn)的地方。之后,app master會(huì)訪問(wèn)這個(gè)目錄,copy job的配置文件到本地并創(chuàng)建job對(duì)象,并根據(jù)split的信息,創(chuàng)建對(duì)應(yīng)的maptaskrunable。運(yùn)行。
但是,總的job信息依然在hdfs上。
以上是“Hadoop如何實(shí)現(xiàn)job提交”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!