真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

使用BulkLoad從HDFS批量導(dǎo)入數(shù)據(jù)到HBase

在向Hbase中寫(xiě)入數(shù)據(jù)時(shí),常見(jiàn)的寫(xiě)入方法有使用HBase API,Mapreduce批量導(dǎo)入數(shù)據(jù),使用這些方式帶入數(shù)據(jù)時(shí),一條數(shù)據(jù)寫(xiě)入到HBase數(shù)據(jù)庫(kù)中的大致流程如圖。
使用BulkLoad從HDFS批量導(dǎo)入數(shù)據(jù)到HBase

10年積累的成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶(hù)對(duì)網(wǎng)站的新想法和需求。提供各種問(wèn)題對(duì)應(yīng)的解決方案。讓選擇我們的客戶(hù)得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站設(shè)計(jì)后付款的網(wǎng)站建設(shè)流程,更有南城免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。

數(shù)據(jù)發(fā)出后首先寫(xiě)入到雨鞋日志W(wǎng)Al中,寫(xiě)入到預(yù)寫(xiě)日志中之后,隨后寫(xiě)入到內(nèi)存MemStore中,最后在Flush到Hfile中。這樣寫(xiě)數(shù)據(jù)的方式不會(huì)導(dǎo)致數(shù)據(jù)的丟失,并且道正數(shù)據(jù)的有序性,但是當(dāng)遇到大量的數(shù)據(jù)寫(xiě)入時(shí),寫(xiě)入的速度就難以保證。所以,介紹一種性能更高的寫(xiě)入方式BulkLoad。

使用BulkLoad批量寫(xiě)入數(shù)據(jù)主要分為兩部分:
一、使用HFileOutputFormat2通過(guò)自己編寫(xiě)的MapReduce作業(yè)將HFile寫(xiě)入到HDFS目錄,由于寫(xiě)入到HBase中的數(shù)據(jù)是按照順序排序的,HFileOutputFormat2中的configureIncrementalLoad()可以完成所需的配置。
二、將Hfile從HDFS移動(dòng)到HBase表中,大致過(guò)程如圖
使用BulkLoad從HDFS批量導(dǎo)入數(shù)據(jù)到HBase

實(shí)例代碼pom依賴(lài):


            org.apache.hbase
            hbase-server
            1.4.0
        

        
            org.apache.hadoop
            hadoop-client
            2.6.4
        

        
            org.apache.hbase
            hbase-client
            0.99.2
        
package com.yangshou;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class BulkLoadMapper extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //讀取文件中的每一條數(shù)據(jù),以序號(hào)作為行鍵
        String line = value.toString();
        //將數(shù)據(jù)進(jìn)行切分
        //切分后數(shù)組中的元素分別為:序號(hào),用戶(hù)id,商品id,用戶(hù)行為,商品分類(lèi),時(shí)間,地址
        String[] str = line.split(" ");
        String id = str[0];
        String user_id = str[1];
        String item_id = str[2];
        String behavior = str[3];
        String item_type = str[4];
        String time = str[5];
        String address = "156";
        //拼接rowkey和put
        ImmutableBytesWritable rowkry = new ImmutableBytesWritable(id.getBytes());
        Put put = new Put(id.getBytes());
        put.add("info".getBytes(),"user_id".getBytes(),user_id.getBytes());
        put.add("info".getBytes(),"item_id".getBytes(),item_id.getBytes());
        put.add("info".getBytes(),"behavior".getBytes(),behavior.getBytes());
        put.add("info".getBytes(),"item_type".getBytes(),item_type.getBytes());
        put.add("info".getBytes(),"time".getBytes(),time.getBytes());
        put.add("info".getBytes(),"address".getBytes(),address.getBytes());
        //將數(shù)據(jù)寫(xiě)出
        context.write(rowkry,put);
    }
}
package com.yangshou;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class BulkLoadDriver  {
    public static void main(String[] args) throws Exception {
        //獲取Hbase配置
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf("BulkLoadDemo"));
        Admin admin = conn.getAdmin();

        //設(shè)置job
        Job job = Job.getInstance(conf,"BulkLoad");
        job.setJarByClass(BulkLoadDriver.class);
        job.setMapperClass(BulkLoadMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        //設(shè)置文件的輸入輸出路徑
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);
        FileInputFormat.setInputPaths(job,new Path("hdfs://hadoopalone:9000/tmp/000000_0"));
        FileOutputFormat.setOutputPath(job,new Path("hdfs://hadoopalone:9000/demo1"));

        //將數(shù)據(jù)加載到Hbase表中
        HFileOutputFormat2.configureIncrementalLoad(job,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo")));
        if(job.waitForCompletion(true)){
            LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
            load.doBulkLoad(new Path("hdfs://hadoopalone:9000/demo1"),admin,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo")));

        }

    }
}

實(shí)例數(shù)據(jù)

44979   100640791   134060896   1   5271    2014-12-09  天津市
44980   100640791   96243605    1   13729   2014-12-02  新疆

在Hbase shell 中創(chuàng)建表

create 'BulkLoadDemo','info'

打包后執(zhí)行
```hadoop jar BulkLoadDemo-1.0-SNAPSHOT.jar com.yangshou.BulkLoadDriver

注意:在執(zhí)行hadoop jar之前應(yīng)該先將Hbase中的相關(guān)包加載過(guò)來(lái)

export HADOOP_CLASSPATH=$HBASE_HOME/lib/*


文章題目:使用BulkLoad從HDFS批量導(dǎo)入數(shù)據(jù)到HBase
文章分享:http://weahome.cn/article/geoojg.html

其他資訊

在線咨詢(xún)

微信咨詢(xún)

電話咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部