真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

KafkaProducer攔截器

Kafka中的攔截器(Interceptor)是0.10.x.x版本引入的一個(gè)功能,一共有兩種:Kafka Producer端的攔截器和Kafka Consumer端的攔截器。本篇主要講述的是Kafka Producer端的攔截器,它主要用來(lái)對(duì)消息進(jìn)行攔截或者修改,也可以用于Producer的Callback回調(diào)之前進(jìn)行相應(yīng)的預(yù)處理。

網(wǎng)站的建設(shè)成都創(chuàng)新互聯(lián)專注網(wǎng)站定制,經(jīng)驗(yàn)豐富,不做模板,主營(yíng)網(wǎng)站定制開(kāi)發(fā).小程序定制開(kāi)發(fā),H5頁(yè)面制作!給你煥然一新的設(shè)計(jì)體驗(yàn)!已為混凝土攪拌罐等企業(yè)提供專業(yè)服務(wù)。

使用Kafka Producer端的攔截器非常簡(jiǎn)單,主要是實(shí)現(xiàn)ProducerInterceptor接口,此接口包含4個(gè)方法:

    1. ProducerRecord onSend(ProducerRecord record):Producer在將消息序列化和分配分區(qū)之前會(huì)調(diào)用攔截器的這個(gè)方法來(lái)對(duì)消息進(jìn)行相應(yīng)的操作。一般來(lái)說(shuō)最好不要修改消息ProducerRecord的topic、key以及partition等信息,如果要修改,也需確保對(duì)其有準(zhǔn)確的判斷,否則會(huì)與預(yù)想的效果出現(xiàn)偏差。比如修改key不僅會(huì)影響分區(qū)的計(jì)算,同樣也會(huì)影響B(tài)roker端日志壓縮(Log Compaction)的功能。
    1. void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被應(yīng)答(Acknowledgement)之前或者消息發(fā)送失敗時(shí)調(diào)用,優(yōu)先于用戶設(shè)定的Callback之前執(zhí)行。這個(gè)方法運(yùn)行在Producer的IO線程中,所以這個(gè)方法里實(shí)現(xiàn)的代碼邏輯越簡(jiǎn)單越好,否則會(huì)影響消息的發(fā)送速率。
    1. void close():關(guān)閉當(dāng)前的攔截器,此方法主要用于執(zhí)行一些資源的清理工作。
    1. configure(Map configs):用來(lái)初始化此類的方法,這個(gè)是ProducerInterceptor接口的父接口Configurable中的方法。

一般情況下只需要關(guān)注并實(shí)現(xiàn)onSend或者onAcknowledgement方法即可。下面我們來(lái)舉個(gè)案例,通過(guò)onSend方法來(lái)過(guò)濾消息體為空的消息以及通過(guò)onAcknowledgement方法來(lái)計(jì)算發(fā)送消息的成功率。

public class ProducerInterceptorDemo implements ProducerInterceptor {
    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        if(record.value().length()<=0)
            return null;
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            sendSuccess++;
        } else {
            sendFailure ++;
        }
    }

    @Override
    public void close() {
        double succe***atio = (double)sendSuccess / (sendFailure + sendSuccess);
        System.out.println("[INFO] 發(fā)送成功率="+String.format("%f", succe***atio * 100)+"%");
    }

    @Override
    public void configure(Map configs) {}
}

自定義的ProducerInterceptorDemo類實(shí)現(xiàn)之后就可以在Kafka Producer的主程序中指定,示例代碼如下:

public class ProducerMain {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "hidden-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);
        properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo");

        Producer producer = new KafkaProducer(properties);

        for(int i=0;i<100;i++) {
            ProducerRecord producerRecord = new ProducerRecord(topic, "msg-" + i);
            producer.send(producerRecord).get();
        }
        producer.close();
    }
}

Kafka Producer不僅可以指定一個(gè)攔截器,還可以指定多個(gè)攔截器以形成攔截鏈,這個(gè)攔截鏈會(huì)按照其中的攔截器的加入順序一一執(zhí)行。比如上面的程序多添加一個(gè)攔截器,示例如下:

properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo,com.hidden.producer.ProducerInterceptorDemoPlus");1

這樣Kafka Producer會(huì)先執(zhí)行攔截器ProducerInterceptorDemo,之后再執(zhí)行ProducerInterceptorDemoPlus。

有關(guān)interceptor.classes參數(shù),在kafka 1.0.0版本中的定義如下:

NAMEDESCRIPTIONTYPEDEFAULTVALID VALUESIMPORTANCE
interceptor.calssses A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there no interceptors. list null low

本文的重點(diǎn)是你有沒(méi)有收獲與成長(zhǎng),其余的都不重要,希望讀者們能謹(jǐn)記這一點(diǎn)。同時(shí)我經(jīng)過(guò)多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,redis,ActiveMQ、、Mycat、Netty、Kafka、MySQL、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個(gè)知識(shí)點(diǎn)高級(jí)進(jìn)階干貨,希望對(duì)想成為架構(gòu)師的朋友有一定的參考和幫助

需要更詳細(xì)思維導(dǎo)圖和以下資料的可以加一下技術(shù)交流分享群:“708 701 457”免費(fèi)獲取

Kafka Producer 攔截器
Kafka Producer 攔截器
Kafka Producer 攔截器
Kafka Producer 攔截器


文章標(biāo)題:KafkaProducer攔截器
URL分享:http://weahome.cn/article/ihjeoo.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部