真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么

今天就跟大家聊聊有關(guān)緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

創(chuàng)新互聯(lián)建站專注于宜春網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供宜春營(yíng)銷型網(wǎng)站建設(shè),宜春網(wǎng)站制作、宜春網(wǎng)頁(yè)設(shè)計(jì)、宜春網(wǎng)站官網(wǎng)定制、成都微信小程序服務(wù),打造宜春網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供宜春網(wǎng)站排名全網(wǎng)營(yíng)銷落地服務(wù)。

當(dāng)項(xiàng)目的請(qǐng)求量上去了之后,通常有兩種做法來(lái)應(yīng)對(duì)高并發(fā),第一是盡最大可能的使用cache來(lái)對(duì)抗,第二是盡最大可能的分庫(kù)分表對(duì)抗。。。說(shuō)起來(lái)容易,做起來(lái)并不那么樂(lè)觀,下面就來(lái)分析下。

一:如何保證緩存一致性

如我們的千人千面系統(tǒng)中,會(huì)針對(duì)商品,訂單等多維度為某一個(gè)商家店鋪?zhàn)詣?dòng)化建立大約400個(gè)數(shù)據(jù)模型,然后買家在淘寶下訂單之后,淘寶會(huì)將訂單推送過(guò)來(lái),訂單會(huì)在400個(gè)模型中兜一圈,從而推送更貼切符合該買家行為習(xí)慣的觸達(dá),為了應(yīng)對(duì)高并發(fā),這些模型自然都是緩存在Cache中,如果有新的模型進(jìn)來(lái)了,我如何通知redis進(jìn)行緩存更新呢?通常的做法就是在添加模型的時(shí)候,順便更新redis。。。對(duì)吧,如下圖:

緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么

說(shuō)的簡(jiǎn)單,web開發(fā)的程序員會(huì)說(shuō),麻蛋的,我管你什么業(yè)務(wù),憑啥要我做推送,把我代碼搞出問(wèn)題了,你負(fù)責(zé)呀???所以你必須得碰一鼻子灰。就算搞定了web程序員,你可能還會(huì)遇到更新database成功,更新redis的時(shí)候失敗,可人家不管,而且錯(cuò)誤日志還在別人的日志系統(tǒng)里面,所以你很難甚至無(wú)法保證這個(gè)db和cache的緩存一致性,那這個(gè)時(shí)候能不能換個(gè)思路,我直接寫個(gè)程序訂閱database的binlog,從binlog中分析出模型數(shù)據(jù)的CURD操作,根據(jù)這些CURD的實(shí)際情況更新Redis的緩存數(shù)據(jù),第一個(gè)可以實(shí)現(xiàn)和web的解耦,第二個(gè)實(shí)現(xiàn)了高度的緩存一致性,所以新的架構(gòu)是這樣的。

緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么

上面這張圖,相信大家都能看得懂,重點(diǎn)就是這個(gè)處理binlog程序,從binlog中分析出CURD從而更新Redis,其實(shí)這個(gè)binlog程序就是本篇所說(shuō)的canal。。。一個(gè)偽裝成MySQL的slave,不斷的通過(guò)dump命令從mysql中盜出binlog日志,從而完美的實(shí)現(xiàn)了這個(gè)需求。

二:如何實(shí)現(xiàn)跨服務(wù)器 join 查詢

本篇開頭也說(shuō)到了,數(shù)據(jù)量大了之后,必然會(huì)存在分庫(kù)分表,甚至database都要分散到多臺(tái)服務(wù)器上,現(xiàn)在的電商項(xiàng)目,都是業(yè)務(wù)趕著技術(shù)跑。。。誰(shuí)也不知道下一個(gè)業(yè)務(wù)會(huì)是一個(gè)怎樣的奇葩,所以必然會(huì)導(dǎo)致你要做一些跨服務(wù)器join查詢,你以為自己很聰明,其實(shí)DBA早就把跨服務(wù)器查詢的函數(shù)給你關(guān)掉了,求爹爹拜奶奶都不會(huì)給你開的,除非你殺一個(gè)DBA祭天,不過(guò)如果你的業(yè)務(wù)真的很重要,可能DBA會(huì)給你做數(shù)據(jù)異構(gòu),所謂的數(shù)據(jù)異構(gòu),那就是將需要join查詢的多表按照某一個(gè)維度又聚合在一個(gè)DB中。讓你去查詢。。。。。

緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么

那如果用canal來(lái)訂閱binlog,就可以改造成下面這種架構(gòu)。

緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么

三:搭建一覽

