真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Oracle數(shù)據(jù)怎么發(fā)送到kafka傳輸數(shù)據(jù)

這篇文章主要介紹“Oracle數(shù)據(jù)怎么發(fā)送到kafka傳輸數(shù)據(jù)”,在日常操作中,相信很多人在Oracle數(shù)據(jù)怎么發(fā)送到kafka傳輸數(shù)據(jù)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”O(jiān)racle數(shù)據(jù)怎么發(fā)送到kafka傳輸數(shù)據(jù)”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

創(chuàng)新互聯(lián)建站服務(wù)項目包括南岸網(wǎng)站建設(shè)、南岸網(wǎng)站制作、南岸網(wǎng)頁制作以及南岸網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,南岸網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到南岸省份的部分城市,未來相信會繼續(xù)擴大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

                   配置
OGG ADPATER FOR KAFKA
需要的kafka包:
Kafka 0.8.2.1
kafka-clients-0.8.2.1.jar
lz4-1.2.0.jar
slf4j-api-1.7.6.jar
snappy-java-1.1.1.6.jar

###################################################################配置OGG
主庫

dblogin userid goldengate, password oggpassword add extract EXTJMS,tranlog, threads 3,begin now add exttrail /data/oggsrc/dirdat/jm, extract EXTJMS megabytes 200

add trandata testKAFKA.* --add schematrandata testKAFKA

抽取進程: 
edit param extjms EXTRACT EXTJMS
SETENV (ORACLE_SID = "rac3")
SETENV (ORACLE_HOME=/data/app/oracle/product/10.2.0/db_1)
SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
DBOPTIONS ALLOWUNUSEDCOLUMN, FETCHBATCHSIZE 1500
userid goldengate, password oggpassword
EXTTRAIL /data/oggsrc/dirdat/jm, FORMAT RELEASE 9.5
DISCARDFILE /data/oggsrc/dirtmp/EXTJMS.dsc, APPEND, MEGABYTES 500
tranlogoptions asmuser SYS@rac_asm, ASMPASSWORD oracle_123
THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000
WARNLONGTRANS 30MIN, CHECKINTERVAL 3MIN
CHECKPOINTSECS 5
FLUSHCSECS 80
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
RecoveryOptions OverwriteMode
--DDL INCLUDE ALL
DDL INCLUDE MAPPED &
exclude objname testKAFKA.PK_CATEGORY_RANKLIST & 
exclude objtype 'PACKAGE' &
exclude objtype 'PACKAGE BODY' &
exclude INSTR 'REPLACE SYNONYM' &
exclude INSTR 'CREATE OR REPLACE PACKAGE' &
exclude objtype 'PROCEDURE' &
exclude objtype 'FUNCTION' &
exclude objtype 'TYPE' &
exclude objtype 'TRIGGER' &
exclude objtype 'GRANT' &
exclude instr 'GRANT' &
exclude objtype 'DATABASE LINK' &
exclude objtype 'CONSTRAINT' &
exclude objtype 'JOB' &
exclude instr 'ALTER SESSION' &
exclude instr 'MATERIALIZED VIEW'  &
exclude INSTR 'AS SELECT' &
exclude INSTR 'REPLACE SYNONYM' &
EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_CMP" &
EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_UNCMP" 
--GETUPDATEBEFORES
--ddloptions addtrandata,REPORT
FETCHOPTIONS, USESNAPSHOT, NOUSELATESTVERSION, MISSINGROW REPORT
dynamicresolution
EOFDELAYCSECS 5
TABLEEXCLUDE testKAFKA.RULE_ACTION_LOG;
TABLE testKAFKA.* ;
SQL>  ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS; Database altered. SQL>  ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (all) COLUMNS; Database altered. SQL> select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI ,FORCE_LOGGING from v$database; SUPPLEME SUP SUP FOR
-------- --- --- ---
YES      YES YES YES SQL>  源端添加新的pump進程:
在testKAFKA源庫測試添加pump進程:
添加pump進程:
添加新的pump:
add extract EDPKK,exttrailsource /data/oggsrc/dirdat/jm, begin now edit param EDPKK EXTRACT EDPKK
setenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)
PASSTHRU
GETUPDATEBEFORES
RecoveryOptions OverwriteMode
RMTHOST 192.168.0.3, MGRPORT 7839
RMTTRAIL /data/ogg_for_bigdata/dirdat/kk
RMTTRAIL /data/ogg_for_kafka/dirdat/kk
DISCARDFILE ./dirrpt/EDPKK.dsc,APPEND,MEGABYTES 5
TABLE testKAFKA.* ; add rmttrail /data/ogg_for_bigdata/dirdat/kk, extract EDPKK megabytes 200 add rmttrail /data/ogg_for_kafka/dirdat/kk,extract EDPKK megabytes 200 編輯定義文件:
userid goldengate, password oggpassword
defsfile dirdef/testKAFKA.def
TABLEEXCLUDE *.DBMS_TABCOMP_TEMP*;                
TABLE testKAFKA.*;

