真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

HBase性能優(yōu)化方法分享

本篇內(nèi)容介紹了“HBase性能優(yōu)化方法分享”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

在依蘭等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都做網(wǎng)站、網(wǎng)站制作 網(wǎng)站設(shè)計制作定制網(wǎng)站制作,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),營銷型網(wǎng)站建設(shè),成都外貿(mào)網(wǎng)站制作,依蘭網(wǎng)站建設(shè)費用合理。

1. 表的設(shè)計

1.1 Pre-Creating Regions

默認(rèn)情況下,在創(chuàng)建HBase表的時候會自動創(chuàng)建一個region分區(qū),當(dāng)導(dǎo)入數(shù)據(jù)的時候,所有的HBase客戶端都向這一個region寫數(shù) 據(jù),直到這個region足夠大了才進行切分。一種可以加快批量寫入速度的方法是通過預(yù)先創(chuàng)建一些空的regions,這樣當(dāng)數(shù)據(jù)寫入HBase時,會按 照region分區(qū)情況,在集群內(nèi)做數(shù)據(jù)的負(fù)載均衡。

有關(guān)預(yù)分區(qū),詳情參見:Table Creation: Pre-Creating Regions,下面是一個例子:

public static boolean createTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits)
  throws IOException {
  try {
    admin.createTable(table, splits);
    return true;
  } catch (TableExistsException e) {
    logger.info("table " + table.getNameAsString() + " already exists");
    // the table already exists...
    return false;
  }
}
 
public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) {
  byte[][] splits = new byte[numRegions-1][];
  BigInteger lowestKey = new BigInteger(startKey, 16);
  BigInteger highestKey = new BigInteger(endKey, 16);
  BigInteger range = highestKey.subtract(lowestKey);
  BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions));
  lowestKey = lowestKey.add(regionIncrement);
  for(int i=0; i < numRegions-1;i++) {
    BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
    byte[] b = String.format("%016x", key).getBytes();
    splits[i] = b;
  }
  return splits;
}

1.2 Row Key

HBase中row key用來檢索表中的記錄,支持以下三種方式:

  • 通過單個row key訪問:即按照某個row key鍵值進行g(shù)et操作;

  • 通過row key的range進行scan:即通過設(shè)置startRowKey和endRowKey,在這個范圍內(nèi)進行掃描;

  • 全表掃描:即直接掃描整張表中所有行記錄。

在HBase中,row key可以是任意字符串,最大長度64KB,實際應(yīng)用中一般為10~100bytes,存為byte[]字節(jié)數(shù)組,一般設(shè)計成定長的

row key是按照字典序存儲,因此,設(shè)計row key時,要充分利用這個排序特點,將經(jīng)常一起讀取的數(shù)據(jù)存儲到一塊,將最近可能會被訪問的數(shù)據(jù)放在一塊。

舉個例子:如果最近寫入HBase表中的數(shù)據(jù)是最可能被訪問的,可以考慮將時間戳作為row key的一部分,由于是字典序排序,所以可以使用Long.MAX_VALUE – timestamp作為row key,這樣能保證新寫入的數(shù)據(jù)在讀取時可以被快速命中。

1.3 Column Family

不要在一張表里定義太多的column family。目前Hbase并 不能很好的處理超過2~3個column family的表。因為某個column family在flush的時候,它鄰近的column family也會因關(guān)聯(lián)效應(yīng)被觸發(fā)flush,最終導(dǎo)致系統(tǒng)產(chǎn)生更多的I/O。感興趣的同學(xué)可以對自己的HBase集群進行實際測試,從得到的測試結(jié)果數(shù) 據(jù)驗證一下。

1.4 In Memory

創(chuàng)建表的時候,可以通過HColumnDescriptor.setInMemory(true)將表放到RegionServer的緩存中,保證在讀取的時候被cache命中。

1.5 Max Version

創(chuàng)建表的時候,可以通過HColumnDescriptor.setMaxVersions(int maxVersions)設(shè)置表中數(shù)據(jù)的最大版本,如果只需要保存最新版本的數(shù)據(jù),那么可以設(shè)置setMaxVersions(1)。

1.6 Time To Live

創(chuàng)建表的時候,可以通過HColumnDescriptor.setTimeToLive(int timeToLive)設(shè)置表中數(shù)據(jù)的存儲生命期,過期數(shù)據(jù)將自動被刪除,例如如果只需要存儲最近兩天的數(shù)據(jù),那么可以設(shè)置 setTimeToLive(2 * 24 * 60 * 60)。

1.7 Compact & Split

