這篇文章主要介紹基于Docker結(jié)合Canal如何實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸功能,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!
創(chuàng)新互聯(lián)是一家專(zhuān)注于成都網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)與策劃設(shè)計(jì),自流井網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專(zhuān)注于網(wǎng)站建設(shè)十年,網(wǎng)設(shè)計(jì)領(lǐng)域的專(zhuān)業(yè)建站公司;建站業(yè)務(wù)涵蓋:自流井等地區(qū)。自流井做網(wǎng)站價(jià)格咨詢:18982081108
Canal的介紹
Canal的歷史由來(lái)
在早期的時(shí)候,阿里巴巴公司因?yàn)楹贾莺兔绹?guó)兩個(gè)地方的機(jī)房都部署了數(shù)據(jù)庫(kù)實(shí)例,但因?yàn)榭鐧C(jī)房同步數(shù)據(jù)的業(yè)務(wù)需求 ,便孕育而生出了Canal,主要是基于trigger(觸發(fā)器)的方式獲取增量變更。從2010年開(kāi)始,阿里巴巴公司開(kāi)始逐步嘗試數(shù)據(jù)庫(kù)日志解析,獲取增量變更的數(shù)據(jù)進(jìn)行同步,由此衍生出了增量訂閱和消費(fèi)業(yè)務(wù)。
當(dāng)前的Canal支持的數(shù)據(jù)源端MySQL版本包括:5.1.x 、5.5.x 、5.6.x、5.7.x、8.0.x。
Canal的應(yīng)用場(chǎng)景
目前普遍基于日志增量訂閱和消費(fèi)的業(yè)務(wù),主要包括:
基于數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)
數(shù)據(jù)庫(kù)鏡像 數(shù)據(jù)庫(kù)實(shí)時(shí)備份
索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等)
業(yè)務(wù)Cache刷新
帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理
Canal的工作原理
在介紹Canal的原理之前,我們先來(lái)了解下MySQL主從復(fù)制的原理。
MySQL主從復(fù)制原理
MySQL Master將數(shù)據(jù)變更的操作寫(xiě)入二進(jìn)制日志binary log中, 其中記錄的內(nèi)容叫做二進(jìn)制日志事件binary log events,可以通過(guò)show binlog events命令進(jìn)行查看
MySQL Slave會(huì)將Master的binary log中的binary log events拷貝到它的中繼日志relay log
MySQL Slave重讀并執(zhí)行relay log中的事件,將數(shù)據(jù)變更映射到它自己的數(shù)據(jù)庫(kù)表中
了解了MySQL的工作原理,我們可以大致猜想到Canal應(yīng)該也是采用類(lèi)似的邏輯去實(shí)現(xiàn)增量數(shù)據(jù)訂閱的功能,那么接下來(lái)我們看看實(shí)際上Canal的工作原理是怎樣的?
Canal工作原理
Canal模擬MySQL Slave的交互協(xié)議,偽裝自己為MySQL Slave,向MySQL Master發(fā)送dump協(xié)議
MySQL Master收到dump請(qǐng)求,開(kāi)始推送binary log給Slave(也就是Canal)
Canal解析binary log對(duì)象(數(shù)據(jù)為byte流)
基于這樣的原理與方式,便可以完成數(shù)據(jù)庫(kù)增量日志的獲取解析,提供增量數(shù)據(jù)訂閱和消費(fèi),實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸?shù)墓δ堋?/p>
既然Canal是這樣的一個(gè)框架,又是純Java語(yǔ)言編寫(xiě)而成,那么我們接下來(lái)就開(kāi)始學(xué)習(xí)怎么使用它并把它用到我們的實(shí)際工作中。
Canal的Docker環(huán)境準(zhǔn)備
因?yàn)槟壳叭萜骰夹g(shù)的火熱,本文通過(guò)使用Docker來(lái)快速搭建開(kāi)發(fā)環(huán)境,而傳統(tǒng)方式的環(huán)境搭建,在我們學(xué)會(huì)了Docker容器環(huán)境搭建后,也能自行依葫蘆畫(huà)瓢搭建成功。由于本篇主要講解Canal,所以關(guān)于Docker的內(nèi)容不會(huì)涉及太多,主要會(huì)介紹Docker的基本概念和命令使用。 如果你想和更多容器技術(shù)專(zhuān)家交流,可以加我微信liyingjiese,備注『加群』。群里每周都有全球各大公司的最佳實(shí)踐以及行業(yè)最新動(dòng)態(tài) 。
什么是Docker
相信絕大多數(shù)人都使用過(guò)虛擬機(jī)VMware,在使用VMware進(jìn)行環(huán)境搭建的時(shí)候,只需提供了一個(gè)普通的系統(tǒng)鏡像并成功安裝,剩下的軟件環(huán)境與應(yīng)用配置還是如我們?cè)诒緳C(jī)操作一樣在虛擬機(jī)里也操作一遍,而且VMware占用宿主機(jī)的資源較多,容易造成宿主機(jī)卡頓,而且系統(tǒng)鏡像本身也占用過(guò)多空間。
為了便于大家快速理解Docker,便與VMware做對(duì)比來(lái)做介紹,Docker提供了一個(gè)開(kāi)始,打包,運(yùn)行APP的平臺(tái),把APP(應(yīng)用)和底層infrastructure(基礎(chǔ)設(shè)施)隔離開(kāi)來(lái)。Docker中最主要的兩個(gè)概念就是鏡像(類(lèi)似VMware的系統(tǒng)鏡像)與容器(類(lèi)似VMware里安裝的系統(tǒng))。
什么是Image(鏡像)
文件和meta data的集合(root filesystem)
分層的,并且每一層都可以添加改變刪除文件,成為一個(gè)新的image
不同的image可以共享相同的layer
Image本身是read-only的
什么是Container(容器)
通過(guò)Image創(chuàng)建(copy)
在Image layer之上建立一個(gè)container layer(可讀寫(xiě))
類(lèi)比面向?qū)ο螅侯?lèi)和實(shí)例
Image負(fù)責(zé)APP的存儲(chǔ)和分發(fā),Container負(fù)責(zé)運(yùn)行APP
Docker的網(wǎng)絡(luò)介紹
Docker的網(wǎng)絡(luò)類(lèi)型有三種:
Bridge:橋接網(wǎng)絡(luò)。默認(rèn)情況下啟動(dòng)的Docker容器,都是使用Bridge,Docker安裝時(shí)創(chuàng)建的橋接網(wǎng)絡(luò),每次Docker容器重啟時(shí),會(huì)按照順序獲取對(duì)應(yīng)的IP地址,這個(gè)就導(dǎo)致重啟下,Docker的IP地址就變了。
None:無(wú)指定網(wǎng)絡(luò)。使用 --network=none,Docker容器就不會(huì)分配局域網(wǎng)的IP。
Host:主機(jī)網(wǎng)絡(luò)。使用--network=host,此時(shí),Docker容器的網(wǎng)絡(luò)會(huì)附屬在主機(jī)上,兩者是互通的。例如,在容器中運(yùn)行一個(gè)Web服務(wù),監(jiān)聽(tīng)8080端口,則主機(jī)的8080端口就會(huì)自動(dòng)映射到容器中。
創(chuàng)建自定義網(wǎng)絡(luò):(設(shè)置固定IP)
docker network create --subnet=172.18.0.0/16 mynetwork
查看存在的網(wǎng)絡(luò)類(lèi)型docker network ls:
搭建Canal環(huán)境
附上Docker的下載安裝地址==> Docker Download 。
下載Canal鏡像docker pull canal/canal-server
:
下載MySQL鏡像docker pull mysql
,下載過(guò)的則如下圖:
查看已經(jīng)下載好的鏡像docker images:
接下來(lái)通過(guò)鏡像生成MySQL容器與canal-server容器:
##生成mysql容器 docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root mysql ##生成canal-server容器 docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server ## 命令介紹 --net mynetwork #使用自定義網(wǎng)絡(luò) --ip #指定分配ip
查看Docker中運(yùn)行的容器docker ps:
MySQL的配置修改
以上只是初步準(zhǔn)備好了基礎(chǔ)的環(huán)境,但是怎么讓Canal偽裝成Salve并正確獲取MySQL中的binary log呢?
對(duì)于自建MySQL,需要先開(kāi)啟Binlog寫(xiě)入功能,配置binlog-format
為ROW模式,通過(guò)修改MySQL配置文件來(lái)開(kāi)啟bin_log,使用find / -name my.cnf
查找my.cnf,修改文件內(nèi)容如下:
[mysqld] log-bin=mysql-bin # 開(kāi)啟binlog binlog-format=ROW # 選擇ROW模式 server_id=1 # 配置MySQL replaction需要定義,不要和Canal的slaveId重復(fù)
進(jìn)入MySQL容器docker exec -it mysql bash。
創(chuàng)建鏈接MySQL的賬號(hào)Canal并授予作為MySQL slave的權(quán)限,如果已有賬戶可直接GRANT:
mysql -uroot -proot # 創(chuàng)建賬號(hào) CREATE USER canal IDENTIFIED BY 'canal'; # 授予權(quán)限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; # 刷新并應(yīng)用 FLUSH PRIVILEGES;
數(shù)據(jù)庫(kù)重啟后,簡(jiǎn)單測(cè)試 my.cnf 配置是否生效:
show variables like 'log_bin'; show variables like 'log_bin'; show master status;
canal-server的配置修改
進(jìn)入canal-server容器docker exec -it canal-server bash
。
編輯canal-server的配置vi canal-server/conf/example/instance.properties
:
更多配置請(qǐng)參考==>Canal配置說(shuō)明 。
重啟canal-server容器docker restart canal-server
進(jìn)入容器查看啟動(dòng)日志:
docker exec -it canal-server bash tail -100f canal-server/logs/example/example.log
至此,我們的環(huán)境工作準(zhǔn)備完成!
拉取數(shù)據(jù)并同步保存到ElasticSearch
本文的ElasticSearch也是基于Docker環(huán)境搭建,所以讀者可執(zhí)行如下命令:
# 下載對(duì)鏡像 docker pull elasticsearch:7.1.1 docker pull mobz/elasticsearch-head:5-alpine # 創(chuàng)建容器并運(yùn)行 docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1 docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine
環(huán)境已經(jīng)準(zhǔn)備好了,現(xiàn)在就要開(kāi)始我們的編碼實(shí)戰(zhàn)部分了,怎么通過(guò)應(yīng)用程序去獲取Canal解析后的binlog數(shù)據(jù)。首先我們基于Spring Boot搭建一個(gè)canal demo應(yīng)用。結(jié)構(gòu)如下圖所示:
Student.java
package com.example.canal.study.pojo; import lombok.Data; import java.io.Serializable; // @Data 用戶生產(chǎn)getter、setter方法 @Data public class Student implements Serializable { private String id; private String name; private int age; private String sex; private String city; }
CanalConfig.java
package com.example.canal.study.common; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.net.InetSocketAddress; /** * @author haha */ @Configuration public class CanalConfig { // @Value 獲取 application.properties配置中端內(nèi)容 @Value("${canal.server.ip}") private String canalIp; @Value("${canal.server.port}") private Integer canalPort; @Value("${canal.destination}") private String destination; @Value("${elasticSearch.server.ip}") private String elasticSearchIp; @Value("${elasticSearch.server.port}") private Integer elasticSearchPort; @Value("${zookeeper.server.ip}") private String zkServerIp; // 獲取簡(jiǎn)單canal-server連接 @Bean public CanalConnector canalSimpleConnector() { CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, "", ""); return canalConnector; } // 通過(guò)連接zookeeper獲取canal-server連接 @Bean public CanalConnector canalHaConnector() { CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, "", ""); return canalConnector; } // elasticsearch 7.x客戶端 @Bean public RestHighLevelClient restHighLevelClient() { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort)) ); return client; } }
CanalDataParser.java
由于這個(gè)類(lèi)的代碼較多,文中則摘出其中比較重要的部分,其它部分代碼可從GitHub上獲?。?/p>
public static class TwoTuple { public final A eventType; public final B columnMap; public TwoTuple(A a, B b) { eventType = a; columnMap = b; } } public static List> printEntry(List entrys) { List > rows = new ArrayList<>(); for (Entry entry : entrys) { // binlog event的事件事件 long executeTime = entry.getHeader().getExecuteTime(); // 當(dāng)前應(yīng)用獲取到該binlog鎖延遲的時(shí)間 long delayTime = System.currentTimeMillis() - executeTime; Date date = new Date(entry.getHeader().getExecuteTime()); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 當(dāng)前的entry(binary log event)的條目類(lèi)型屬于事務(wù) if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) { TransactionBegin begin = null; try { begin = TransactionBegin.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } // 打印事務(wù)頭信息,執(zhí)行的線程id,事務(wù)耗時(shí) logger.info(transaction_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime)}); logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId()); printXAInfo(begin.getPropsList()); } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) { TransactionEnd end = null; try { end = TransactionEnd.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } // 打印事務(wù)提交信息,事務(wù)id logger.info("----------------\n"); logger.info(" END ----> transaction id: {}", end.getTransactionId()); printXAInfo(end.getPropsList()); logger.info(transaction_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime)}); } continue; } // 當(dāng)前entry(binary log event)的條目類(lèi)型屬于原始數(shù)據(jù) if (entry.getEntryType() == EntryType.ROWDATA) { RowChange rowChage = null; try { // 獲取儲(chǔ)存的內(nèi)容 rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } // 獲取當(dāng)前內(nèi)容的事件類(lèi)型 EventType eventType = rowChage.getEventType(); logger.info(row_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime)}); // 事件類(lèi)型是query或數(shù)據(jù)定義語(yǔ)言DDL直接打印sql語(yǔ)句,跳出繼續(xù)下一次循環(huán) if (eventType == EventType.QUERY || rowChage.getIsDdl()) { logger.info(" sql ----> " + rowChage.getSql() + SEP); continue; } printXAInfo(rowChage.getPropsList()); // 循環(huán)當(dāng)前內(nèi)容條目的具體數(shù)據(jù) for (RowData rowData : rowChage.getRowDatasList()) { List columns; // 事件類(lèi)型是delete返回刪除前的列內(nèi)容,否則返回改變后列的內(nèi)容 if (eventType == CanalEntry.EventType.DELETE) { columns = rowData.getBeforeColumnsList(); } else { columns = rowData.getAfterColumnsList(); } HashMap map = new HashMap<>(16); // 循環(huán)把列的name與value放入map中 for (Column column: columns){ map.put(column.getName(), column.getValue()); } rows.add(new TwoTuple<>(eventType, map)); } } } return rows; }
ElasticUtils.java
package com.example.canal.study.common; import com.alibaba.fastjson.JSON; import com.example.canal.study.pojo.Student; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; import java.util.Map; /** * @author haha */ @Slf4j @Component public class ElasticUtils { @Autowired private RestHighLevelClient restHighLevelClient; /** * 新增 * @param student * @param index 索引 */ public void saveEs(Student student, String index) { IndexRequest indexRequest = new IndexRequest(index) .id(student.getId()) .source(JSON.toJSONString(student), XContentType.JSON) .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); log.info("保存數(shù)據(jù)至ElasticSearch成功:{}", response.getId()); } catch (IOException e) { log.error("保存數(shù)據(jù)至elasticSearch失敗: {}", e); } } /** * 查看 * @param index 索引 * @param id _id * @throws IOException */ public void getEs(String index, String id) throws IOException { GetRequest getRequest = new GetRequest(index, id); GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT); Mapfields = response.getSource(); for (Map.Entry entry : fields.entrySet()) { System.out.println(entry.getKey() + ":" + entry.getValue()); } } /** * 更新 * @param student * @param index 索引 * @throws IOException */ public void updateEs(Student student, String index) throws IOException { UpdateRequest updateRequest = new UpdateRequest(index, student.getId()); updateRequest.upsert(JSON.toJSONString(student), XContentType.JSON); UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); log.info("更新數(shù)據(jù)至ElasticSearch成功:{}", response.getId()); } /** * 根據(jù)id刪除數(shù)據(jù) * @param index 索引 * @param id _id * @throws IOException */ public void DeleteEs(String index, String id) throws IOException { DeleteRequest deleteRequest = new DeleteRequest(index, id); DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT); log.info("刪除數(shù)據(jù)至ElasticSearch成功:{}", response.getId()); } }
BinLogElasticSearch.java
package com.example.canal.study.action; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.example.canal.study.common.CanalDataParser; import com.example.canal.study.common.ElasticUtils; import com.example.canal.study.pojo.Student; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.List; import java.util.Map; /** * @author haha */ @Slf4j @Component public class BinLogElasticSearch { @Autowired private CanalConnector canalSimpleConnector; @Autowired private ElasticUtils elasticUtils; //@Qualifier("canalHaConnector")使用名為canalHaConnector的bean @Autowired @Qualifier("canalHaConnector") private CanalConnector canalHaConnector; public void binLogToElasticSearch() throws IOException { openCanalConnector(canalHaConnector); // 輪詢拉取數(shù)據(jù) Integer batchSize = 5 * 1024; while (true) { Message message = canalHaConnector.getWithoutAck(batchSize); // Message message = canalSimpleConnector.getWithoutAck(batchSize); long id = message.getId(); int size = message.getEntries().size(); log.info("當(dāng)前監(jiān)控到binLog消息數(shù)量{}", size); if (id == -1 || size == 0) { try { // 等待2秒 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } else { //1. 解析message對(duì)象 Listentries = message.getEntries(); List > rows = CanalDataParser.printEntry(entries); for (CanalDataParser.TwoTuple tuple : rows) { if(tuple.eventType == CanalEntry.EventType.INSERT) { Student student = createStudent(tuple); // 2。將解析出的對(duì)象同步到elasticSearch中 elasticUtils.saveEs(student, "student_index"); // 3.消息確認(rèn)已處理 // canalSimpleConnector.ack(id); canalHaConnector.ack(id); } if(tuple.eventType == CanalEntry.EventType.UPDATE){ Student student = createStudent(tuple); elasticUtils.updateEs(student, "student_index"); // 3.消息確認(rèn)已處理 // canalSimpleConnector.ack(id); canalHaConnector.ack(id); } if(tuple.eventType == CanalEntry.EventType.DELETE){ elasticUtils.DeleteEs("student_index", tuple.columnMap.get("id").toString()); canalHaConnector.ack(id); } } } } } /** * 封裝數(shù)據(jù)至Student * @param tuple * @return */ private Student createStudent(CanalDataParser.TwoTuple tuple){ Student student = new Student(); student.setId(tuple.columnMap.get("id").toString()); student.setAge(Integer.parseInt(tuple.columnMap.get("age").toString())); student.setName(tuple.columnMap.get("name").toString()); student.setSex(tuple.columnMap.get("sex").toString()); student.setCity(tuple.columnMap.get("city").toString()); return student; } /** * 打開(kāi)canal連接 * * @param canalConnector */ private void openCanalConnector(CanalConnector canalConnector) { //連接CanalServer canalConnector.connect(); // 訂閱destination canalConnector.subscribe(); } /** * 關(guān)閉canal連接 * * @param canalConnector */ private void closeCanalConnector(CanalConnector canalConnector) { //關(guān)閉連接CanalServer canalConnector.disconnect(); // 注銷(xiāo)訂閱destination canalConnector.unsubscribe(); } }
CanalDemoApplication.java(Spring Boot啟動(dòng)類(lèi))
package com.example.canal.study; import com.example.canal.study.action.BinLogElasticSearch; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author haha */ @SpringBootApplication public class CanalDemoApplication implements ApplicationRunner { @Autowired private BinLogElasticSearch binLogElasticSearch; public static void main(String[] args) { SpringApplication.run(CanalDemoApplication.class, args); } // 程序啟動(dòng)則執(zhí)行run方法 @Override public void run(ApplicationArguments args) throws Exception { binLogElasticSearch.binLogToElasticSearch(); } }
application.properties
server.port=8081 spring.application.name = canal-demo canal.server.ip = 192.168.124.5 canal.server.port = 11111 canal.destination = example zookeeper.server.ip = 192.168.124.5:2181 zookeeper.sasl.client = false elasticSearch.server.ip = 192.168.124.5 elasticSearch.server.port = 9200
Canal集群高可用的搭建
通過(guò)上面的學(xué)習(xí),我們知道了單機(jī)直連方式的Canala應(yīng)用。在當(dāng)今互聯(lián)網(wǎng)時(shí)代,單實(shí)例模式逐漸被集群高可用模式取代,那么Canala的多實(shí)例集群方式如何搭建呢!
基于ZooKeeper獲取Canal實(shí)例
準(zhǔn)備ZooKeeper的Docker鏡像與容器:
docker pull zookeeper docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server
1、機(jī)器準(zhǔn)備:
運(yùn)行Canal的容器IP: 172.18.0.4 , 172.18.0.8
ZooKeeper容器IP:172.18.0.3:2181
MySQL容器IP:172.18.0.6:3306
2、按照部署和配置,在單臺(tái)機(jī)器上各自完成配置,演示時(shí)instance name為example。
3、修改canal.properties,加上ZooKeeper配置并修改Canal端口:
canal.port=11113 canal.zkServers=172.18.0.3:2181 canal.instance.global.spring.xml = classpath:spring/default-instance.xml
4、創(chuàng)建example目錄,并修改instance.properties:
canal.instance.mysql.slaveId = 1235 #之前的canal slaveId是1234,保證slaveId不重復(fù)即可 canal.instance.master.address = 172.18.0.6:3306
注意: 兩臺(tái)機(jī)器上的instance目錄的名字需要保證完全一致,HA模式是依賴(lài)于instance name進(jìn)行管理,同時(shí)必須都選擇default-instance.xml
配置。
啟動(dòng)兩個(gè)不同容器的Canal,啟動(dòng)后,可以通過(guò)tail -100f logs/example/example.log
查看啟動(dòng)日志,只會(huì)看到一臺(tái)機(jī)器上出現(xiàn)了啟動(dòng)成功的日志。
比如我這里啟動(dòng)成功的是 172.18.0.4:
查看一下ZooKeeper中的節(jié)點(diǎn)信息,也可以知道當(dāng)前工作的節(jié)點(diǎn)為172.18.0.4:11111:
[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.4:11111","cid":1}
客戶端鏈接, 消費(fèi)數(shù)據(jù)
可以通過(guò)指定ZooKeeper地址和Canal的instance name,canal client會(huì)自動(dòng)從ZooKeeper中的running節(jié)點(diǎn)獲取當(dāng)前服務(wù)的工作節(jié)點(diǎn),然后與其建立鏈接:
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.4:11111","cid":1}
對(duì)應(yīng)的客戶端編碼可以使用如下形式,上文中的CanalConfig.java中的canalHaConnector就是一個(gè)HA連接:
CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181", "example", "", "");
鏈接成功后,canal server會(huì)記錄當(dāng)前正在工作的canal client信息,比如客戶端IP,鏈接的端口信息等(聰明的你,應(yīng)該也可以發(fā)現(xiàn),canal client也可以支持HA功能):
[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running {"active":true,"address":"192.168.124.5:59887","clientId":1001}
數(shù)據(jù)消費(fèi)成功后,canal server會(huì)在ZooKeeper中記錄下當(dāng)前最后一次消費(fèi)成功的binlog位點(diǎn)(下次你重啟client時(shí),會(huì)從這最后一個(gè)位點(diǎn)繼續(xù)進(jìn)行消費(fèi)):
[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalName":"binlog.000004","position":2169,"timestamp":1562672817000}}
停止正在工作的172.18.0.4的canal server:
docker exec -it canal-server bash cd canal-server/bin sh stop.sh
這時(shí)172.18.0.8會(huì)立馬啟動(dòng)example instance,提供新的數(shù)據(jù)服務(wù):
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.8:11111","cid":1}
與此同時(shí),客戶端也會(huì)隨著canal server的切換,通過(guò)獲取ZooKeeper中的最新地址,與新的canal server建立鏈接,繼續(xù)消費(fèi)數(shù)據(jù),整個(gè)過(guò)程自動(dòng)完成。
異常與總結(jié)
elasticsearch-head
無(wú)法訪問(wèn)Elasticsearch
es與es-head是兩個(gè)獨(dú)立的進(jìn)程,當(dāng)es-head訪問(wèn)es服務(wù)時(shí),會(huì)存在一個(gè)跨域問(wèn)題。所以我們需要修改es的配置文件,增加一些配置項(xiàng)來(lái)解決這個(gè)問(wèn)題,如下:
[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/ [root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml # 文件末尾加上如下配置 http.cors.enabled: true http.cors.allow-origin: "*"
修改完配置文件后需重啟es服務(wù)。
elasticsearch-head查詢報(bào)406 Not Acceptable
解決方法:
1、進(jìn)入head安裝目錄;
2、cd _site/
3、編輯vendor.js 共有兩處
#6886行 contentType: "application/x-www-form-urlencoded 改成 contentType: "application/json;charset=UTF-8" #7574行 var inspectData = s.contentType === "application/x-www-form-urlencoded" && 改成 var inspectData = s.contentType === "application/json;charset=UTF-8" &&
使用elasticsearch-rest-high-level-client
報(bào)org.elasticsearch.action.index.IndexRequest.ifSeqNo
#pom中除了加入依賴(lài)#還需加入 org.elasticsearch.client elasticsearch-rest-high-level-client 7.1.1 org.elasticsearch elasticsearch 7.1.1
相關(guān)參考: git hub issues 。
為什么ElasticSearch要在7.X版本不能使用type?
參考: 為什么ElasticSearch要在7.X版本去掉type?
使用spring-data-elasticsearch.jar報(bào)org.elasticsearch.client.transport.NoNodeAvailableException
由于本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底層采用es官方TransportClient,而es官方計(jì)劃放棄TransportClient,工具以es官方推薦的RestHighLevelClient進(jìn)行調(diào)用請(qǐng)求。 可參考 RestHighLevelClient API 。
設(shè)置Docker容器開(kāi)啟啟動(dòng)
如果創(chuàng)建時(shí)未指定 --restart=always ,可通過(guò)update 命令 docker update --restart=always [containerID]
Docker for Mac network host模式不生效
Host模式是為了性能,但是這卻對(duì)Docker的隔離性造成了破壞,導(dǎo)致安全性降低。 在性能場(chǎng)景下,可以用--netwokr host開(kāi)啟Host模式,但需要注意的是,如果你用Windows或Mac本地啟動(dòng)容器的話,會(huì)遇到Host模式失效的問(wèn)題。原因是Host模式只支持Linux宿主機(jī)。
參見(jiàn)官方文檔: https://docs.docker.com/network/host/ 。
客戶端連接ZooKeeper報(bào)authenticate using SASL(unknow error)
zookeeper.jar與Dokcer中的ZooKeeper版本不一致
zookeeper.jar使用了3.4.6之前的版本
出現(xiàn)這個(gè)錯(cuò)的意思是ZooKeeper作為外部應(yīng)用需要向系統(tǒng)申請(qǐng)資源,申請(qǐng)資源的時(shí)候需要通過(guò)認(rèn)證,而sasl是一種認(rèn)證方式,我們想辦法來(lái)繞過(guò)sasl認(rèn)證。避免等待,來(lái)提高效率。
在項(xiàng)目代碼中加入System.setProperty("zookeeper.sasl.client", "false");,
如果是Spring Boot項(xiàng)目可以在application.properties
中加入zookeeper.sasl.client=false
。
參考: Increased CPU usage by unnecessary SASL checks 。
如果更換canal.client.jar中依賴(lài)的zookeeper.jar的版本
把Canal的官方源碼下載到本機(jī)git clone https://github.com/alibaba/canal.git ,然后修改client模塊下pom.xml文件中關(guān)于ZooKeeper的內(nèi)容,然后重新mvn install:
把自己項(xiàng)目依賴(lài)的包替換為剛剛mvn install
生產(chǎn)的包:
關(guān)于選型的取舍
以上是“基于Docker結(jié)合Canal如何實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸功能”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!