[TOC]
創(chuàng)新互聯(lián)建站是一家專注于網(wǎng)站制作、做網(wǎng)站與策劃設計,潯陽網(wǎng)站建設哪家好?創(chuàng)新互聯(lián)建站做網(wǎng)站,專注于網(wǎng)站建設十多年,網(wǎng)設計領(lǐng)域的專業(yè)建站公司;建站業(yè)務涵蓋:潯陽等地區(qū)。潯陽做網(wǎng)站價格咨詢:028-86922220
1)客戶端通過本地通過RPC與namenode建立rpc通信,然后請求上傳文件
2)namenode收到請求后,會檢查是否能創(chuàng)建該文件(比如校驗用戶是否有該權(quán)限,文件是否已經(jīng)存在等)。如果檢查通過,namenode就會開始記錄該新文件的元信息(先寫入到edits文件,然后更新內(nèi)存中的metadata),并響應client可以開始上傳。
3)client 在本地將文件進行切塊(按照指定的block大?。?。然后請求namemode上傳第一個block。
4)namenode根據(jù)策略以及每個datanode的情況,返回3個datanode地址給client(這里默認3副本)。
5)client與請求namenode返回的3個datanode建立pipeline,即 client請求dn1,dn1請求dn2,dn2請求dn3,這樣一個串行通道。
6)3個datanode逐級響應,最終響應給client。表示可以傳輸數(shù)據(jù)
7)client會將每個block還會分割成一個個packet,然后放入 data queue中,等待上傳。每傳輸一個packet,就會將packet加入到另外一個 ack queue中,等到pipeline中的datanode響應傳輸完成后,就會講相應的packet從ack queue中移除。
8)后面就是重復上面的流程,直到client關(guān)閉通道,并將所有的queue中的packet刷寫到pipeline中之后,datanode就會標記文件已完成。
注意:client完成寫入之后,此時block 才是可見的,正在寫的block是不可見的。當調(diào)用sync方法時(將緩沖區(qū)數(shù)據(jù)刷寫到磁盤中),client才確認寫入已經(jīng)完成。client關(guān)閉流時調(diào)用 的close方法,底層就會調(diào)用sync。是否需要手動調(diào)用取決你根據(jù)程序需 要在數(shù)據(jù)健壯性和吞吐率之間的權(quán)衡。
問題:傳輸過程中,某個datanode發(fā)生錯誤,hdfs是怎么解決?
1)pipeline關(guān)閉掉
2)為了防止丟包,ack queue中的packet會同步到data queue中。重新進行下一次傳輸。
3)把產(chǎn)生錯誤的datanode上當前在寫,但未完成的block刪除掉
4)剩下的block寫到剩余兩個正常的datanode中。
5)namenode會自動尋找另外合適的一個datanode復制另外兩個datanode中刷寫的block,完成3副本的寫入。當然,這個操作namenode的內(nèi)部機制,對client來說是無感知的。
namenode使用兩種文件保存元數(shù)據(jù),fsimag和edits文件。
fsimage:元數(shù)據(jù)鏡像文件,存儲某一時間段內(nèi)的namenode的內(nèi)存元數(shù)據(jù)信息
edits:操作日志文件。
fstime:保存最近一次checkpoint的時間。
更詳細的 fsimage和edits文件講解,請看 “hdfs體系架構(gòu)”
? namenode所有的元數(shù)據(jù)信息從啟動時就已經(jīng)全部加載到內(nèi)存中(為了提高查詢性能),用于處理讀請求的查詢操作。到有寫操作時,namenode會先向edits文件中寫入操作日志,完成后才會修改內(nèi)存中的metadata,這個主要是保證元數(shù)據(jù)已經(jīng)存儲到磁盤中不丟失。
? hdfs內(nèi)部維護的fsimage文件其實就是內(nèi)存中的metadata的鏡像,但是兩者并不是實時一致的。fsimage的更新是通過合并edits來實現(xiàn)的。而這個合并操作是 secondaryNameNode完成的,主要流程如下:
1)首先是 SNN通知 NN切換edits文件,主要是保證合并過程有新的寫入操作時能夠正常寫入edits文件。
2)SNN通過http請求從NN獲取 fsimage和edits文件。
3)SNN將fsiamge載入內(nèi)存,開始合并edits到fsimage,生成新的fsimage
4)SNN將新的fsimage發(fā)送給NN
5)NN用新的fsimage,替換舊的fsimage。
? 寫入操作時,默認3副本,那么副本分布在哪些datanode節(jié)點上,會影響寫入速度。在hdfs的網(wǎng)絡拓撲中,有那么四種物理范圍,同一節(jié)點、同一機架上的不同節(jié)點、同一機房中不同節(jié)點、不同機房中的不同節(jié)點。這4中物理范圍表示節(jié)點間的距離逐漸增大。這種物理距離越遠會影響副本之間所在節(jié)點之間的傳輸效率,即傳輸效率越低。
上面說到副本的選擇的節(jié)點的位置會影響寫效率,那么hdfs是如何選擇節(jié)點位置的。
(1)舊版本的方式
路徑是 r1/n1 --> r2/n1 --> r2/n2
(2)新版本方式
路徑是 r1/n1 --> r1/n2 --> r2/n2(后面這個其實任意都行,主要處于不同機架就好)
這種方式比第一種要好,因為這種方式數(shù)據(jù)經(jīng)過的總路徑更短了,只要一個副本需要跨機架傳輸,而上面的則有兩個副本需要跨機架傳輸。
下面的分析過程基于 hadoop2.8.4 的源碼分析的。
一般來說,會先通過 FileSystem.get() 獲取到操作hdfs 的客戶端對象,后面所有的操作都通過調(diào)用該對象的方法完成的。
FileSystem client = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf);
接著我們看看 FileSystem.get() 的實現(xiàn)
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) {
return get(conf);
} else {
if (scheme != null && authority == null) {
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
return get(defaultUri, conf);
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
/*
這里是關(guān)鍵代碼,表示進入 CACHE.get() 方法
*/
return conf.getBoolean(disableCacheName, false) ? createFileSystem(uri, conf) : CACHE.get(uri, conf);
}
}
CACHE是FileSystem的一個靜態(tài)內(nèi)部類Cache 的對象。繼續(xù)看看 CACHE.get()方法
FileSystem get(URI uri, Configuration conf) throws IOException {
FileSystem.Cache.Key key = new FileSystem.Cache.Key(uri, conf);
//進入CACHE對象的 getInternal() 方法
return this.getInternal(uri, conf, key);
}
進入CACHE對象的 getInternal() 方法
private FileSystem getInternal(URI uri, Configuration conf, FileSystem.Cache.Key key) throws IOException {
FileSystem fs;
synchronized(this) {
/*
獲取map中的filesytem對象,表示之前已經(jīng)初始化了filesystem對象,并存儲到map集合中,現(xiàn)在直接從map中獲取就好。
*/
fs = (FileSystem)this.map.get(key);
}
if (fs != null) {
//如果fs存在,就直接返回存在的filesytem實例即可
return fs;
} else {
//如果是初次使用filesystem,就得創(chuàng)建并初始化
fs = FileSystem.createFileSystem(uri, conf);
synchronized(this) {
FileSystem oldfs = (FileSystem)this.map.get(key);
if (oldfs != null) {
fs.close();
return oldfs;
} else {
if (this.map.isEmpty() && !ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(this.clientFinalizer, 10);
}
fs.key = key;
this.map.put(key, fs);
if (conf.getBoolean("fs.automatic.close", true)) {
this.toAutoClose.add(key);
}
return fs;
}
}
}
}
我們看到了上面有兩種方式,一種是如果filesytem對象已存在,那么直接從map獲取并返回對象即可。如果不存在,就調(diào)用 FileSystem.createFileSystem() 方法創(chuàng)建,創(chuàng)建完成后返回fs。下面看看這個方法.
private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
Tracer tracer = FsTracer.get(conf);
TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
scope.addKVAnnotation("scheme", uri.getScheme());
FileSystem var6;
try {
Class> clazz = getFileSystemClass(uri.getScheme(), conf);
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
//這是關(guān)鍵性的代碼,看名字就知道,對filesytem 進行初始化
fs.initialize(uri, conf);
var6 = fs;
} finally {
scope.close();
}
return var6;
}
我們要注意,F(xiàn)ileSystem這個類是抽象類,它的實現(xiàn)子類是 DistributedFileSystem,所以雖然 fs是FileSystem類型的,但是對象本身是DistributedFileSystem類型的,也就是java 的多態(tài)特性。所以fs.initialize() 調(diào)用的實際上是 DistributedFileSystem中initialize()方法。下面看看這個方法
/*
DistributedFileSystem.class
*/
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
this.setConf(conf);
String host = uri.getHost();
if (host == null) {
throw new IOException("Incomplete HDFS URI, no host: " + uri);
} else {
this.homeDirPrefix = conf.get("dfs.user.home.dir.prefix", "/user");
//這是關(guān)鍵性代碼,創(chuàng)建了一個DFSClient對象,顧名思義就是RPC的客戶端
this.dfs = new DFSClient(uri, conf, this.statistics);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.workingDir = this.getHomeDirectory();
this.storageStatistics = (DFSOpsCountStatistics)GlobalStorageStatistics.INSTANCE.put("DFSOpsCountStatistics", new StorageStatisticsProvider() {
public StorageStatistics provide() {
return new DFSOpsCountStatistics();
}
});
}
}
看到上面創(chuàng)建了一個 DFSClient() 對象,賦值給了 this.dfs。下面看看這個類的構(gòu)造方法。
/*
DFSClient.class
*/
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, Statistics stats) throws IOException {
.............................
/*源碼比較長,所以截取重要的部分顯示*/
//這是一個關(guān)鍵性變量,其實就是namenode代理對象,只不過還沒有創(chuàng)建對象
ProxyAndInfo proxyInfo = null;
...............................
//下面開始創(chuàng)建namenode代理對象
if (proxyInfo != null) {
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = (ClientProtocol)proxyInfo.getProxy();
} else if (rpcNamenode != null) {
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
this.dtService = null;
} else {
Preconditions.checkArgument(nameNodeUri != null, "null URI");
//這里創(chuàng)建代理對象信息
proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, nameNodeUri, nnFallbackToSimpleAuth);
this.dtService = proxyInfo.getDelegationTokenService();
//這里可以看到直接通過 proxyInfo.getProxy()獲取namenode代理對象,并將引用賦值給 this.namenode。并且類型是 ClientProtocol 類型的。
this.namenode = (ClientProtocol)proxyInfo.getProxy();
}
/*下面省略一堆代碼*/
}
可以看到上面已經(jīng)通過 this.namenode = (ClientProtocol)proxyInfo.getProxy(); 獲取到了 namenode 的代理對象,也就是rpc的客戶端對象。下面看看 ClientProtocol 這個是啥東西,因為代理對象是這個類型的。
/*
ClientProtocol.class
這是個接口
*/
public interface ClientProtocol {
long versionID = 69L;
/*
下面主要是定義很多個抽象方法,主要就是用于對hdfs進行操作的接口,比如,open,create等這些常用方法。
*/
}
下面看看 proxyInfo創(chuàng)建代理對象的方法
/*
NameNodeProxiesClient
*/
public static NameNodeProxiesClient.ProxyAndInfo createProxyWithClientProtocol(Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth) throws IOException {
AbstractNNFailoverProxyProvider failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, true, fallbackToSimpleAuth);
if (failoverProxyProvider == null) {
//創(chuàng)建無HA的代理對象
InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
Text dtService = SecurityUtil.buildTokenService(nnAddr);
//創(chuàng)建proxy對象
ClientProtocol proxy = createNonHAProxyWithClientProtocol(nnAddr, conf, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
//ProxyAndInfo是一個靜態(tài)內(nèi)部類,將前面的proxy通過該類封裝后返回,可通過該類的 getProxy 方法返回已創(chuàng)建的proxy對象
return new NameNodeProxiesClient.ProxyAndInfo(proxy, dtService, nnAddr);
} else {
//創(chuàng)建有HA的代理對象
return createHAProxy(conf, nameNodeUri, ClientProtocol.class, failoverProxyProvider);
}
}
可以看到上面是已經(jīng)創(chuàng)建了 proxy對象,并返回,而且我們也可以看到,創(chuàng)建的proxy對象就是clientProtocol類型的。下面看看創(chuàng)建proxy對象的方法 createNonHAProxyWithClientProtocol()
/*
NameNodeProxiesClient
*/
public static ClientProtocol createNonHAProxyWithClientProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(conf, "dfs.client.retry.policy.enabled", false, "dfs.client.retry.policy.spec", "10000,6,60000,10", SafeModeException.class.getName());
long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
//這里是核心代碼,可以明顯看到調(diào)用 RPC 模塊中的方法創(chuàng)建proxy對象
ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB)RPC.getProtocolProxy(ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf), defaultPolicy, fallbackToSimpleAuth).getProxy();
if (withRetries) {
Map methodNameToPolicyMap = new HashMap();
ClientProtocol translatorProxy = new ClientNamenodeProtocolTranslatorPB(proxy);
return (ClientProtocol)RetryProxy.create(ClientProtocol.class, new DefaultFailoverProxyProvider(ClientProtocol.class, translatorProxy), methodNameToPolicyMap, defaultPolicy);
} else {
return new ClientNamenodeProtocolTranslatorPB(proxy);
}
}
所以至此我們可以發(fā)現(xiàn),客戶端和namenode之間通信的方式就是通過RPC實現(xiàn)的。
總結(jié)來說,方法的調(diào)用時序圖如下:
一般來說,上傳操作,首先得
OutputStream os = fs.create(new Path("xxxx"));
即創(chuàng)建文件,然后再上傳文件數(shù)據(jù)。上傳數(shù)據(jù)的流程和普通的流操作沒什么區(qū)別。
下面看看這個 create方法。
/*
FileSystem.class
*/
public abstract FSDataOutputStream create(Path var1, FsPermission var2, boolean var3, int var4, short var5, long var6, Progressable var8) throws IOException;
可以看到這是個抽象方法,前面也說到,它的實現(xiàn)子類是 DistributedFileSystem,這里這里是調(diào)用子類的 create方法,繼續(xù)看
/*
DistributedFileSystem.class
*/
public FSDataOutputStream create(Path f, final FsPermission permission, final EnumSet cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException {
this.statistics.incrementWriteOps(1);
this.storageStatistics.incrementOpCounter(OpType.CREATE);
Path absF = this.fixRelativePart(f);
return (FSDataOutputStream)(new FileSystemLinkResolver() {
public FSDataOutputStream doCall(Path p) throws IOException {
//這里是核心代碼,this.dfs前面說到了就是存儲了DFSClient對象的引用的。可以通過該客戶端調(diào)用很多操作hdfs的方法。這里調(diào)用create方法,創(chuàng)建了一個 DFSOutputStream 對象。輸出流對象
DFSOutputStream dfsos = DistributedFileSystem.this.dfs.create(DistributedFileSystem.this.getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt);
//這里將上面創(chuàng)建的dfsos進行包裝并返回
return DistributedFileSystem.this.dfs.createWrappedOutputStream(dfsos, DistributedFileSystem.this.statistics);
}
public FSDataOutputStream next(FileSystem fs, Path p) throws IOException {
return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt);
}
}).resolve(this, absF);
}
可以看見上面創(chuàng)建返回了 DFSOutputStream 輸出流對象。下面看看DFSClient.create方法的實現(xiàn)代碼。
/*
DFSClient.class
*/
public DFSOutputStream create(String src, FsPermission permission, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException {
this.checkOpen();
FsPermission masked = this.applyUMask(permission);
LOG.debug("{}: masked={}", src, masked);
//創(chuàng)建輸出流對象
DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, this.dfsClientConf.createChecksum(checksumOpt), this.getFavoredNodesStr(favoredNodes));
this.beginFileLease(result.getFileId(), result);
return result;
}
繼續(xù)看 DFSOutputStream.newStreamForCreate 這個方法.
/*
DistributedFileSystem.class
*/
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException {
TraceScope ignored = dfsClient.newPathTraceScope("newStreamForCreate", src);
Throwable var12 = null;
try {
HdfsFileStatus stat = null;
boolean shouldRetry = true;
int retryCount = 10;
while(true) {
if (shouldRetry) {
shouldRetry = false;
try {
//這里是核心代碼,可以看見是調(diào)用 dfsclient.namenode這個代理對象中的create方法創(chuàng)建文件,并返回狀態(tài)
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS);
} catch (RemoteException var27) {
IOException e = var27.unwrapRemoteException(new Class[]{AccessControlException.class, DSQuotaExceededException.class, QuotaByStorageTypeExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class});
if (e instanceof RetryStartFileException) {
if (retryCount <= 0) {
throw new IOException("Too many retries because of encryption zone operations", e);
}
shouldRetry = true;
--retryCount;
continue;
}
throw e;
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
//這里將上面創(chuàng)建文件的狀態(tài)傳入輸出流作為參數(shù)
DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
//看見一個神奇的方法
out.start();
DFSOutputStream var30 = out;
//返回輸出流
return var30;
}
} catch (Throwable var28) {
var12 = var28;
throw var28;
} finally {
if (ignored != null) {
if (var12 != null) {
try {
ignored.close();
} catch (Throwable var26) {
var12.addSuppressed(var26);
}
} else {
ignored.close();
}
}
}
}
上面看到 DFSOutputStream 對象居然有一個 start方法,來看看先。
/*
DFSOutputStream.class
*/
protected synchronized void start() {
this.getStreamer().start();
}
// 繼續(xù)看 this.getStreamer() 這個方法,可以看到這個方法返回的是DataStreamer,繼續(xù)看這個類
protected DataStreamer getStreamer() {
return this.streamer;
}
/*
DataStreamer.class
*/
//可以看到這個類繼承了 Daemon類,而Daemon本身是繼承了 Thread類
class DataStreamer extends Daemon { }
由此可得知,DFSOutputStream 這個類本身并沒有繼承 Thread類,但是使用DataStreamer這個繼承了 Thread類的來新建線程傳輸數(shù)據(jù),不占用當前線程。而在 DataStreamer 這個類中,重寫了 Thread標志性的 run 方法。傳輸數(shù)據(jù)就是在這里完成的。前面說到的 hdfs的 pipeline 也是這個run方法中實現(xiàn)的,里面是一個while死循環(huán),知道傳輸完數(shù)據(jù)為止,或者客戶端關(guān)閉。代碼過長,就不看了。反正看到這里已經(jīng)成功獲取了 client的輸出流對象,后面就是傳統(tǒng)的輸入流和輸出流的對接了,這里不細講了。
方法時序圖如下:
1、FileSystem初始化,Client拿到NameNodeRpcServer代理對象,建立與NameNode的RPC通信
2、調(diào)用FileSystem的create()方法,由于實現(xiàn)類為DistributedFileSystem,所有是調(diào)用該類中的create()方法
3、DistributedFileSystem持有DFSClient的引用,繼續(xù)調(diào)用DFSClient中的create()方法
4、DFSOutputStream提供的靜態(tài)newStreamForCreate()方法中調(diào)用NameNodeRpcServer服務端的create()方法并創(chuàng)建DFSOutputStream輸出流對象返回
5、通過hadoop提供的IOUtil工具類將輸出流輸出到本地
1、基本流程
1)客戶端向namenode請求下載文件,namenode在內(nèi)存的metadata查找對應的文件的元數(shù)據(jù),如果無則返回無,有則返回對應文件的block位置信息。而且,namenode會根據(jù)客戶端所在的位置,根據(jù)datanode以及client之間的距離大小,將返回的 block 的副本的datanode節(jié)點從距離小到大排序,距離最近的datanode則排在第一位。
2)client通過機架感知策略,選擇最近的datanode進行block請求讀取
3)datanode開始傳輸數(shù)據(jù)給client,以packet為單位,并做校驗
4)客戶端接收packet之后,本地緩存,然后再往本地路徑寫入該block。
5)第二塊,第三塊block重復以上過程
注意:
如果在讀數(shù)據(jù)的時候, DFSInputStream和datanode的通訊發(fā)生異常,就會嘗試正在讀的block的排序第二近的datanode,并且會記錄哪個 datanode發(fā)生錯誤,剩余的blocks讀的時候就會直接跳過該datanode。 DFSInputStream也會檢查block數(shù)據(jù)校驗和,如果發(fā)現(xiàn)一個壞的block,就會先報告到namenode節(jié)點,然后 DFSInputStream在其他的datanode上讀該block的鏡像。
client的初始化代碼是一樣的,這里不重復分析了。直接看下載
首先通過 open方法獲取目標文件的輸入流對象。
FSDataInputStream fis = client.open(getPath);
下面看看這個open方法
/*
FileSystem.class
*/
public FSDataInputStream open(Path f) throws IOException {
return this.open(f, this.getConf().getInt("io.file.buffer.size", 4096));
}
public abstract FSDataInputStream open(Path var1, int var2) throws IOException;
可以看到,依舊是抽象方法,所以依舊是調(diào)用 DistributedFileSystem的open方法。
/*
DistributedFileSystem.class
*/
public FSDataInputStream open(Path f, final int bufferSize) throws IOException {
this.statistics.incrementReadOps(1);
this.storageStatistics.incrementOpCounter(OpType.OPEN);
Path absF = this.fixRelativePart(f);
return (FSDataInputStream)(new FileSystemLinkResolver() {
public FSDataInputStream doCall(Path p) throws IOException {
//核心代碼,這里調(diào)用dfsclient的open方法穿件輸入流
DFSInputStream dfsis = DistributedFileSystem.this.dfs.open(DistributedFileSystem.this.getPathName(p), bufferSize, DistributedFileSystem.this.verifyChecksum);
return DistributedFileSystem.this.dfs.createWrappedInputStream(dfsis);
}
public FSDataInputStream next(FileSystem fs, Path p) throws IOException {
return fs.open(p, bufferSize);
}
}).resolve(this, absF);
}
熟悉的套路,依舊調(diào)用 dfsclient的open方法,創(chuàng)建輸入流,下面看看這個open方法
/*
DFSClient.class
*/
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) throws IOException {
this.checkOpen();
TraceScope ignored = this.newPathTraceScope("newDFSInputStream", src);
Throwable var5 = null;
DFSInputStream var6;
try {
//這里直接創(chuàng)建一個輸入流對象,如果按照上面上傳文件的套路來說,應該是 dfsclient.namenode.open(xxx)才對的,這里并沒有
var6 = new DFSInputStream(this, src, verifyChecksum, (LocatedBlocks)null);
} catch (Throwable var15) {
var5 = var15;
throw var15;
} finally {
if (ignored != null) {
if (var5 != null) {
try {
ignored.close();
} catch (Throwable var14) {
var5.addSuppressed(var14);
}
} else {
ignored.close();
}
}
}
return var6;
}
上面并沒有調(diào)用DFSClient.open,而是將DFSClient作為參數(shù)傳入DFSInputStream。下面看看 DFSInputStream 這個神奇的類。
/*
DFSInputStream.class
*/
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, LocatedBlocks locatedBlocks) throws IOException {
//將dfsclinet保存到當前類中
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized(this.infoLock) {
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
}
this.locatedBlocks = locatedBlocks;
//核心方法,開始獲取block信息,如有多少個block,以及每個block所在的datanode節(jié)點名
this.openInfo(false);
}
下面看看 openInfo() 方法
/*
DFSInputStream.class
*/
void openInfo(boolean refreshLocatedBlocks) throws IOException {
DfsClientConf conf = this.dfsClient.getConf();
synchronized(this.infoLock) {
//獲取block的位置信息以及最后一個block的長度(因為最后一個block肯定不是完整的128M的長度)
this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
int retriesForLastBlockLength;
for(retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength(); retriesForLastBlockLength > 0 && this.lastBlockBeingWrittenLength == -1L; --retriesForLastBlockLength) {
DFSClient.LOG.warn("Last block locations not available. Datanodes might not have reported blocks completely. Will retry for " + retriesForLastBlockLength + " times");
this.waitFor(conf.getRetryIntervalForGetLastBlockLength());
this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength(true);
}
if (this.lastBlockBeingWrittenLength == -1L && retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
}
}
}
下面看看 fetchLocatedBlocksAndGetLastBlockLength 這個獲取block信息的方法
/*
DFSInputStream.class
*/
private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh) throws IOException {
LocatedBlocks newInfo = this.locatedBlocks;
if (this.locatedBlocks == null || refresh) {
//可以看到這里是調(diào)用 dfsclient中的方法倆獲取block信息
newInfo = this.dfsClient.getLocatedBlocks(this.src, 0L);
}
DFSClient.LOG.debug("newInfo = {}", newInfo);
if (newInfo == null) {
throw new IOException("Cannot open filename " + this.src);
} else {
if (this.locatedBlocks != null) {
Iterator oldIter = this.locatedBlocks.getLocatedBlocks().iterator();
Iterator newIter = newInfo.getLocatedBlocks().iterator();
while(oldIter.hasNext() && newIter.hasNext()) {
if (!((LocatedBlock)oldIter.next()).getBlock().equals(((LocatedBlock)newIter.next()).getBlock())) {
throw new IOException("Blocklist for " + this.src + " has changed!");
}
}
}
this.locatedBlocks = newInfo;
long lastBlockBeingWrittenLength = 0L;
if (!this.locatedBlocks.isLastBlockComplete()) {
LocatedBlock last = this.locatedBlocks.getLastLocatedBlock();
if (last != null) {
if (last.getLocations().length == 0) {
if (last.getBlockSize() == 0L) {
return 0L;
}
return -1L;
}
long len = this.readBlockLength(last);
last.getBlock().setNumBytes(len);
lastBlockBeingWrittenLength = len;
}
}
this.fileEncryptionInfo = this.locatedBlocks.getFileEncryptionInfo();
return lastBlockBeingWrittenLength;
}
}
看到上面又回到調(diào)用 dfsClient.getLocatedBlocks,看看這個方法
/*
DFSClient.class
*/
public LocatedBlocks getLocatedBlocks(String src, long start) throws IOException {
return this.getLocatedBlocks(src, start, this.dfsClientConf.getPrefetchSize());
}
//繼續(xù)調(diào)用下面這個方法
public LocatedBlocks getLocatedBlocks(String src, long start, long length) throws IOException {
TraceScope ignored = this.newPathTraceScope("getBlockLocations", src);
Throwable var7 = null;
LocatedBlocks var8;
try {
//調(diào)用這個靜態(tài)方法獲取 block位置信息
var8 = callGetBlockLocations(this.namenode, src, start, length);
} catch (Throwable var17) {
var7 = var17;
throw var17;
} finally {
if (ignored != null) {
if (var7 != null) {
try {
ignored.close();
} catch (Throwable var16) {
var7.addSuppressed(var16);
}
} else {
ignored.close();
}
}
}
return var8;
}
//繼續(xù)看
static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, String src, long start, long length) throws IOException {
try {
//熟悉的味道,通過 namenode 的代理對象獲取block信息
return namenode.getBlockLocations(src, start, length);
} catch (RemoteException var7) {
throw var7.unwrapRemoteException(new Class[]{AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class});
}
}
上面可以看到,仍舊是通過 namenode代理對象發(fā)起操作,下面看看 namenode.getBlockLocations。因為代理對象的類型是 ClientProtocol類型的,是個接口,所以得到實現(xiàn)子類中查看 ,ClientNamenodeProtocolTranslatorPB這個類。
/*
ClientNamenodeProtocolTranslatorPB.class
*/
public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException {
GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto.newBuilder().setSrc(src).setOffset(offset).setLength(length).build();
try {
//熟悉的味道,調(diào)用 rcpProxy 向namenode server 發(fā)起操作。
GetBlockLocationsResponseProto resp = this.rpcProxy.getBlockLocations((RpcController)null, req);
return resp.hasLocations() ? PBHelperClient.convert(resp.getLocations()) : null;
} catch (ServiceException var8) {
throw ProtobufHelper.getRemoteException(var8);
}
}
看到這里,下面就是RPC底層的操作了。
方法時序圖如下:
1、FileSystem初始化,Client拿到NameNodeRpcServer代理對象,建立與NameNode的RPC通信(與前面一樣)
2、調(diào)用FileSystem的open()方法,由于實現(xiàn)類為DistributedFileSystem,所有是調(diào)用該類中的open()方法
3、DistributedFileSystem持有DFSClient的引用,繼續(xù)調(diào)用DFSClient中的open()方法
4、實例化DFSInputStream輸入流
5、調(diào)用openinfo()方法
6、調(diào)用fetchLocatedBlocksAndGetLastBlockLength()方法,抓取block信息并獲取最后block長度
7、調(diào)用DFSClient中的getLocatedBlocks()方法,獲取block信息
8、在callGetBlockLocations()方法中通過NameNode代理對象調(diào)用NameNodeRpcServer的getBlockLocations()方法
9、將block信息寫入輸出流,在8中會將 block 位置信息保存到DFSInputStream輸入流對象中的成員變量中
10、交給IOUtil,下載文件到本地