睿治
当前版本:2.7.2

实时组件使用参考

KAFKA生产消费组件

1.前置步骤

需提前部署kafka服务器,并且在产品服务器的kafka数据源中新建同一套生产开发kafka数据源连接池。如图,两数据源除适用环境不一致,其余的配置都是一样的。

如果没有配置配套的kafka生产和开发环境,运行kafka任务时会提示如下内容。

2.使用步骤

Kafka是一种高吞吐量的分布式发布订阅消息系统,Kafka发布环节由生产者进行消息采集和发布。在数据源中配置Kafka服务器,然后通过生产者组件可以实现将消息写入Kafka集群中,实现消息的生产。 kafka服务器中数据存储格式通常由三个字段分别是key、value、offset。

2.1Kafka生产组件

我们一般可以将我们的库表数据通过ETL实时的推送到kafka集群中。例如如下场景,我们通过实时表输入组件+实时清洗组件,将清洗过后的数据以json的格式推送到kafka服务器中。

其中kafka生产组件中需要配置的属性信息有kafka数据源、名称字段、主题(topic)、值字段

Kafka数据源的选择对应我们产品数据源中kafka连接池。

主题(topic)字段用于设置的是kafka存储的分区,可以输入不存在的主题,输入不存在的主题,在运行任务时会在kafka服务器中自动创建该主题。

名称字段对应kafka服务器中的key值,通常用于数据的分类。可不配置。

值字段即我们选择需要推送的数据,一般是前置组件的表数据,仅支持推送一个列的数据。它的值对应kafka服务器中的value字段的值。

注意:睿治272此组件该值支持多选,支持同时推送多个字段的值传输到kafka服务器中,推送的值以json的格式进行存储。睿治272之前的版本想要推送多个值,需先将数据合并处理成一个字段再进行推送。

配置完成后我们运行任务,就可以将我们的数据传输到kafka服务器了。我们可以通过第三方工具或者直接通过kafka命令行,查看对应的主题(topic)的数据。

2.2kafka消费组件

Kafka消费组件顾名思义就是将我们kafka服务器中的数据消费(传输)到我们的第三方数据库中。我们一般与实时表输出组件配套使用,kafka服务器中的数据输出到数据库中以数据库表的形式进行存储。

首先我们需要配置kafka消费组件里的基本属性,分别是我们消费的kafka数据源,以及kafka数据源里对应的主题(topic),主题topic可支持多选,即支持同时消费不同topic的数据。分隔符和扩展属性根据实际的kafka数据格式进行配置,这里我们不做配置。

然后在字段列表处新建字段用来暂时存放我们的kafka消费的value值,由于我们一般是将多个字段值合并处理成一个字段存储在kafka服务器中,此处我们以存储的数据格式为json串为例。所以在这里我们新建一个json的字段用来存储kafka消费的数据。 

然后将数据消费出来后我们需要通过json解析组件将这个json进行解析操作,此处我们将json串中的数据解析成city、casts 两个字段。(此处可根据实际存储值进行配置json解析组件)

配置完成后,运行任务即可将kafka服务器中指定的topic数据按照一定的数据格式传输到我们的第三方数据库表中。

3.JSON解析组件

3.1应用背景

实时json解析组件可以从前置实时组件中读取到json格式的数据,然后将其解析成结构化数据,并向后置组件输出。

客户的一个接口返回的用户信息,姓名性别身份证家庭住址等等,这些信息是以JSON的形式返回的,可以通过时JSON解析组件将用户的信息落地到数据库表。 

3.2操作步骤

首先我们准备一个json式的数据存储到数据库表中,此处我们以下图数据为例;

具体json 内容如下:

