指标管理

CDC日志增量组件

1.前置步骤

1.需要提前对原数据库的环境进行操作配置。目前支持的四种数据库。分别为ORACLR,MYSQL,SQLSERVER,POSTGRESQL. 详情参考数据库表配置。

2.在进行CDC同步的过程中原表必须拥有主键,否则无法达到数据修改和删除同步的效果。

3.服务器中需部署大数据实时处理服务。

2.使用步骤

1.实时CDC日志增量组件,使用主要是通过对数据库日志的监控来进行数据的实时传输。

2.用户通过界面填写必要的配置信息,选择对应的操作便可达到实时数据同步处理的效果。

3.操作生成的事件。

快照:当第一次监控这个数据库表的时候,会对这个数据库表进行一次快照处理。事件都是一条一条数据处理的。

INSERT(插入):插入分为批量插入和单条插入。无论是批量插入还是单条插入,每个事件也都是按单条数据处理  。

UPDATE(更新):更新分为批量更新和单条更新。无论是批量更新还是单条更新,每个事件也都是按单条数据处理。

DELETE(删除):删除也是如此。

1)选择对应的cdc 组件

2)选择需要进行CDC的库表

双击实时CDC日志增量组件,进行属性编辑,选择支持的数据库和表。(选择的数据库和表都应该是经过前提配置开启CDC日志监控的。)

业务操作:即设置同步的操作。默认勾选的三个INSERT\UPDATE\DELETE。

抽取存量数据:1、如果勾选在第一次执行ETL过程时,会同步源表数据更新目标表,之后实时的同步日志增删改数据。先全量,再实时增量。2、不勾选,在第一次执行时开始同步日志得增删改数据.实时增量。

扩展属性设置:提供一个扩展属性设置供用户添加扩展功能。

配置完成后即可运行该任务。

3.运行场景

实时CDC 日志增量组件作为输入组件可后接其他数据处理组件和输出组件。通常我们以CDC作为输入端,后接数据处理组件,然后将处理后的数据推送到kafka中。
此处我们以学生表为例。
表中有ID、NAME、SEX三个字段,其中ID字段设置为主键。
选择完成后,我们在该组件后接过滤组件,将性别(SEX)为空的数据过滤掉,然后将过滤后的数据推送到kafka服务器中。
配置完成后,点击运行任务,等待该任务成功启动运行后,我们可以通过SQL对xsb 进行数据的增删改的操作。被编辑后的数据将会被监控到且推送到kafka服务器中。
我们可通过检查运行过程中的生成的临时表(EDI_CDC_表名),通过查看临时表的数据来检查我们的操作是否被监控到。
其中EDI_CDC_OP代表数据的操作 ,C代表添加数据,D代表删除数据,U代表修改的数据。EDI_CDC_INDEX代表监控操作次数。

4.开启CDC日志方法

4.1oracle

官方文档说明只验证过oracle 12c以及19c。本文档中用oracle 11g验证通过
1. oracle配置
本质是通过oracle归档日志来实现同步的。
1.1 开启归档模式
用sqlplus执行如下命令
sqlplus / as sysdba
connect sys/admin AS SYSDBA
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = 'D:\oracle11gr2\oradata\recovery_area'
scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should now "Database log mode: Archive Mode"
archive log list
exit;
在数据库级别启用最小补充日志记录,并且可以按如下方式配置。
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
★创建表空间,监控已经存在的表空间则不需要创建
CREATE TABLESPACE logminer_tbs DATAFILE
'D:\oracle11gr2\oradata\orcl\logminer_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
★创建user
CREATE USER dbzuser IDENTIFIED BY dbz
DEFAULT TABLESPACE logminer_tbs
QUOTA UNLIMITED ON logminer_tbs
★给用户授权
GRANT CREATE SESSION TO dbzuser;
GRANT SET CONTAINER TO dbzuser;
GRANT SELECT ON V_$DATABASE to dbzuser;
GRANT FLASHBACK ANY TABLE TO dbzuser;
GRANT SELECT ANY TABLE TO dbzuser;
GRANT SELECT_CATALOG_ROLE TO dbzuser;
GRANT EXECUTE_CATALOG_ROLE TO dbzuser;
GRANT SELECT ANY TRANSACTION TO dbzuser;
GRANT LOGMINING TO dbzuser;
GRANT CREATE TABLE TO dbzuser;
GRANT LOCK ANY TABLE TO dbzuser;
GRANT ALTER ANY TABLE TO dbzuser;
GRANT CREATE SEQUENCE TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser;
GRANT SELECT ON V_$LOG TO dbzuser;
GRANT SELECT ON V_$LOG_HISTORY TO dbzuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO dbzuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO dbzuser;
GRANT SELECT ON V_$LOGFILE TO dbzuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;
exit
★创建表并授权
sqlplus / as sysdba
conn dbzuser/dbz;
CREATE TABLE STU ( "s_id" INT PRIMARY KEY, "s_name" VARCHAR ( 255 ) ); //
ALTER TABLE DBZUSER.STU ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

