真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何使用kafkaconnect將數(shù)據(jù)批量寫到hdfs

小編給大家分享一下如何使用kafka connect將數(shù)據(jù)批量寫到hdfs,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

成都創(chuàng)新互聯(lián)公司長(zhǎng)期為上千客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為高淳企業(yè)提供專業(yè)的網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè),高淳網(wǎng)站改版等技術(shù)服務(wù)。擁有十余年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。

 

kafka-connect是以單節(jié)點(diǎn)模式運(yùn)行,即standalone。

一. 首先,先對(duì)kafka和kafka connect做一個(gè)簡(jiǎn)單的介紹

kafka:Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者規(guī)模的網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。比較直觀的解釋就是其有一個(gè)生產(chǎn)者(producer)和一個(gè)消費(fèi)者(consumer)??梢詫afka想象成一個(gè)數(shù)據(jù)容器,生產(chǎn)者負(fù)責(zé)發(fā)送數(shù)據(jù)到這個(gè)容器中,而消費(fèi)者從容器中取出數(shù)據(jù),在將數(shù)據(jù)做處理,如存儲(chǔ)到hdfs。

kafka connect:Kafka Connect是一種用于在Kafka和其他系統(tǒng)之間可擴(kuò)展的、可靠的流式傳輸數(shù)據(jù)的工具。它使得能夠快速定義將大量數(shù)據(jù)集合移入和移出Kafka的連接器變得簡(jiǎn)單。即適合批量數(shù)據(jù)導(dǎo)入導(dǎo)出操作。

二. 下面將介紹如何用kafka connect將數(shù)據(jù)寫入到hdfs中。包括在這個(gè)過程中可能碰到的一些問題說明。

首先啟動(dòng)kafka-connect:

bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties
這個(gè)命令后面兩個(gè)參數(shù),
第一個(gè)是指定啟動(dòng)的模式,有分布式和單節(jié)點(diǎn)兩種,這里是單節(jié)點(diǎn)。kafka自帶,放于config目錄下。
第二個(gè)參數(shù)指向描述connector的屬性的文件,可以有多個(gè),這里只有一個(gè)connector用來寫入到hdfs。需要自己創(chuàng)建。

接下來看看connector1.properties的內(nèi)容,


name="test"    #該connector的名字
#將自己按connect接口規(guī)范編寫的代碼打包后放在kafka/libs目錄下,再根據(jù)項(xiàng)目結(jié)構(gòu)引用對(duì)應(yīng)connector
connector.class=hdfs.HdfsSinkConnector
#Task是導(dǎo)入導(dǎo)出的具體實(shí)現(xiàn),這里是指定多少個(gè)task來并行運(yùn)行導(dǎo)入導(dǎo)出作業(yè),由多線程實(shí)現(xiàn)。由于hdfs中一個(gè)文件每次只能又一個(gè)文件操作,所以這里只能是1
tasks.max=1
#指定從哪個(gè)topic讀取數(shù)據(jù),這些其實(shí)是用來在connector或者task的代碼中讀取的。
topics=test
#指定key以那種方式轉(zhuǎn)換,需和Producer發(fā)送方指定的序列化方式一致
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.json.JsonConverter     #同上
hdfs.url=hdfs://127.0.0.1:9000  #hdfs的url路徑,在Connector中會(huì)被讀取
hdfs.path=/test/file  #hdfs文件路徑,同樣Connector中被讀取

key.converter.schemas.enable=true  #稍后介紹,可以true也可以false,影響傳輸格式
value.converter.schemas.enable=true  #稍后介紹,可以true也可以false

三. 接下來看代碼,connect主要是導(dǎo)入導(dǎo)出兩個(gè)概念,導(dǎo)入是source,導(dǎo)出時(shí)Sink。這里只使用Sink,不過Source和Sink的實(shí)現(xiàn)其實(shí)基本相同。