{

"status": "1",

"count": "1",

"info": "OK",

"infocode": "10000",

"forecasts": [{

"city": "武汉市",

"adcode": "420100",

"province": "湖北",

"reporttime": "2021-05-24 11:02:00",

"casts": [{

"date": "2021-05-24",

"week": "1",

"dayweather": "多云",

"nightweather": "多云",

"daytemp": "28",

"nighttemp": "18",

"daywind": "西北",

"nightwind": "西北",

"daypower": "≤3",

"nightpower": "≤3"

}, {

"date": "2021-05-25",

"week": "2",

"dayweather": "多云",

"nightweather": "小雨",

"daytemp": "25",

"nighttemp": "20",

"daywind": "西南",

"nightwind": "西南",

"daypower": "≤3",

"nightpower": "≤3"

}, {

"date": "2021-05-26",

"week": "3",

"dayweather": "大雨",

"nightweather": "多云",

"daytemp": "23",

"nighttemp": "17",

"daywind": "西北",

"nightwind": "西北",

"daypower": "≤3",

"nightpower": "≤3"

}, {

"date": "2021-05-27",

"week": "4",

"dayweather": "阴",

"nightweather": "阴",

"daytemp": "27",

"nighttemp": "18",

"daywind": "西南",

"nightwind": "西南",

"、": "4",

"nightpower": "4"

}]

}]

}

该json串中包含了多个层级节点,下面我们以解析此json 为例。我们需要解析casts节点下的字段内容。

首先我们需要定位casts节点在该json 串中所处的位置,这里casts是forecasts下的子节点,我们按照指定的格式填写解析的节点路径,此处为forecasts/casts,然后我们需要判断该节点的内容是数组类型还是对象,此处是对象类型。所以我们在配置解析组件时选择解析节点类型为对象。

然后我们需要再字段列表中新建我们解析出来的字段内容,此处我们casts节点下有date、week、dayweather、nightweather、daytemp、nighttemp、daywind、nightwind、daypower、nightwind等8个字段,此处我们仅解析前四个字段,我们在字段配置中添加四个字段date、week、dayweather、nightweather 其字段名和属性名保持一致,另外引用前置ID_字段。

完成配置后输出到控制台。可以看到我们将上述json串中的casts 节点下的值解析出来了。

注意:如果json 串中是对象和数组的复合数据,需要一步一步进行解析,需要先解析数组或者对象中的一个再解析另外一个。

实时CDC日志单表同步组件

1.前置步骤

1.需要提前对原数据库的环境进行操作配置。目前支持的四种数据库。分别为ORACLR,MYSQL,SQLSERVER,POSTGRESQL. 详情参考数据库表配置。

2.在进行CDC同步的过程中原表必须拥有主键,否则无法达到数据修改和删除同步的效果。

3.服务器中需部署大数据实时处理服务。

2.使用步骤

1.实时CDC日志增量组件,使用主要是通过对数据库日志的监控来进行数据的实时传输。

2.用户通过界面填写必要的配置信息,选择对应的操作便可达到实时数据同步处理的效果。

3.操作生成的事件。

快照:当第一次监控这个数据库表的时候,会对这个数据库表进行一次快照处理。事件都是一条一条数据处理的。

INSERT(插入):插入分为批量插入和单条插入。无论是批量插入还是单条插入,每个事件也都是按单条数据处理  。

UPDATE(更新):更新分为批量更新和单条更新。无论是批量更新还是单条更新,每个事件也都是按单条数据处理。

DELETE(删除):删除也是如此。

1)选择对应的cdc 组件

2)选择需要进行CDC的库表

双击实时CDC日志增量组件,进行属性编辑,选择支持的数据库和表。(选择的数据库和表都应该是经过前提配置开启CDC日志监控的。)

业务操作:即设置同步的操作。默认勾选的三个INSERT\UPDATE\DELETE。

抽取存量数据:1、如果勾选在第一次执行ETL过程时,会同步源表数据更新目标表,之后实时的同步日志增删改数据。先全量,再实时增量。2、不勾选,在第一次执行时开始同步日志得增删改数据.实时增量。

扩展属性设置:提供一个扩展属性设置供用户添加扩展功能。

配置完成后即可运行该任务。

3.运行场景

