前兩天測試環(huán)境的需求將上線生產(chǎn)環(huán)境,需求還是
a. 數(shù)據(jù)源:SSP庫 ssp.m_system_user,Oracle DB 12.1.0.2.0,Ogg Version 12.2.0.1.1 OGGCORE_12.2.0.1.0_PLATFORMS_151211.1401_FBO
b. 數(shù)據(jù)目標(biāo):MySQL DLS庫 DLS_SYSTEM_USER
c.kafka集群:10.1.1.247 ,Ogg Version 12.3.0.1.0 OGGCORE_OGGADP.12.3.0.1.0GA_PLATFORMS_170828.1608
創(chuàng)新互聯(lián)是一家專注于成都做網(wǎng)站、網(wǎng)站制作與策劃設(shè)計(jì),米脂網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設(shè)十多年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:米脂等地區(qū)。米脂做網(wǎng)站價格咨詢:13518219792
由于oracle 12c已經(jīng)是多租戶架構(gòu),在使用OGG同步的時候,需要考慮下面一些情況
一個 CDB包含多個PDB
抽取模式只能是integrated(集成)模式,不支持claasic capture傳統(tǒng)方式捕獲;
因?yàn)橐褂胕ntegrated extract,因此,需要能訪問log mining server,而這個只能從cdb$root中訪問;
源端要使用common user,即c##ogg這種用戶來訪問源端DB,這樣能訪問DB的redo log & all pdbs。
在GGSCI或參數(shù)文件中,可以使用pdb.schema.table來訪問具體的表或序列;
可以在參數(shù)文件 中使用sourceCatalog參數(shù),指定一個PDB,后面的參數(shù)中只需要schema.table即可;
目標(biāo)端每個pdb要有一個replicat進(jìn)程,即一個replicat進(jìn)程只能投遞到一個PDB,不能投遞到多個。
源端OGG用戶需要賦權(quán):dbms_goldengate_auth.grant_admin_privilege(‘C##GGADMIN’,container=>‘a(chǎn)ll’),同時建議將ogg的用戶設(shè)置賦權(quán)為:grant dba to c##ogg container=all;
源端DB除了以前要打開歸檔, force logging, 最小附加日志,可能還需要打開一個開關(guān):alter system set enable_goldengate_replication=true;
具體實(shí)施步驟;
1、為要同步的表添加附加日志
dblogin userid ogg@SALESPDB,password OGG_PROD
add trandata ssp.m_system_user
2、 添加抽取進(jìn)程
add extract EXT_KAF4,integrated tranlog, begin now --12c區(qū)別
add EXTTRAIL ./dirdat/k4, extract EXT_KAF4,MEGABYTES 200
GGSCI (salesdb as ogg@salesdb/SALESPDB) 17> dblogin useridalias ggroot
Successfully logged into database CDB$ROOT.
--得在cdb里面注冊ext進(jìn)程
register extract EXT_KAF4 database container(SALESPDB)
edit params EXT_KAF4
extract EXT_KAF4
userid c##ggadmin,PASSWORD ggadmin
LOGALLSUPCOLS
UPDATERECORDFORMAT COMPACT
exttrail ./dirdat/k4,FORMAT RELEASE 12.1
SOURCECATALOG SALESPDB
table ssp.m_system_user;
3、添加投遞進(jìn)程:
add extract PMP_KAF4, exttrailsource ./dirdat/k4
add rmttrail ./dirdat/b4,EXTRACT PMP_KAF4,MEGABYTES 200
eidt params PMP_KAF4
EXTRACT PMP_KAF4
USERID c##ggadmin,PASSWORD ggadmin
PASSTHRU
RMTHOST 10.1.1.247, MGRPORT 9178
RMTTRAIL ./dirdat/b4,format release 12.1
SOURCECATALOG SALESPDB
table ssp.m_system_user;
4.添加初始化進(jìn)程
ADD EXTRACT ek_04, sourceistable ---源端添加
EXTRACT ek_04
USERID c##ggadmin,PASSWORD ggadmin
RMTHOST 10.1.1.247, MGRPORT 9178
RMTFILE ./dirdat/b5,maxfiles 999, megabytes 500,format release 12.1
SOURCECATALOG SALESPDB
table ssp.m_system_user;
5.生成def文件:
edit param salesdb4
USERID c##ggadmin,PASSWORD ggadmin
defsfile /home/oracle/ogg/ggs12/dirdef/salesdb4.def,format release 12.1
SOURCECATALOG SALESPDB
table ssp.m_system_user;
在OGG_HOME下執(zhí)行如下命令生成def文件
defgen paramfile ./dirprm/salesdb4.prm
將生成的def文件傳到目標(biāo)端kafka--$OGG_HOME/dirdef下
---mysql 數(shù)據(jù)庫地址:10.1.11.24 mysql地址
---kafka地址 10.1.1.246:0000,10.1.1.247:0000 主題:DLS_MERCHANT
1、添加初始化進(jìn)程:---dirprm
ADD replicat rp_06,specialrun
EDIT PARAMS rp_06
SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
targetdb libfile libggjava.so set property=./dirprm/kafka_k05.props
SOURCEDEFS ./dirdef/salesdb4.def
EXTFILE ./dirdat/b5
reportcount every 1 minutes, rate
grouptransops 10000
map SALESPDB.SSP.M_SYSTEM_USER,TARGET DLS.DLS_SYSTEM_USER;
2、添加復(fù)制進(jìn)程:
add replicat rep_04,exttrail ./dirdat/b4
edit params rep_04
REPLICAT rep_04
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
HANDLECOLLISIONS
targetdb libfile libggjava.so set property=./dirprm/kafka_k05.props
SOURCEDEFS ./dirdef/salesdb4.def
reportcount every 1 minutes, rate
grouptransops 10000
map SALESPDB.SSP.M_SYSTEM_USER,TARGET DLS.DLS_SYSTEM_USER;
3、參數(shù)配置:
cd /home/appgroup/ogg/ggs12/dirprm
custom_kafka_producer.properties 文件
vi kafka_k05.props
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate= DLS_MERCHANT
#The following selects the message key using the concatenated primary keys
#gg.handler.kafkahandler.keyMappingTemplate=
#gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.format =json
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.format.prettyPrint=false
gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
gg.handler.kafkahandler.format.includePrimaryKeys=true
gg.handler.kafkahandler.SchemaTopicName= DLS_MERCHANT
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
----啟動各個進(jìn)程
1,源端抽取,投遞,初始化進(jìn)程啟動
2,目標(biāo)端啟動初始化進(jìn)程、執(zhí)行初始化腳本,啟動復(fù)制進(jìn)程
start rp_06
./replicat paramfile ./dirprm/rp_06.prm reportfile ./dirrpt/rp_06.rpt -p INITIALDATALOAD
start rep_04