實(shí)現(xiàn)Sink其實(shí)不難,實(shí)現(xiàn)對(duì)應(yīng)的接口,即SinkConnector和SinkTask兩個(gè)接口,再打包放到kafka/libs目錄下即可。其中SinkConnector只有一個(gè),而Task可以有多
先是Connector
public class HdfsSinkConnector extends SinkConnector {
    //這兩項(xiàng)為配置hdfs的urlh和路徑的配置項(xiàng),需要在connector1.properties中指定
    public static final String HDFS_URL = "hdfs.url";
    public static final String HDFS_PATH = "hdfs.path";
    private static final ConfigDef CONFIG_DEF = new ConfigDef()
            .define(HDFS_URL, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs url")
            .define(HDFS_PATH, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs path");
    private String hdfsUrl;
    private String hdfsPath;
    @Override
    public String version() {
        return AppInfoParser.getVersion();
    }
//start方法會(huì)再初始的時(shí)候執(zhí)行一次,這里主要用于配置    @Override    public void start(Map props) {        hdfsUrl = props.get(HDFS_URL);        hdfsPath = props.get(HDFS_PATH);    }   //這里指定了Task的類    @Override    public Class taskClass() {        return HdfsSinkTask.class;    }   //用于配置Task的config,這些都是會(huì)在Task中用到    @Override    public List> taskConfigs(int maxTasks) {        ArrayList> configs = new ArrayList<>();        for (int i = 0; i < maxTasks; i++) {            Map config = new HashMap<>();            if (hdfsUrl != null)                config.put(HDFS_URL, hdfsUrl);            if (hdfsPath != null)                config.put(HDFS_PATH, hdfsPath);            configs.add(config);        }        return configs;    }   //關(guān)閉時(shí)的操作,一般是關(guān)閉資源。    @Override    public void stop() {        // Nothing to do since FileStreamSinkConnector has no background monitoring.    }    @Override    public ConfigDef config() {        return CONFIG_DEF;    } }

接下來是Task

public class HdfsSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(HdfsSinkTask.class);

    private String filename;

    public static String hdfsUrl;
    public static String hdfsPath;
    private Configuration conf;
    private FSDataOutputStream os;
    private FileSystem hdfs;


    public HdfsSinkTask(){

    }

    @Override
    public String version() {
        return new HdfsSinkConnector().version();
    }
  //Task開始會(huì)執(zhí)行的代碼,可能有多個(gè)Task,所以每個(gè)Task都會(huì)執(zhí)行一次
    @Override
    public void start(Map props) {
        hdfsUrl = props.get(HdfsSinkConnector.HDFS_URL);
        hdfsPath = props.get(HdfsSinkConnector.HDFS_PATH);
        System.out.println("----------------------------------- start--------------------------------");

        conf = new Configuration();
        conf.set("fs.defaultFS", hdfsUrl);
        //這兩個(gè)是與hdfs append相關(guān)的設(shè)置
        conf.setBoolean("dfs.support.append", true);
        conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
        try{
            hdfs = FileSystem.get(conf);
//            connector.hdfs = new Path(HDFSPATH).getFileSystem(conf);
            os = hdfs.append(new Path(hdfsPath));
        }catch (IOException e){
            System.out.println(e.toString());
        }

    }
  //核心操作,put就是將數(shù)據(jù)從kafka中取出,存放到其他地方去
    @Override
    public void put(Collection sinkRecords) {
        for (SinkRecord record : sinkRecords) {
            log.trace("Writing line to {}: {}", logFilename(), record.value());
            try{
                System.out.println("write info------------------------" + record.value().toString() + "-----------------");
                os.write((record.value().toString()).getBytes("UTF-8"));
                os.hsync();
            }catch(Exception e){
                System.out.print(e.toString());
            }
        }
    }

    @Override
    public void flush(Map offsets) {
        try{
            os.hsync();
        }catch (Exception e){
            System.out.print(e.toString());
        }

    }
//同樣是結(jié)束時(shí)候所執(zhí)行的代碼,這里用于關(guān)閉hdfs資源    @Override    public void stop() {        try {            os.close();        }catch(IOException e){            System.out.println(e.toString());        }    }    private String logFilename() {        return filename == null ? "stdout" : filename;    } }

這里重點(diǎn)提一下,因?yàn)樵赾onnector1.propertise中設(shè)置了key.converter=org.apache.kafka.connect.converters.ByteArrayConverter,所以不能用命令行形式的
producer發(fā)送數(shù)據(jù),而是要用程序的方式,并且在producer總也要設(shè)置key的序列化形式為org.apache.kafka.common.serialization.ByteArraySerializer。
編碼完成,先用idea以開發(fā)程序與依賴包分離的形式打包成jar包,然后將程序?qū)?yīng)的jar包(一般就是“項(xiàng)目名.jar”)放到kafka/libs目錄下面,這樣就能被找到。
四. 接下來對(duì)這個(gè)過程的問題做一個(gè)匯總。
1.在connector1.properties中的key.converter.schemas.enable=false和value.converter.schemas.enable=false的問題。
這個(gè)選項(xiàng)默認(rèn)在connect-standalone.properties中是true的,這個(gè)時(shí)候發(fā)送給topic的Json格式是需要使用avro格式,具體情況可以百度,這里給出一個(gè)樣例。
{
    "schema": {
        "type": "struct",
        "fields": [{
            "type": "int32",
            "optional": true,
            "field": "c1"
        }, {
            "type": "string",
            "optional": true,
            "field": "c2"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "create_ts"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "update_ts"
        }],
        "optional": false,
        "name": "foobar"
    },
    "payload": {
        "c1": 10000,
        "c2": "bar",
        "create_ts": 1501834166000,
        "update_ts": 1501834166000
    }
}

