admin 管理员组

文章数量: 888299

通过ogg逐步实现Oracle到kafka的数据同步流程

                                           盛年不重来,一日难再晨。及时宜自勉,岁月不待人。——陶渊明

 

    随着信息行业的快速发展,各个行业的数据量逐步增大,数据由存储在关系型数据库中逐步转换到存储在大数据平台中。数据转换中,因数据结构的改变及为确保数据的实时和一致性,对数据同步工具就提出相应的需求。

    本次搭建流程是实现oracle到kafka的数据实时同步复制,基本流程如下:

一、基础环境
    源端:数据库-oracle 11.2.0.4 ogg版本-OGGCORE_12.3.0.1.0_PLATFORMS_171208.0005_FBO  操作系统-CentOS Linux release 7.4.1708 (Core)
    目标端:ogg版本-OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_191028.1848 操作系统-CentOS Linux release 7.4.1708 (Core)
    
二、源端环境配置
    1,开启归档
   

alter database archivelog;
alter database open;


    2,开启force_logging和minimal supplemental logging
   

select supplemental_log_data_min,force_logging from v$database;
alter database force_logging;
alter database add supplemental log data(primary key) columns;
alter database add supplemental log data(unique) columns;


    3,安装ogg软件
    4,配置环境变量

三、目标端环境配置
    1,安装ogg软件
    mkdir -p /opt/oggkafka
    上传文件,解压安装
 

    ./ggscicreate subdirs 


    2,安装zookeeper 3.4.6
    3,安装kafka,本地环境配置了三个节点地址为:65.26.2.46:9092,65.26.2.123:9092,65.26.2.245:9092
    4,配置环境变量
   

vi ~/.bash_profile
export OGG_HOME=/opt/oggkafka
export JAVA_HOME=/opt/module/jdk1.8.0_212
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:$LD_LIBRARY_PATH:$JAVA_HOME/jre/libvm.so:$OGG_HOME:$OGG_HOME/ggjava
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/libPATH=$JAVA_HOME/bin:$PATH:$HOME/bin:$OGG_HOMEexport PATH


四、源端配置
    1,配置抽取进程
   

./ggsciedit param kfk_ext(配置信息如下)extract kfk_extsetenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")setenv (ORACLE_HOME="/u01/app/oracle/product/11.2.0")userid goldengate, password **********FETCHOPTIONS FETCHPKUPDATECOLSgetUpdateBeforesNOCOMPRESSDELETESNOCOMPRESSUPDATESEXTTRAIL ./dirdat/kb--INCLUDE MAPPED OBJTYPE 'TABLE' &--INCLUDE MAPPED OBJTYPE 'INDEX' &--INCLUDE MAPPED OBJTYPE 'SEQUENCE' &--INCLUDE MAPPED OBJTYPE 'VIEW' &--INCLUDE MAPPED OBJTYPE 'PROCEDURE' &--INCLUDE MAPPED OBJTYPE 'FUNCTION' &--INCLUDE MAPPED OBJTYPE 'PACKAGE' &--EXCLUDE OPTYPE COMMENTTABLE NETAPP.*;


    
    2,配置推送进程
   

    edit param kfk_dp(配置如下)Extract kfk_dpsetenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")setenv (ORACLE_HOME="/u01/app/oracle/product/11.2.0")--userid goldengate,password **********passthruRmthost 65.26.2.46, mgrport 7809--DISCARDFILE ./dirrpt/discard_02.DSC,append--DISCARDFILE  ./dirrpt/discard_02.DSC,append,megabytes 100RMTTRAIL ./dirdat/ke--RMTTRAIL ./dirdat/rb, megabytes 100table NETAPP.*;


    3,增加进程
   

    delete extract kfk_extdelete extract kfk_dpAdd extract kfk_ext, tranlog, begin now, threads 1Add exttrail ./dirdat/kb, extract kfk_extalter exttrail ./dirdat/kb ,extract kfk_ext,megabytes 100Add extract kfk_dp, exttrailsource ./dirdat/kb begin nowAdd rmttrail ./dirdat/ke, extract kfk_dp,megabytes 100


    
    4,启动进程
   

    ./ggscistart kfk_extstart kfk_dp


    
