真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

flink1.11中的CDC是什么意思

本篇文章給大家分享的是有關flink 1.11中的CDC是什么意思,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

成都創(chuàng)新互聯(lián)公司專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務,包含不限于網(wǎng)站設計、成都網(wǎng)站建設、濟水街道網(wǎng)絡推廣、小程序定制開發(fā)、濟水街道網(wǎng)絡營銷、濟水街道企業(yè)策劃、濟水街道品牌公關、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運營等,從售前售中售后,我們都將竭誠為您服務,您的肯定,是我們最大的嘉獎;成都創(chuàng)新互聯(lián)公司為所有大學生創(chuàng)業(yè)者提供濟水街道建站搭建服務,24小時服務熱線:13518219792,官方網(wǎng)址:www.cdcxhl.com

CDC簡介

CDC,Change Data Capture,變更數(shù)據(jù)獲取的簡稱,使用CDC我們可以從數(shù)據(jù)庫中獲取已提交的更改并將這些更改發(fā)送到下游,供下游使用。這些變更可以包括INSERT,DELETE,UPDATE等,

用戶可以在以下的場景下使用CDC:

  • 使用flink sql進行數(shù)據(jù)同步,可以將數(shù)據(jù)從一個數(shù)據(jù)同步到其他的地方,比如MySQL、elasticsearch等。

  • 可以在源數(shù)據(jù)庫上實時的物化一個聚合視圖

  • 因為只是增量同步,所以可以實時的低延遲的同步數(shù)據(jù)

  • 使用EventTime join 一個temporal表以便可以獲取準確的結果

flink 1.11 將這些changelog提取并轉(zhuǎn)化為table apa和sql,目前支持兩種格式:Debezium和Canal,這就意味著源表不僅僅是append操作,而且還有upsert、delete操作。

flink 1.11中的CDC是什么意思

Canal

接下來我們使用canal為例簡單介紹下CDC的使用

canal 格式:

{
  "data": [
    {
      "id": "13",
      "username": "13",
      "password": "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9",
      "name": "Canal Manager V2"
    }
  ],
  "old": [
    {
      "id": "13",
      "username": "13",
      "password": "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9",
      "name": "Canal Manager"
    }
  ],
  "database": "canal_manager",
  "es": 1568972368000,
  "id": 11,
  "isDdl": false,
  "mysqlType": {...},
  "pkNames": [
    "id"
  ],
  "sql": "",
  "sqlType": {...},
  "table": "canal_user",
  "ts": 1568972369005,
  "type": "UPDATE"
}

簡單講下幾個核心的字段:

  • type : 描述操作的類型,包括‘UPDATE’, 'INSERT', 'DELETE'。

  • data : 代表操作的數(shù)據(jù)。如果為'INSERT',則表示行的內(nèi)容;如果為'UPDATE',則表示行的更新后的狀態(tài);如果為'DELETE',則表示刪除前的狀態(tài)。

  • old :可選字段,如果存在,則表示更新之前的內(nèi)容,如果不是update操作,則為 null。

完整的語義如下;

    private String                    destination;                            // 對應canal的實例或者MQ的topic
    private String                    groupId;                                // 對應mq的group id
    private String                    database;                               // 數(shù)據(jù)庫或schema
    private String                    table;                                  // 表名
    private List              pkNames;
    private Boolean                   isDdl;
    private String                    type;                                   // 類型: INSERT UPDATE DELETE
    // binlog executeTime
    private Long                      es;                                     // 執(zhí)行耗時
    // dml build timeStamp
    private Long                      ts;                                     // 同步時間
    private String                    sql;                                    // 執(zhí)行的sql, dml sql為空
    private List> data;                                   // 數(shù)據(jù)列表
    private List> old;                                    // 舊數(shù)據(jù)列表, 用于update, size和data的size一一對應
-- 定義的字段和data 里面的數(shù)據(jù)想匹配 
CREATE TABLE my_table (
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'canal-json.ignore-parse-errors'='true' -- 忽略解析錯誤,缺省值false
);

CanalJson反序列化源碼解析

canal 格式也是作為一種flink的格式,而且是source,所以也就是涉及到讀取數(shù)據(jù)的時候進行反序列化,我們接下來就簡單看看CanalJson的反序列化的實現(xiàn)。具體的實現(xiàn)類是CanalJsonDeserializationSchema。

我們看下這個最核心的反序列化方法:

	@Override
	public void deserialize(byte[] message, Collector out) throws IOException {
		try {
		    //使用json反序列化器將message反序列化成RowData
			RowData row = jsonDeserializer.deserialize(message);
			
			//獲取type字段,用于下面的判斷
			String type = row.getString(2).toString();
			if (OP_INSERT.equals(type)) {
				// 如果操作類型是insert,則data數(shù)組表示的是要插入的數(shù)據(jù),則循環(huán)遍歷data,然后添加一個標識INSERT,構造RowData對象,發(fā)送下游。
				ArrayData data = row.getArray(0);
				for (int i = 0; i < data.size(); i++) {
					RowData insert = data.getRow(i, fieldCount);
					insert.setRowKind(RowKind.INSERT);
					out.collect(insert);
				}
			} else if (OP_UPDATE.equals(type)) {
				// 如果是update操作,從data字段里獲取更新后的數(shù)據(jù)、
				ArrayData data = row.getArray(0);
				// old字段獲取更新之前的數(shù)據(jù)
				ArrayData old = row.getArray(1);
				for (int i = 0; i < data.size(); i++) {
					// the underlying JSON deserialization schema always produce GenericRowData.
					GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
					GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
					for (int f = 0; f < fieldCount; f++) {
						if (before.isNullAt(f)) {
							//如果old字段非空,則說明進行了數(shù)據(jù)的更新,如果old字段是null,則說明更新前后數(shù)據(jù)一樣,這個時候把before的數(shù)據(jù)也設置成after的,也就是發(fā)送給下游的before和after數(shù)據(jù)一樣。
							before.setField(f, after.getField(f));
						}
					}
					before.setRowKind(RowKind.UPDATE_BEFORE);
					after.setRowKind(RowKind.UPDATE_AFTER);
					//把更新前后的數(shù)據(jù)都發(fā)送下游
					out.collect(before);
					out.collect(after);
				}
			} else if (OP_DELETE.equals(type)) {
				// 如果是刪除操作,data字段里包含將要被刪除的數(shù)據(jù),把這些數(shù)據(jù)組織起來發(fā)送給下游
				ArrayData data = row.getArray(0);
				for (int i = 0; i < data.size(); i++) {
					RowData insert = data.getRow(i, fieldCount);
					insert.setRowKind(RowKind.DELETE);
					out.collect(insert);
				}
			} else {
				if (!ignoreParseErrors) {
					throw new IOException(format(
						"Unknown \"type\" value \"%s\". The Canal JSON message is '%s'", type, new String(message)));
				}
			}
		} catch (Throwable t) {
			// a big try catch to protect the processing.
			if (!ignoreParseErrors) {
				throw new IOException(format(
					"Corrupt Canal JSON message '%s'.", new String(message)), t);
			}
		}
	}

以上就是flink 1.11中的CDC是什么意思,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學到更多知識。更多詳情敬請關注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


網(wǎng)頁題目:flink1.11中的CDC是什么意思
網(wǎng)站URL:http://weahome.cn/article/gpohhd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部