debezium,簡(jiǎn)稱dbz,偽裝為MySQL從庫(kù),當(dāng)主庫(kù)發(fā)生變化后,主庫(kù)會(huì)主動(dòng)將變化的信息同步到dbz內(nèi),dbz將收到的信息轉(zhuǎn)為JSON推送到Kafka內(nèi)。
公司主營(yíng)業(yè)務(wù):成都做網(wǎng)站、成都網(wǎng)站建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)、移動(dòng)網(wǎng)站開(kāi)發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。成都創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開(kāi)放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來(lái)的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來(lái)驚喜。成都創(chuàng)新互聯(lián)推出柳城免費(fèi)做網(wǎng)站回饋大家。安裝JDK11yum -y install java-11-openjdk-devel
解壓部署tar xfz debezium-server-dist-2.0.0.Final.tar.gz
修改配置文件application.properties
[yinyx@localhost conf]$ cat application.properties
quarkus.http.port=8999
rkus.log.level=INFO
quarkus.log.console.json=false
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=6306
debezium.source.database.user=test
debezium.source.database.password=test
debezium.source.database.server.id=2
debezium.source.database.include.list=test
debezium.source.topic.prefix=yyx
debezium.source.key.converter.schemas.enable=false
debezium.source.value.converter.schemas.enable=false
debezium.source.schema.history.internal.kafka.bootstrap.servers=127.0.0.1:9092
debezium.source.schema.history.internal.kafka.topic=schemahistory
debezium.source.decimal.handling.mode=string
debezium.source.lob.enabled=true
debezium.source.database.history.skip.unparseable.ddl=true
debezium.source.tombstones.on.delete=false
debezium.sink.type=kafka
debezium.sink.kafka.producer.bootstrap.servers=127.0.0.1:9092
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.format.key.schemas.enable=false
debezium.format.value.schemas.enable=false
[yinyx@localhost conf]$
啟動(dòng)./run.sh
注意先啟動(dòng)kafka
測(cè)試 檢查topic是否已經(jīng)創(chuàng)建[yinyx@localhost bin]$ ./kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092
__consumer_offsets
schemahistory
yinyx
yyx
yyx.test.t1
啟動(dòng)kafka的消費(fèi)./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic yyx.test.t1
到MySQL更新t1表的數(shù)據(jù)insert update delete 隨便整
查看kafka消費(fèi),應(yīng)出現(xiàn)類似如下信息{“before”:{“f1”:3,“f2”:“cc|33”,“f3”:1670056305000},“after”:{“f1”:3,“f2”:“cc|333”,“f3”:1670056305000},“source”:{“version”:“2.0.0.Final”,“connector”:“mysql”,“name”:“yyx”,“ts_ms”:1670030109000,“snapshot”:“false”,“db”:“test”,“sequence”:null,“table”:“t1”,“server_id”:1,“gtid”:“7bdc8394-71cf-11ed-b2d5-000c293c9462:25”,“file”:“mysql-bin.000002”,“pos”:7907,“row”:0,“thread”:39,“query”:null},“op”:“u”,“ts_ms”:1670031525419,“transaction”:null}
{“before”:{“f1”:2,“f2”:“bb|222”,“f3”:1670002422000},“after”:{“f1”:2,“f2”:“bb|222haha”,“f3”:1670002422000},“source”:{“version”:“2.0.0.Final”,“connector”:“mysql”,“name”:“yyx”,“ts_ms”:1670031487000,“snapshot”:“false”,“db”:“test”,“sequence”:null,“table”:“t1”,“server_id”:1,“gtid”:“7bdc8394-71cf-11ed-b2d5-000c293c9462:27”,“file”:“mysql-bin.000002”,“pos”:8561,“row”:0,“thread”:39,“query”:null},“op”:“u”,“ts_ms”:1670031525422,“transaction”:null}
總結(jié)至此,MySQL的變化,會(huì)實(shí)時(shí)反應(yīng)到Kafka的JSON數(shù)據(jù)里面,后續(xù)自己開(kāi)發(fā)程序從Kafka接收處理即可。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