五、目标端配置
    1,拷贝配置文件
   

    cd $OGG_HOME/AdapterExamples/big-data/kafka/cp custom_kafka_producer.properties $OGG_HOME/dirprm/cp kafka.props $OGG_HOME/dirprm


    其中kafka.props内容为:
 

    gg.handlerlist = kafkahandlergg.handler.kafkahandler.type=kafkagg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties#The following resolves the topic name using the short table namegg.handler.kafkahandler.topicMappingTemplate=${tableName}#The following selects the message key using the concatenated primary keysgg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}gg.handler.kafkahandler.format=avro_opgg.handler.kafkahandler.SchemaTopicName=mySchemaTopicgg.handler.kafkahandler.BlockingSend =falsegg.handler.kafkahandler.includeTokens=falsegg.handler.kafkahandler.mode=opgoldengate.userexit.writers=javawriterjavawriter.stats.display=TRUEjavawriter.stats.full=TRUEgg.log=log4jgg.log.level=INFOgg.report.time=30sec#Sample gg.classpath for Apache Kafkagg.classpath=dirprm/:/var/lib/kafka/libs/*#Sample gg.classpath for HDP#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar 


    
    custom_kafka_producer.properties内容为:
   

    bootstrap.servers=host:portacks=1reconnect.backoff.ms=1000value.serializer=org.apache.kafkamon.serialization.ByteArraySerializerkey.serializer=org.apache.kafkamon.serialization.ByteArraySerializer# 100KB per partitionbatch.size=16384linger.ms=0


    2,修改配置文件kafka.props
   

    cd /opt/oggkafka/dirprmvi kafka.propsgg.handlerlist=kafkahandler                                                                                                  gg.handler.kafkahandler.type=kafka                                                                                           gg.handler.kafkahandler.format=json                                                                                          gg.handler.kafkahandler.BlockingSend=true                                                                                    gg.handler.kafkahandler.includeTokens=false                                                                                  gg.handler.kafkahandler.Mode=op                                                                                              gg.handler.kafkahandler.format.insertOpKey=I                                                                                 gg.handler.kafkahandler.format.updateOpKey=U                                                                                 gg.handler.kafkahandler.format.deleteOpKey=D                                                                                 gg.handler.kafkahandler.format.truncateOpKey=T                                                                               gg.handler.kafkahandler.format.iso8601Format = true                                                                          goldengate.userexit.writers = javawriter                                                                                     javawriter.stats.display = TRUE                                                                                              javawriter.stats.full = TRUE                                                                                                 gg.log = log4j                                                                                                               gg.log.level = DEBUG                                                                                                         gg.report.time = 30sec                                                                                                       javawriter.bootoptions=-Xmx2048m -Xms256m -Djava.class.path=ggjava/ggjava.jar                                                gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties                                             gg.handler.kafkahandler.format.includePrimaryKeys=true                                                                       gg.handler.kafkahandler.TopicMappingTemplate=ogg_schema                                                                      gg.classpath=dirprm/:/opt/oggkafka:/opt/oggkafka/lib/*:/opt/module/kafka_2.11-2.4.1/libs/*:/opt/module/jdk1.8.0_212/jre/lib/*javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar  


    
    3,修改配置文件custom_kafka_producer.properties        
                     
   

    bootstrap.servers=65.26.2.46:9092,65.26.2.123:9092,65.26.2.245:9092       acks=1                                                                    compression.type=gzip                                                     reconnect.backoff.ms=1000                                                 value.serializer=org.apache.kafkamon.serialization.ByteArraySerializerkey.serializer=org.apache.kafkamon.serialization.ByteArraySerializer  # 100KB per partition   

     
    
    4,目标端添加日志应用进程rep01,并配置
    add replicat rep01, exttrail ./dirdat/ke     
    
   

    edit param rep01replicat rep01                                               TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props                                        REPORTCOUNT EVERY 1 MINUTES, RATE                            GROUPTRANSOPS 10000                                          --NOFILTERDUPTRANSACTIONS     map NETAPP.*,target NETAPP.*;


  
六、启动进程
    start rep01 NOFILTERDUPTRANSACTIONS                              

本文标签: 通过ogg逐步实现Oracle到kafka的数据同步流程