這篇文章主要講解了“HBase Scan流程怎么理解”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“HBase Scan流程怎么理解”吧!
超過十載行業(yè)經(jīng)驗(yàn),技術(shù)領(lǐng)先,服務(wù)至上的經(jīng)營(yíng)模式,全靠網(wǎng)絡(luò)和口碑獲得客戶,為自己降低成本,也就是為客戶降低成本。到目前業(yè)務(wù)范圍包括了:成都網(wǎng)站建設(shè)、成都做網(wǎng)站,成都網(wǎng)站推廣,成都網(wǎng)站優(yōu)化,整體網(wǎng)絡(luò)托管,微信小程序開發(fā),微信開發(fā),app軟件開發(fā),同時(shí)也可以讓客戶的網(wǎng)站和網(wǎng)絡(luò)營(yíng)銷和我們一樣獲得訂單和生意!
HBase的讀流程目前看來(lái)比較復(fù)雜,主要由于:
HBase的表數(shù)據(jù)分為多個(gè)層次,HRegion->HStore->[HFile,HFile,...,MemStore]
RegionServer的LSM-Like存儲(chǔ)引擎,不斷flush產(chǎn)生新的HFile,同時(shí)產(chǎn)生新的MemStore用于后續(xù)數(shù)據(jù)寫入,并且為了防止由于HFile過多而導(dǎo)致Scan時(shí)需要掃描的文件過多而導(dǎo)致的性能下降,后臺(tái)線程會(huì)適時(shí)的進(jìn)行Compaction,Compaction的過程會(huì)產(chǎn)生新的HFile,并且會(huì)刪除Compact完成的HFile
具體實(shí)現(xiàn)中的各種優(yōu)化,比如lazy seek優(yōu)化,導(dǎo)致代碼比較復(fù)雜
讀流程中充斥著各種Scanner,如下圖:
+--------------+ | | +-----------+ RegionScanner+----------+ | +------+-------+ | | | | | | | +-----v+-------+ +------v-------+ +------v+------+ | | | | | | | StoreScanner | | StoreScanner | | StoreScanner | | | | | | | +--------------+ +--+---+-----+-+ +--------------+ | | | +-----------------------+ | +----------+ | | | | | | +-------v---------+ +-------------v----+ +---------v------+ | | | | | | |StoreFileScanner | |StoreFileScanner | | MemStoreScanner| | | | | | | +-------+---------+ +--------+---------+ +-------+--------+ | | | | | | | | | | | | +-------v---------+ +--------v---------+ +-------v--------+ | | | | | | | HFileScanner | | HFileScanner | | HFileScanner | | | | | | | +-----------------+ +------------------+ +----------------+
在HBase中,一張表可以有多個(gè)Column Family,在一次Scan的流程中,每個(gè)Column Family(后續(xù)叫Store)的數(shù)據(jù)讀取由一個(gè)StoreScanner對(duì)象負(fù)責(zé)。每個(gè)Store的數(shù)據(jù)由一個(gè)內(nèi)存中的MemStore和磁盤上的HFile文件組成,相對(duì)應(yīng)的,StoreScanner對(duì)象雇傭一個(gè)MemStoreScanner和N個(gè)StoreFileScanner來(lái)進(jìn)行實(shí)際的數(shù)據(jù)讀取。
從邏輯上看,讀取一行的數(shù)據(jù)需要
按照順序讀取出每個(gè)Store
對(duì)于每個(gè)Store,合并Store下面的相關(guān)的HFile和內(nèi)存中的MemStore
實(shí)現(xiàn)上,這兩步都是通過堆完成。RegionScanner的讀取通過下面的多個(gè)StoreScanner組成的堆
完成,使用RegionScanner的成員變量KeyValueHeap storeHeap表示
組成StoreScanner的多個(gè)Scanner在RegionScannerImpl構(gòu)造函數(shù)中獲得:
for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); // 實(shí)際是StoreScanner類型 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { scanners.add(scanner); } else { joinedScanners.add(scanner); } }
store.getScanner(scan, entry.getValue(), this.readPt)內(nèi)部就是new 一個(gè)StoreScanner,邏輯都在StoreScanner的構(gòu)造函數(shù)中
構(gòu)造函數(shù)內(nèi)部其實(shí)就是找到相關(guān)的HFile和MemStore,然后建堆,注意,這個(gè)堆是StoreScanner級(jí)別的,一個(gè)StoreScanner一個(gè)堆,堆中的元素就是底下包含的HFile和MemStore對(duì)應(yīng)的StoreFileScanner和MemStoreScanner
得到相關(guān)的HFile和MemStore邏輯在StoreScanner::getScannersNoCompaction()中,內(nèi)部會(huì)根據(jù)請(qǐng)求指定的TimeRange,KeyRange過濾掉不需要的HFile,同時(shí)也會(huì)利用bloom filter過濾掉不需要的HFIle.接著,調(diào)用
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, isParallelSeekEnabled);
對(duì)這些StoreFileScanner和MemStoreScanner分別進(jìn)行seek,seekKey是matcher.getStartKey(),
如下構(gòu)造
return new KeyValue(row, family, null, HConstants.LATEST_TIMESTAMP, Type.DeleteFamily);
seek是針對(duì)KeyValue的,seek的語(yǔ)義是seek到指定KeyValue,如果指定KeyValue不存在,則seek到指定KeyValue的下一
個(gè)。舉例來(lái)說(shuō),假設(shè)名為X的column family里有兩列a和b,文件中有兩行rowkey分別為aaa和
bbb,如下表所示.
Column Family X | ||
rowkey | column a | column b |
aaa | 1 | abc |
bbb | 2 | def |
HBase客戶端設(shè)置scan請(qǐng)求的start key為aaa,那么matcher.getStartKey()會(huì)被初始化為(rowkey, family, qualifier,timestamp,type)=(aaa,X,null,LATEST_TIMESTAMP,Type.DeleteFamily),根據(jù)KeyValue的比較原則,這個(gè)KeyValue比aaa行的第一個(gè)列a更
小(因?yàn)闆]有qualifier),所以對(duì)這個(gè)StoreFileScanner seek時(shí),會(huì)seek到aaa這行的第一列a
實(shí)際上
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, isParallelSeekEnabled);
有可能不會(huì)對(duì)StoreFileScanner進(jìn)行實(shí)際的seek,而是進(jìn)行l(wèi)azy seek,seek的工作放到不得不做的時(shí)候。后續(xù)會(huì)專門說(shuō)lazy seek
上面得到了請(qǐng)求scan涉及到的所有的column family對(duì)應(yīng)的StoreScanner,隨后調(diào)用如下函數(shù)進(jìn)行建堆:
protected void initializeKVHeap(Listscanners, List joinedScanners, HRegion region) throws IOException { this.storeHeap = new KeyValueHeap(scanners, region.comparator); if (!joinedScanners.isEmpty()) { this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator); } }
KeyValueScanner是一個(gè)接口,表示一個(gè)可以向外迭代出KeyValue
的Scanner,StoreFileScanner,MemStoreScanner和StoreScanner都實(shí)現(xiàn)了該接口。這里的comparator類型為KVScannerComparator,用于比較兩個(gè)KeyValueScanner,實(shí)際上內(nèi)部使用了KVComparator,它是用來(lái)比較兩個(gè)KeyValue的。從后面可以看出,實(shí)際上,這個(gè)由KeyValueScanner組成的堆,堆頂KeyValueScanner滿足的特征是: 它的堆頂(KeyValue)最小
堆用類KeyValueHeap表示,看KeyValueHeap構(gòu)造函數(shù)做了什么
KeyValueHeap(List extends KeyValueScanner> scanners, KVScannerComparator comparator) throws IOException { this.comparator = comparator; if (!scanners.isEmpty()) { // 根據(jù)傳入的KeyValueScanner構(gòu)造出一個(gè)優(yōu)先級(jí)隊(duì)列(內(nèi)部實(shí)現(xiàn)就是堆) this.heap = new PriorityQueue(scanners.size(), this.comparator); for (KeyValueScanner scanner : scanners) { if (scanner.peek() != null) { this.heap.add(scanner); } else { scanner.close(); } } //以上將元素加入堆中 // 從堆頂pop出一個(gè)KeyValueScanner放入成員變量current,那么這個(gè)堆的堆頂 // 就是current這個(gè)KeyValueScanner的堆頂,KeyValueHeap的peek()取堆頂 // 操作直接返回current.peek() this.current = pollRealKV(); } }
在看pollRealKV()怎么做的之前需要先看看HBase 0.94引入的Lazy Seek
在這個(gè)優(yōu)化之前,讀取一個(gè)column family(Store),需要seek其下的所有HFile和MemStore到指定的查詢KeyValue(seek的語(yǔ)義為如果KeyValue存在則seek到對(duì)應(yīng)位置,如果不存在,則seek到這個(gè)KeyValue的后一個(gè)KeyValue,假設(shè)Store下有3個(gè)HFile和一個(gè)MemStore,按照時(shí)序遞增記為[HFile1, HFile2, HFile3, MemStore],在lazy seek優(yōu)化之前,需要對(duì)所有的HFile和MemStore進(jìn)行seek,對(duì)HFile文件的seek比較慢,往往需要將HFile相應(yīng)的block加載到內(nèi)存,然后定位。在有了lazy seek優(yōu)化之后,如果需要的KeyValue在HFile3中就存在,那么HFIle1和HFile2都不需要進(jìn)行seek,大大提高速度。大體來(lái)說(shuō),思路是請(qǐng)求seek某個(gè)KeyValue時(shí)實(shí)際上沒有對(duì)StoreFileScanner進(jìn)行真正的seek,而是對(duì)于每個(gè)StoreFileScanner,設(shè)置它的peek為(rowkey,family,qualifier,lastTimestampInStoreFile)
KeyValueHeap有兩個(gè)重要的接口,peek()和next(),他們都是返回堆頂,區(qū)別在于next()會(huì)將堆頂出堆,然后重新調(diào)整堆,對(duì)外來(lái)說(shuō)就是迭代器向前移動(dòng),而peek()不會(huì)將堆頂出堆,堆頂不變。實(shí)現(xiàn)中,
peek()操作非常簡(jiǎn)單,只需要調(diào)用堆的成員變量current的peek()方法操作即可.拿StoreScanner堆舉例,current要么是StoreFileScanner類型要么是MemStore,那么到底current是如何選擇出來(lái)的以及Lazy Seek是如何實(shí)現(xiàn)的?
下面舉個(gè)例子說(shuō)明。
HBase開啟了Lazy Seek優(yōu)化(實(shí)際上默認(rèn)開啟)
Store下有三個(gè)HFile和MemStore,按照時(shí)間順序記作[HFile1,HFile2,HFile3,MemStore],seek KeyValue為(rowkey,family,qualifier,timestamp),記作seekKV.
并且它只在HFile3中存在,不在其他HFile和MemStore中存在
seekScanner()的邏輯,如果是lazy seek,則對(duì)于每個(gè)Scanner都調(diào)
用requestSeek(seekKV)方法,方法內(nèi)部首先進(jìn)行rowcol類型的bloom filter過濾
如果結(jié)果判定seekKV在StoreFile中肯定不存在,則直接設(shè)置StoreFileScanner的peek(實(shí)際上StoreFileScanner不是一個(gè)
堆只是為了統(tǒng)一代碼)為 kv.createLastOnRowCol(),并且將realSeekDone設(shè)置true,表示實(shí)際的seek完成.
public KeyValue createLastOnRowCol() { return new KeyValue( bytes, getRowOffset(), getRowLength(), bytes, getFamilyOffset(), getFamilyLength(), bytes, getQualifierOffset(), getQualifierLength(), HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0); }
可以看出ts設(shè)置為最小,說(shuō)明這個(gè)KeyValue排在所有的同rowkey同column family同qualifier的KeyValue最后。顯然,當(dāng)上層StoreScanner取堆頂時(shí),
如果其它StoreFileScanner/MemStoreScanner中存在同rowkey同column family同qualifier的真實(shí)的KeyValue則會(huì)優(yōu)先彈出。
如果seekKV在StoreFile中,那么會(huì)執(zhí)行如下邏輯:
realSeekDone = false; long maxTimestampInFile = reader.getMaxTimestamp(); long seekTimestamp = kv.getTimestamp(); if (seekTimestamp > maxTimestampInFile) { // Create a fake key that is not greater than the real next key. // (Lower timestamps correspond to higher KVs.) // To understand this better, consider that we are asked to seek // to // a higher timestamp than the max timestamp in this file. We // know that // the next point when we have to consider this file again is // when we // pass the max timestamp of this file (with the same // row/column). cur = kv.createFirstOnRowColTS(maxTimestampInFile); } else { enforceSeek(); }
顯然,當(dāng)kv的ts比HFile中最大的ts都更大時(shí),那么這個(gè)HFile中顯然不存在seekKV,但是可能存在
相同rowkey,family,qualifier的不同ts的KeyValue,那么這里設(shè)置堆頂時(shí)要注意,不能把堆頂設(shè)置為比當(dāng)前HFile文件中的可能真實(shí)存在的相同rowkey,family,qualifier的KeyValue大,如下:
public KeyValue createFirstOnRowColTS(long ts) { return new KeyValue( bytes, getRowOffset(), getRowLength(), bytes, getFamilyOffset(), getFamilyLength(), bytes, getQualifierOffset(), getQualifierLength(), ts, Type.Maximum, bytes, getValueOffset(), getValueLength()); }
Type的比較中,Type.Maximum最小,這樣產(chǎn)生的KeyValue保證了不會(huì)大于當(dāng)前HFile文件中的可能存在的相同rowkey,family,qualifier的KeyValue,同時(shí)將seekKV保存到StoreFileScanner成員變量delayedSeekKV中,以便后續(xù)真正seek的時(shí)候獲取.
考慮一下如果seekKV的ts比當(dāng)前HFile中的maxTimestamp更小怎么辦?可以設(shè)置一個(gè)ts為latest_timestamp
的KeyValue么?如果設(shè)置了,它會(huì)比其它HFile中存在實(shí)際的KeyValue先彈出,這樣順序就亂了,所以這種情況下,只能進(jìn)行實(shí)際的seek,enforceSeek()函數(shù)中進(jìn)行實(shí)際的seek后,將realSeekDone設(shè)置為
true.
因?yàn)镠File3的latestTimestampInStoreFile最大,所以會(huì)首先取到HFile3對(duì)應(yīng)的StoreFileScanner的pee
k(KeyValue的比較原則是timestamp大的KeyValue更小),
這個(gè)時(shí)候會(huì)檢查這個(gè)KeyValueScanner是否進(jìn)行了實(shí)際的seek(對(duì)于StoreFileScanner來(lái)說(shuō),通過布爾變量realSeekDone進(jìn)行標(biāo)記,對(duì)于MemStoreScanner來(lái)說(shuō),始終返回true),在這里,沒有進(jìn)行real seek
,接著進(jìn)行實(shí)際的seek操作,seek到HFile3中存在的seekKV,接著拿著seekKV去和HFile2的peek進(jìn)行比較,顯然seekKV比HFile2的peek小(由于timestamp > lastTimestampInStoreFile2),故
StoreScanner的peek操作返回seekKV。
實(shí)現(xiàn)中,KeyValueHeap有兩個(gè)重要的接口,peek()和next(),他們都是返回堆頂,區(qū)別在于next()會(huì)將堆頂出堆,然后重新調(diào)整堆,對(duì)外來(lái)說(shuō)就是迭代器向前移動(dòng),而peek()不會(huì)將堆頂出堆,堆頂不變。實(shí)現(xiàn)中,
peek()操作非常簡(jiǎn)單,只需要調(diào)用堆的成員變量current的peek()方法操作即可.拿StoreScanner堆舉例,current要么是StoreFileScanner類型要么是MemStore,而current的選擇則是pollRealKV()
完成的,這個(gè)函數(shù)之所以內(nèi)部有while循環(huán)就是因?yàn)榭紤]了Lazy Seek優(yōu)化,實(shí)際上,pollRealKV()代碼的邏輯就是例子中"取StoreScanner堆頂邏輯"。pollRealKV()的返回值會(huì)賦給current
protected KeyValueScanner pollRealKV() throws IOException { KeyValueScanner kvScanner = heap.poll(); if (kvScanner == null) { return null; } while (kvScanner != null && !kvScanner.realSeekDone()) { if (kvScanner.peek() != null) { kvScanner.enforceSeek(); KeyValue curKV = kvScanner.peek(); if (curKV != null) { KeyValueScanner nextEarliestScanner = heap.peek(); if (nextEarliestScanner == null) { // The heap is empty. Return the only possible scanner. return kvScanner; } // Compare the current scanner to the next scanner. We try to avoid // putting the current one back into the heap if possible. KeyValue nextKV = nextEarliestScanner.peek(); if (nextKV == null || comparator.compare(curKV, nextKV) < 0) { // We already have the scanner with the earliest KV, so return it. return kvScanner; } // Otherwise, put the scanner back into the heap and let it compete // against all other scanners (both those that have done a "real // seek" and a "lazy seek"). heap.add(kvScanner); } else { // Close the scanner because we did a real seek and found out there // are no more KVs. kvScanner.close(); } } else { // Close the scanner because it has already run out of KVs even before // we had to do a real seek on it. kvScanner.close(); } kvScanner = heap.poll(); } return kvScanner; }
內(nèi)存中的Memstore被flush到文件系統(tǒng)或者compaction完成都會(huì)改變Store的HFile文件集合。
在每次做完一批mutate操作后,會(huì)通過HRegion::isFlushSize(newSize)檢查是否需要對(duì)當(dāng)前HRegion內(nèi)的memstore進(jìn)行flush
其實(shí)就是判斷HRegion內(nèi)的所有的memstore大小和是否大于hbase.hregion.memstore.flush.size,默認(rèn)128MB,如果需要flush,會(huì)將請(qǐng)求放入后臺(tái)flush線程(MemStoreFlusher)的隊(duì)列中,由后臺(tái)flush線程處理,調(diào)用路徑HRegion::flushcache()->internalFlushcache(...)->StoreFlushContext.flushCache(...)->StoreFlushContext.commit(...)=>HStore::updateStorefiles(),這塊邏輯在HBase Snapshot原理和實(shí)現(xiàn)中有講到,這里不贅述。只說(shuō)一下最后一步的updateStorefiles()操作,該函數(shù)主要工作是拿住HStore級(jí)別的寫鎖,然后將新產(chǎn)生的HFile文件插入到StoreEngine中,解寫鎖,然后釋放snapshot,最后調(diào)用
notifyChangedReadersObservers(),如下:
this.lock.writeLock().lock(); try { this.storeEngine.getStoreFileManager().insertNewFiles(sfs); this.memstore.clearSnapshot(set); } finally { // We need the lock, as long as we are updating the storeFiles // or changing the memstore. Let us release it before calling // notifyChangeReadersObservers. See HBASE-4485 for a possible // deadlock scenario that could have happened if continue to hold // the lock. this.lock.writeLock().unlock(); } // Tell listeners of the change in readers. notifyChangedReadersObservers();
重點(diǎn)在于notifyChangedReadersObservers(),看看代碼:
private void notifyChangedReadersObservers() throws IOException { for (ChangedReadersObserver o: this.changedReaderObservers) { o.updateReaders(); } }
實(shí)際上,每個(gè)observer類型都是StoreScanner,每次新開一個(gè)StoreScanner都會(huì)注冊(cè)在Store內(nèi)部的這個(gè)observer集合中,當(dāng)Store下面的HFile集合變化時(shí),通知這些注冊(cè)上來(lái)的StoreScanner即可。
具體的通知方式就是首先拿住StoreScanner的鎖,將這個(gè)時(shí)候的堆頂保存在成員變量lastTop中,
然后將StoreScanner內(nèi)部的堆置為null(this.heap=null)最后解鎖,而StoreScanner那邊next/seek/reseek時(shí),都會(huì)首先通過函數(shù)checkReseek()函數(shù)來(lái)檢查是否this.heap為null,為null
,為null說(shuō)明當(dāng)前Store下的HFile集合改變了,那么調(diào)用resetScannerStack(lastTop),將當(dāng)前
Store下的所有StoreFileScanner/MemStoreScanner都seek到lastTop,然后重新建StoreScanner對(duì)應(yīng)的堆。checkReseek()代碼如下:
protected boolean checkReseek() throws IOException { if (this.heap == null && this.lastTop != null) { resetScannerStack(this.lastTop); if (this.heap.peek() == null || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) { LOG.debug("Storescanner.peek() is changed where before = " + this.lastTop.toString() + ",and after = " + this.heap.peek()); this.lastTop = null; return true; } this.lastTop = null; // gone! } // else dont need to reseek return false; }
感謝各位的閱讀,以上就是“HBase Scan流程怎么理解”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)HBase Scan流程怎么理解這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!