收到業(yè)務部門需求,要求將Oracle數(shù)據(jù)庫某表同步至MySQL數(shù)據(jù)庫中,異構(gòu)環(huán)境我們用kafka來實現(xiàn),下面是具體的一些配置;
創(chuàng)新互聯(lián)專注于網(wǎng)站設計制作、網(wǎng)站制作、網(wǎng)頁設計、網(wǎng)站制作、網(wǎng)站開發(fā)。公司秉持“客戶至上,用心服務”的宗旨,從客戶的利益和觀點出發(fā),讓客戶在網(wǎng)絡營銷中找到自己的駐足之地。尊重和關懷每一位客戶,用嚴謹?shù)膽B(tài)度對待客戶,用專業(yè)的服務創(chuàng)造價值,成為客戶值得信賴的朋友,為客戶解除后顧之憂。
由于業(yè)務需要,現(xiàn)申請使用架構(gòu)組數(shù)據(jù)同步服務同步以下數(shù)據(jù)到管家MySQL數(shù)據(jù)庫
代理商用戶數(shù)據(jù):
a. 數(shù)據(jù)源:SSP庫 AAA.system_user
b. 數(shù)據(jù)目標:MySQL DLS庫 DLS_SYSTEM_USER
c. 同步邏輯: 無
d. 同步數(shù)據(jù)及對應關系:參見附件
e. 是否涉及敏感信息:否
準備工作;由于目標庫Mysql庫該表已經(jīng)存在,我們將該表備份并且獲取建表語句;
--獲取建表語句
mysql> show create table dls_system_user;
--導出單個數(shù)據(jù)表結(jié)構(gòu)和數(shù)據(jù)
mysqldump -uroot -p dls DLS_SYSTEM_USER > DLS_SYSTEM_USER_180622.sql
--重命名表
ALTER TABLE DLS_SYSTEM_USERRENAME DLS_SYSTEM_USER_BAK0622;
--新建空表
CREATE TABLE dls_system_user
(ID
varchar(100) NOT NULL,ACCOUNT_EXPIRED
int(1) NOT NULL DEFAULT '0',ACCOUNT_LOCKED
int(1) NOT NULL DEFAULT '0',ENABLED
int(1) NOT NULL DEFAULT '0',ORG_NO
varchar(255) NOT NULL DEFAULT '',USER_CODE
varchar(100) NOT NULL DEFAULT '',REMARK_NAME
varchar(255) NOT NULL DEFAULT '',IS_CREATE_PERSON
varchar(255) NOT NULL DEFAULT '',STATUS
int(10) NOT NULL DEFAULT '0',
PRIMARY KEY (ID
),
KEY IDX_DLS_SYSTEM_USER_USER_CODE
(USER_CODE
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Oracle源端GoldenGate配置:
1、為要同步的表添加附加日志
dblogin USERID ggs@ntestdb, password ggs
add trandata AAA.system_user
2、 添加抽取進程
add extract ext_kafb, tranlog, begin now
add EXTTRAIL ./dirdat/a2, extract ext_kafb,MEGABYTES 200
edit params EXT_KAFB
extract EXT_KAFB
USERID ggs@ntestdb, password ggs
LOGALLSUPCOLS
exttrail ./dirdat/a2,FORMAT RELEASE 11.2
table AAA.system_user;
3、添加投遞進程:
add extract pmp_kafb, exttrailsource ./dirdat/a2
add rmttrail ./dirdat/b2,EXTRACT pmp_kafb,MEGABYTES 200
eidt params pmp_kafb
EXTRACT pmp_kafb
USERID ggs@ntestdb, password ggs
PASSTHRU
RMTHOST 172.16.xxx.5, MGRPORT 9178 --kafka服務器地址
RMTTRAIL ./dirdat/b2,format release 11.2
table AAA.system_user;
----初始化文件存放在 /ggs/ggs12/dirprm/
4.添加初始化進程
ADD EXTRACT ek_20, sourceistable ---源端添加
edit params ek_20
EXTRACT ek_20
USERID ggs@ntestdb, password ggs
RMTHOST 172.16.154.5, MGRPORT 9178
RMTFILE ./dirdat/lb,maxfiles 999, megabytes 500
table AAA.system_user;
5.生成def文件:
GGSCI> edit param defgen_n9
USERID ggs@ntestdb, password ggs
defsfile /goldengate/ggskafka/dirdef/defgen_n9.def,format release 11.2
table AAA.system_user;
在OGG_HOME下執(zhí)行如下命令生成def文件
defgen paramfile /goldengate/ggskafka/dirprm/defgen_n9.prm
將生成的def文件傳到kafka服務器$OGG_HOME/dirdef下
---目標端mysql 數(shù)據(jù)庫地址172.16.xxx.148,需要新建kafka用戶
grant select,insert,update,delete,create,drop on DLS.* to 'kafka'@'%' identified by 'jiubugaosuni';
--kafka服務器GoldenGate操作
1、添加初始化進程:---dirprm
GGSCI> ADD replicat rn_3,specialrun
EDIT PARAMS rn_3
SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
targetdb libfile libggjava.so set property=./dirprm/kafkat_n3.props
SOURCEDEFS ./dirdef/defgen_n9.def
EXTFILE ./dirdat/lb
reportcount every 1 minutes, rate
grouptransops 10000
MAP AAA.system_user, TARGET DLS.DLS_SYSTEM_USER;
2、添加復制進程:
GGSCI>add replicat RN_KF3,exttrail ./dirdat/b2
GGSCI>edit params RN_KF3
REPLICAT RN_KF3
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
HANDLECOLLISIONS
targetdb libfile libggjava.so set property=./dirprm/kafkat_n3.props
SOURCEDEFS ./dirdef/defgen_n9.def
reportcount every 1 minutes, rate
grouptransops 10000
MAP AAA.system_user, TARGET DLS.DLS_SYSTEM_USER;
3、參數(shù)配置:
cd /home/app/ogg/ggs12/dirprm
custom_kafka_producer.properties 文件內(nèi)容如下:
[app@test-datamanager dirprm]$ more custom_kafka_producer.properties
bootstrap.servers=172.16.xxx.5:9092,172.16.xxx.7:9092
acks=1
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=16384
linger.ms=0
---vi添加對應文件 kafkat_n3.props
kafka.props文件內(nèi)容如下:
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate= DLS.DLS_MERCHANT_STATUS
#gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.format =json --指定文件類型
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.format.prettyPrint=false
gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
gg.handler.kafkahandler.format.includePrimaryKeys=true
gg.handler.kafkahandler.SchemaTopicName= DLS.DLS_MERCHANT_STATUS --指定topic名稱
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ --patch路徑
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
至此我們配置算是基本完成,現(xiàn)在我們來開啟進程,初始化數(shù)據(jù);
1、啟動源端抓取進程
GGSCI> start EXT_KAFB
2、啟動源端投遞進程
GGSCI> start pmp_kafb
3、啟動源端初始化進程
GGSCI> start ek_20
4、啟動目標端初始化進程
GGSCI> start rn_3
在$OGG_HOME下執(zhí)行如下命令:
./replicat paramfile ./dirprm/rn_3.prm reportfile ./dirrpt/rn_3.rpt -p INITIALDATALOAD
5、啟動目標端恢復進程
GGSCI> start RN_KF3