真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯網站制作重慶分公司

Flink實現Kafka到Mysql的Exactly-Once

一、背景

10年積累的成都網站制作、網站設計經驗,可以快速應對客戶對網站的新想法和需求。提供各種問題對應的解決方案。讓選擇我們的客戶得到更好、更有力的網絡服務。我雖然不認識你,你也不認識我。但先網站制作后付款的網站建設流程,更有泰州免費網站建設讓你可以放心的選擇與我們合作。

? ? ? ?最近項目中使用Flink消費kafka消息,并將消費的消息存儲到MySQL中,看似一個很簡單的需求,在網上也有很多flink消費kafka的例子,但看了一圈也沒看到能解決重復消費的問題的文章,于是在flink官網中搜索此類場景的處理方式,發(fā)現官網也沒有實現flink到mysql的Exactly-Once例子,但是官網卻有類似的例子來解決端到端的僅一次消費問題。這個現成的例子就是FlinkKafkaProducer011這個類,它保證了通過FlinkKafkaProducer011發(fā)送到kafka的消息是Exactly-Once的,主要的實現方式就是繼承了TwoPhaseCommitSinkFunction這個類,關于TwoPhaseCommitSinkFunction這個類的作用可以先看上一篇文章https://blog.51cto.com/simplelife/2401411。

二、實現思想

? ? ? 這里簡單說下這個類的作用就是實現這個類的方法:beginTransaction、preCommit、commit、abort,達到事件(preCommit)預提交的邏輯(當事件進行自己的邏輯處理后進行預提交,如果預提交成功之后才進行真正的(commit)提交,如果預提交失敗則調用abort方法進行事件的回滾操作),結合flink的checkpoint機制,來保存topic中partition的offset。

達到的效果我舉個例子來說明下:比如checkpoint每10s進行一次,此時用FlinkKafkaConsumer011實時消費kafka中的消息,消費并處理完消息后,進行一次預提交數據庫的操作,如果預提交沒有問題,10s后進行真正的插入數據庫操作,如果插入成功,進行一次checkpoint,flink會自動記錄消費的offset,可以將checkpoint保存的數據放到hdfs中,如果預提交出錯,比如在5s的時候出錯了,此時Flink程序就會進入不斷的重啟中,重啟的策略可以在配置中設置,當然下一次的checkpoint也不會做了,checkpoint記錄的還是上一次成功消費的offset,本次消費的數據因為在checkpoint期間,消費成功,但是預提交過程中失敗了,注意此時數據并沒有真正的執(zhí)行插入操作,因為預提交(preCommit)失敗,提交(commit)過程也不會發(fā)生了。等你將異常數據處理完成之后,再重新啟動這個Flink程序,它會自動從上一次成功的checkpoint中繼續(xù)消費數據,以此來達到Kafka到Mysql的Exactly-Once。

三、具體實現代碼三個類

1、StreamDemoKafka2Mysql.java

package?com.fwmagic.flink.streaming;

import?com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink;
import?org.apache.flink.runtime.state.filesystem.FsStateBackend;
import?org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import?org.apache.flink.streaming.api.CheckpointingMode;
import?org.apache.flink.streaming.api.datastream.DataStreamSource;
import?org.apache.flink.streaming.api.environment.CheckpointConfig;
import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import?org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import?org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import?org.apache.kafka.clients.consumer.ConsumerConfig;

import?java.util.Properties;