傳遞定義文件:
./defgen paramfile ./dirprm/defgen.prm
 cd dirdef/
[oracle@vm01 dirdef]$ scp  dirdef/testKAFKA.def oracle@192.168.0.3:/data/ogg_for_bigdata/dirdef
The authenticity of host '192.168.0.3 (192.168.0.3)' can't be established.
RSA key fingerprint is 46:8c:35:61:74:ca:43:e0:b0:74:d5:ff:0c:2f:67:8a.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '192.168.0.3' (RSA) to the list of known hosts.
reverse mapping checking getaddrinfo for bogon failed - POSSIBLE BREAK-IN ATTEMPT!
oracle@192.168.0.3's password: 
testKAFKA.def                                                                                                                  100%  899KB 898.7KB/s   00:00    
[oracle@vm01 dirdef]$  目標(biāo)端直接端
mgr: PORT 7839
DYNAMICPORTLIST 7840-7850
--AUTOSTART replicat *
--AUTORESTART replicat *,RETRIES 5,WAITMINUTES 2
AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10
PURGEOLDEXTRACTS /data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2
PURGEOLDEXTRACTS /data/ogg_for_kafka/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45

添加 UE DATA PUMP:
 使用版本:
Version 12.1.2.1.4 20470586 OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209 在目標(biāo)端  新端口7889:
ADD EXTRACT repkk, EXTTRAILSOURCE /data/ogg_for_bigdata/dirdat/kk

ADD EXTRACT repkk, EXTTRAILSOURCE /data/ogg_for_kafka/dirdat/kk edit param repkk GGSCI (localhost.localdomain) 18> view param repkk EXTRACT repkk
SETENV (GGS_USEREXIT_CONF ="dirprm/repkk.props")
GetEnv (JAVA_HOME)
GetEnv (PATH)
GetEnv (LD_LIBRARY_PATH)
SourceDefs dirdef/testKAFKA.def
CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores
GetUpdateBefores
NoCompressDeletes
NoCompressUpdates
NoTcpSourceTimer
TABLEEXCLUDE testKAFKA.MV*;
TABLE testKAFKA.*;

