真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Disruptor分析-創(chuàng)新互聯(lián)

什么是 Disruptor?

Disruptor是一個(gè)高性能的異步處理框架,或者可以認(rèn)為是最快的消息框架(輕量的JMS),也可以認(rèn)為是一個(gè)觀察者模式的實(shí)現(xiàn),或者事件監(jiān)聽(tīng)模式的實(shí)現(xiàn)

目前創(chuàng)新互聯(lián)建站已為上千的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)站空間、網(wǎng)站托管、服務(wù)器托管、企業(yè)網(wǎng)站設(shè)計(jì)、閔行網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。

性能遠(yuǎn)遠(yuǎn)高于傳統(tǒng)的BlockingQueue容器

Disruptor使用觀察者模式,主動(dòng)將消息發(fā)送給消費(fèi)者,而不是等消費(fèi)者從隊(duì)列中取,在無(wú)鎖的情況下, 實(shí)現(xiàn)queue(環(huán)形, RingBuffer)的并發(fā)操作, 性能遠(yuǎn)高于BlockingQueue

Disruptor 的設(shè)計(jì)思想

環(huán)形數(shù)組結(jié)構(gòu)

為了避免垃圾回收,使用數(shù)組,數(shù)組對(duì)處理器的緩存機(jī)制更加友好

數(shù)組長(zhǎng)度為 2^n,通過(guò)位運(yùn)算,加快定位速度,下標(biāo)采用遞增的方式,不用擔(dān)心索引溢出

無(wú)鎖設(shè)計(jì)

每個(gè)生產(chǎn)者或者消費(fèi)者線程,會(huì)先申請(qǐng)可以操作的元素在數(shù)組中的位置,申請(qǐng)到之后,直接在該位置寫(xiě)入或者讀取數(shù)據(jù)

Disruptor 實(shí)現(xiàn)生產(chǎn)消費(fèi)模型

pom


    com.lmax
    disruptor
    3.2.1

LongEvent

// 聲明一個(gè)Event來(lái)包含需要傳遞的數(shù)據(jù)
public class LongEvent {
    private Long value;

    public Long getValue() {
        return value;
    }

    public void setValue(Long value) {
        this.value = value;
    }
}

LongEventFactory

// Event工廠
public class LongEventFactory implements EventFactory {

    public LongEvent newInstance() {
        return new LongEvent();
    }
}

LongEventHandler

// 事件消費(fèi)者
public class LongEventHandler implements EventHandler {

    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("消費(fèi)者:"+event.getValue());
    }
}

LongEventProducer

public class LongEventProducer {
    private RingBuffer ringBuffer;

    public LongEventProducer(RingBuffer ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer byteBuffer) {
        // 獲取事件隊(duì)列下標(biāo)位置
        long sequence = ringBuffer.next();
        try {
            // 取出空隊(duì)列
            LongEvent longEvent = ringBuffer.get(sequence);
            // 賦值
            longEvent.setValue(byteBuffer.getLong(0));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("生產(chǎn)者發(fā)送數(shù)據(jù)。。。");
            // 發(fā)送數(shù)據(jù)
            ringBuffer.publish(sequence);
        }
    }
}

Main

public class Main {
    public static void main(String[] args) {
        // 創(chuàng)建可緩存線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 創(chuàng)建工廠
        EventFactory eventFactory = new LongEventFactory();
        // 創(chuàng)建ringBufferSize
        int ringBufferSize = 1024 * 1024;
        // 創(chuàng)建disruptor
        // MULTI表示可以多個(gè)生產(chǎn)者
        Disruptor longEventDisruptor = new Disruptor(eventFactory, ringBufferSize, executorService, ProducerType.MULTI, new YieldingWaitStrategy());
        // 注冊(cè)消費(fèi)者
        longEventDisruptor.handleEventsWith(new LongEventHandler());
        // 啟動(dòng)
        longEventDisruptor.start();
        // 創(chuàng)建RingBuffer容器
        RingBuffer ringBuffer = longEventDisruptor.getRingBuffer();
        // 創(chuàng)建生產(chǎn)者
        LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);
        // 指定緩沖區(qū)大小
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (int i = 0; i < 100; i++) {
            byteBuffer.putLong(0, i);
            longEventProducer.onData(byteBuffer);
        }
        executorService.shutdown();
        longEventDisruptor.shutdown();
    }
}