实时CDC 日志增量组件作为输入组件可后接其他数据处理组件和输出组件。通常我们以CDC作为输入端,后接数据处理组件,然后将处理后的数据推送到kafka中。
此处我们以学生表为例。
表中有ID、NAME、SEX三个字段,其中ID字段设置为主键。
选择完成后,我们在该组件后接过滤组件,将性别(SEX)为空的数据过滤掉,然后将过滤后的数据推送到kafka服务器中。
配置完成后,点击运行任务,等待该任务成功启动运行后,我们可以通过SQL对xsb 进行数据的增删改的操作。被编辑后的数据将会被监控到且推送到kafka服务器中。
我们可通过检查运行过程中的生成的临时表(EDI_CDC_表名),通过查看临时表的数据来检查我们的操作是否被监控到。
其中EDI_CDC_OP代表数据的操作 ,C代表添加数据,D代表删除数据,U代表修改的数据。EDI_CDC_INDEX代表监控操作次数。

4.开启CDC日志方法

4.1oracle
官方文档说明只验证过oracle 12c以及19c。本文档中用oracle 11g验证通过
1 oracle配置
本质是通过oracle归档日志来实现同步的。
1.1 开启归档模式
用sqlplus执行如下命令
sqlplus / as sysdba
connect sys/admin AS SYSDBA
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = 'D:\oracle11gr2\oradata\recovery_area'
scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should now "Database log mode: Archive Mode"
archive log list
exit;
在数据库级别启用最小补充日志记录,并且可以按如下方式配置。
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
★创建表空间,监控已经存在的表空间则不需要创建
CREATE TABLESPACE logminer_tbs DATAFILE
'D:\oracle11gr2\oradata\orcl\logminer_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
★创建user
CREATE USER dbzuser IDENTIFIED BY dbz
DEFAULT TABLESPACE logminer_tbs
QUOTA UNLIMITED ON logminer_tbs
★给用户授权
GRANT CREATE SESSION TO dbzuser;
GRANT SET CONTAINER TO dbzuser;
GRANT SELECT ON V_$DATABASE to dbzuser;
GRANT FLASHBACK ANY TABLE TO dbzuser;
GRANT SELECT ANY TABLE TO dbzuser;
GRANT SELECT_CATALOG_ROLE TO dbzuser;
GRANT EXECUTE_CATALOG_ROLE TO dbzuser;
GRANT SELECT ANY TRANSACTION TO dbzuser;
GRANT LOGMINING TO dbzuser;
GRANT CREATE TABLE TO dbzuser;
GRANT LOCK ANY TABLE TO dbzuser;
GRANT ALTER ANY TABLE TO dbzuser;
GRANT CREATE SEQUENCE TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser;
GRANT SELECT ON V_$LOG TO dbzuser;
GRANT SELECT ON V_$LOG_HISTORY TO dbzuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO dbzuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO dbzuser;
GRANT SELECT ON V_$LOGFILE TO dbzuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;
exit
★创建表并授权
sqlplus / as sysdba
conn dbzuser/dbz;
CREATE TABLE STU ( "s_id" INT PRIMARY KEY, "s_name" VARCHAR ( 255 ) ); //
ALTER TABLE DBZUSER.STU ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
4.2Pg
★支持版本
postgresql支持9.4以上版本
★ 准备步骤
默认已经安装了pg数据库
源码目录:/opt/postgresql-12.8
★安装解码插件
wal2json :wal2json组织维护的插件,用于将日志转换成json。
★安装wal2json
1.从githhub:  下载最新的稳定版的源代码包 https://github.com/eulerto/wal2json/tree/wal2json_2_4
测试中使用的是2.4版本。
2 解压缩wal2json插件到指定目录
执行命令:unzip wal2json-wal2json_2_4.zip
进入wal2json的目录: cd wal2json-wal2json_2_4
3 编译打包,并安装,执行如下命令
make
make install
★ 配置postgresql.conf
打开postgresql安装目录data文件夹下的postgresql.conf文件,增加一行
shared_preload_libraries = 'wal2json'。
★配置replication slot
Replication slot和connector是一对一的关系,每启动一个CDC同步任务就需要一个复制槽。且复制槽还和快照有关,删除了复制槽则需要重新快照,如果复制槽不够,或者复制槽名称相同都会报错,通过postgresql.conf配置复制槽的数量,设置如下配置:
max_wal_senders = 10   #wal日志的sender
max_replication_slots = 10  #总的复制槽数量
保存后重启postgresql
★停止和启动postgresql
1 切换到pg的超级用户下: su - postgres
2停止:
/usr/local/postgresql/bin/pg_ctl -D /usr/local/postgresql/data -l

