真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何使用SQL讀取Kafka并寫入MySQL

今天就跟大家聊聊有關如何使用SQL讀取Kafka并寫入MySQL,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

創(chuàng)新互聯(lián)是一家以成都網(wǎng)站建設、網(wǎng)頁設計、品牌設計、軟件運維、營銷推廣、小程序App開發(fā)等移動開發(fā)為一體互聯(lián)網(wǎng)公司。已累計為成都木包裝箱等眾行業(yè)中小客戶提供優(yōu)質(zhì)的互聯(lián)網(wǎng)建站和軟件開發(fā)服務。

SqlSubmit 的實現(xiàn)

筆者一開始是想用 SQL Client 來貫穿整個演示環(huán)節(jié),但可惜 1.9 版本 SQL CLI 還不支持處理 CREATE TABLE 語句。所以筆者就只好自己寫了個簡單的提交腳本。后來想想,也挺好的,可以讓聽眾同時了解如何通過 SQL 的方式,和編程的方式使用 Flink SQL。

SqlSubmit 的主要任務是執(zhí)行和提交一個 SQL 文件,實現(xiàn)非常簡單,就是通過正則表達式匹配每個語句塊。如果是 CREATE TABLE 或 INSERT INTO 開頭,則會調(diào)用 tEnv.sqlUpdate(...)。如果是 SET 開頭,則會將配置設置到 TableConfig 上。其核心代碼主要如下所示:

EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build();// 創(chuàng)建一個使用 Blink Planner 的 TableEnvironment, 并工作在流模式TableEnvironment tEnv = TableEnvironment.create(settings);// 讀取 SQL 文件List sql = Files.readAllLines(path);// 通過正則表達式匹配前綴,來區(qū)分不同的 SQL 語句List calls = SqlCommandParser.parse(sql);// 根據(jù)不同的 SQL 語句,調(diào)用 TableEnvironment 執(zhí)行for (SqlCommandCall call : calls) {  switch (call.command) {    case SET:      String key = call.operands[0];      String value = call.operands[1];      // 設置參數(shù)
      tEnv.getConfig().getConfiguration().setString(key, value);      break;    case CREATE_TABLE:      String ddl = call.operands[0];
      tEnv.sqlUpdate(ddl);      break;    case INSERT_INTO:      String dml = call.operands[0];
      tEnv.sqlUpdate(dml);      break;    default:      throw new RuntimeException("Unsupported command: " + call.command);
  }
}// 提交作業(yè)tEnv.execute("SQL Job");

使用 DDL 連接 Kafka 源表

在 flink-sql-submit 項目中,我們準備了一份測試數(shù)據(jù)集(來自阿里云天池公開數(shù)據(jù)集,特別鳴謝),位于 src/main/resources/user_behavior.log。數(shù)據(jù)以 JSON 格式編碼,大概長這個樣子:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

為了模擬真實的 Kafka 數(shù)據(jù)源,筆者還特地寫了一個 source-generator.sh 腳本(感興趣的可以看下源碼),會自動讀取 user_behavior.log 的數(shù)據(jù)并以默認每毫秒1條的速率灌到 Kafka 的 user_behavior topic 中。

有了數(shù)據(jù)源后,我們就可以用 DDL 去創(chuàng)建并連接這個 Kafka 中的 topic(詳見 src/main/resources/q1.sql)。

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP
) WITH (    'connector.type' = 'kafka', -- 使用 kafka connector    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本    'connector.topic' = 'user_behavior',  -- kafka topic    'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取    'connector.properties.0.key' = 'zookeeper.connect',  -- 連接信息    'connector.properties.0.value' = 'localhost:2181', 
    'connector.properties.1.key' = 'bootstrap.servers',    'connector.properties.1.value' = 'localhost:9092', 
    'update-mode' = 'append',    'format.type' = 'json',  -- 數(shù)據(jù)源格式為 json    'format.derive-schema' = 'true' -- 從 DDL schema 確定 json 解析規(guī)則
)

注:可能有用戶會覺得其中的 connector.properties.0.key 等參數(shù)比較奇怪,社區(qū)計劃將在下一個版本中改進并簡化 connector 的參數(shù)配置。

使用 DDL 連接 MySQL 結果表

連接 MySQL 可以使用 Flink 提供的 JDBC connector。例如

CREATE TABLE pvuv_sink (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
) WITH (    'connector.type' = 'jdbc', -- 使用 jdbc connector    'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url    'connector.table' = 'pvuv_sink', -- 表名    'connector.username' = 'root', -- 用戶名    'connector.password' = '123456', -- 密碼    'connector.write.flush.max-rows' = '1' -- 默認5000條,為了演示改為1條
)

PV UV 計算

