真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

使用Reactor怎么實現(xiàn)一個Flink操作功能-創(chuàng)新互聯(lián)

這篇文章給大家介紹使用Reactor怎么實現(xiàn)一個Flink操作功能,內(nèi)容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

成都創(chuàng)新互聯(lián)公司提供做網(wǎng)站、成都做網(wǎng)站、網(wǎng)頁設(shè)計,高端網(wǎng)站設(shè)計,一元廣告等致力于企業(yè)網(wǎng)站建設(shè)與公司網(wǎng)站制作,十載的網(wǎng)站開發(fā)和建站經(jīng)驗,助力企業(yè)信息化建設(shè),成功案例突破上千余家,是您實現(xiàn)網(wǎng)站建設(shè)的好選擇.

實現(xiàn)過程

Flink對流式處理做的很好的封裝,使用Flink的時候幾乎不用關(guān)心線程池、積壓、數(shù)據(jù)丟失等問題,但是使用Reactor實現(xiàn)類似的功能就必須對Reactor運行原理比較了解,并且經(jīng)過不同場景下測試,否則很容易出問題。

下面列舉出實現(xiàn)過程中的核心點:

1、創(chuàng)建Flux和發(fā)送數(shù)據(jù)分離

入門Reactor的時候給的示例都是創(chuàng)建Flux的時候同時就把數(shù)據(jù)賦值了,比如:Flux.just、Flux.range等,從3.4.0版本后先創(chuàng)建Flux,再發(fā)送數(shù)據(jù)可使用Sinks完成。有兩個比較容易混淆的方法:

  • Sinks.many().multicast() 如果沒有訂閱者,那么接收的消息直接丟棄

  • Sinks.many().unicast() 如果沒有訂閱者,那么保存接收的消息直到第一個訂閱者訂閱

  • Sinks.many().replay() 不管有多少訂閱者,都保存所有消息


在此示例場景中,選擇的是Sinks.many().unicast()

官方文檔:/tupian/20230522/>

  • emitNext 指定提交失敗策略同步提交

  • tryEmitNext 異步提交,返回提交成功、失敗狀態(tài)


  • 在此場景我們不希望丟數(shù)據(jù),可自定義失敗策略,提交失敗無限重試,當(dāng)然也可以調(diào)用異步方法自己重試。

     Sinks.EmitFailureHandler ALWAYS_RETRY_HANDLER = (signalType, emitResult) -> emitResult.isFailure();

    在此之后就就可以調(diào)用Sinks.asFlux開心的使用各種操作符了。

    在此之后就就可以調(diào)用Sinks.asFlux開心的使用各種操作符了。

    3、窗口函數(shù)

    Reactor支持兩類窗口聚合函數(shù):

    • window類:返回Mono(Flux)

    • buffer類:返回List


    在此場景中,使用buffer即可滿足需求,bufferTimeout(int maxSize, Duration maxTime)支持較大個數(shù),較大等待時間操作,F(xiàn)link中的keys操作可以用groupBy、collectMap來實現(xiàn)。

    4、消費者處理

    Reactor經(jīng)過buffer后是一個一個的發(fā)送數(shù)據(jù),如果使用publishOn或subscribeOn處理的話,只等待下游的subscribe處理完成才會重新request新的數(shù)據(jù),buffer操作符才會重新發(fā)送數(shù)據(jù)。如果此時subscribe消費者耗時較長,數(shù)據(jù)流會在buffer流程阻塞,顯然并不是我們想要的。

    理想的操作是消費者在一個線程池里操作,可多線程并行處理,如果線程池滿,再阻塞buffer操作符。解決方案是自定義一個線程池,并且當(dāng)然線程池如果任務(wù)滿submit支持阻塞,可以用自定義RejectedExecutionHandler來實現(xiàn):

     RejectedExecutionHandler executionHandler = (r, executor) -> {
       try {
         executor.getQueue().put(r);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new RejectedExecutionException("Producer thread interrupted", e);
       }
     };
     
     new ThreadPoolExecutor(poolSize, poolSize,
         0L, TimeUnit.MILLISECONDS,
         new SynchronousQueue<>(),
         executionHandler);

    三、總結(jié)

    1、總結(jié)一下整體的執(zhí)行流程

    提交任務(wù):提交數(shù)據(jù)支持同步異步兩種方式,支持多線程提交,正常情況下響應(yīng)很快,同步的方法如果隊列滿則阻塞。
    豐富的操作符處理流式數(shù)據(jù)。
    buffer操作符產(chǎn)生的數(shù)據(jù)多線程處理:同步提交到單獨的消費者線程池,線程池任務(wù)滿則阻塞。
    消費者線程池:支持阻塞提交,保證不丟消息,同時隊列長度設(shè)置成0,因為前面已經(jīng)有隊列了。
    背壓:消費者線程池阻塞后,會背壓到buffer操作符,并背壓到緩沖隊列,緩存隊列滿背壓到數(shù)據(jù)提交者。

    2、和Flink的對比

    實現(xiàn)的Flink的功能:

    • 不輸Flink的豐富操作符

    • 支持背壓,不丟數(shù)據(jù)


    優(yōu)勢:

    • 輕量級,可直接在業(yè)務(wù)代碼中使用

    劣勢:

    • 內(nèi)部執(zhí)行流程復(fù)雜,容易踩坑,不如Flink傻瓜化

    • 沒有watermark功能,也就意味著只支持無序數(shù)據(jù)處理

    • 沒有savepoint功能,雖然我們用背壓解決了部分問題,但是宕機后開始會丟失緩存隊列和消費者線程池里的數(shù)據(jù),補救措施是添加Java Hook功能

    • 只支持單機,意味著你的緩存隊列不能設(shè)置無限大,要考慮線程池的大小,且沒有flink globalWindow等功能

    • 需考慮對上游數(shù)據(jù)源的影響,F(xiàn)link的上游一般是mq,數(shù)據(jù)量大時可自動堆積,如果本文的方案上游是http、rpc調(diào)用,產(chǎn)生的阻塞影響就不能忽略。補償方案是每次提交數(shù)據(jù)都使用異步方法,如果失敗則提交到mq中緩沖并消費該mq無限重試。

    關(guān)于使用Reactor怎么實現(xiàn)一個Flink操作功能就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。


    文章題目:使用Reactor怎么實現(xiàn)一個Flink操作功能-創(chuàng)新互聯(lián)
    新聞來源:http://weahome.cn/article/djddoe.html

    其他資訊

    在線咨詢

    微信咨詢

    電話咨詢

    028-86922220(工作日)

    18980820575(7×24)

    提交需求

    返回頂部