好了,canal的應(yīng)用場(chǎng)景給大家也介紹到了,最主要是理解這種思想,人家搞不定的東西,你的價(jià)值就出來(lái)了。

1. 開啟mysql的binlog功能

開啟binlog,并且將binlog的格式改為Row,這樣就可以獲取到CURD的二進(jìn)制內(nèi)容,windows上的路徑為:C:\ProgramData\MySQL\MySQL  Server 5.7\my.ini。

log-bin=mysql-bin #添加這一行就ok binlog-format=ROW #選擇row模式 server_id=1

緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么

2. 驗(yàn)證binlog是否開啟

使用命令驗(yàn)證,并且開啟binlog的過(guò)期時(shí)間為30天,默認(rèn)情況下binlog是不過(guò)期的,這就導(dǎo)致你的磁盤可能會(huì)爆滿,直到掛掉。

show variables like 'log_%';  #設(shè)置binlog的過(guò)期時(shí)間為30天 show variables like '%expire_logs_days%'; set global expire_logs_days=30;

緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么

3. 給canal服務(wù)器分配一個(gè)mysql的賬號(hào)權(quán)限,方便canal去偷binlog日志。

CREATE USER canal IDENTIFIED BY 'canal';     GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';   FLUSH PRIVILEGES;    show grants for 'canal'

4. 下載canal

github的地址:https://github.com/alibaba/canal/releases

緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么

5. 然后就是各種tar解壓 canal.deployer-1.0.24.tar.gz => canal

[root@localhost myapp]# ls apache-maven-3.5.0-bin.tar.gz                        dubbo-monitor-simple-2.5.4-SNAPSHOT.jar     nginx                tengine-2.2.0.tar.gz canal                                                gearmand                                    nginx-1.13.4.tar.gz  tengine_st canal.deployer-1.0.24.tar.gz                         gearmand-1.1.17                             nginx_st             tomcat dubbo                                                gearmand-1.1.17.tar.gz                      redis                zookeeper dubbo-monitor-simple-2.5.4-SNAPSHOT                  maven                                       redis-4.0.1.tar.gz   zookeeper-3.4.9.tar.gz dubbo-monitor-simple-2.5.4-SNAPSHOT-assembly.tar.gz  mysql-5.7.19-linux-glibc2.12-x86_64.tar.gz  tengine [root@localhost myapp]# cd canal [root@localhost canal]# ls bin  conf  lib  logs [root@localhost canal]# cd conf [root@localhost conf]# ls canal.properties  example  logback.xml  spring [root@localhost conf]# cd example [root@localhost example]# ls instance.properties  meta.dat [root@localhost example]#

6. canal 和 instance 配置文件

canal的模式是這樣的,一個(gè)canal里面可能會(huì)有多個(gè)instance,也就說(shuō)一個(gè)instance可以監(jiān)控一個(gè)mysql實(shí)例,多個(gè)instance也就可以對(duì)應(yīng)多臺(tái)服務(wù)器的mysql實(shí)例。也就是一個(gè)canal就可以監(jiān)控分庫(kù)分表下的多機(jī)器mysql。

1) canal.properties

它是全局性的canal服務(wù)器配置,具體如下,這里面的參數(shù)涉及到方方面面。

################################################# #########               common argument         #############  ################################################# canal.id= 1 canal.ip= canal.port= 11111 canal.zkServers= # flush data to zk canal.zookeeper.flush.period = 1000 # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024  ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE      ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false  # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size =  1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60  # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30  # binlog filter config canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false  # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED  canal.instance.binlog.image = FULL,MINIMAL,NOBLOB  # binlog ddl isolation canal.instance.get.ddl.isolation = false  ################################################# #########               destinations            #############  ################################################# canal.destinations= example # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5  canal.instance.global.mode = spring  canal.instance.global.lazy = false #canal.instance.global.manager.address = 127.0.0.1:1099 #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml  #################################################   ## mysql serverId   canal.instance.mysql.slaveId = 1234    # position info,需要改成自己的數(shù)據(jù)庫(kù)信息   canal.instance.master.address = 127.0.0.1:3306    canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp =  #canal.instance.standby.address =    #canal.instance.standby.journal.name =   #canal.instance.standby.position =    #canal.instance.standby.timestamp =     # username/password,需要改成自己的數(shù)據(jù)庫(kù)信息   canal.instance.dbUsername = root canal.instance.dbPassword = 123456 canal.instance.defaultDatabaseName = datamip   canal.instance.connectionCharset = UTF-8    # table regex   canal.instance.filter.regex = .*\\..*    #################################################