什么是 RingBuffer

它是一個(gè)環(huán)(首尾相接的環(huán)),作用是存儲(chǔ)數(shù)據(jù),實(shí)現(xiàn)不同線程之間的數(shù)據(jù)傳輸

Disruptor 分析

RingBuffer 每塊區(qū)是擁有一個(gè)序號(hào)的,這個(gè)序號(hào)指向環(huán)形數(shù)組結(jié)構(gòu)的下一個(gè)可用元素

Disruptor 分析

隨著不斷地寫(xiě)進(jìn)了填充這個(gè)圓環(huán),這個(gè)指針序號(hào)會(huì)不斷地遞增,直到繞過(guò)這個(gè)環(huán)

Disruptor 分析

如果圓環(huán)滿了,它會(huì)將金數(shù)據(jù)覆蓋,如上圖:現(xiàn)在12的區(qū)域的下個(gè)區(qū)域目前是3,如果有新的數(shù)據(jù)到來(lái),那么指針往下移的時(shí)候就會(huì)把區(qū)域3的數(shù)據(jù)給覆蓋變成13,框架提供了一系列幫助我們平行消費(fèi)的監(jiān)控,會(huì)很好的控制生產(chǎn)者和消費(fèi)者之間的速度,從而達(dá)到生產(chǎn)和消費(fèi)之間的平衡

RingBuffer 為什么效率高?

采用數(shù)組,數(shù)組支持索引訪問(wèn)

數(shù)組的內(nèi)存分配是預(yù)先加載的,一但指定大小創(chuàng)建后,就一直存在,這也意味著不需要花大量的時(shí)間做垃圾回收,而阻塞隊(duì)列采用鏈表實(shí)現(xiàn),需要不斷的刪除、創(chuàng)建節(jié)點(diǎn)

Disruptor的核心概念

  • RingBuffer:Disruptor 底層數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn),核心類,是線程間交換數(shù)據(jù)的中轉(zhuǎn)地
  • Sequence:序號(hào),聲明一個(gè)序號(hào),用于跟蹤 ringbuffer 中任務(wù)的變化和消費(fèi)者的消費(fèi)情況

  • Sequencer:生產(chǎn)者與緩存 RingBuffer 之間的橋梁,單生產(chǎn)者與多生產(chǎn)者分別對(duì)應(yīng)于兩個(gè)實(shí)現(xiàn)SingleProducerSequencer 與 MultiProducerSequencer,Sequencer 用于向 RingBuffer 申請(qǐng)空間,使用 publish 方法通過(guò) waitStrategy 通知所有在等待可消費(fèi)事件的 SequenceBarrie
  • SequenceBarrier:序號(hào)柵欄,管理和協(xié)調(diào)生產(chǎn)者的游標(biāo)序號(hào)和各個(gè)消費(fèi)者的序號(hào),確保生產(chǎn)者不會(huì)覆蓋消費(fèi)者未來(lái)得及處理的消息,確保存在依賴的消費(fèi)者之間能夠按照正確的順序處理
  • WaitStrategy:有多種實(shí)現(xiàn),用以表示當(dāng)無(wú)可消費(fèi)事件時(shí),消費(fèi)者的等待策略

  • Event:消費(fèi)事件

  • EventProcessor:事件處理器,監(jiān)聽(tīng) RingBuffer 的事件,并消費(fèi)可用事件,從 RingBuffer 讀取的事件會(huì)交由實(shí)際的生產(chǎn)者實(shí)現(xiàn)類來(lái)消費(fèi),它會(huì)一直偵聽(tīng)下一個(gè)可用的序號(hào),直到該序號(hào)對(duì)應(yīng)的事件已經(jīng)準(zhǔn)備好

  • EventHandler:業(yè)務(wù)處理器,是實(shí)際消費(fèi)者的接口,完成具體的業(yè)務(wù)邏輯實(shí)現(xiàn),第三方實(shí)現(xiàn)該接口,代表著消費(fèi)者

  • Producer:生產(chǎn)者接口,第三方線程充當(dāng)該角色,producer 向 RingBuffer 寫(xiě)入事件

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。


文章標(biāo)題:Disruptor分析-創(chuàng)新互聯(lián)
網(wǎng)頁(yè)URL:http://weahome.cn/article/csseih.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部