這篇文章給大家分享的是有關(guān)Flink如何實(shí)現(xiàn)雙流 join的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過來看看吧。
成都創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價(jià)比城中網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫(kù),直接使用。一站式城中網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋城中地區(qū)。費(fèi)用合理售后完善,十余年實(shí)體公司更值得信賴。
在數(shù)據(jù)庫(kù)中的靜態(tài)表上做 OLAP 分析時(shí),兩表 join 是非常常見的操作。同理,在流式處理作業(yè)中,有時(shí)也需要在兩條流上做 join 以獲得更豐富的信息。Flink DataStream API 為用戶提供了3個(gè)算子來實(shí)現(xiàn)雙流 join,分別是:
join()
coGroup()
intervalJoin()
從 Kafka 分別接入點(diǎn)擊流和訂單流,并轉(zhuǎn)化為 POJO。
DataStreamclickSourceStream = env .addSource(new FlinkKafkaConsumer011<>( "ods_analytics_access_log", new SimpleStringSchema(), kafkaProps ).setStartFromLatest()); DataStream orderSourceStream = env .addSource(new FlinkKafkaConsumer011<>( "ods_ms_order_done", new SimpleStringSchema(), kafkaProps ).setStartFromLatest()); DataStream clickRecordStream = clickSourceStream .map(message -> JSON.parseObject(message, AnalyticsAccessLogRecord.class)); DataStream orderRecordStream = orderSourceStream .map(message -> JSON.parseObject(message, OrderDoneLogRecord.class));
join() 算子提供的語(yǔ)義為"Window join",即按照指定字段和(滾動(dòng)/滑動(dòng)/會(huì)話)窗口進(jìn)行 inner join,支持處理時(shí)間和事件時(shí)間兩種時(shí)間特征。以下示例以10秒滾動(dòng)窗口,將兩個(gè)流通過商品 ID 關(guān)聯(lián),取得訂單流中的售價(jià)相關(guān)字段。
clickRecordStream .join(orderRecordStream) .where(record -> record.getMerchandiseId()) .equalTo(record -> record.getMerchandiseId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .apply(new JoinFunction() { @Override public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception { return StringUtils.join(Arrays.asList( accessRecord.getMerchandiseId(), orderRecord.getPrice(), orderRecord.getCouponMoney(), orderRecord.getRebateAmount() ), '\t'); } }) .print().setParallelism(1);
簡(jiǎn)單易用。
只有 inner join 肯定還不夠,如何實(shí)現(xiàn) left/right outer join 呢?答案就是利用 coGroup() 算子。它的調(diào)用方式類似于 join() 算子,也需要開窗,但是 CoGroupFunction 比 JoinFunction 更加靈活,可以按照用戶指定的邏輯匹配左流和/或右流的數(shù)據(jù)并輸出。
以下的例子就實(shí)現(xiàn)了點(diǎn)擊流 left join 訂單流的功能,是很樸素的 nested loop join 思想(二重循環(huán))。
clickRecordStream .coGroup(orderRecordStream) .where(record -> record.getMerchandiseId()) .equalTo(record -> record.getMerchandiseId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .apply(new CoGroupFunction>() { @Override public void coGroup(Iterable accessRecords, Iterable orderRecords, Collector > collector) throws Exception { for (AnalyticsAccessLogRecord accessRecord : accessRecords) { boolean isMatched = false; for (OrderDoneLogRecord orderRecord : orderRecords) { // 右流中有對(duì)應(yīng)的記錄 collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice())); isMatched = true; } if (!isMatched) { // 右流中沒有對(duì)應(yīng)的記錄 collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null)); } } } }) .print().setParallelism(1);
join() 和 coGroup() 都是基于窗口做關(guān)聯(lián)的。但是在某些情況下,兩條流的數(shù)據(jù)步調(diào)未必一致。例如,訂單流的數(shù)據(jù)有可能在點(diǎn)擊流的購(gòu)買動(dòng)作發(fā)生之后很久才被寫入,如果用窗口來圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的語(yǔ)義,按照指定字段以及右流相對(duì)左流偏移的時(shí)間區(qū)間進(jìn)行關(guān)聯(lián),即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]
interval join 也是 inner join,雖然不需要開窗,但是需要用戶指定偏移區(qū)間的上下界,并且只支持事件時(shí)間。
示例代碼如下。注意在運(yùn)行之前,需要分別在兩個(gè)流上應(yīng)用 assignTimestampsAndWatermarks() 方法獲取事件時(shí)間戳和水印。
clickRecordStream .keyBy(record -> record.getMerchandiseId()) .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId())) .between(Time.seconds(-30), Time.seconds(30)) .process(new ProcessJoinFunction() { @Override public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector collector) throws Exception { collector.collect(StringUtils.join(Arrays.asList( accessRecord.getMerchandiseId(), orderRecord.getPrice(), orderRecord.getCouponMoney(), orderRecord.getRebateAmount() ), '\t')); } }) .print().setParallelism(1);
由上可見,interval join 與 window join 不同,是兩個(gè) KeyedStream 之上的操作,并且需要調(diào)用 between() 方法指定偏移區(qū)間的上下界。如果想令上下界是開區(qū)間,可以調(diào)用 upperBoundExclusive()/lowerBoundExclusive() 方法。
以下是 KeyedStream.process(ProcessJoinFunction) 方法調(diào)用的重載方法的邏輯。
publicSingleOutputStreamOperator process( ProcessJoinFunction processJoinFunction, TypeInformation outputType) { Preconditions.checkNotNull(processJoinFunction); Preconditions.checkNotNull(outputType); final ProcessJoinFunction cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction); final IntervalJoinOperator operator = new IntervalJoinOperator<>( lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, left.getType().createSerializer(left.getExecutionConfig()), right.getType().createSerializer(right.getExecutionConfig()), cleanedUdf ); return left .connect(right) .keyBy(keySelector1, keySelector2) .transform("Interval Join", outputType, operator); }
可見是先對(duì)兩條流執(zhí)行 connect() 和 keyBy() 操作,然后利用 IntervalJoinOperator 算子進(jìn)行轉(zhuǎn)換。在 IntervalJoinOperator 中,會(huì)利用兩個(gè) MapState 分別緩存左流和右流的數(shù)據(jù)。
private transient MapState>> leftBuffer; private transient MapState >> rightBuffer; @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( LEFT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer)) )); this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( RIGHT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer)) )); }
其中 Long 表示事件時(shí)間戳,List> 表示該時(shí)刻到來的數(shù)據(jù)記錄。當(dāng)左流和右流有數(shù)據(jù)到達(dá)時(shí),會(huì)分別調(diào)用 processElement1() 和 processElement2() 方法,它們都調(diào)用了 processElement() 方法,代碼如下。
@Override public void processElement1(StreamRecordrecord) throws Exception { processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true); } @Override public void processElement2(StreamRecord record) throws Exception { processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false); } @SuppressWarnings("unchecked") private void processElement( final StreamRecord record, final MapState >> ourBuffer, final MapState >> otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft) throws Exception { final THIS ourValue = record.getValue(); final long ourTimestamp = record.getTimestamp(); if (ourTimestamp == Long.MIN_VALUE) { throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " + "interval stream joins need to have timestamps meaningful timestamps."); } if (isLate(ourTimestamp)) { return; } addToBuffer(ourBuffer, ourValue, ourTimestamp); for (Map.Entry >> bucket: otherBuffer.entries()) { final long timestamp = bucket.getKey(); if (timestamp < ourTimestamp + relativeLowerBound || timestamp > ourTimestamp + relativeUpperBound) { continue; } for (BufferEntry entry: bucket.getValue()) { if (isLeft) { collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp); } else { collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp); } } } long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp; if (isLeft) { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); } else { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); } }
這段代碼的思路是:
取得當(dāng)前流 StreamRecord 的時(shí)間戳,調(diào)用 isLate() 方法判斷它是否是遲到數(shù)據(jù)(即時(shí)間戳小于當(dāng)前水印值),如是則丟棄。
調(diào)用 addToBuffer() 方法,將時(shí)間戳和數(shù)據(jù)一起插入當(dāng)前流對(duì)應(yīng)的 MapState。
遍歷另外一個(gè)流的 MapState,如果數(shù)據(jù)滿足前述的時(shí)間區(qū)間條件,則調(diào)用 collect() 方法將該條數(shù)據(jù)投遞給用戶定義的 ProcessJoinFunction 進(jìn)行處理。collect() 方法的代碼如下,注意結(jié)果對(duì)應(yīng)的時(shí)間戳是左右流時(shí)間戳里較大的那個(gè)。
private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception { final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); collector.setAbsoluteTimestamp(resultTimestamp); context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp); userFunction.processElement(left, right, context, collector); }
調(diào)用 TimerService.registerEventTimeTimer() 注冊(cè)時(shí)間戳為 timestamp + relativeUpperBound 的定時(shí)器,該定時(shí)器負(fù)責(zé)在水印超過區(qū)間的上界時(shí)執(zhí)行狀態(tài)的清理邏輯,防止數(shù)據(jù)堆積。注意左右流的定時(shí)器所屬的 namespace 是不同的,具體邏輯則位于 onEventTime() 方法中。
@Override public void onEventTime(InternalTimertimer) throws Exception { long timerTimestamp = timer.getTimestamp(); String namespace = timer.getNamespace(); logger.trace("onEventTime @ {}", timerTimestamp); switch (namespace) { case CLEANUP_NAMESPACE_LEFT: { long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound; logger.trace("Removing from left buffer @ {}", timestamp); leftBuffer.remove(timestamp); break; } case CLEANUP_NAMESPACE_RIGHT: { long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp; logger.trace("Removing from right buffer @ {}", timestamp); rightBuffer.remove(timestamp); break; } default: throw new RuntimeException("Invalid namespace " + namespace); } }
感謝各位的閱讀!關(guān)于“Flink如何實(shí)現(xiàn)雙流 join”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!