/**
?*?消費kafka消息,sink(自定義)到mysql中,保證kafka?to?mysql的Exactly-Once
?*/
@SuppressWarnings("all")
public?class?StreamDemoKafka2Mysql?{

????public?static?void?main(String[]?args)?throws?Exception?{
????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();

????????//設置并行度,為了方便測試,查看消息的順序,這里設置為1,可以更改為多并行度
????????env.setParallelism(1);
????????//checkpoint設置
????????//每隔10s進行啟動一個檢查點【設置checkpoint的周期】
????????env.enableCheckpointing(10000);
????????//設置模式為:exactly_one,僅一次語義
????????env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
????????//確保檢查點之間有1s的時間間隔【checkpoint最小間隔】
????????env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
????????//檢查點必須在10s之內完成,或者被丟棄【checkpoint超時時間】
????????env.getCheckpointConfig().setCheckpointTimeout(10000);
????????//同一時間只允許進行一次檢查點
????????env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
????????//表示一旦Flink程序被cancel后,會保留checkpoint數據,以便根據實際需要恢復到指定的checkpoint
????????//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
????????//設置statebackend,將檢查點保存在hdfs上面,默認保存在內存中。這里先保存到本地
????????env.setStateBackend(new?FsStateBackend("file:///Users/temp/cp/"));
????????//設置kafka消費參數
????????Properties?props?=?new?Properties();
????????props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,?"hd1:9092,hd2:9092,hd3:9092");
????????props.put(ConsumerConfig.GROUP_ID_CONFIG,?"flink-consumer-group1");
????????//kafka分區(qū)自動發(fā)現周期
????????props.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,?"3000");

????????/*SimpleStringSchema可以獲取到kafka消息,JSONKeyValueDeserializationSchema可以獲取都消息的key,value,metadata:topic,partition,offset等信息*/
????????//?FlinkKafkaConsumer011?kafkaConsumer011?=?new?FlinkKafkaConsumer011<>(topic,?new?SimpleStringSchema(),?props);
????????FlinkKafkaConsumer011?kafkaConsumer011?=?new?FlinkKafkaConsumer011<>("demo123",?new?JSONKeyValueDeserializationSchema(true),?props);

????????//加入kafka數據源
????????DataStreamSource?streamSource?=?env.addSource(kafkaConsumer011);
????????//數據傳輸到下游
????????streamSource.addSink(new?MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink");
????????//觸發(fā)執(zhí)行
????????env.execute(StreamDemoKafka2Mysql.class.getName());

????}
}

2、MySqlTwoPhaseCommitSink.java

package?com.fwmagic.flink.sink;

import?com.fwmagic.flink.util.DBConnectUtil;
import?org.apache.flink.api.common.ExecutionConfig;
import?org.apache.flink.api.common.typeutils.base.VoidSerializer;
import?org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import?org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import?org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;

import?java.sql.Connection;
import?java.sql.PreparedStatement;
import?java.sql.Timestamp;
import?java.text.SimpleDateFormat;
import?java.util.Date;