由于是全局性的配置,所以上面三處標(biāo)紅的地方要注意一下:

  • canal.port= 11111 當(dāng)前canal的服務(wù)器端口號(hào)

  • canal.destinations= example  當(dāng)前默認(rèn)開啟了一個(gè)名為example的instance實(shí)例,如果想開多個(gè)instance,用","逗號(hào)隔開就可以了。。。

  • canal.instance.filter.regex = .\.. mysql實(shí)例下的所有db的所有表都在監(jiān)控范圍內(nèi)。

2) instance.properties

這個(gè)就是具體的某個(gè)instances實(shí)例的配置,未涉及到的配置都會(huì)從canal.properties上繼承。

################################################# ## mysql serverId canal.instance.mysql.slaveId = 1234  # position info canal.instance.master.address = 192.168.23.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp =  #canal.instance.standby.address =  #canal.instance.standby.journal.name = #canal.instance.standby.position =  #canal.instance.standby.timestamp =   # username/password canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName =datamip canal.instance.connectionCharset = UTF-8  # table regex canal.instance.filter.regex = .*\\..* # table black regex canal.instance.filter.black.regex =  #################################################

上面標(biāo)紅的地方注意下就好了,去偷binlog的時(shí)候,需要知道的mysql地址和用戶名,密碼。

7. 開啟canal

大家要記得把 /canal/bin 目錄配置到 /etc/profile 的  Path中,方便快速開啟,通過(guò)下圖你會(huì)看到11111端口已經(jīng)在centos上開啟了。

[root@localhost bin]# ls canal.pid  startup.bat  startup.sh  stop.sh [root@localhost bin]# pwd /usr/myapp/canal/bin [root@localhost example]# startup.sh cd to /usr/myapp/canal/bin for workaround relative path LOG CONFIGURATION : /usr/myapp/canal/bin/../conf/logback.xml canal conf : /usr/myapp/canal/bin/../conf/canal.properties CLASSPATH :/usr/myapp/canal/bin/../conf:/usr/myapp/canal/bin/../lib/zookeeper-3.4.5.jar:/usr/myapp/canal/bin/../lib/zkclient-0.1.jar:/usr/myapp/canal/bin/../lib/spring-2.5.6.jar:/usr/myapp/canal/bin/../lib/slf4j-api-1.7.12.jar:/usr/myapp/canal/bin/../lib/protobuf-java-2.6.1.jar:/usr/myapp/canal/bin/../lib/oro-2.0.8.jar:/usr/myapp/canal/bin/../lib/netty-all-4.1.6.Final.jar:/usr/myapp/canal/bin/../lib/netty-3.2.5.Final.jar:/usr/myapp/canal/bin/../lib/logback-core-1.1.3.jar:/usr/myapp/canal/bin/../lib/logback-classic-1.1.3.jar:/usr/myapp/canal/bin/../lib/log4j-1.2.14.jar:/usr/myapp/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/usr/myapp/canal/bin/../lib/guava-18.0.jar:/usr/myapp/canal/bin/../lib/fastjson-1.2.28.jar:/usr/myapp/canal/bin/../lib/commons-logging-1.1.1.jar:/usr/myapp/canal/bin/../lib/commons-lang-2.6.jar:/usr/myapp/canal/bin/../lib/commons-io-2.4.jar:/usr/myapp/canal/bin/../lib/commons-beanutils-1.8.2.jar:/usr/myapp/canal/bin/../lib/canal.store-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.sink-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.server-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.protocol-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.parse.driver-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.parse.dbsync-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.parse-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.meta-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.instance.spring-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.instance.manager-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.instance.core-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.filter-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.deployer-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.common-1.0.24.jar:/usr/myapp/canal/bin/../lib/aviator-2.2.1.jar: cd to /usr/myapp/canal/conf/example for continue [root@localhost example]# netstat -tln Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address           Foreign Address         State       tcp        0      0 0.0.0.0:11111           0.0.0.0:*               LISTEN      tcp        0      0 0.0.0.0:111             0.0.0.0:*               LISTEN      tcp        0      0 192.168.122.1:53        0.0.0.0:*               LISTEN      tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      tcp        0      0 127.0.0.1:631           0.0.0.0:*               LISTEN      tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN      tcp6       0      0 :::111                  :::*                    LISTEN      tcp6       0      0 :::22                   :::*                    LISTEN      tcp6       0      0 ::1:631                 :::*                    LISTEN      tcp6       0      0 ::1:25                  :::*                    LISTEN      [root@localhost example]#

8. Java Client 代碼

