admin 管理员组文章数量: 888299
通过ogg逐步实现Oracle到kafka的数据同步流程
盛年不重来,一日难再晨。及时宜自勉,岁月不待人。——陶渊明
随着信息行业的快速发展,各个行业的数据量逐步增大,数据由存储在关系型数据库中逐步转换到存储在大数据平台中。数据转换中,因数据结构的改变及为确保数据的实时和一致性,对数据同步工具就提出相应的需求。
本次搭建流程是实现oracle到kafka的数据实时同步复制,基本流程如下:
一、基础环境
源端:数据库-oracle 11.2.0.4 ogg版本-OGGCORE_12.3.0.1.0_PLATFORMS_171208.0005_FBO 操作系统-CentOS Linux release 7.4.1708 (Core)
目标端:ogg版本-OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_191028.1848 操作系统-CentOS Linux release 7.4.1708 (Core)
二、源端环境配置
1,开启归档
alter database archivelog;
alter database open;
2,开启force_logging和minimal supplemental logging
select supplemental_log_data_min,force_logging from v$database;
alter database force_logging;
alter database add supplemental log data(primary key) columns;
alter database add supplemental log data(unique) columns;
3,安装ogg软件
4,配置环境变量
三、目标端环境配置
1,安装ogg软件
mkdir -p /opt/oggkafka
上传文件,解压安装
./ggscicreate subdirs
2,安装zookeeper 3.4.6
3,安装kafka,本地环境配置了三个节点地址为:65.26.2.46:9092,65.26.2.123:9092,65.26.2.245:9092
4,配置环境变量
vi ~/.bash_profile
export OGG_HOME=/opt/oggkafka
export JAVA_HOME=/opt/module/jdk1.8.0_212
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:$LD_LIBRARY_PATH:$JAVA_HOME/jre/libvm.so:$OGG_HOME:$OGG_HOME/ggjava
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/libPATH=$JAVA_HOME/bin:$PATH:$HOME/bin:$OGG_HOMEexport PATH
四、源端配置
1,配置抽取进程
./ggsciedit param kfk_ext(配置信息如下)extract kfk_extsetenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")setenv (ORACLE_HOME="/u01/app/oracle/product/11.2.0")userid goldengate, password **********FETCHOPTIONS FETCHPKUPDATECOLSgetUpdateBeforesNOCOMPRESSDELETESNOCOMPRESSUPDATESEXTTRAIL ./dirdat/kb--INCLUDE MAPPED OBJTYPE 'TABLE' &--INCLUDE MAPPED OBJTYPE 'INDEX' &--INCLUDE MAPPED OBJTYPE 'SEQUENCE' &--INCLUDE MAPPED OBJTYPE 'VIEW' &--INCLUDE MAPPED OBJTYPE 'PROCEDURE' &--INCLUDE MAPPED OBJTYPE 'FUNCTION' &--INCLUDE MAPPED OBJTYPE 'PACKAGE' &--EXCLUDE OPTYPE COMMENTTABLE NETAPP.*;
2,配置推送进程
edit param kfk_dp(配置如下)Extract kfk_dpsetenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")setenv (ORACLE_HOME="/u01/app/oracle/product/11.2.0")--userid goldengate,password **********passthruRmthost 65.26.2.46, mgrport 7809--DISCARDFILE ./dirrpt/discard_02.DSC,append--DISCARDFILE ./dirrpt/discard_02.DSC,append,megabytes 100RMTTRAIL ./dirdat/ke--RMTTRAIL ./dirdat/rb, megabytes 100table NETAPP.*;
3,增加进程
delete extract kfk_extdelete extract kfk_dpAdd extract kfk_ext, tranlog, begin now, threads 1Add exttrail ./dirdat/kb, extract kfk_extalter exttrail ./dirdat/kb ,extract kfk_ext,megabytes 100Add extract kfk_dp, exttrailsource ./dirdat/kb begin nowAdd rmttrail ./dirdat/ke, extract kfk_dp,megabytes 100
4,启动进程
./ggscistart kfk_extstart kfk_dp
五、目标端配置
1,拷贝配置文件
cd $OGG_HOME/AdapterExamples/big-data/kafka/cp custom_kafka_producer.properties $OGG_HOME/dirprm/cp kafka.props $OGG_HOME/dirprm
其中kafka.props内容为:
gg.handlerlist = kafkahandlergg.handler.kafkahandler.type=kafkagg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties#The following resolves the topic name using the short table namegg.handler.kafkahandler.topicMappingTemplate=${tableName}#The following selects the message key using the concatenated primary keysgg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}gg.handler.kafkahandler.format=avro_opgg.handler.kafkahandler.SchemaTopicName=mySchemaTopicgg.handler.kafkahandler.BlockingSend =falsegg.handler.kafkahandler.includeTokens=falsegg.handler.kafkahandler.mode=opgoldengate.userexit.writers=javawriterjavawriter.stats.display=TRUEjavawriter.stats.full=TRUEgg.log=log4jgg.log.level=INFOgg.report.time=30sec#Sample gg.classpath for Apache Kafkagg.classpath=dirprm/:/var/lib/kafka/libs/*#Sample gg.classpath for HDP#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
custom_kafka_producer.properties内容为:
bootstrap.servers=host:portacks=1reconnect.backoff.ms=1000value.serializer=org.apache.kafkamon.serialization.ByteArraySerializerkey.serializer=org.apache.kafkamon.serialization.ByteArraySerializer# 100KB per partitionbatch.size=16384linger.ms=0
2,修改配置文件kafka.props
cd /opt/oggkafka/dirprmvi kafka.propsgg.handlerlist=kafkahandler gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.format=json gg.handler.kafkahandler.BlockingSend=true gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.Mode=op gg.handler.kafkahandler.format.insertOpKey=I gg.handler.kafkahandler.format.updateOpKey=U gg.handler.kafkahandler.format.deleteOpKey=D gg.handler.kafkahandler.format.truncateOpKey=T gg.handler.kafkahandler.format.iso8601Format = true goldengate.userexit.writers = javawriter javawriter.stats.display = TRUE javawriter.stats.full = TRUE gg.log = log4j gg.log.level = DEBUG gg.report.time = 30sec javawriter.bootoptions=-Xmx2048m -Xms256m -Djava.class.path=ggjava/ggjava.jar gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties gg.handler.kafkahandler.format.includePrimaryKeys=true gg.handler.kafkahandler.TopicMappingTemplate=ogg_schema gg.classpath=dirprm/:/opt/oggkafka:/opt/oggkafka/lib/*:/opt/module/kafka_2.11-2.4.1/libs/*:/opt/module/jdk1.8.0_212/jre/lib/*javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
3,修改配置文件custom_kafka_producer.properties
bootstrap.servers=65.26.2.46:9092,65.26.2.123:9092,65.26.2.245:9092 acks=1 compression.type=gzip reconnect.backoff.ms=1000 value.serializer=org.apache.kafkamon.serialization.ByteArraySerializerkey.serializer=org.apache.kafkamon.serialization.ByteArraySerializer # 100KB per partition
4,目标端添加日志应用进程rep01,并配置
add replicat rep01, exttrail ./dirdat/ke
edit param rep01replicat rep01 TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 --NOFILTERDUPTRANSACTIONS map NETAPP.*,target NETAPP.*;
六、启动进程
start rep01 NOFILTERDUPTRANSACTIONS
本文标签: 通过ogg逐步实现Oracle到kafka的数据同步流程
版权声明:本文标题:通过ogg逐步实现Oracle到kafka的数据同步流程 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/free/1699081502h327018.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论