真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Flume怎么自定義EventSerializer序列化類

這篇文章主要介紹“Flume怎么自定義Event Serializer序列化類”,在日常操作中,相信很多人在Flume怎么自定義Event Serializer序列化類問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flume怎么自定義Event Serializer序列化類”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

向陽網(wǎng)站建設公司創(chuàng)新互聯(lián)建站,向陽網(wǎng)站設計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為向陽1000多家提供企業(yè)網(wǎng)站建設服務。企業(yè)網(wǎng)站搭建\成都外貿(mào)網(wǎng)站建設要多少錢,請找那個售后服務好的向陽做網(wǎng)站的公司定做!

把日志從flume打到hbase中,但是我們的日志由于前期是存到MongoDB中的,所以都是Json格式的日志,這時候使用flume自帶的SimpleHbaseEventSerializer和RegexHbaseEventSerializer這樣的就不行了,于是開始痛苦的看源碼,自己寫序列化的類(這里需要注意,如果是在flume的hbasesink包下編寫的代碼,License信息一定要加上。就是最上面那段英文,要不然在運行的時候會報錯),比較簡單,編寫好類之后,編譯打包,傳到flume的lib目錄下,然后在配置agent的時候指定Serializer的類為編寫的類即可。下面是代碼(類注釋沒貼出來,見諒哈):

public class PRTMSAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
	private byte[] table;//hbase表
	private byte[] cf;//列簇
	private byte[][] payload;//列集合
	private byte[][] payloadColumn;//列值
	private byte[] incrementColumn;
	private String rowSuffix;//roykey后綴
	private String rowPrefix;//rowkey前綴
	private byte[] incrementRow;
	private KeyType keyType;//rowkey后綴類型 
	private static final Logger logger = LoggerFactory.getLogger(PRTMSAsyncHbaseEventSerializer.class);

	@Override
	public void configure(Context context) {
		// TODO Auto-generated method stub
		//設置主鍵后綴類型,這里使用時間戳
		keyType = KeyType.TS;
		if (iCol != null && !iCol.isEmpty()) {
			incrementColumn = iCol.getBytes(Charsets.UTF_8);
		}
		incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
	}

	@Override
	public void configure(ComponentConfiguration conf) {
		// TODO Auto-generated method stub

	}

	@Override
	public void initialize(byte[] table, byte[] cf) {
		// TODO Auto-generated method stub
		this.table = table;
		this.cf = cf;
	}
	/**
	 * 
	 * @Title: setEvent 
	 * @Description: 獲取日志信息,并解析出HBase的列以及列的value值 
	 * @param event   
	 * @throws 
	 * @see org.apache.flume.sink.hbase.AsyncHbaseEventSerializer#setEvent(org.apache.flume.Event)
	 */
	@Override
	public void setEvent(Event event) {
		// TODO Auto-generated method stub
		//獲取日志信息
		String log = new String(event.getBody(), StandardCharsets.UTF_8);
		//headers包含日志中項目編號和host信息
		Map headers = event.getHeaders();
		JsonReader jsonReader = new JsonReader(new StringReader(log));
		String name = "";
		String value = "";
		String path = "";
		Map kv = new HashMap();
		try {
			//解析日志中的鍵值對緩存到map中
			jsonReader.beginObject();
			while (jsonReader.hasNext()) {
				name = jsonReader.nextName();
				value = jsonReader.nextString();
				if(name.equals("uri"))
					path = value.split(" ")[1];
				kv.put(name, value);
			}
			jsonReader.endObject();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		//解析headers中的項目id和服務host、路徑
		if(path.contains("?")){
			path = path.substring(0, path.indexOf("?"));
		}
		String pcode = headers.get("pcode");
		String host = headers.get("host");
		//將項目編號和服務器host添加到map中
		kv.put("pcode",pcode);
		kv.put("host", host);
		//初始化列和value數(shù)組
		this.payloadColumn = new byte[kv.keySet().size()][];
		this.payload = new byte[kv.keySet().size()][];
		int i = 0;
		//給hbase的列和value賦值
		for (String key : kv.keySet()) {
			this.payloadColumn[i] = key.getBytes();
			this.payload[i] = kv.get(key).getBytes();
			i++;
		}
		//設置rowkey的前綴 格式是項目編號+路徑
		
		this.rowSuffix = new StringBuilder(pcode).reverse().toString() + ":"+path+":"+kv.get("time");
	}
	
	@Override
	public List getActions() {
		// TODO Auto-generated method stub
		List actions = new ArrayList();
		if (payloadColumn != null) {
			byte[] rowKey;
			try {
				rowKey = rowSuffix.getBytes();
				// for 循環(huán),提交所有列和對于數(shù)據(jù)的put請求。
				for (int i = 0; i < this.payload.length; i++) {
					PutRequest putRequest = new PutRequest(table, rowKey, cf, payloadColumn[i], payload[i]);
					actions.add(putRequest);
				}

			} catch (Exception e) {
				throw new FlumeException("Could not get row key!", e);
			}
		}
		return actions;
	}

	@Override
	public List getIncrements() {
		// TODO Auto-generated method stub
		List actions = new ArrayList();
		if (incrementColumn != null) {
			AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn);
			actions.add(inc);
		}
		return actions;
	}
	@Override
	public void cleanUp() {
		// TODO Auto-generated method stub
	}

}

到此,關于“Flume怎么自定義Event Serializer序列化類”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
網(wǎng)站名稱:Flume怎么自定義EventSerializer序列化類
鏈接分享:http://weahome.cn/article/gijios.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部