/usr/local/postgresql/data/logfile stop
3 启动:
/usr/local/postgresql/bin/pg_ctl -D /usr/local/postgresql/data -l
/usr/local/postgresql/data/logfile start
进入pg sql
su - postgres
psql -U postgres
★ 配置权限
1 创建一个拥有复制和登录权限的角色,角色名自行命名,本例中是dbz_role
tips:使用管理员账号也是可以的,管理员(postgres)默认拥有这些权限 dbz_role   LOGIN; CREATE ROLE REPLICATION
2 将dbz_role这个角色赋予给需要监听的数据库用户,超级管理员账号则忽略
grant dbz_role dbz_user
★ 配置pg_hba.conf
打开pg_hba.conf这个文件,在文件的最后增加一行,dbz_user是数据源里设置的用户名。也可以使用postgres超级账号。ip是数据工厂所在的服务器地址如果数据工厂是集群环境,则配置多条,每条对应不同的ip。配置好后重启postgresql.
host  replication   dbz_user    172.21.1.7/32      trust
4.3Sqlserver
SQL server数据库配置
基于sqlserver的cdc特性只支持SQL Server 2016 Service Pack 1 (SP1) 及其以后的标准版或者企业版
★sqlserver环境准备
详情请参考以下链接(linux环境下配置,window环境下配置过于繁琐且卸载不易)
https://blog.csdn.net/yutenys/article/details/122154593?ops_request_misc=%257B%2522request%255Fid%2
522%253A%2522164620290116780255244215%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall
.%2522%257D&request_id=164620290116780255244215&biz_id=0&utm_medium=distribute.pc_search_result.non
e-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-1-122154593.pc_search_result_control_group&utm_t
erm=centos7%E5%AE%89%E8%A3%85sqlserver2019&spm=1018.2226.3001.4187
步骤2——步骤4
★开启sqlserver的CDC模式
# sqlcmd -S localhost -U sa -PEsen1234             
(sa是默认账号,Esen1234是我设置的密码)
1>create database test
2>go
1>use test
2>go
★开启test库的CDC
1>EXEC sys.sp_cdc_enable_db
2>go
2.2创建文件组
1>use test
2>go
1>alter database test  add filegroup fg1
2>go
★创建文件组下的文件
1> alter database test
2> add file
3> (name='ffggg1',filename='/var/opt/mssql/data/ffggg1.mdf')
4> to filegroup fg1
5> go
★开启sqlserver代理
# /opt/mssql/bin/mssql-conf set sqlagent.enabled true
# systemctl restart mssql-server
# sqlcmd -S localhost -U sa -PEsen1234
1>EXEC sp_configure 'show advanced', 1;
2>RECONFIGURE;
3>EXEC sp_configure 'allow updates', 0;
4>RECONFIGURE;
5>EXEC sp_configure 'Agent XPs', 1;
6>RECONFIGURE;
7>go
★开启test库的t1表的CDC
1>use test
2>go
1>EXEC sys.sp_cdc_enable_table
2>@source_schema = N'dbo',
3>@source_name   = N't1',
4>@role_name     = N'NULL',
5>@filegroup_name = N'fg1',
6>@supports_net_changes = 0
7>go
★ 修改表结构后重新cdc(重点关注)
修改源表表结构,需要重新进行cdc配置才可以,分两步
第一步,先禁用源表之前的cdc配置,这样会删除源表对应的cdc数据库表以及一些配置信息
EXEC sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 't1',
@capture_instance = 'dbo_t1';
go
其中捕获实例capture_instance 这个参数的值默认是“模式 + 下划线 + 表名“
第二步 重新执行启用表cdc的存储过程
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N't1',
@role_name = N'NULL',
@filegroup_name = N'fg1',
@supports_net_changes = 0
go
★ 注意事项
1 创建文件组后一定要创建对应的文件
2 不支持sql server中的timestamp类型,因为这个类型在sql server中和时间毫无关系
4.4Mysql
本文档中用mysql5.7和mysql8.0验证通过
★ mysql配置
本质是通过mysql binlog日志来实现同步的。
5.3.1 开启binlog日志
(1)检查是否开启binlog日志
mysql> show global variables like 'log_bin';
返回结果
ON表示已开启binlog日志,如果是OFF则没有开启。
(2)如果没开启,需在/etc/my.cnf中加入以下内容
server_id=2
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30
(3)重启mysql
systemctl restart mysqld
5.3.2
//创建数据库
create database test_db default character set utf8 default collate
utf8_general_ci;
//创建user
create user 'test_user'@'%' identified by '123456';
//授权
grant all privileges on test_db.* to 'test_user'@'%';
//此类权限不是针对某个数据库的,因此需使用on *.*来进行授权
grant SELECT,RELOAD,SHOW DATABASES,REPLICATION SLAVE,REPLICATION CLIENT on *.* to
'test_user'@'%';
//刷新权限
flush privileges;
//退出
exit;  