在HBase中,數(shù)據(jù)在更新時首先寫入WAL 日志(HLog)和內(nèi)存(MemStore)中,MemStore中的數(shù)據(jù)是排序的,當(dāng)MemStore累計到一定閾值時,就會創(chuàng)建一個新的 MemStore,并且將老的MemStore添加到flush隊列,由單獨的線程flush到磁盤上,成為一個StoreFile。于此同時, 系統(tǒng)會在zookeeper中記錄一個redo point,表示這個時刻之前的變更已經(jīng)持久化了(minor compact)

StoreFile是只讀的,一旦創(chuàng)建后就不可以再修改。因此Hbase的更新其實是不斷追加的操作。當(dāng)一個Store中的StoreFile達到一定的閾值后,就會進行一次合并(major compact),將對同一個key的修改合并到一起,形成一個大的StoreFile,當(dāng)StoreFile的大小達到一定閾值后,又會對 StoreFile進行分割(split),等分為兩個StoreFile。

由于對表的更新是不斷追加的,處理讀請求時,需要訪問Store中全部的StoreFile和MemStore,將它們按照row key進行合并,由于StoreFile和MemStore都是經(jīng)過排序的,并且StoreFile帶有內(nèi)存中索引,通常合并過程還是比較快的。

實際應(yīng)用中,可以考慮必要時手動進行major compact,將同一個row key的修改進行合并形成一個大的StoreFile。同時,可以將StoreFile設(shè)置大些,減少split的發(fā)生。

2. 寫表操作

2.1 多HTable并發(fā)寫

創(chuàng)建多個HTable客戶端用于寫操作,提高寫數(shù)據(jù)的吞吐量,一個例子:

static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = “user_log”;
wTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
    wTableLog[i] = new HTable(conf, table_log_name);
    wTableLog[i].setWriteBufferSize(5 * 1024 * 1024); //5MB
    wTableLog[i].setAutoFlush(false);
}

2.2 HTable參數(shù)設(shè)置

2.2.1 Auto Flush

通過調(diào)用HTable.setAutoFlush(false)方法可以將HTable寫客戶端的自動flush關(guān)閉,這樣可以批量寫入數(shù)據(jù)到 HBase,而不是有一條put就執(zhí)行一次更新,只有當(dāng)put填滿客戶端寫緩存時,才實際向HBase服務(wù)端發(fā)起寫請求。默認(rèn)情況下auto flush是開啟的。

2.2.2 Write Buffer

通過調(diào)用HTable.setWriteBufferSize(writeBufferSize)方法可以設(shè)置HTable客戶端的寫 buffer大小,如果新設(shè)置的buffer小于當(dāng)前寫buffer中的數(shù)據(jù)時,buffer將會被flush到服務(wù)端。其 中,writeBufferSize的單位是byte字節(jié)數(shù),可以根據(jù)實際寫入數(shù)據(jù)量的多少來設(shè)置該值。

2.2.3 WAL Flag

在HBae中,客戶端向集群中的RegionServer提交數(shù)據(jù)時(Put/Delete操作),首先會先寫WAL(Write Ahead Log)日志(即HLog,一個RegionServer上的所有Region共享一個HLog),只有當(dāng)WAL日志寫成功后,再接著寫 MemStore,然后客戶端被通知提交數(shù)據(jù)成功;如果寫WAL日志失敗,客戶端則被通知提交失敗。這樣做的好處是可以做到RegionServer宕機 后的數(shù)據(jù)恢復(fù)。

因此,對于相對不太重要的數(shù)據(jù),可以在Put/Delete操作時,通過調(diào)用Put.setWriteToWAL(false)或Delete.setWriteToWAL(false)函數(shù),放棄寫WAL日志,從而提高數(shù)據(jù)寫入的性能。

值得注意的是:謹(jǐn)慎選擇關(guān)閉WAL日志,因為這樣的話,一旦RegionServer宕機,Put/Delete的數(shù)據(jù)將會無法根據(jù)WAL日志進行恢復(fù)。

2.3 批量寫

通過調(diào)用HTable.put(Put)方法可以將一個指定的row key記錄寫入HBase,同樣HBase提供了另一個方法:通過調(diào)用HTable.put(List)方法可以將指定的row key列表,批量寫入多行記錄,這樣做的好處是批量執(zhí)行,只需要一次網(wǎng)絡(luò)I/O開銷,這對于對數(shù)據(jù)實時性要求高,網(wǎng)絡(luò)傳輸RTT高的情景下可能帶來明顯的 性能提升。

2.4 多線程并發(fā)寫

