真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Flinkkafka定制技巧

動(dòng)態(tài)路由:
方案1: 定制一個(gè)特殊的KafkaDynamicSink,內(nèi)嵌多個(gè)原生的FlinkKafkaProducer,每個(gè)對(duì)應(yīng)一個(gè)下游的KAFKA隊(duì)列
在OPEN方法中讀取所有KAFKA渠道配置并構(gòu)建FlinkKafkaProducer并構(gòu)建一個(gè)Map: kafka channelId -> FlinkKafkaProducer

重載INVOKE方法
根據(jù)路由規(guī)則找到當(dāng)前流數(shù)據(jù)對(duì)應(yīng)所有的ChannelId (允許多個(gè)),再?gòu)腗AP中獲取對(duì) FlinkKafkaProducer 并調(diào)用其INVOKE方法

核心代碼:
public class DynamicKafkaSink extends RichSinkFunction {
    @Override
    public void open(Configuration parameters) throws Exception {
        List allChannels = channelRepository.getAll();
        for(ChannelModel nextChannel: allChannels) {
            FlinkKafkaProducer010 nextProducer = (FlinkKafkaProducer010)channelFactory.createChannelProducer(nextChannel,
            FlinkKafkaProducer010.class, Collections.emptyMap());
            nextProducer.setRuntimeContext(this.getRuntimeContext());
            nextProducer.open(parameters);
            producers.put(nextChannel.getChannelId(), nextProducer);
        }
    }
    
    @Override
    public void invoke(IN value) throws Exception {
        List channelIds = channelRouteStrategy.route(value);
        for (String nextChannelId: channelIds) {
            FlinkKafkaProducer010 nextProducer = producers.get(nextChannelId);
            nextProducer.invoke(converted);
        }
    }

}



注意:
Map不能在構(gòu)造函數(shù)中初始化,而要在OPEN方法中初始化,F(xiàn)LINK分布式特性決定了構(gòu)造函數(shù)和OPEN不在同一個(gè)JVM里執(zhí)行
類級(jí)別的變量需要可序列化,否則需要聲明為TRANSIENT

每個(gè)新構(gòu)建的FlinkKafkaProducer需要先調(diào)用
setRuntimeContext(this.getRuntimeContext())
再調(diào)用open 方法才能被使用


優(yōu)點(diǎn):
可以路由到不同的BROKER上的TOPIC,在不同的BROKER上隔離性更好

缺陷:
所有的FlinkKafkaProducer只在OPEN的時(shí)候創(chuàng)建一次,后面如果添加了新的KAFKA隊(duì)列無(wú)法被動(dòng)態(tài)感知并路由
更改了FlinkKafkaProducer創(chuàng)建和初始化的過(guò)程,從MAIN函數(shù)中轉(zhuǎn)到了KafkaDynamicSink的OPEN方法里,未經(jīng)過(guò)全面測(cè)試,可能存在問題


方案2:方案1的升級(jí)版,利用FLINK SPLIT STREAM的特性,根據(jù)路由規(guī)則將原生數(shù)據(jù)流分成多個(gè),每個(gè)子數(shù)據(jù)流對(duì)應(yīng)一個(gè)下游KAFKA隊(duì)列
在FLINK Main 函數(shù)中讀取所有KAFKA渠道配置并構(gòu)建FlinkKafkaProducer并構(gòu)建一個(gè)Map: kafka channelId -> FlinkKafkaProducer
在輸入流上構(gòu)建一個(gè)SplitStream, OutputSelector 中根據(jù)路由邏輯返回一組ChannelId
遍歷Map,對(duì)于Map中的每個(gè)Key (ChannelID) 調(diào)用 SplitStream 的 select方法獲取對(duì)應(yīng)的分支流數(shù)據(jù),然后路由到對(duì)應(yīng)的 FlinkKafkaProducer

核心代碼:
public static void main(String[] args) {
    List allChannels = channelRepository.getAll();
    for(ChannelModel nextChannel: allChannels) {
        FlinkKafkaProducer010 nextProducer = (FlinkKafkaProducer010)channelFactory.createChannelProducer(nextChannel,
        FlinkKafkaProducer010.class, Collections.emptyMap());
        nextProducer.setRuntimeContext(this.getRuntimeContext());
        nextProducer.open(parameters);
        producers.put(nextChannel.getChannelId(), nextProducer);
    }
    
    DataStreamSource source = ....
    SplitStream splitStream = source.split(new OutputSelector() {

        @Override
        public Iterable select(String value) {
            List channelIds = channelRouteStrategy.route(value);
            return channeIds;
        }
    });
    
    for(String nextChannel: producers.keySet()) {
        FlinkKafkaProducer010 target = producers.get(nextChannel);
        splitStream.select(nextChannel).addSink(target);
    }
}


優(yōu)點(diǎn):
可以路由到不同的BROKER上的TOPIC,在不同的BROKER上隔離性更好
完全利用FLINK原生的特性,更加簡(jiǎn)潔優(yōu)雅,解決了方案1的第二點(diǎn)不足

缺陷:
所有的FlinkKafkaProducer只在MAIN函數(shù)中創(chuàng)建一次,后面如果添加了新的KAFKA隊(duì)列無(wú)法被動(dòng)態(tài)感知并路由


