這篇文章將為大家詳細(xì)講解有關(guān)Flink 1.10中SQL、HiveCatalog與事件時(shí)間整合的示例分析,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
渦陽(yáng)網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián),渦陽(yáng)網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為渦陽(yáng)超過(guò)千家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站建設(shè)要多少錢(qián),請(qǐng)找那個(gè)售后服務(wù)好的渦陽(yáng)做網(wǎng)站的公司定做!
添加依賴(lài)項(xiàng)
Maven 下載:
https://maven.aliyun.com/mvn/search
2.11
1.10.0
1.1.0
org.apache.flink
flink-table-api-scala_${scala.bin.version}
${flink.version}
org.apache.flink
flink-table-api-scala-bridge_${scala.bin.version}
${flink.version}
org.apache.flink
flink-table-planner-blink_${scala.bin.version}
${flink.version}
org.apache.flink
flink-sql-connector-kafka-0.11_${scala.bin.version}
${flink.version}
org.apache.flink
flink-connector-hive_${scala.bin.version}
${flink.version}
org.apache.flink
flink-json
${flink.version}
org.apache.hive
hive-exec
${hive.version}
最后,找到 Hive 的配置文件 hive-site.xml,準(zhǔn)備工作就完成了。
注冊(cè) HiveCatalog、創(chuàng)建數(shù)據(jù)庫(kù)
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(5)
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
val catalog = new HiveCatalog(
"rtdw", // catalog name
"default", // default database
"/Users/lmagic/develop", // Hive config (hive-site.xml) directory
"1.1.0" // Hive version
)
tableEnv.registerCatalog("rtdw", catalog)
tableEnv.useCatalog("rtdw")
val createDbSql = "CREATE DATABASE IF NOT EXISTS rtdw.ods"
tableEnv.sqlUpdate(createDbSql)
創(chuàng)建 Kafka 流表并指定事件時(shí)間
"eventType": "clickBuyNow", "userId": "97470180", "shareUserId": "", "platform": "xyz", "columnType": "merchDetail", "merchandiseId": "12727495", "fromType": "wxapp", "siteId": "20392", "categoryId": "", "ts": 1585136092541
CREATE TABLE rtdw.ods.streaming_user_active_log ( eventType STRING COMMENT '...', userId STRING, shareUserId STRING, platform STRING, columnType STRING, merchandiseId STRING, fromType STRING, siteId STRING, categoryId STRING, ts BIGINT, procTime AS PROCTIME(), -- 處理時(shí)間 eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件時(shí)間 WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'ng_log_par_extracted', 'connector.startup-mode' = 'latest-offset', -- 指定起始o(jì)ffset位置 'connector.properties.zookeeper.connect' = 'zk109:2181,zk110:2181,zk111:2181', 'connector.properties.bootstrap.servers' = 'kafka112:9092,kafka113:9092,kafka114:9092', 'connector.properties.group.id' = 'rtdw_group_test_1', 'format.type' = 'json', 'format.derive-schema' = 'true', -- 由表schema自動(dòng)推導(dǎo)解析JSON 'update-mode' = 'append')
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'n' TIME_UNIT
https://www.jianshu.com/p/c612e95a5028
val createTableSql = """ |上文的SQL語(yǔ)句 |...... """.stripMargin tableEnv.sqlUpdate(createTableSql)
開(kāi)窗計(jì)算 PV、UV
SELECT eventType,TUMBLE_START(eventTime, INTERVAL '30' SECOND) AS windowStart,TUMBLE_END(eventTime, INTERVAL '30' SECOND) AS windowEnd,COUNT(userId) AS pv,COUNT(DISTINCT userId) AS uvFROM rtdw.ods.streaming_user_active_logWHERE platform = 'xyz'GROUP BY eventType, TUMBLE(eventTime, INTERVAL '30' SECOND)
SQL 文檔:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows
val queryActiveSql =
"""
|......
|......
""".stripMargin
val result = tableEnv.sqlQuery(queryActiveSql)
result
.toAppendStream[Row]
.print()
.setParallelism(1)
關(guān)于“Flink 1.10中SQL、HiveCatalog與事件時(shí)間整合的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。