本篇文章給大家分享的是有關(guān)HDFS中怎么實現(xiàn)本地文件上傳,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
創(chuàng)新互聯(lián)公司主營醴陵網(wǎng)站建設的網(wǎng)絡公司,主營網(wǎng)站建設方案,重慶APP開發(fā),醴陵h5微信小程序開發(fā)搭建,醴陵網(wǎng)站營銷推廣歡迎醴陵等地區(qū)企業(yè)咨詢
public synchronized void write(byte b[], int off, int len)
throws IOException {
if (closed) { //校驗是否關(guān)閉了,關(guān)閉了自然不應該再寫入數(shù)據(jù)了
throw new IOException("Stream closed");
}
while (len > 0) { //這里的len就是指源緩沖區(qū)剩下的未寫完的數(shù)據(jù)長度,單位byte
int remaining = BUFFER_SIZE - pos; //目的緩沖區(qū)里可以寫的字節(jié)數(shù)
int toWrite = Math.min(remaining, len); //跟需要寫的字節(jié)數(shù)比較,取較小值作為真正要寫入的字節(jié)數(shù)
System.arraycopy(b, off, outBuf, pos, toWrite); //開始復制來作為寫入到目的緩沖區(qū)操作
pos += toWrite; //更新目的緩沖區(qū)位置指針
off += toWrite; //更新源緩沖區(qū)位置指針
len -= toWrite; //更新源緩沖區(qū)剩下的內(nèi)容長度
filePos += toWrite; //計算整個文件的總的已經(jīng)寫入的長度(包括緩沖區(qū)里的內(nèi)容)
if ((bytesWrittenToBlock + pos >= BLOCK_SIZE) ||
(pos == BUFFER_SIZE)) {
flush(); //這里是2個條件引起flush,一個是總長度(已寫+緩存)超過一個塊大小,
//第2個就是目的緩沖區(qū)已經(jīng)滿了,都么空間寫入了,自然需要flush了。
}
}
}
//友情提醒,這里的前半段寫入是能寫多少寫多少,寫完了再判斷!
為啥有2個判斷條件?想必很多人對緩沖區(qū)滿了很好理解,因為都沒剩余空間了
而對bytesWrittenToBlock + pos >= BLOCK_SIZE可能不是很清楚
這是因為一個Block寫滿了就要另起爐灶,重新開一個Block.
flush()函數(shù)暫時不解釋,后面再解釋!
---
public synchronized void write(int b) throws IOException {
if (closed) {//仍然是校驗是否關(guān)閉
throw new IOException("Stream closed");
}
if ((bytesWrittenToBlock + pos == BLOCK_SIZE) ||
(pos >= BUFFER_SIZE)) {
flush();
}//仍然是2個條件的校驗
outBuf[pos++] = (byte) b;
filePos++;//這2句的意義在于真正的寫入到目的緩沖區(qū)里
不過為啥不把這2段調(diào)一下順序更好理解?果然思維獨特!
}
---
public synchronized void flush() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}//檢驗是否關(guān)閉,老規(guī)矩
if (bytesWrittenToBlock + pos >= BLOCK_SIZE) {
flushData(BLOCK_SIZE - bytesWrittenToBlock);
}//如果需要新起1個Block的話,就把剩下的不足字節(jié)數(shù)先寫上
if (bytesWrittenToBlock == BLOCK_SIZE) {
endBlock();//然后關(guān)閉當前塊,新起一塊
}
flushData(pos);//對當前塊繼續(xù)寫剩下的
}
---
繼續(xù)看別的函數(shù)
在看別的函數(shù)之前,首先希望讀者先建立一個0.1.0中文件的存儲機制。
在讀取本地文件上傳到HDFS中,文件流是這樣的。
本地文件--->本地內(nèi)存緩沖區(qū)Buffer--->本地文件--->上傳到遠程HDFS系統(tǒng)。
而本地內(nèi)存緩沖區(qū)Buffer--->本地文件就是flushData做的事情,請再復習下flush函數(shù),然后再接下來分析flushData.
PS:看代碼比寫代碼累,看代碼是了解別人的思維,寫代碼是把自己的思維實現(xiàn)起來。。。
private synchronized void flushData(int maxPos) throws IOException {
int workingPos = Math.min(pos, maxPos);//計算要寫入的字節(jié)數(shù),真是多此一舉。
if (workingPos > 0) {//如果確實需要寫的話
//
// To the local block backup, write just the bytes
//
backupStream.write(outBuf, 0, workingPos);//寫入到本地文件
//注意,請認真閱讀backupStream的初始化過程,是一個本地文件。
//也就是說計劃把內(nèi)存緩沖區(qū)里的內(nèi)容寫到本地文件中,寫完一個block再發(fā)送給HDFS.
//聰明的讀者應該想到最后一個block的大小是<=blockSize的。
// Track position
//
bytesWrittenToBlock += workingPos;//更新寫入到block塊的字節(jié)數(shù),
//尤其要強調(diào),當一個塊結(jié)束后,這個變量就會重置為0,你懂的。
System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
//字節(jié)前挪移到偏移量為0的位置,方便后面IO操作,你懂得,不解釋。
pos -= workingPos;//相關(guān)變量都需要更新
}
}
---------------
接下來到了比較核心的函數(shù)endBlock(); 意思是關(guān)閉當前塊,新起一塊,下面來看看具體的代碼!
private synchronized void endBlock() throws IOException {
//
// Done with local copy
//
backupStream.close();//關(guān)閉本地文件系統(tǒng)的臨時文件
//
// Send it to datanode//準備發(fā)送給datanode了。
//
boolean mustRecover = true;//定義一個哨兵變量
while (mustRecover) {//需要讀取當前文件時
nextBlockOutputStream();
因為這個函數(shù)到后面才分析,所以提把背景知識補充好,這個函數(shù)
主要是初始化了一對IO流句柄,這個流是當前shell和遠程datanode
之間的TCP連接,這對IO流句柄就是 blockStream + blockReplyStream,
分別對應著輸出流和輸入流,輸出流用來輸出文件頭和文件內(nèi)容,輸入流是
用來讀取響應。
InputStream in = new FileInputStream(backupFile);//既然第一行關(guān)閉了寫,
現(xiàn)在就可以開始讀了
try {
byte buf[] = new byte[BUFFER_SIZE];//還是局部的IO緩沖區(qū)
int bytesRead = in.read(buf);//從本地文件中讀取內(nèi)容
while (bytesRead > 0) {//大于0?
blockStream.writeLong((long) bytesRead);//寫入字節(jié)數(shù)
blockStream.write(buf, 0, bytesRead);//寫入緩沖區(qū)的內(nèi)容
bytesRead = in.read(buf);//繼續(xù)從本地文件中讀取
}
internalClose();//跟NameNode和DataNode的交互,表示關(guān)閉
mustRecover = false;//表示任務結(jié)束
} catch (IOException ie) {
handleSocketException(ie);
} finally {
in.close();//關(guān)閉當前文件的輸入流
}
}
//
// Delete local backup, start new one
//下面4行是從新建立起本地文件系統(tǒng)的文件緩沖系統(tǒng),不解釋
backupFile.delete();
backupFile = newBackupFile();
backupStream = new FileOutputStream(backupFile);
bytesWrittenToBlock = 0;
}
在閱讀以上代碼之后,我個人認為如果用C語言來寫這段邏輯的話,我會直接調(diào)用sendfile來實現(xiàn)文件傳輸。
當然JAVA的API滯后性以及OS當時或許都不提供這種方式吧,反正現(xiàn)在的內(nèi)核都提供了。
---------------------------------------
那么接下來分析的是函數(shù):nextBlockOutputStream()
private synchronized void nextBlockOutputStream() throws IOException {
boolean retry = false;//不解釋
long start = System.currentTimeMillis();//當前開始時間
do {
retry = false;//重置為false
long localstart = System.currentTimeMillis();//當前開始時間
boolean blockComplete = false;//標注塊是否OK
LocatedBlock lb = null; //初始化為null
while (! blockComplete) {//如果未結(jié)束
if (firstTime) {//如果是第一次開啟一個文件
lb = namenode.create(src.toString(), clientName.toString(), localName, overwrite);//創(chuàng)建一個文件
} else {
lb = namenode.addBlock(src.toString(), localName);
}//增加一個block
if (lb == null) {//如果找不到
try {
Thread.sleep(400);//就沉睡400毫秒
if (System.currentTimeMillis() - localstart > 5000) {
LOG.info("Waiting to find new output block node for " + (System.currentTimeMillis() - start) + "ms");
}
} catch (InterruptedException ie) {
}
} else {
blockComplete = true;//設置blockComplete為true.解釋為找到了一個block
}
}
block = lb.getBlock();//從lb中獲取block的信息
DatanodeInfo nodes[] = lb.getLocations();//從lb中獲取block要存儲的DataNode數(shù)組
//
// Connect to first DataNode in the list. Abort if this fails.
//請注意上面這句的意思:連接第一個數(shù)據(jù)節(jié)點,
//為啥?數(shù)據(jù)傳輸采用計算機組成原理的菊花鏈模式
InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString());//解析
try {
s = new Socket();
s.connect(target, READ_TIMEOUT);//連接第一個DataNode
s.setSoTimeout(READ_TIMEOUT);//設置讀取時間
} catch (IOException ie) {//異常這里就不分析了
// Connection failed. Let's wait a little bit and retry
try {
if (System.currentTimeMillis() - start > 5000) {
LOG.info("Waiting to find target node: " + target);
}
Thread.sleep(6000);
} catch (InterruptedException iex) {
}
if (firstTime) {
namenode.abandonFileInProgress(src.toString());
} else {
namenode.abandonBlock(block, src.toString());
}
retry = true;
continue;
}
//此時已經(jīng)成功連接到了遠程DataNode節(jié)點,bingo!
// Xmit header info to datanode
//
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
//獲取輸出流句柄
out.write(OP_WRITE_BLOCK);//輸出行為標識
out.writeBoolean(false);//false?
block.write(out);//寫入block信息,注意:是把從namenode獲取到的block寫給DataNode
out.writeInt(nodes.length);//這一樣和下面這一行是為了寫入所有存儲及備份的DataNode
for (int i = 0; i < nodes.length; i++) {
nodes[i].write(out);//不解釋
}
out.write(CHUNKED_ENCODING);//寫CHUNKED_ENCODING
bytesWrittenToBlock = 0;//重置為0
blockStream = out;//把句柄賦值給類的局部變量供后續(xù)使用
blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));//同理,不解釋
} while (retry);
firstTime = false;//firstTime在至少有一個塊信息返回后就為false
===================================================
接下來要分析的函數(shù)是
private synchronized void internalClose() throws IOException {
blockStream.writeLong(0);//表明長度結(jié)束了
blockStream.flush();//把緩沖內(nèi)容全部輸出。
long complete = blockReplyStream.readLong();//讀取響應
if (complete != WRITE_COMPLETE) {//如果不是結(jié)束
LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
}
LocatedBlock lb = new LocatedBlock();//創(chuàng)建一個新對象
lb.readFields(blockReplyStream);//根據(jù)響應流來賦值
namenode.reportWrittenBlock(lb);//向namenode報告寫入成功
s.close();//關(guān)閉此流
s = null;
}
================
最后就是close函數(shù)
public synchronized void close() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}//校驗是否關(guān)閉了
flush();//盡可能的輸出內(nèi)容
if (filePos == 0 || bytesWrittenToBlock != 0) {
try {
endBlock();//結(jié)束一個塊
} catch (IOException e) {
namenode.abandonFileInProgress(src.toString());//拋棄此file
throw e;
}
}
backupStream.close();//關(guān)閉流
backupFile.delete();//刪除文件
if (s != null) {
s.close();//不解釋
s = null;
}
super.close();
long localstart = System.currentTimeMillis();
boolean fileComplete = false;
while (! fileComplete) {//循環(huán)報告文件寫完了
fileComplete = namenode.complete(src.toString(), clientName.toString());
if (!fileComplete) {
try {
Thread.sleep(400);
if (System.currentTimeMillis() - localstart > 5000) {
LOG.info("Could not complete file, retrying...");
}
} catch (InterruptedException ie) {
}
}
}
closed = true;
}
以上就是HDFS中怎么實現(xiàn)本地文件上傳,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。