[oracle@repvm dirdef]$ cp testKAFKA.def /data/ogg_for_kafka/dirdef
配置文件:
[oracle@repvm dirprm]$ cat repkk.props 
gg.handlerlist =kafkahandler
#gg.handler.kafkahandler.type=oracle.goldengate.handler.kafka.KafkaHandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=core_kafka_producer.properties
gg.handler.kafkahandler.TopicName =zqtest
gg.handler.kafkahandler.format =avro_op
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode =tx
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE gg.log=log4j
#gg.log.level=INFO
gg.log.level=DEBUG gg.report.time=30sec #gg.classpath=dirprm/:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar:/data/ogg_for_kafka/dirprm/kafka_jar/*:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2/libs/*
gg.classpath=dirprm:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2/libs/* javawriter.bootoptions=-Xmx4096m -Xms4096m -Djava.class.path=/data/ogg_for_kafka/ggjava/ggjava.jar:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar -Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties
[oracle@repvm dirprm]$  javawriter.bootoptions 必須包含ogg for kafka的lib包

kafka的屬性文件:
bootstrap.servers=localhost:9092
acks = 1
compression.type = gzip
reconnect.backoff.ms = 1000
value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size = 102400
linger.ms = 10000
max.request.size = 5024000
send.buffer.bytes = 5024000

compression.type 參數(shù)默認: 配置kafka:
[oracle@repvm kafka_2.10-0.8.2.2]$ pwd
/data/kafka_2.10-0.8.2.2
[oracle@repvm kafka_2.10-0.8.2.2]$ 
[oracle@repvm kafka_2.10-0.8.2.2]$ grep -v '^$\|^\s*\#' config/server.properties
broker.id=0
port=9092
num.network.threads=3
 
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
[oracle@repvm kafka_2.10-0.8.2.2]$ 

[oracle@repvm libs]$ ll
total 17452
-rw-r--r-- 1 oracle oinstall   53244 Aug 31  2014 jopt-simple-3.2.jar
-rw-r--r-- 1 oracle oinstall 3991269 Sep  3  2015 kafka_2.10-0.8.2.2.jar
-rw-r--r-- 1 oracle oinstall   37748 Sep  3  2015 kafka_2.10-0.8.2.2-javadoc.jar
-rw-r--r-- 1 oracle oinstall 2324165 Sep  3  2015 kafka_2.10-0.8.2.2-scaladoc.jar
-rw-r--r-- 1 oracle oinstall  521466 Sep  3  2015 kafka_2.10-0.8.2.2-sources.jar
-rw-r--r-- 1 oracle oinstall 1233391 Sep  3  2015 kafka_2.10-0.8.2.2-test.jar
-rw-r--r-- 1 oracle oinstall  324016 Sep  3  2015 kafka-clients-0.8.2.2.jar
-rw-r--r-- 1 oracle oinstall  481535 Aug 31  2014 log4j-1.2.16.jar
-rw-r--r-- 1 oracle oinstall  165505 Aug 31  2014 lz4-1.2.0.jar
-rw-r--r-- 1 oracle oinstall   82123 Aug 31  2014 metrics-core-2.2.0.jar
-rw-r--r-- 1 oracle oinstall 7126372 Nov 25  2014 scala-library-2.10.4.jar
-rw-r--r-- 1 oracle oinstall   28688 Aug 31  2014 slf4j-api-1.7.6.jar
-rw-r--r-- 1 oracle oinstall    9753 Aug 31  2014 slf4j-log4j12-1.6.1.jar
-rw-r--r-- 1 oracle oinstall  594033 May 29  2015 snappy-java-1.1.1.7.jar
-rw-r--r-- 1 oracle oinstall   64009 Aug 31  2014 zkclient-0.3.jar
-rw-r--r-- 1 oracle oinstall  792964 Aug 31  2014 zookeeper-3.4.6.jar
[oracle@repvm libs]$

kafka-clients-0.8.2.1.jar
lz4-1.2.0.jar
slf4j-api-1.7.6.jar
snappy-java-1.1.1.6.jar

[oracle@repvm bin]$ pwd
/data/kafka_2.10-0.8.2.2/bin
[oracle@repvm bin]$
nohup sh  kafka-server-start.sh ../config/server.properties > /tmp/server.log &

先啟動zookeeper:
[oracle@repvm kafka_2.10-0.8.2.2]$ nohup  bin/zookeeper-server-start.sh config/zookeeper.properties &
[1] 18645
nohup: ignoring input and appending output to `nohup.out'
[oracle@repvm kafka_2.10-0.8.2.2]$ tail -f nohup.out 
[2016-06-02 12:22:42,981] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:os.version=2.6.32-358.el6.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:user.name=oracle (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:user.home=/home/oracle (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:user.dir=/data/kafka_2.10-0.8.2.2 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,013] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,013] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,013] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,035] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
啟動kafka:
[oracle@repvm kafka_2.10-0.8.2.2]$ nohup bin/kafka-server-start.sh config/server.properties &
[1] 18845
nohup: ignoring input and appending output to `nohup.out'
[oracle@repvm kafka_2.10-0.8.2.2]$ 創(chuàng)建topic:
[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zqtest
Created topic "zqtest".
[oracle@repvm kafka_2.10-0.8.2.2]$ 

查看:
[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
zqtest
[oracle@repvm kafka_2.10-0.8.2.2]$  ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zqtest
測試發(fā)送消息:
[oracle@repvm kafka_2.10-0.8.2.2]$  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic zqtest 
[2016-06-02 14:33:50,690] WARN Property topic is not valid (kafka.utils.VerifiableProperties) ds
接收端:
[oracle@repvm kafka_2.10-0.8.2.2]$  bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zqtest --from-beginning ds 成功接受!!!

到此,關(guān)于“Oracle數(shù)據(jù)怎么發(fā)送到kafka傳輸數(shù)據(jù)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
分享名稱:Oracle數(shù)據(jù)怎么發(fā)送到kafka傳輸數(shù)據(jù)
網(wǎng)頁網(wǎng)址:http://weahome.cn/article/ihoips.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部