主要就是schema和payload這兩個(gè),不按照這個(gè)格式會(huì)報(bào)錯(cuò)如下

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

   at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)

如果想發(fā)送普通的json格式而不是avro格式的話,很簡(jiǎn)單key.converter.schemas.enable和value.converter.schemas.enable設(shè)置為false就行。這樣就能發(fā)送普通的json格式數(shù)據(jù)。

2.在啟動(dòng)的過程中出現(xiàn)各種各樣的java.lang.ClassNotFoundException。

在啟動(dòng)connector的時(shí)候,一開始總是會(huì)報(bào)各個(gè)各樣的ClassNotFoundException,不是這個(gè)包就是那個(gè)包,查找問題一直說要么缺少包要么是包沖突。這個(gè)是什么原因呢?

其實(shí)歸根結(jié)底還是依賴沖突的問題,因?yàn)閗afka程序自定義的類加載器加載類的目錄是在kafka/libs中,而寫到hdfs需要hadoop的包。

我一開始的做法是將hadoop下的包路徑添加到CLASSPATH中,這樣子問題就來了,因?yàn)閗afka和hadoop的依賴包是有沖突的,比如hadoop是guava-11.0.2.jar,而kafka是guava-20.0.jar,兩個(gè)jar包版本不同,而我們是在kafka程序中調(diào)用hdfs,所以當(dāng)jar包沖突時(shí)應(yīng)該優(yōu)先調(diào)用kafka的。但是注意kafka用的是程序自定義的類加載器,其優(yōu)先級(jí)是低于CLASSPATH路徑下的類的,就是說加載類時(shí)會(huì)優(yōu)先加載CLASSPATH下的類。這樣子就有問題了。

我的解決方案時(shí)將kafka和hadoop加載的jar包路徑都添加到CLASSPATH中,并且kafka的路徑寫在hadoop前面,這樣就可以啟動(dòng)connector成功。

以上是“如何使用kafka connect將數(shù)據(jù)批量寫到hdfs”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!


新聞名稱:如何使用kafkaconnect將數(shù)據(jù)批量寫到hdfs
分享鏈接:http://weahome.cn/article/geeehs.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部