/**
?*?自定義kafka?to?mysql,繼承TwoPhaseCommitSinkFunction,實現兩階段提交。
?*?功能:保證kafak?to?mysql?的Exactly-Once
?*/
public?class?MySqlTwoPhaseCommitSink?extends?TwoPhaseCommitSinkFunction?{

????public?MySqlTwoPhaseCommitSink()?{
????????super(new?KryoSerializer<>(Connection.class,?new?ExecutionConfig()),?VoidSerializer.INSTANCE);
????}

????/**
?????*?執(zhí)行數據入庫操作
?????*?@param?connection
?????*?@param?objectNode
?????*?@param?context
?????*?@throws?Exception
?????*/
????@Override
????protected?void?invoke(Connection?connection,?ObjectNode?objectNode,?Context?context)?throws?Exception?{
????????System.err.println("start?invoke.......");
????????String?date?=?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss").format(new?Date());
????????System.err.println("===>date:"?+?date?+?"?"?+?objectNode);
????????String?value?=?objectNode.get("value").toString();
????????String?sql?=?"insert?into?`t_test`?(`value`,`insert_time`)?values?(?,?)";
????????PreparedStatement?ps?=?connection.prepareStatement(sql);
????????ps.setString(1,?value);
????????ps.setTimestamp(2,?new?Timestamp(System.currentTimeMillis()));
????????//執(zhí)行insert語句
????????ps.execute();
????????//手動制造異常
????????if(Integer.parseInt(value)?==?15)?System.out.println(1/0);
????}

????/**
?????*?獲取連接,開啟手動提交事物(getConnection方法中)
?????*?@return
?????*?@throws?Exception
?????*/
????@Override
????protected?Connection?beginTransaction()?throws?Exception?{
????????String?url?=?"jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";
????????Connection?connection?=?DBConnectUtil.getConnection(url,?"root",?"123456");
????????System.err.println("start?beginTransaction......."+connection);
????????return?connection;
????}

????/**
?????*?預提交,這里預提交的邏輯在invoke方法中
?????*?@param?connection
?????*?@throws?Exception
?????*/
????@Override
????protected?void?preCommit(Connection?connection)?throws?Exception?{
????????System.err.println("start?preCommit......."+connection);

????}

????/**
?????*?如果invoke執(zhí)行正常則提交事物
?????*?@param?connection
?????*/
????@Override
????protected?void?commit(Connection?connection)?{
????????System.err.println("start?commit......."+connection);
????????DBConnectUtil.commit(connection);

????}
????
????@Override
????protected?void?recoverAndCommit(Connection?connection)?{
????????System.err.println("start?recoverAndCommit......."+connection);

????}


????@Override
????protected?void?recoverAndAbort(Connection?connection)?{
????????System.err.println("start?abort?recoverAndAbort......."+connection);
????}

????/**
?????*?如果invoke執(zhí)行異常則回滾事物,下一次的checkpoint操作也不會執(zhí)行
?????*?@param?connection
?????*/
????@Override
????protected?void?abort(Connection?connection)?{
????????System.err.println("start?abort?rollback......."+connection);
????????DBConnectUtil.rollback(connection);
????}

}

3、DBConnectUtil.java

package?com.fwmagic.flink.util;

import?java.sql.Connection;
import?java.sql.DriverManager;
import?java.sql.SQLException;

