這期內(nèi)容當中小編將會給大家?guī)碛嘘P(guān)flume中如何自定義Interceptor,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
竹溪網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)建站,竹溪網(wǎng)站設(shè)計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為竹溪成百上千提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)營銷網(wǎng)站建設(shè)要多少錢,請找那個售后服務(wù)好的竹溪做網(wǎng)站的公司定做!
flume現(xiàn)狀:
這種比較個性化的轉(zhuǎn)換flume沒有相關(guān)插件
分析:
flume event 針對source為文本文件時,會一行一個event(默認小于2048長度)
而攔截器就是針對event來做處理的
代碼:
package com.wy.flume.interceptor; import com.google.common.base.Charsets; import com.google.common.collect.Lists; import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; public class AdRefererLogFormatInterceptor implements Interceptor { //匹配user-agent private static final Pattern pattern = Pattern.compile("^\"(.*)\"\\s\"(.*)\"(.*)$"); private static final HashMapplatform = initPlatforms(); private static final HashMap browser = initBrowsers(); private static HashMap initPlatforms() { HashMap platforms = new HashMap<>(); platforms.put("windows nt 6.2", "Win8"); platforms.put("windows nt 6.2", "Win8"); platforms.put("windows nt 6.1", "Win7"); platforms.put("windows nt 6.0", "Win Longhorn"); platforms.put("windows nt 5.2", "Win2003"); platforms.put("windows nt 5.0", "Win2000"); platforms.put("windows nt 5.1", "WinXP"); platforms.put("windows nt 4.0", "Windows NT 4.0"); platforms.put("winnt4.0", "Windows NT 4.0"); platforms.put("winnt 4.0", "Windows NT"); platforms.put("winnt", "Windows NT"); platforms.put("windows 98", "Win98"); platforms.put("win98", "Win98"); platforms.put("windows 95", "Win95"); platforms.put("win95", "Win95"); platforms.put("windows", "Unknown Windows OS"); platforms.put("os x", "MacOS X"); platforms.put("ppc mac", "Power PC Mac"); platforms.put("freebsd", "FreeBSD"); platforms.put("ppc", "Macintosh"); platforms.put("linux", "Linux"); platforms.put("debian", "Debian"); platforms.put("sunos", "Sun Solaris"); platforms.put("beos", "BeOS"); platforms.put("apachebench", "ApacheBench"); platforms.put("aix", "AIX"); platforms.put("irix", "Irix"); platforms.put("osf", "DEC OSF"); platforms.put("hp-ux", "HP-UX"); platforms.put("netbsd", "NetBSD"); platforms.put("bsdi", "BSDi"); platforms.put("openbsd", "OpenBSD"); platforms.put("gnu", "GNU/Linux"); platforms.put("unix", "Unknown Unix OS"); return platforms; } private static HashMap initBrowsers() { HashMap browsers = new HashMap<>(); browsers.put("Flock", "Flock"); browsers.put("Chrome", "Chrome"); browsers.put("Opera", "Opera"); browsers.put("MSIE", "IE"); browsers.put("Internet Explorer", "IE"); browsers.put("Shiira", "Shiira"); browsers.put("Firefox", "Firefox"); browsers.put("Chimera", "Chimera"); browsers.put("Phoenix", "Phoenix"); browsers.put("Firebird", "Firebird"); browsers.put("Camino", "Camino"); browsers.put("Netscape", "Netscape"); browsers.put("OmniWeb", "OmniWeb"); browsers.put("Safari", "Safari"); browsers.put("Mozilla", "Mozilla"); browsers.put("Konqueror", "Konqueror"); browsers.put("icab", "iCab"); browsers.put("Lynx", "Lynx"); browsers.put("Links", "Links"); browsers.put("hotjava", "HotJava"); browsers.put("amaya", "Amaya"); browsers.put("IBrowse", "IBrowse"); return browsers; } private AdRefererLogFormatInterceptor() { } @Override public void initialize() { // NO-OP... } @Override public void close() { // NO-OP... } @Override public Event intercept(Event event) { String body = new String(event.getBody(), Charsets.UTF_8); String[] fields = body.split(",", 8); StringBuilder sb = new StringBuilder(); sb.append(fields[0]); sb.append('\t'); sb.append(fields[1]); sb.append('\t'); sb.append(fields[2]); sb.append('\t'); sb.append(fields[3]); sb.append('\t'); sb.append(fields[4]); sb.append('\t'); sb.append(fields[5]); sb.append('\t'); sb.append(fields[6]); sb.append('\t'); Matcher submatcher = pattern.matcher(fields[7].trim()); String url = ""; String os = "others"; String br = "others"; String ver = ""; if (submatcher.matches()) { url = submatcher.group(1); String agent = submatcher.group(2); //匹配操作系統(tǒng) Set platformKeys = platform.keySet(); for (String platformKey : platformKeys) { Pattern pattern = Pattern.compile( Pattern.quote(platformKey) , Pattern.CASE_INSENSITIVE); Matcher matcher = pattern.matcher(agent); if (matcher.find()) { os = platform.get(platformKey); break; } } //匹配瀏覽器 和版本 Set browserKeys = browser.keySet(); for (String browserKey : browserKeys) { Pattern pattern = Pattern.compile( Pattern.quote(browserKey) + ".*?([0-9\\.]+)", Pattern.CASE_INSENSITIVE); Matcher matcher = pattern.matcher(agent); if (matcher.find()) { ver = matcher.group(1); br = browser.get(browserKey); break; } } } sb.append(url); sb.append('\t'); sb.append(os); sb.append('\t'); sb.append(br); sb.append('\t'); sb.append(ver); //修改event body event.setBody(sb.toString().getBytes()); return event; } @Override public List intercept(List events) { List intercepted = Lists.newArrayListWithCapacity(events.size()); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { intercepted.add(interceptedEvent); } } return intercepted; } public static class Builder implements Interceptor.Builder { //使用Builder初始化Interceptor @Override public Interceptor build() { return new AdRefererLogFormatInterceptor(); } @Override public void configure(Context context) { } } }
部署:
1、將程序打包成AdRerfererLogInterceptor.jar
2、將jar包上傳到FLUME_HOME的lib目錄下(flume1.5采用bin安裝)
3、在配置文件中使用Interceptor
hdp2.sources.s1.interceptors = i1
hdp2.sources.s1.interceptors.i1.type = com.wy.flume.interceptor.AdRefererLogFormatInterceptor$Builder
優(yōu)勢:
在數(shù)據(jù)傳輸?shù)耐瑫r進行數(shù)據(jù)的處理,節(jié)省步驟,而且有flume幫組管理文件進度,程序中斷時不用手動做恢復(fù)(file channel)
總結(jié):
在Interceptor中可以對event的header 和 body 進行處理,進而達到定制化的目的。
上述就是小編為大家分享的flume中如何自定義Interceptor了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。