边缘采集组件

1.组件介绍

本章节主要介绍了如何使用边缘采集输入输出组件实时采集服务器上的日志数据。

前提条件

1、需要提前在采集服务器上安装边缘节点程序。

2、在使用边缘采集输入输出组件前,需要配置边缘节点的IP地址和端口号。具体配置方法可参照边缘节点管理

目前边缘采集输入组件支持以下五种

(1)边缘采集Avro输入组件:可通过RPC接收Avro数据,将数据输入到内存中。

(2)边缘采集文件目录输入组件:可监听目录下的新文件,将数据输入到内存中,不支持断点续传。

(3)边缘采集Kafka输入组件:可从Kafka中读取数据,将数据输入到内存中。

(4)边缘采集文本输入组件:可监听目录或文件,将数据输入到内存中,支持断点续传。

(5)边缘采集Http输入组件:可接收外部HTTP客户端发送过来的数据,将数据输入到内存中。

边缘采集输出组件支持以下四种

(1)边缘采集HDFS输出组件:可从内存中接收FLUME编译器输入组件的数据,将数据通过RPC实现端到端的批量压缩数据传输。

(2)边缘采集Avro输出组件:可从内存中接收FLUME编译器输入组件的数据,将数据写入HDFS文件系统。

(3)边缘采集HBase输出组件:可从内存中接收FLUME编译器输入组件的数据,将数据写入Hbase时序数据库中。

(4)边缘采集Kafka输出组件:可从内存中接收FLUME编译器输入组件的数据,将数据写入Kafka指定主题中。

2.使用场景

Story1:客户需要采集tomcat服务器实时产生的日志数据,并实时解析入库;为实时平台操作分析提供数据支撑。
操作步骤
1、边缘采集输入输出组件主要是实现了基于服务器应用日志进行实时更新,实时监听服务器的日志数据,当日志数据中有新增后,将新增的数据输出至内存区进行缓存,然后输出到目标中。系统操作流程如下:
操作入口:任务管理-新建实时任务-实时任务设计器 
① 在ETL任务设计器分组“边缘采集输入组件”下拖出“边缘采集Avro输入组件”。双击该组件,配置采集的边缘节点、输入源的具体信息。
a.边缘节点(默认使用内存方式缓存数据):选择系统设置-数据整合配置-边缘节点,系统根据配置的边缘节点程序,采集服务器上实时产生的日志数据。
b.b.主机名:监听主机名/IP
c.端口:绑定监听端口,该端口需未被占用
d:高级属性:扩展属性用来配置非必填项的其他属性,格式为:key=value。
②在ETL任务设计器分组“边缘采集输出组件”下拖出“边缘采集Avro输出组件”。,将之前配置好的“边缘采集Avro输入组件”连接此组件,然后双击“边缘采集Avro输出组件”,配置数据的输出目标信息。
a.主机名:绑定的主机名/IP
b.端口:监听端口
c:高级属性:扩展属性用来配置非必填项的其他属性,格式为:key=value。
③点击【运行】,系统跟据配置的边缘节点程序,即可实现实时采集服务器上产生的日志数据。