public?class?DBConnectUtil?{

????/**
?????*?獲取連接
?????*
?????*?@param?url
?????*?@param?user
?????*?@param?password
?????*?@return
?????*?@throws?SQLException
?????*/
????public?static?Connection?getConnection(String?url,?String?user,?String?password)?throws?SQLException?{
????????Connection?conn?=?null;
????????try?{
????????????Class.forName("com.mysql.jdbc.Driver");
????????}?catch?(ClassNotFoundException?e)?{
????????????e.printStackTrace();
????????}
????????conn?=?DriverManager.getConnection(url,?user,?password);
????????//設置手動提交
????????conn.setAutoCommit(false);
????????return?conn;
????}

????/**
?????*?提交事物
?????*/
????public?static?void?commit(Connection?conn)?{
????????if?(conn?!=?null)?{
????????????try?{
????????????????conn.commit();
????????????}?catch?(SQLException?e)?{
????????????????e.printStackTrace();
????????????}?finally?{
????????????????close(conn);
????????????}
????????}
????}

????/**
?????*?事物回滾
?????*
?????*?@param?conn
?????*/
????public?static?void?rollback(Connection?conn)?{
????????if?(conn?!=?null)?{
????????????try?{
????????????????conn.rollback();
????????????}?catch?(SQLException?e)?{
????????????????e.printStackTrace();
????????????}?finally?{
????????????????close(conn);
????????????}
????????}
????}

????/**
?????*?關閉連接
?????*
?????*?@param?conn
?????*/
????public?static?void?close(Connection?conn)?{
????????if?(conn?!=?null)?{
????????????try?{
????????????????conn.close();
????????????}?catch?(SQLException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
}

四、代碼測試

為了方便發(fā)送消息,我用一個定時任務每秒發(fā)送一個數字,1~16,在發(fā)送到數字15之前,應該是做過一次checkpoint了,并且快要到第二次checkpoint的時間,第一次checkpoint的消費數據成功將插入數據庫中,在消費到數字15的時候,手動造一個異常,此時數據庫中應該只有第一次checkpoint后commit的數據,第二次checkpoint的數據并不會插入到數據庫中(因為預提交已經失敗,不會進行真正的提交),我實驗的日志信息:

start?invoke.......
===>date:2019-05-28?18:36:50?{"value":1,"metadata":{"offset":892,"topic":"gaga","partition":0}}
start?invoke.......
===>date:2019-05-28?18:36:51?{"value":2,"metadata":{"offset":887,"topic":"gaga","partition":2}}
start?invoke.......
===>date:2019-05-28?18:36:52?{"value":3,"metadata":{"offset":889,"topic":"gaga","partition":1}}
start?invoke.......
===>date:2019-05-28?18:36:53?{"value":4,"metadata":{"offset":893,"topic":"gaga","partition":0}}
start?invoke.......
===>date:2019-05-28?18:36:54?{"value":5,"metadata":{"offset":888,"topic":"gaga","partition":2}}
start?invoke.......
===>date:2019-05-28?18:36:55?{"value":6,"metadata":{"offset":890,"topic":"gaga","partition":1}}
start?invoke.......
===>date:2019-05-28?18:36:56?{"value":7,"metadata":{"offset":894,"topic":"gaga","partition":0}}
start?invoke.......
===>date:2019-05-28?18:36:57?{"value":8,"metadata":{"offset":889,"topic":"gaga","partition":2}}
start?preCommit.......
start?beginTransaction.......
start?commit.......com.mysql.jdbc.JDBC4Connection@3c5ad420
start?invoke.......
===>date:2019-05-28?18:36:58?{"value":9,"metadata":{"offset":891,"topic":"gaga","partition":1}}
start?invoke.......
===>date:2019-05-28?18:36:59?{"value":10,"metadata":{"offset":895,"topic":"gaga","partition":0}}
start?invoke.......
===>date:2019-05-28?18:37:00?{"value":11,"metadata":{"offset":890,"topic":"gaga","partition":2}}
start?invoke.......
===>date:2019-05-28?18:37:01?{"value":12,"metadata":{"offset":892,"topic":"gaga","partition":1}}
start?invoke.......
===>date:2019-05-28?18:37:02?{"value":13,"metadata":{"offset":896,"topic":"gaga","partition":0}}
start?invoke.......
===>date:2019-05-28?18:37:03?{"value":14,"metadata":{"offset":891,"topic":"gaga","partition":2}}
start?invoke.......
===>date:2019-05-28?18:37:04?{"value":15,"metadata":{"offset":893,"topic":"gaga","partition":1}}
start?abort?rollback.......com.mysql.jdbc.JDBC4Connection@5f2afc1b
start?commit.......com.mysql.jdbc.JDBC4Connection@71ed09a
java.lang.ArithmeticException:?/?by?zero
	at?com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:36)
	at?com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:16)
	at?org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
	at?org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at?org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at?org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at?org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at?org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at?org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at?org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at?org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at?org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
	at?org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
	at?org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
	at?org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711)
	at?org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
	at?org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
	at?org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
	at?org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at?org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at?java.lang.Thread.run(Thread.java:748)

從日志中可以看到第一次commit時間在2019-05-28 18:36:57,成功入庫到數據為1-8,第二次消費到數字15的時候,提交失敗,日志最后一行發(fā)生了回滾,關閉了連接,然后進行conmit的時候也失敗了,消費的數據9-15不會插入到數據庫中,此時checkpoint也不會做了,checkpoint保存的還是上一次成功消費后的offset數據。

數據庫表:t_test

CREATE?TABLE?`t_test`?(
??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT,
??`value`?varchar(255)?DEFAULT?NULL,
??`insert_time`?datetime?DEFAULT?NULL,
??PRIMARY?KEY?(`id`)
)?ENGINE=InnoDB?DEFAULT?CHARSET=utf8mb4

表中的數據:

Flink實現Kafka到Mysql的Exactly-Once

五、完整代碼地址:https://gitee.com/fang_wei/fwmagic-flink


文章標題:Flink實現Kafka到Mysql的Exactly-Once
文章來源:http://weahome.cn/article/ihooid.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部