1. create Idea project for AsyncHbaseEventSerializer
創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價(jià)比大城網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫(kù),直接使用。一站式大城網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋大城地區(qū)。費(fèi)用合理售后完善,十余年實(shí)體公司更值得信賴。
添加dependency 到pom.xml
Implements AsyncHbaseEventSerializer according to the business.
importorg.apache.flume.Context; importorg.apache.flume.Event; importorg.apache.flume.conf.ComponentConfiguration; importorg.apache.flume.sink.hbase.AsyncHbaseEventSerializer; importorg.hbase.async.AtomicIncrementRequest; importorg.hbase.async.PutRequest; importjava.util.ArrayList; importjava.util.List; /** * Created by root on 12/5/17. */ public classSplittingSerializerimplementsAsyncHbaseEventSerializer { private byte[]table; private byte[]colFam; privateEventcurrentEvent; private byte[][]rentRowKey; private final byte[]eventCountCol="eventCount".getBytes(); columnNames; private finalList private finalList private byte[] cur public voidinitialize(byte[] table, byte[] cf) { this.table= table; this.colFam= cf; //Can not get the columns from context in configure method. Had to hard coded here. columnNames =new byte[3][]; columnNames[0] ="name".getBytes(); columnNames[1] ="id".getBytes(); columnNames[2] ="phone".getBytes(); } public voidsetEvent(Event event) { // Set the event and verify that the rowKey is not present this.currentEvent= event; /* //Don't know how to set the key of event header. String rowKeyStr = currentEvent.getHeaders().get("rowKey"); if (rowKeyStr == null) { throw new FlumeException("No row key found in headers!"); } currentRowKey = rowKeyStr.getBytes();*/ } publicList // Split the event body and get the values for the columns String eventStr =newString(currentEvent.getBody()); String[] cols = eventStr.split(","); Long currTime = System.currentTimeMillis(); longrevTs = Long.MAX_VALUE- currTime; currentRowKey = (Long.toString(revTs) + cols[0]).getBytes(); puts.clear(); for(inti =0;i < cols.length;i++) { //Generate a PutRequest for each column. PutRequest req =newPutRequest(table,currentRowKey,colFam, columnNames[i],cols[i].getBytes()); puts.add(req); } returnputs; } publicList incs.clear(); //Increment the number of events received incs.add(newAtomicIncrementRequest(table,"totalEvents".getBytes(),colFam,eventCountCol)); returnincs; } public voidcleanUp() { table=null; colFam=null; currentEvent=null; columnNames =null; currentRowKey =null; } public voidconfigure(Context context) { //Get the column names from the configuration //Did not work. Don't know how to use it. String cols =newString(context.getString("columns")); String[] names = cols.split(","); byte[][] columnNames =new byte[names.length][]; inti =0; System.out.println("getting columnNames"); for(String name : names) { columnNames[i++] = name.getBytes(); } } public voidconfigure(ComponentConfiguration componentConfiguration) { } } |
build and deploy the jar file
build --> build artifacts
copy to the lib directory of flume. Here I use scp to upload to the flume of another host.
2. configure flume
a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 sink2 a1.source.s1.selector.type = replicating #NetCat TCP source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1 c2 #channel a1.channels.c2.type = memory a1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 1000 #HBase sink a1.sinks.sink2.type = org.apache.flume.sink.hbase.AsyncHBaseSink a1.sinks.sink2.channel = c2 a1.sinks.sink2.table = law a1.sinks.sink2.columnFamily = lawfile a1.sinks.sink2.batchSize = 5000 #The serializer to use a1.sinks.sink2.serializer = ifre.flume.hbase.SplittingSerializer #List of columns each event writes to. a1.sinks.sink2.serializer.columns = name,id,phone |
3. create hbase table
# hbase shell create "law" "lawfile" |
4. run flume agent
[root@ifrebigsearch2 apache-flume-1.6.0-bin]# bin/flume-ng agent --conf conf --conf-file conf/crawler-hdfs-conf.properties --name a1 -Dflume.root.logger=INFO,console |
5. run nc
[root@ifrebigsearch0 dkh]# nc ifrebigsearch2 6666 zhangsan,10110198806054561,13812345678 OK |
6.result
hbase(main):002:0> scan 'law' ROW COLUMN+CELL 9223370524386395508z column=lawfile:id, timestamp=1512468380362, value=10110198 hangsan 806054561 9223370524386395508z column=lawfile:name, timestamp=1512468380361, value=zhangs hangsan an 9223370524386395508z column=lawfile:phone, timestamp=1512468380363, value=13812 hangsan 345678 |