假設我們的需求是計算每小時全網(wǎng)的用戶訪問量,和獨立用戶數(shù)。很多用戶可能會想到使用滾動窗口來計算。但這里我們介紹另一種方式。即 Group Aggregation 的方式。

INSERT INTO pvuv_sink
SELECT
  DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
  COUNT(*) AS pv,
  COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

它使用 DATE_FORMAT 這個內(nèi)置函數(shù),將日志時間歸一化成“年月日小時”的字符串格式,并根據(jù)這個字符串進行分組,即根據(jù)每小時分組,然后通過 COUNT(*) 計算用戶訪問量(PV),通過 COUNT(DISTINCT user_id) 計算獨立用戶數(shù)(UV)。這種方式的執(zhí)行模式是每收到一條數(shù)據(jù),便會進行基于之前計算的值做增量計算(如+1),然后將最新結果輸出。所以實時性很高,但輸出量也大。

我們將這個查詢的結果,通過 INSERT INTO 語句,寫到了之前定義的 pvuv_sink MySQL 表中。

注:在深圳 Meetup 中,我們有對這種查詢的性能調(diào)優(yōu)做了深度的介紹。

實戰(zhàn)演示

環(huán)境準備

本實戰(zhàn)演示環(huán)節(jié)需要安裝一些必須的服務,包括:

  • Flink 本地集群:用來運行 Flink SQL 任務。

  • Kafka 本地集群:用來作為數(shù)據(jù)源。

  • MySQL 數(shù)據(jù)庫:用來作為結果表。

  • Flink 本地集群安裝

1.下載 Flink 1.9.0 安裝包并解壓: https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz 
2.下載以下依賴 jar 包,并拷貝到 flink-1.9.0/lib/ 目錄下。因為我們運行時需要依賴各個 connector 實現(xiàn)。

  • flink-sql-connector-kafka_2.11-1.9.0.jar 
    http://central.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.9.0/flink-sql-connector-kafka_2.11-1.9.0.jar

  • flink-json-1.9.0-sql-jar.jar 
    http://central.maven.org/maven2/org/apache/flink/flink-json/1.9.0/flink-json-1.9.0-sql-jar.jar

  • flink-jdbc_2.11-1.9.0.jar 
    http://central.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.9.0/flink-jdbc_2.11-1.9.0.jar

  • mysql-connector-java-5.1.48.jar 
    https://dev.mysql.com/downloads/connector/j/5.1.html

3.將 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因為我們的演示任務可能會消耗多于1個的 slot。 
4.在 flink-1.9.0 目錄下執(zhí)行 ./bin/start-cluster.sh,啟動集群。

運行成功的話,可以在  http://localhost:8081 訪問到 Flink Web UI。

如何使用SQL讀取Kafka并寫入MySQL

另外,還需要將 Flink 的安裝路徑填到 flink-sql-submit 項目的 env.sh 中,用于后面提交 SQL 任務,如我的路徑是

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0
Kafka 本地集群安裝

下載 Kafka 2.2.0 安裝包并解壓: https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz

將安裝路徑填到 flink-sql-submit 項目的 env.sh 中,如我的路徑是

KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0

在 flink-sql-submit 目錄下運行 ./start-kafka.sh 啟動 Kafka 集群。

在命令行執(zhí)行 jps,如果看到 Kafka 進程和 QuorumPeerMain 進程即表明啟動成功。

MySQL 安裝

可以在官方頁面下載 MySQL 并安裝: 
https://dev.mysql.com/downloads/mysql/ 
如果有 Docker 環(huán)境的話,也可以直接通過 Docker 安裝 
https://hub.docker.com/_/mysql

$ docker pull mysql
$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

然后在 MySQL 中創(chuàng)建一個  flink-test 的數(shù)據(jù)庫,并按照上文的 schema 創(chuàng)建  pvuv_sink 表。

提交 SQL 任務

  1. 在  flink-sql-submit 目錄下運行  ./source-generator.sh,會自動創(chuàng)建  user_behavior topic,并實時往里灌入數(shù)據(jù)

  2. 在  flink-sql-submit 目錄下運行  ./run.sh q1, 提交成功后,可以在 Web UI 中看到拓撲。

如何使用SQL讀取Kafka并寫入MySQL

在 MySQL 客戶端,我們也可以實時地看到每個小時的 pv uv 值在不斷地變化

看完上述內(nèi)容,你們對如何使用SQL讀取Kafka并寫入MySQL有進一步的了解嗎?如果還想了解更多知識或者相關內(nèi)容,請關注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。


當前標題:如何使用SQL讀取Kafka并寫入MySQL
網(wǎng)站鏈接:http://weahome.cn/article/jhdedc.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部