這篇文章主要講解了“如何使用Tbale SQL與Flink JDBC連接器讀取MySQL數(shù)據(jù)”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何使用Tbale SQL與Flink JDBC連接器讀取MYSQL數(shù)據(jù)”吧!
創(chuàng)新互聯(lián)始終堅持【策劃先行,效果至上】的經(jīng)營理念,通過多達10年累計超上千家客戶的網(wǎng)站建設(shè)總結(jié)了一套系統(tǒng)有效的網(wǎng)絡(luò)營銷推廣解決方案,現(xiàn)已廣泛運用于各行各業(yè)的客戶,其中包括:成都茶樓設(shè)計等企業(yè),備受客戶稱贊。
使用Tbale&SQL與Flink JDBC連接器讀取MYSQL數(shù)據(jù),并用GROUP BY語句根據(jù)一個或多個列對結(jié)果集進行分組。
示例環(huán)境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
GroupToMysql.java
package com.flink.examples.mysql; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import static org.apache.flink.table.api.Expressions.$; /** * @Description 使用Tbale&SQL與Flink JDBC連接器讀取MYSQL數(shù)據(jù),并用GROUP BY語句根據(jù)一個或多個列對結(jié)果集進行分組。 */ public class GroupToMysql { /** 官方參考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html 分區(qū)掃描 為了加速并行Source任務(wù)實例中的數(shù)據(jù)讀取,F(xiàn)link為JDBC表提供了分區(qū)掃描功能。 scan.partition.column:用于對輸入進行分區(qū)的列名。 scan.partition.num:分區(qū)數(shù)。 scan.partition.lower-bound:第一個分區(qū)的最小值。 scan.partition.upper-bound:最后一個分區(qū)的最大值。 */ //flink-jdbc-1.11.1寫法,所有屬性名在JdbcTableSourceSinkFactory工廠類中定義 static String table_sql = "CREATE TABLE my_users (\n" + " id BIGINT,\n" + " name STRING,\n" + " age INT,\n" + " status INT,\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" + " 'connector.driver' = 'com.mysql.jdbc.Driver', \n" + " 'connector.table' = 'users', \n" + " 'connector.username' = 'root',\n" + " 'connector.password' = 'password' \n" + // " 'connector.read.fetch-size' = '10' \n" + ")"; public static void main(String[] args) throws Exception { //構(gòu)建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設(shè)置setParallelism并行度 env.setParallelism(1); //構(gòu)建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //構(gòu)建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注冊mysql數(shù)據(jù)維表 tEnv.executeSql(table_sql); //Table table = avg(tEnv); //Table table = count(tEnv); //Table table = min(tEnv); Table table = max(tEnv); //打印字段結(jié)構(gòu) table.printSchema(); //普通查詢操作用toAppendStream //tEnv.toAppendStream(table, Row.class).print(); //group操作用toRetractStream //tEnv.toRetractStream(table, Row.class).print(); //table 轉(zhuǎn)成 dataStream 流,Tuple2第一個參數(shù)flag是true表示add添加新的記錄流,false表示retract表示舊的記錄流 DataStream> behaviorStream = tEnv.toRetractStream(table, Row.class); behaviorStream.flatMap(new FlatMapFunction , Object>() { @Override public void flatMap(Tuple2 value, Collector
建表SQL
CREATE TABLE `users` ( `id` bigint(8) NOT NULL AUTO_INCREMENT, `name` varchar(40) DEFAULT NULL, `age` int(8) DEFAULT NULL, `status` tinyint(2) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
打印結(jié)果
root |-- status: INT |-- age1: INT 0,16 0,18 1,21 1,28 2,31
感謝各位的閱讀,以上就是“如何使用Tbale SQL與Flink JDBC連接器讀取MYSQL數(shù)據(jù)”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對如何使用Tbale SQL與Flink JDBC連接器讀取MYSQL數(shù)據(jù)這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!