在客戶端開啟多個HTable寫線程,每個寫線程負(fù)責(zé)一個HTable對象的flush操作,這樣結(jié)合定時flush和寫 buffer(writeBufferSize),可以既保證在數(shù)據(jù)量小的時候,數(shù)據(jù)可以在較短時間內(nèi)被flush(如1秒內(nèi)),同時又保證在數(shù)據(jù)量大的 時候,寫buffer一滿就及時進行flush。下面給個具體的例子:

for (int i = 0; i < threadN; i++) {
    Thread th = new Thread() {
        public void run() {
            while (true) {
                try {
                    sleep(1000); //1 second
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                                synchronized (wTableLog[i]) {
                    try {
                        wTableLog[i].flushCommits();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
                }
    };
    th.setDaemon(true);
    th.start();
}

3. 讀表操作

3.1 多HTable并發(fā)讀

創(chuàng)建多個HTable客戶端用于讀操作,提高讀數(shù)據(jù)的吞吐量,一個例子:

static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = “user_log”;
rTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
    rTableLog[i] = new HTable(conf, table_log_name);
    rTableLog[i].setScannerCaching(50);
}

3.2 HTable參數(shù)設(shè)置

3.2.1 Scanner Caching

通過調(diào)用HTable.setScannerCaching(int scannerCaching)可以設(shè)置HBase scanner一次從服務(wù)端抓取的數(shù)據(jù)條數(shù),默認(rèn)情況下一次一條。通過將此值設(shè)置成一個合理的值,可以減少scan過程中next()的時間開銷,代價是 scanner需要通過客戶端的內(nèi)存來維持這些被cache的行記錄。

3.2.2 Scan Attribute Selection

scan時指定需要的Column Family,可以減少網(wǎng)絡(luò)傳輸數(shù)據(jù)量,否則默認(rèn)scan操作會返回整行所有Column Family的數(shù)據(jù)。

3.2.3 Close ResultScanner

通過scan取完數(shù)據(jù)后,記得要關(guān)閉ResultScanner,否則RegionServer可能會出現(xiàn)問題(對應(yīng)的Server資源無法釋放)。

3.3 批量讀

通過調(diào)用HTable.get(Get)方法可以根據(jù)一個指定的row key獲取一行記錄,同樣HBase提供了另一個方法:通過調(diào)用HTable.get(List)方法可以根據(jù)一個指定的row key列表,批量獲取多行記錄,這樣做的好處是批量執(zhí)行,只需要一次網(wǎng)絡(luò)I/O開銷,這對于對數(shù)據(jù)實時性要求高而且網(wǎng)絡(luò)傳輸RTT高的情景下可能帶來明顯 的性能提升。

3.4 多線程并發(fā)讀

在客戶端開啟多個HTable讀線程,每個讀線程負(fù)責(zé)通過HTable對象進行g(shù)et操作。下面是一個多線程并發(fā)讀取HBase,獲取店鋪一天內(nèi)各分鐘PV值的例子:

public class DataReaderServer {
     //獲取店鋪一天內(nèi)各分鐘PV值的入口函數(shù)
     public static ConcurrentHashMap getUnitMinutePV(long uid, long startStamp, long endStamp){
         long min = startStamp;
         int count = (int)((endStamp - startStamp) / (60*1000));
         List lst = new ArrayList();
         for (int i = 0; i <= count; i++) {
            min = startStamp + i * 60 * 1000;
            lst.add(uid + "_" + min);
         }
         return parallelBatchMinutePV(lst);
     }
      //多線程并發(fā)查詢,獲取分鐘PV值
private static ConcurrentHashMap parallelBatchMinutePV(List lstKeys){
        ConcurrentHashMap hashRet = new ConcurrentHashMap();
        int parallel = 3;
        List> lstBatchKeys  = null;
        if (lstKeys.size() < parallel ){
            lstBatchKeys  = new ArrayList>(1);
            lstBatchKeys.add(lstKeys);
        }
        else{
            lstBatchKeys  = new ArrayList>(parallel);
            for(int i = 0; i < parallel; i++  ){
                List lst = new ArrayList();
                lstBatchKeys.add(lst);
            }
 
            for(int i = 0 ; i < lstKeys.size() ; i ++ ){
                lstBatchKeys.get(i%parallel).add(lstKeys.get(i));
            }
        }
 
        List >> futures = new ArrayList >>(5);
 
        ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
        builder.setNameFormat("ParallelBatchQuery");
        ThreadFactory factory = builder.build();
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(), factory);
 
        for(List keys : lstBatchKeys){
            Callable< ConcurrentHashMap > callable = new BatchMinutePVCallable(keys);
            FutureTask< ConcurrentHashMap > future = (FutureTask< ConcurrentHashMap >) executor.submit(callable);
            futures.add(future);
        }
        executor.shutdown();
 
        // Wait for all the tasks to finish
        try {
          boolean stillRunning = !executor.awaitTermination(
              5000000, TimeUnit.MILLISECONDS);
          if (stillRunning) {
            try {
                executor.shutdownNow();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
          }
        } catch (InterruptedException e) {
          try {
              Thread.currentThread().interrupt();
          } catch (Exception e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
          }
        }
 
        // Look for any exception
        for (Future f : futures) {
          try {
              if(f.get() != null)
              {
                  hashRet.putAll((ConcurrentHashMap)f.get());
              }
          } catch (InterruptedException e) {
            try {
                 Thread.currentThread().interrupt();
            } catch (Exception e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
          } catch (ExecutionException e) {
            e.printStackTrace();
          }
        }
 
        return hashRet;
    }
     //一個線程批量查詢,獲取分鐘PV值
    protected static ConcurrentHashMap getBatchMinutePV(List lstKeys){
        ConcurrentHashMap hashRet = null;
        List lstGet = new ArrayList();
        String[] splitValue = null;
        for (String s : lstKeys) {
            splitValue = s.split("_");
            long uid = Long.parseLong(splitValue[0]);
            long min = Long.parseLong(splitValue[1]);
            byte[] key = new byte[16];
            Bytes.putLong(key, 0, uid);
            Bytes.putLong(key, 8, min);
            Get g = new Get(key);
            g.addFamily(fp);
            lstGet.add(g);
        }
        Result[] res = null;
        try {
            res = tableMinutePV[rand.nextInt(tableN)].get(lstGet);
        } catch (IOException e1) {
            logger.error("tableMinutePV exception, e=" + e1.getStackTrace());
        }
 
        if (res != null && res.length > 0) {
            hashRet = new ConcurrentHashMap(res.length);
            for (Result re : res) {
                if (re != null && !re.isEmpty()) {
                    try {
                        byte[] key = re.getRow();
                        byte[] value = re.getValue(fp, cp);
                        if (key != null && value != null) {
                            hashRet.put(String.valueOf(Bytes.toLong(key,
                                    Bytes.SIZEOF_LONG)), String.valueOf(Bytes
                                    .toLong(value)));
                        }
                    } catch (Exception e2) {
                        logger.error(e2.getStackTrace());
                    }
                }
            }
        }
 
        return hashRet;
    }
}
//調(diào)用接口類,實現(xiàn)Callable接口
class BatchMinutePVCallable implements Callable>{
     private List keys;
 
     public BatchMinutePVCallable(List lstKeys ) {
         this.keys = lstKeys;
     }
 
     public ConcurrentHashMap call() throws Exception {
         return DataReadServer.getBatchMinutePV(keys);
     }
}

3.5 緩存查詢結(jié)果

對于頻繁查詢HBase的應(yīng)用場景,可以考慮在應(yīng)用程序中做緩存,當(dāng)有新的查詢請求時,首先在緩存中查找,如果存在則直接返回,不再查詢HBase;否則對HBase發(fā)起讀請求查詢,然后在應(yīng)用程序中將查詢結(jié)果緩存起來。至于緩存的替換策略,可以考慮LRU等常用的策略。

3.6 Blockcache

HBase上Regionserver的內(nèi)存分為兩個部分,一部分作為Memstore,主要用來寫;另外一部分作為BlockCache,主要用于讀。

寫請求會先寫入Memstore,Regionserver會給每個region提供一個Memstore,當(dāng)Memstore滿64MB以后, 會啟動 flush刷新到磁盤。當(dāng)Memstore的總大小超過限制時(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),會強行啟動flush進程,從最大的Memstore開始flush直到低于限制。

讀請求先到Memstore中查數(shù)據(jù),查不到就到BlockCache中查,再查不到就會到磁盤上讀,并把讀的結(jié)果放入BlockCache。由 于BlockCache采用的是LRU策略,因此BlockCache達到上限(heapsize * hfile.block.cache.size * 0.85)后,會啟動淘汰機制,淘汰掉最老的一批數(shù)據(jù)。

一個Regionserver上有一個BlockCache和N個Memstore,它們的大小之和不能大于等于heapsize * 0.8,否則HBase不能啟動。默認(rèn)BlockCache為0.2,而Memstore為0.4。對于注重讀響應(yīng)時間的系統(tǒng),可以將 BlockCache設(shè)大些,比如設(shè)置BlockCache=0.4,Memstore=0.39,以加大緩存的命中率。

有關(guān)BlockCache機制,請參考這里:HBase的Block cache,HBase的blockcache機制,hbase中的緩存的計算與使用。

4.數(shù)據(jù)計算

4.1 服務(wù)端計算

Coprocessor運行于HBase RegionServer服務(wù)端,各個Regions保持對與其相關(guān)的coprocessor實現(xiàn)類的引用,coprocessor類可以通過 RegionServer上classpath中的本地jar或HDFS的classloader進行加載。

目前,已提供有幾種coprocessor:

Coprocessor:提供對于region管理的鉤子,例如region的open/close/split/flush/compact等;
RegionObserver:提供用于從客戶端監(jiān)控表相關(guān)操作的鉤子,例如表的get/put/scan/delete等;
Endpoint:提供可以在region上執(zhí)行任意函數(shù)的命令觸發(fā)器。一個使用例子是RegionServer端的列聚合,這里有代碼示例。
以上只是有關(guān)coprocessor的一些基本介紹,本人沒有對其實際使用的經(jīng)驗,對它的可用性和性能數(shù)據(jù)不得而知。感興趣的同學(xué)可以嘗試一下,歡迎討論。

4.2 寫端計算

4.2.1 計數(shù)

HBase本身可以看作是一個可以水平擴展的Key-Value存儲系統(tǒng),但是其本身的計算能力有限(Coprocessor可以提供一定的服務(wù)端計算),因此,使用HBase時,往往需要從寫端或者讀端進行計算,然后將最終的計算結(jié)果返回給調(diào)用者。舉兩個簡單的例子:

PV計算:通過在HBase寫端內(nèi)存中,累加計數(shù),維護PV值的更新,同時為了做到持久化,定期(如1秒)將PV計算結(jié)果同步到HBase中,這樣查詢端最多會有1秒鐘的延遲,能看到秒級延遲的PV結(jié)果。
分 鐘PV計算:與上面提到的PV計算方法相結(jié)合,每分鐘將當(dāng)前的累計PV值,按照rowkey + minute作為新的rowkey寫入HBase中,然后在查詢端通過scan得到當(dāng)天各個分鐘以前的累計PV值,然后順次將前后兩分鐘的累計PV值相 減,就得到了當(dāng)前一分鐘內(nèi)的PV值,從而最終也就得到當(dāng)天各個分鐘內(nèi)的PV值。

4.2.2 去重

對于UV的計算,就是個去重計算的例子。分兩種情況:

如果內(nèi)存可以容納,那么可以在Hash表中維護所有已經(jīng)存在的UV標(biāo)識,每當(dāng)新來一個標(biāo)識時,通過快速查找Hash確定是否是一個新的UV,若是 則UV值加1,否則UV值不變。另外,為了做到持久化或提供給查詢接口使用,可以定期(如1秒)將UV計算結(jié)果同步到HBase中。
如果內(nèi)存不能容納,可以考慮采用Bloom Filter來實現(xiàn),從而盡可能的減少內(nèi)存的占用情況。除了UV的計算外,判斷URL是否存在也是個典型的應(yīng)用場景。

4.3 讀端計算

如果對于響應(yīng)時間要求比較苛刻的情況(如單次http請求要在毫秒級時間內(nèi)返回),個人覺得讀端不宜做過多復(fù)雜的計算邏輯,盡量做到讀端功能單一 化:即從HBase RegionServer讀到數(shù)據(jù)(scan或get方式)后,按照數(shù)據(jù)格式進行簡單的拼接,直接返回給前端使用。當(dāng)然,如果對于響應(yīng)時間要求一般,或者 業(yè)務(wù)特點需要,也可以在讀端進行一些計算邏輯。

5.總結(jié)

作為一個Key-Value存儲系統(tǒng),HBase并不是萬能的,它有自己獨特的地方。因此,基于它來做應(yīng)用時,我們往往需要從多方面進行優(yōu)化改進 (表設(shè)計、讀表操作、寫表操作、數(shù)據(jù)計算等),有時甚至還需要從系統(tǒng)級對HBase進行配置調(diào)優(yōu),更甚至可以對HBase本身進行優(yōu)化。這屬于不同的層次 范疇。

總之,概括來講,對系統(tǒng)進行優(yōu)化時,首先定位到影響你的程序運行性能的瓶頸之處,然后有的放矢進行針對行的優(yōu)化。如果優(yōu)化后滿足你的期望,那么就可以停止優(yōu)化;否則繼續(xù)尋找新的瓶頸之處,開始新的優(yōu)化,直到滿足性能要求。 

“HBase性能優(yōu)化方法分享”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!


本文名稱:HBase性能優(yōu)化方法分享
瀏覽地址:http://weahome.cn/article/jiejpi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部