4.2Pg

★支持版本
postgresql支持9.4以上版本
★ 准备步骤
默认已经安装了pg数据库
源码目录:/opt/postgresql-12.8
★安装解码插件
wal2json :wal2json组织维护的插件,用于将日志转换成json。
★安装wal2json
1.从githhub:  下载最新的稳定版的源代码包 https://github.com/eulerto/wal2json/tree/wal2json_2_4
测试中使用的是2.4版本。
2 解压缩wal2json插件到指定目录
执行命令:unzip wal2json-wal2json_2_4.zip
进入wal2json的目录: cd wal2json-wal2json_2_4
3 编译打包,并安装,执行如下命令
make
make install
★ 配置postgresql.conf
打开postgresql安装目录data文件夹下的postgresql.conf文件,增加一行
shared_preload_libraries = 'wal2json'。
★配置replication slot
Replication slot和connector是一对一的关系,每启动一个CDC同步任务就需要一个复制槽。且复制槽还和快照有关,删除了复制槽则需要重新快照,如果复制槽不够,或者复制槽名称相同都会报错,通过postgresql.conf配置复制槽的数量,设置如下配置:
max_wal_senders = 10   #wal日志的sender
max_replication_slots = 10  #总的复制槽数量
保存后重启postgresql
★停止和启动postgresql
1 切换到pg的超级用户下: su - postgres
2停止:
/usr/local/postgresql/bin/pg_ctl -D /usr/local/postgresql/data -l

/usr/local/postgresql/data/logfile stop
3 启动:
/usr/local/postgresql/bin/pg_ctl -D /usr/local/postgresql/data -l
/usr/local/postgresql/data/logfile start
进入pg sql
su - postgres
psql -U postgres
★ 配置权限
1 创建一个拥有复制和登录权限的角色,角色名自行命名,本例中是dbz_role
tips:使用管理员账号也是可以的,管理员(postgres)默认拥有这些权限 dbz_role   LOGIN; CREATE ROLE REPLICATION
2 将dbz_role这个角色赋予给需要监听的数据库用户,超级管理员账号则忽略
grant dbz_role dbz_user
★ 配置pg_hba.conf
打开pg_hba.conf这个文件,在文件的最后增加一行,dbz_user是数据源里设置的用户名。也可以使用postgres超级账号。ip是数据工厂所在的服务器地址如果数据工厂是集群环境,则配置多条,每条对应不同的ip。配置好后重启postgresql.
host  replication   dbz_user    172.21.1.7/32      trust

4.3Sqlserver

SQL server数据库配置
基于sqlserver的cdc特性只支持SQL Server 2016 Service Pack 1 (SP1) 及其以后的标准版或者企业版
★sqlserver环境准备
详情请参考以下链接(linux环境下配置,window环境下配置过于繁琐且卸载不易)
https://blog.csdn.net/yutenys/article/details/122154593?ops_request_misc=%257B%2522request%255Fid%2
522%253A%2522164620290116780255244215%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall
.%2522%257D&request_id=164620290116780255244215&biz_id=0&utm_medium=distribute.pc_search_result.non
e-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-1-122154593.pc_search_result_control_group&utm_t
erm=centos7%E5%AE%89%E8%A3%85sqlserver2019&spm=1018.2226.3001.4187
步骤2——步骤4
★开启sqlserver的CDC模式
# sqlcmd -S localhost -U sa -PEsen1234             
(sa是默认账号,Esen1234是我设置的密码)
1>create database test
2>go
1>use test
2>go
★开启test库的CDC
1>EXEC sys.sp_cdc_enable_db
2>go
2.2创建文件组
1>use test
2>go
1>alter database test  add filegroup fg1
2>go
★创建文件组下的文件
1> alter database test
2> add file
3> (name='ffggg1',filename='/var/opt/mssql/data/ffggg1.mdf')
4> to filegroup fg1
5> go
★开启sqlserver代理
# /opt/mssql/bin/mssql-conf set sqlagent.enabled true
# systemctl restart mssql-server
# sqlcmd -S localhost -U sa -PEsen1234
1>EXEC sp_configure 'show advanced', 1;
2>RECONFIGURE;
3>EXEC sp_configure 'allow updates', 0;
4>RECONFIGURE;
5>EXEC sp_configure 'Agent XPs', 1;
6>RECONFIGURE;
7>go
★开启test库的t1表的CDC
1>use test
2>go
1>EXEC sys.sp_cdc_enable_table
2>@source_schema = N'dbo',
3>@source_name   = N't1',
4>@role_name     = N'NULL',
5>@filegroup_name = N'fg1',
6>@supports_net_changes = 0
7>go
★ 修改表结构后重新cdc(重点关注)
修改源表表结构,需要重新进行cdc配置才可以,分两步
第一步,先禁用源表之前的cdc配置,这样会删除源表对应的cdc数据库表以及一些配置信息
EXEC sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 't1',
@capture_instance = 'dbo_t1';
go
其中捕获实例capture_instance 这个参数的值默认是“模式 + 下划线 + 表名“
第二步 重新执行启用表cdc的存储过程
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N't1',
@role_name = N'NULL',
@filegroup_name = N'fg1',
@supports_net_changes = 0
go
★ 注意事项
1 创建文件组后一定要创建对应的文件
2 不支持sql server中的timestamp类型,因为这个类型在sql server中和时间毫无关系