canal driver  需要在maven倉(cāng)庫(kù)中獲取一下:http://www.mvnrepository.com/artifact/com.alibaba.otter/canal.client/1.0.24,不過(guò)依賴還是蠻多的。

          com.alibaba.otter       canal.client       1.0.24   

緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么

9. 啟動(dòng)java代碼進(jìn)行驗(yàn)證

下面的代碼對(duì)table的CURD都做了一個(gè)基本的判斷,看看是不是能夠智能感知,然后可以根據(jù)實(shí)際情況進(jìn)行redis的更新操作。。。

package com.datamip.canal;  import java.awt.Event; import java.net.InetSocketAddress; import java.util.List;  import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.Header; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException;  public class App {      public static void main(String[] args) throws InterruptedException {          // 第一步:與canal進(jìn)行連接         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.23.170", 11111),                 "example", "", "");         connector.connect();          // 第二步:開啟訂閱         connector.subscribe();          // 第三步:循環(huán)訂閱         while (true) {             try {                 // 每次讀取 1000 條                 Message message = connector.getWithoutAck(1000);                  long batchID = message.getId();                  int size = message.getEntries().size();                  if (batchID == -1 || size == 0) {                     System.out.println("當(dāng)前暫時(shí)沒(méi)有數(shù)據(jù)");                     Thread.sleep(1000); // 沒(méi)有數(shù)據(jù)                 } else {                     System.out.println("-------------------------- 有數(shù)據(jù)啦 -----------------------");                     PrintEntry(message.getEntries());                 }                  // position id ack (方便處理下一條)                 connector.ack(batchID);              } catch (Exception e) {                 // TODO: handle exception              } finally {                 Thread.sleep(1000);             }         }     }      // 獲取每條打印的記錄     @SuppressWarnings("static-access")     public static void PrintEntry(List entrys) {          for (Entry entry : entrys) {              // 第一步:拆解entry 實(shí)體             Header header = entry.getHeader();             EntryType entryType = entry.getEntryType();              // 第二步: 如果當(dāng)前是RowData,那就是我需要的數(shù)據(jù)             if (entryType == EntryType.ROWDATA) {                  String tableName = header.getTableName();                 String schemaName = header.getSchemaName();                  RowChange rowChange = null;                  try {                     rowChange = RowChange.parseFrom(entry.getStoreValue());                 } catch (InvalidProtocolBufferException e) {                     e.printStackTrace();                 }                  EventType eventType = rowChange.getEventType();                  System.out.println(String.format("當(dāng)前正在操作 %s.%s, Action= %s", schemaName, tableName, eventType));                  // 如果是‘查詢’ 或者 是 ‘DDL’ 操作,那么sql直接打出來(lái)                 if (eventType == EventType.QUERY || rowChange.getIsDdl()) {                     System.out.println("rowchange sql ----->" + rowChange.getSql());                     return;                 }                  // 第三步:追蹤到 columns 級(jí)別                 rowChange.getRowDatasList().forEach((rowData) -> {                      // 獲取更新之前的column情況                     List beforeColumns = rowData.getBeforeColumnsList();                      // 獲取更新之后的 column 情況                     List afterColumns = rowData.getAfterColumnsList();                      // 當(dāng)前執(zhí)行的是 刪除操作                     if (eventType == EventType.DELETE) {                         PrintColumn(beforeColumns);                     }                      // 當(dāng)前執(zhí)行的是 插入操作                     if (eventType == eventType.INSERT) {                         PrintColumn(afterColumns);                     }                      // 當(dāng)前執(zhí)行的是 更新操作                     if (eventType == eventType.UPDATE) {                         PrintColumn(afterColumns);                     }                 });             }         }     }      // 每個(gè)row上面的每一個(gè)column 的更改情況     public static void PrintColumn(List columns) {          columns.forEach((column) -> {              String columnName = column.getName();             String columnValue = column.getValue();             String columnType = column.getMysqlType();             boolean isUpdated = column.getUpdated(); // 判斷 該字段是否更新              System.out.println(String.format("columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName,                     columnValue, columnType, isUpdated));          });      } }

Update操作

緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么

Insert操作

緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么

Delete 操作

緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么

從結(jié)果中看,沒(méi)毛病,有圖有真相,好了,本篇就說(shuō)到這里,對(duì)于開發(fā)的你,肯定是有幫助的~~~

看完上述內(nèi)容,你們對(duì)緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。


文章名稱:緩存一致性和跨服務(wù)器查詢的數(shù)據(jù)異構(gòu)解決方案是什么
標(biāo)題路徑:http://weahome.cn/article/igsseo.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部