方案3: 利用FLINK的 KeyedSerializationSchema中的getTargetTopic函數(shù),KeyedSerializationSchema 除了將對(duì)象轉(zhuǎn)化Kafka ProducerRecord
的鍵值對(duì)之外還可以動(dòng)態(tài)指定Topic
在FLINK Main 函數(shù)中將輸入流通過(guò)flatMap 轉(zhuǎn)化為 Tuple2, 其中key 是目標(biāo)所屬的Topic, value 是原生數(shù)據(jù)
實(shí)現(xiàn)一個(gè)KeyedSerializationSchema作為構(gòu)造函數(shù)傳給FlinkKafkaProducer,重載getTargetTopic方法: 返回 tuple2.f0

核心代碼:
class DynaRouteSerializationSchema implements KeyedSerializationSchema {
    
    String getTargetTopic(T element) {
        Tuple2 tuple = (Tuple2)element;
        return tuple.f0;
    }
}

public static void main(String[] args) {
    DataStreamSource source = ....
    DataStream> converted = source
    .flatMap(new RichFlatMapFunction>() {
        @Override
        public void flatMap(T value, Collector> out)
        throws Exception {
            List channelIds = channelRouteStrategy.route(value);
            for(String nextChannel: channelIds) {
                out.collect(Tuple2.valueOf(nextChannel, value));
            }
        }
    });
    
    

}


優(yōu)點(diǎn):
完全利用FLINK原生的特性,代碼量非常少
新增加的TOPIC也可以被路由到,不需要啟停流處理

缺陷:
無(wú)法像前兩個(gè)方案實(shí)現(xiàn)Broker級(jí)別的路由,只能做到Topic級(jí)別的路由


斷流功能:

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來(lái)自于我們對(duì)這個(gè)行業(yè)的熱愛。我們立志把好的技術(shù)通過(guò)有效、簡(jiǎn)單的方式提供給客戶,將通過(guò)不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長(zhǎng)期合作伙伴,公司提供的服務(wù)項(xiàng)目有:主機(jī)域名虛擬主機(jī)、營(yíng)銷軟件、網(wǎng)站建設(shè)、曹妃甸網(wǎng)站維護(hù)、網(wǎng)站推廣。

有時(shí)系統(tǒng)升級(jí)或者其他組件不可用,需要暫時(shí)停止KAFKA PRODUCER
FLINK 原生機(jī)制:
被動(dòng)反壓:
Kafka09Fetcher 包含了一根獨(dú)立的 KafkaConsumerThread,從KAFKA中讀取數(shù)據(jù),再交給HANDOVER
HANDOVER可以理解為一個(gè)大小為1的隊(duì)列, Kafka09Fetcher 再?gòu)年?duì)列中獲取并處理數(shù)據(jù),一旦當(dāng)處理速度變慢,KafkaConsumerThread
無(wú)法將數(shù)據(jù)寫入HANDOVER, 線程就會(huì)被阻塞

另外KeyedDeserializationSchema定義了一個(gè)isEndOfStream方法,如果返回true, Kafka09Fetcher就會(huì)停止循環(huán)并退出,導(dǎo)致整個(gè)流處理結(jié)束

設(shè)計(jì)思路:

SignalService:  注冊(cè)SignalListener, 利用Curator TreeCache 監(jiān)聽一個(gè)Zookeeper 路徑獲取起動(dòng)/停止流處理的信號(hào)量

SignalListener: 接收Z(yǔ)OOKEEPER變更信息的回調(diào)接口

PausableKafkaFetcher: 繼承Flink原生的KafkaFetcher, 監(jiān)聽到信號(hào)變化阻塞ConsumerThread的處理

PausableKafkaConsumer: 繼承Flink原生的KafkaConsumer, 創(chuàng)建PausableKafkaFetcher

核心代碼:

public class PausableKafkaFetcher extends Kafka010Fetcher implements SignalListener {

    private final ReentrantLock pauseLock = new ReentrantLock(true);

    private final Condition pauseCond = pauseLock.newCondition();

    private volatile boolean paused = false;

   

   public void onSignal(String path, String value) {

       try {

            pauseLock.lockInterruptibly();

       } catch(InterruptedException e) {

       }

       try {

           if (SIGNAL_PAUSE.equals(value)) {

               paused = true;

           } else if (SIGNAL_START.equals(value)) {

               paused = false;

           }

           pauseCond.signal(); 

       }

       finally {

           pauseLock.unlock();

       } 

   }

   protected void emitRecord(T record, KafkaTopicPartitionState partition, long offset, ConsumerRecord consumerRecord) throws Exception {

      super.emitRecord(record, partition, offset, consumerRecord);

      pauseLock.lockInterruptibly();

      try {

         while (paused) {

            pauseCond.await();

         }

      } finally {

         pauseLock.unlock();

      }

  }

}

public class PausableKafkaConsumer extends FlinkKafkaConsumer010 {

     public void open(Configuration configuration) {

        signalService = ZKSignalService.getInstance();

        signalService.initialize(zkConfig);

     }

 

     public void cancel() {

         super.cancel();

         unregisterSignal();

     }   

 

     public void close() {

        super.close();

        unregisterSignal();

     }

     private void unregisterSignal() {

         if (signalService != null) {

            String fullPath = WATCH_PREFIX + "/" + watchPath;

            signalService.unregisterSignalListener(fullPath);

         }

     }    

     protected AbstractFetcher createFetcher(...) throws  Exception {

        PausableKafkaFetcher fetcher = new PausableKafkaFetcher<> (...);

        if (signalService != null) {

            String fullPath = WATCH_PREFIX + "/" + watchPath;

            signalService.registerSignalListener(fullPath, fetcher);

        }

        return fetcher

     }

}


當(dāng)前名稱:Flinkkafka定制技巧
URL分享:http://weahome.cn/article/pessei.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部