4.4Mysql

本文档中用mysql5.7和mysql8.0验证通过
★ mysql配置
本质是通过mysql binlog日志来实现同步的。

开启binlog日志
(1)检查是否开启binlog日志
mysql> show global variables like 'log_bin';
返回结果
ON表示已开启binlog日志,如果是OFF则没有开启。
(2)如果没开启,需在/etc/my.cnf中加入以下内容
server_id=2
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30
(3)重启mysql
systemctl restart mysqld

//创建数据库
create database test_db default character set utf8 default collate
utf8_general_ci;
//创建user
create user 'test_user'@'%' identified by '123456';
//授权
grant all privileges on test_db.* to 'test_user'@'%';
//此类权限不是针对某个数据库的,因此需使用on *.*来进行授权
grant SELECT,RELOAD,SHOW DATABASES,REPLICATION SLAVE,REPLICATION CLIENT on *.* to
'test_user'@'%';
//刷新权限
flush privileges;
//退出
exit;  

4.5DM

本文档中用达梦V8验证通过.

本质是通过达梦归档日志来实现同步的。

①开启归档模式

1 检查是否开启,执行如下查询

select para_name, para_value from v$dm_ini where para_name in ('ARCH_INI','RLOG_APPEND_LOGIC');

如果查询结果都为1,则表示已经开启,为0表示未开启

2 如果未开启,则执行如下操作开启

修改dm.ini文件,增加如下设置

RLOG_APPEND_LOGIC=2  //配置成2,达梦才会在主键表上进行更新操作时带上不相关的字段

ARCH_INI=1

② 配置归档参数

修改dmarch.ini文件

#DaMeng Database Archive Configuration file

#this is comments

    ARCH_WAIT_APPLY      = 0       

[ARCHIVE_LOCAL1]

    ARCH_TYPE            = LOCAL       

    ARCH_DEST            = /home/dmdba/dmarch       

    ARCH_FILE_SIZE       = 2048       

    ARCH_SPACE_LIMIT     = 102400       

    ARCH_FLUSH_BUF_SIZE  = 0       

    ARCH_HANG_FLAG       = 1

归档相关参数,可通过dmarch.ini修改

参数名称

默认值

推荐值

说明

FILE_SIZE

128

2048

本地单个归档文件最大值(单位:M)。

ARCH_DEST

 

 

归档目标,本地归档为归档文件存放路径

ARCH_TYPE            

 

 

归档类型:LOCAL

SPACE_LIMIT

0

102400

归档大小上限,0 表示无限制(按数据量的 1/5 保留,例如 500G 数据,保留 100G 归档)。

ARCH_FLUSH_BUF_SIZE

0

0

归档合并刷盘缓存大小,单位 MB,取值范围 0~128,缺省为 0,表示不使用归档合并刷盘

此参数建议设置为0,可保证cdc的效率

配置后重启数据库

③ 创建用户并分配权限

Debezium 达梦连接器要求使用特定权限设置用户账号,以便连接器捕获更多事件

//创建表空间,监控已经存在的表空间则不需要创建 

CREATE TABLESPACE logminer_tbs DATAFILE '/home/dmdbms/data/DAMENG/logminer_tbs.dbf' SIZE 100 AUTOEXTEND ON NEXT 10 MAXSIZE UNLIMITED;

 

//创建user

create user dbzuser identified by Sanlink123 default tablespace logminer_tbs;

 

//授权

GRANT CREATE TABLE, CREATE VIEW, create index to dbzuser;

GRANT SELECT ON V$VERSION TO dbzuser;

GRANT SELECT ON V$ARCH_FILE TO dbzuser;

GRANT SELECT ON V$ARCHIVED_LOG TO dbzuser;

GRANT SELECT ON V$LOGMNR_LOGS TO dbzuser;

GRANT SELECT ON V$LOGMNR_CONTENTS TO dbzuser;

④ 创建表

CREATE TABLE DBZ_TEST(ID VARCHAR(100), NAME VARCHAR(100));  //监控已经存在的表可忽略

附件列表

0

文档内容仅供参考
如果您需要解决具体问题,还可以登录亿信社区
在提问求助板块提问,30分钟内帮您解决问题

如果您认为本词条还有待完善,请编辑

上一篇KAFKA生产消费组件

下一篇边缘采集组件

请先登录