真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Disruptor、Kafka、Netty如何整合

這篇文章主要介紹了Disruptor、Kafka、Netty如何整合,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

創(chuàng)新互聯(lián)主營(yíng)鳳泉網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營(yíng)網(wǎng)站建設(shè)方案,成都app開發(fā),鳳泉h5小程序設(shè)計(jì)搭建,鳳泉網(wǎng)站營(yíng)銷推廣歡迎鳳泉等地區(qū)企業(yè)咨詢

NETTY應(yīng)用網(wǎng)關(guān)

整個(gè)網(wǎng)關(guān)的核心是一個(gè)netty server,各個(gè)應(yīng)用程序(包括web server,手機(jī)app等)連到這個(gè)netty server上請(qǐng)求數(shù)據(jù);關(guān)于數(shù)據(jù)來源,需要監(jiān)聽多個(gè)kafka topic(而且這里的topic是可變的,也就是說需要kafka consumer的動(dòng)態(tài)開始和停止),之后需要把所有這些topic的數(shù)據(jù)整合在一起,通過channel發(fā)送給客戶端應(yīng)用程序。

數(shù)據(jù)流圖

Disruptor、Kafka、Netty如何整合

源碼

下面把大部分的代碼貼出來,有需要的同學(xué)可以參考。會(huì)對(duì)關(guān)鍵的技術(shù)點(diǎn)進(jìn)行說明,偏業(yè)務(wù)部分大家自行忽略吧。

main函數(shù)

啟動(dòng)disruptor;監(jiān)聽一個(gè)固定的topic,把獲取到的msg,交給ConsumerProcessorGroup來完成kafka consumer的創(chuàng)建和停止。

public static void main(String[] args) {
        DisruptorHelper.getInstance().start();
        Properties props = ConsumerProps.getConsumerProps();
        KafkaConsumer consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList("uavlst"));
        while (true) {
            ConsumerRecords records = consumer.poll(100);
            ConsumerRecord lastRecord = null;
            for (ConsumerRecord record : records)
                lastRecord = record;

            if (lastRecord != null){
                ConsumerProcessorGroup.getInstance().recieveNewUavLst(lastRecord.value());
            }
        }
    }

DisruptorHelper

DisruptorHelper是一個(gè)單例,主要是包含了一個(gè)disruptor 對(duì)象,在new這個(gè)對(duì)象的時(shí)候,用到了ProducerType.MULTI和new BlockingWaitStrategy(),其中前者意味著我們需要多個(gè)producer共同來工作,后者其實(shí)是默認(rèn)的producer的等待策略,后續(xù)根據(jù)實(shí)際情況進(jìn)行調(diào)整。

public class DisruptorHelper {
    private static DisruptorHelper instance = null;

    public static DisruptorHelper getInstance() {
        if (instance == null) {
            instance = new DisruptorHelper();
        }
        return instance;
    }

    private final int BUFFER_SIZE = 1024;
    private Disruptor disruptor = null;

    private DisruptorHelper() {
        MsgEventHandler eventHandler = new MsgEventHandler();
        disruptor = new Disruptor(new MsgEventFactory(), BUFFER_SIZE, new ConsumerThreadFactory(), ProducerType.MULTI, new BlockingWaitStrategy());
        disruptor.handleEventsWith(eventHandler);
    }

    public void start() {
        disruptor.start();
    }

    public void shutdown() {
        disruptor.shutdown();
    }

    public void produce(ConsumerRecord record) {
        RingBuffer ringBuffer = disruptor.getRingBuffer();
        long sequence = ringBuffer.next();
        try {
            ringBuffer.get(sequence).setRecord(record);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

ConsumerProcessorGroup

ConsumerProcessorGroup是一個(gè)單例,當(dāng)中包含一個(gè)fixedThreadPool,動(dòng)態(tài)的啟動(dòng)線程來進(jìn)行kafka topic的消費(fèi)。

public class ConsumerProcessorGroup {
    private static ConsumerProcessorGroup instance = null;

    public static ConsumerProcessorGroup getInstance(){
        if (instance == null){
            instance = new ConsumerProcessorGroup();
        }
        return instance;
    }

    private ConsumerProcessorGroup() {

    }

    private ExecutorService fixedThreadPool = Executors.newFixedThreadPool(20);

    public List uavIDLst = new Vector();

    public void recieveNewUavLst(String uavIDs){
        List newUavIDs = Arrays.asList(uavIDs.split(","));
        for (String uavID : newUavIDs){
            if (!uavIDLst.contains(uavID)){
                fixedThreadPool.execute(new ConsumerThread(uavID));
                uavIDLst.add(uavID);
            }
        }
        List tmpLstForDel = new ArrayList();
        for (String uavID : uavIDLst){
            if (!newUavIDs.contains(uavID)){
                tmpLstForDel.add(uavID);
            }
        }
        uavIDLst.removeAll(tmpLstForDel);
    }
}

ConsumerThread

對(duì)kafka topic進(jìn)行消費(fèi),通過DisruptorHelper將獲取的record寫入disruptor的ring buffer當(dāng)中。

public class ConsumerThread implements Runnable {
    private String uavID;

    public ConsumerThread(String uavID) {
        this.uavID = uavID;
    }

    public void run() {
        Properties props = ConsumerProps.getConsumerProps();
        KafkaConsumer consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList(uavID));
        System.out.println(uavID + " consumer started! Current thread id is " + Thread.currentThread().getId());
        while (ConsumerProcessorGroup.getInstance().uavIDLst.contains(uavID)) {
            ConsumerRecords records = consumer.poll(100);
            for (ConsumerRecord record : records){
                DisruptorHelper.getInstance().produce(record);
            }
        }
        System.out.println(uavID + " consumer finished! Current thread id is " + Thread.currentThread().getId());
    }
}

MsgEventHandler

Disruptor的消費(fèi)者,依次從Ring Buffer當(dāng)中讀取數(shù)據(jù)并執(zhí)行相應(yīng)的處理。

public class MsgEventHandler implements EventHandler {
    private Map converterMap;

    public void onEvent(MsgEvent event, long sequence, boolean endOfBatch) throws Exception {
        ConsumerRecord record = event.getRecord();
        System.out.printf("topic = %s, part = %d, offset = %d, key = %s, value = %s \n\r", record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
}

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Disruptor、Kafka、Netty如何整合”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來學(xué)習(xí)!


網(wǎng)站欄目:Disruptor、Kafka、Netty如何整合
當(dāng)前路徑:http://weahome.cn/article/geosec.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部