3.注意事项

1、实时任务点击【运行】后,默认在后台一直运行,直到用户点击【取消】后才会终止。

2、采集组件和推送组件支持多对多的连线。

3、边缘采集组件仅支持相互之间进行使用。

实时JSON解析组件

1.应用背景

实时json解析组件可以从前置实时组件中读取到json格式的数据,然后将其解析成结构化数据,并向后置组件输出。

客户的一个接口返回的用户信息,姓名性别身份证家庭住址等等,这些信息是以JSON的形式返回的,可以通过时JSON解析组件将用户的信息落地到数据库表。 

2.操作步骤

首先我们准备一个json式的数据存储到数据库表中,此处我们以下图数据为例;

 实时组件使用参考@

具体json 内容如下:

{

"status": "1",

"count": "1",

"info": "OK",

"infocode": "10000",

"forecasts": [{

"city": "武汉市",

"adcode": "420100",

"province": "湖北",

"reporttime": "2021-05-24 11:02:00",

"casts": [{

"date": "2021-05-24",

"week": "1",

"dayweather": "多云",

"nightweather": "多云",

"daytemp": "28",

"nighttemp": "18",

"daywind": "西北",

"nightwind": "西北",

"daypower": "≤3",

"nightpower": "≤3"

}, {

"date": "2021-05-25",

"week": "2",

"dayweather": "多云",

"nightweather": "小雨",

"daytemp": "25",

"nighttemp": "20",

"daywind": "西南",

"nightwind": "西南",

"daypower": "≤3",

"nightpower": "≤3"

}, {

"date": "2021-05-26",

"week": "3",

"dayweather": "大雨",

"nightweather": "多云",

"daytemp": "23",

"nighttemp": "17",

"daywind": "西北",

"nightwind": "西北",

"daypower": "≤3",

"nightpower": "≤3"

}, {

"date": "2021-05-27",

"week": "4",

"dayweather": "阴",

"nightweather": "阴",

"daytemp": "27",

"nighttemp": "18",

"daywind": "西南",

"nightwind": "西南",

"、": "4",

"nightpower": "4"

}]

}]

}

 实时组件使用参考@

该json串中包含了多个层级节点,下面我们以解析此json 为例。我们需要解析casts节点下的字段内容。

首先我们需要定位casts节点在该json 串中所处的位置,这里casts是forecasts下的子节点,我们按照指定的格式填写解析的节点路径,此处为forecasts/casts,然后我们需要判断该节点的内容是数组类型还是对象,此处是对象类型。所以我们在配置解析组件时选择解析节点类型为对象。

 实时组件使用参考@

然后我们需要再字段列表中新建我们解析出来的字段内容,此处我们casts节点下有date、week、dayweather、nightweather、daytemp、nighttemp、daywind、nightwind、daypower、nightwind等8个字段,此处我们仅解析前四个字段,我们在字段配置中添加四个字段date、week、dayweather、nightweather 其字段名和属性名保持一致,另外引用前置ID_字段。

完成配置后输出到控制台。可以看到我们将上述json串中的casts 节点下的值解析出来了。

 实时组件使用参考@

注意:如果json 串中是对象和数组的复合数据,需要一步一步进行解析,需要先解析数组或者对象中的一个再解析另外一个。

附件列表

0

文档内容仅供参考
如果您需要解决具体问题,还可以登录亿信社区
在提问求助板块提问,30分钟内帮您解决问题

如果您认为本词条还有待完善,请编辑

上一篇主题表输出组件

下一篇元数据变更

请先登录