真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

flume中如何自定義Interceptor

這期內(nèi)容當中小編將會給大家?guī)碛嘘P(guān)flume中如何自定義Interceptor,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

竹溪網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)建站,竹溪網(wǎng)站設(shè)計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為竹溪成百上千提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)營銷網(wǎng)站建設(shè)要多少錢,請找那個售后服務(wù)好的竹溪做網(wǎng)站的公司定做!

flume現(xiàn)狀:

這種比較個性化的轉(zhuǎn)換flume沒有相關(guān)插件

分析:

flume event 針對source為文本文件時,會一行一個event(默認小于2048長度)

而攔截器就是針對event來做處理的

代碼:

package com.wy.flume.interceptor;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;  

public class AdRefererLogFormatInterceptor implements Interceptor {
    //匹配user-agent
    private static final Pattern pattern = Pattern.compile("^\"(.*)\"\\s\"(.*)\"(.*)$");
    private static final HashMap platform = initPlatforms();
    private static final HashMap browser = initBrowsers();

    private static HashMap initPlatforms() {
        HashMap platforms = new HashMap<>();
        platforms.put("windows nt 6.2", "Win8");
        platforms.put("windows nt 6.2", "Win8");
        platforms.put("windows nt 6.1", "Win7");
        platforms.put("windows nt 6.0", "Win Longhorn");
        platforms.put("windows nt 5.2", "Win2003");
        platforms.put("windows nt 5.0", "Win2000");
        platforms.put("windows nt 5.1", "WinXP");
        platforms.put("windows nt 4.0", "Windows NT 4.0");
        platforms.put("winnt4.0", "Windows NT 4.0");
        platforms.put("winnt 4.0", "Windows NT");
        platforms.put("winnt", "Windows NT");
        platforms.put("windows 98", "Win98");
        platforms.put("win98", "Win98");
        platforms.put("windows 95", "Win95");
        platforms.put("win95", "Win95");
        platforms.put("windows", "Unknown Windows OS");
        platforms.put("os x", "MacOS X");
        platforms.put("ppc mac", "Power PC Mac");
        platforms.put("freebsd", "FreeBSD");
        platforms.put("ppc", "Macintosh");
        platforms.put("linux", "Linux");
        platforms.put("debian", "Debian");
        platforms.put("sunos", "Sun Solaris");
        platforms.put("beos", "BeOS");
        platforms.put("apachebench", "ApacheBench");
        platforms.put("aix", "AIX");
        platforms.put("irix", "Irix");
        platforms.put("osf", "DEC OSF");
        platforms.put("hp-ux", "HP-UX");
        platforms.put("netbsd", "NetBSD");
        platforms.put("bsdi", "BSDi");
        platforms.put("openbsd", "OpenBSD");
        platforms.put("gnu", "GNU/Linux");
        platforms.put("unix", "Unknown Unix OS");
        return platforms;
    }

    private static HashMap initBrowsers() {
        HashMap browsers = new HashMap<>();
        browsers.put("Flock", "Flock");
        browsers.put("Chrome", "Chrome");
        browsers.put("Opera", "Opera");
        browsers.put("MSIE", "IE");
        browsers.put("Internet Explorer", "IE");
        browsers.put("Shiira", "Shiira");
        browsers.put("Firefox", "Firefox");
        browsers.put("Chimera", "Chimera");
        browsers.put("Phoenix", "Phoenix");
        browsers.put("Firebird", "Firebird");
        browsers.put("Camino", "Camino");
        browsers.put("Netscape", "Netscape");
        browsers.put("OmniWeb", "OmniWeb");
        browsers.put("Safari", "Safari");
        browsers.put("Mozilla", "Mozilla");
        browsers.put("Konqueror", "Konqueror");
        browsers.put("icab", "iCab");
        browsers.put("Lynx", "Lynx");
        browsers.put("Links", "Links");
        browsers.put("hotjava", "HotJava");
        browsers.put("amaya", "Amaya");
        browsers.put("IBrowse", "IBrowse");
        return browsers;
    }

    private AdRefererLogFormatInterceptor() {
    }

    @Override
    public void initialize() {
        // NO-OP...
    }

    @Override
    public void close() {
        // NO-OP...
    }

    @Override
    public Event intercept(Event event) {
        String body = new String(event.getBody(), Charsets.UTF_8);
        String[] fields = body.split(",", 8);
        StringBuilder sb = new StringBuilder();
        sb.append(fields[0]);
        sb.append('\t');
        sb.append(fields[1]);
        sb.append('\t');
        sb.append(fields[2]);
        sb.append('\t');
        sb.append(fields[3]);
        sb.append('\t');
        sb.append(fields[4]);
        sb.append('\t');
        sb.append(fields[5]);
        sb.append('\t');
        sb.append(fields[6]);
        sb.append('\t');

        Matcher submatcher = pattern.matcher(fields[7].trim());
        String url = "";
        String os = "others";
        String br = "others";
        String ver = "";
        if (submatcher.matches()) {
            url = submatcher.group(1);
            String agent = submatcher.group(2);
            //匹配操作系統(tǒng)
            Set platformKeys = platform.keySet();
            for (String platformKey : platformKeys) {
                Pattern pattern = Pattern.compile( Pattern.quote(platformKey) , Pattern.CASE_INSENSITIVE);
                Matcher matcher = pattern.matcher(agent);
                if (matcher.find()) {
                    os = platform.get(platformKey);
                    break;
                }
            }
            //匹配瀏覽器 和版本
            Set browserKeys = browser.keySet();
            for (String browserKey : browserKeys) {
                Pattern pattern = Pattern.compile( Pattern.quote(browserKey) + ".*?([0-9\\.]+)", Pattern.CASE_INSENSITIVE);
                Matcher matcher = pattern.matcher(agent);
                if (matcher.find()) {
                    ver = matcher.group(1);
                    br = browser.get(browserKey);
                    break;
                }
            }
        }
        sb.append(url);
        sb.append('\t');
        sb.append(os);
        sb.append('\t');
        sb.append(br);
        sb.append('\t');
        sb.append(ver);
        //修改event body
        event.setBody(sb.toString().getBytes());
        return event;
    }

    @Override
    public List intercept(List events) {
        List intercepted = Lists.newArrayListWithCapacity(events.size());
        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                intercepted.add(interceptedEvent);
            }
        }
        return intercepted;
    }
    
    public static class Builder implements Interceptor.Builder {
        //使用Builder初始化Interceptor
        @Override
        public Interceptor build() {
            return new AdRefererLogFormatInterceptor();
        }

        @Override
        public void configure(Context context) {
            
        }
    }
}

部署:

1、將程序打包成AdRerfererLogInterceptor.jar

2、將jar包上傳到FLUME_HOME的lib目錄下(flume1.5采用bin安裝)

3、在配置文件中使用Interceptor

hdp2.sources.s1.interceptors = i1
hdp2.sources.s1.interceptors.i1.type = com.wy.flume.interceptor.AdRefererLogFormatInterceptor$Builder

優(yōu)勢:

在數(shù)據(jù)傳輸?shù)耐瑫r進行數(shù)據(jù)的處理,節(jié)省步驟,而且有flume幫組管理文件進度,程序中斷時不用手動做恢復(fù)(file channel)

總結(jié):

在Interceptor中可以對event的header 和 body 進行處理,進而達到定制化的目的。

上述就是小編為大家分享的flume中如何自定義Interceptor了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


分享標題:flume中如何自定義Interceptor
網(wǎng)站地址:http://weahome.cn/article/pgdige.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部