KAFKA生产消费组件
1.前置步骤
需提前部署kafka服务器,并且在产品服务器的kafka数据源中新建同一套生产开发kafka数据源连接池。如图,两数据源除适用环境不一致,其余的配置都是一样的。
如果没有配置配套的kafka生产和开发环境,运行kafka任务时会提示如下内容。
2.使用步骤
Kafka是一种高吞吐量的分布式发布订阅消息系统,Kafka发布环节由生产者进行消息采集和发布。在数据源中配置Kafka服务器,然后通过生产者组件可以实现将消息写入Kafka集群中,实现消息的生产。 kafka服务器中数据存储格式通常由三个字段分别是key、value、offset。
2.1Kafka生产组件我们一般可以将我们的库表数据通过ETL实时的推送到kafka集群中。例如如下场景,我们通过实时表输入组件+实时清洗组件,将清洗过后的数据以json的格式推送到kafka服务器中。
其中kafka生产组件中需要配置的属性信息有kafka数据源、名称字段、主题(topic)、值字段
Kafka数据源的选择对应我们产品数据源中kafka连接池。
主题(topic)字段用于设置的是kafka存储的分区,可以输入不存在的主题,输入不存在的主题,在运行任务时会在kafka服务器中自动创建该主题。
名称字段对应kafka服务器中的key值,通常用于数据的分类。可不配置。
值字段即我们选择需要推送的数据,一般是前置组件的表数据,仅支持推送一个列的数据。它的值对应kafka服务器中的value字段的值。
注意:睿治272此组件该值支持多选,支持同时推送多个字段的值传输到kafka服务器中,推送的值以json的格式进行存储。睿治272之前的版本想要推送多个值,需先将数据合并处理成一个字段再进行推送。
配置完成后我们运行任务,就可以将我们的数据传输到kafka服务器了。我们可以通过第三方工具或者直接通过kafka命令行,查看对应的主题(topic)的数据。
2.2kafka消费组件Kafka消费组件顾名思义就是将我们kafka服务器中的数据消费(传输)到我们的第三方数据库中。我们一般与实时表输出组件配套使用,kafka服务器中的数据输出到数据库中以数据库表的形式进行存储。
首先我们需要配置kafka消费组件里的基本属性,分别是我们消费的kafka数据源,以及kafka数据源里对应的主题(topic),主题topic可支持多选,即支持同时消费不同topic的数据。分隔符和扩展属性根据实际的kafka数据格式进行配置,这里我们不做配置。
然后在字段列表处新建字段用来暂时存放我们的kafka消费的value值,由于我们一般是将多个字段值合并处理成一个字段存储在kafka服务器中,此处我们以存储的数据格式为json串为例。所以在这里我们新建一个json的字段用来存储kafka消费的数据。
然后将数据消费出来后我们需要通过json解析组件将这个json进行解析操作,此处我们将json串中的数据解析成city、casts 两个字段。(此处可根据实际存储值进行配置json解析组件)
配置完成后,运行任务即可将kafka服务器中指定的topic数据按照一定的数据格式传输到我们的第三方数据库表中。
3.JSON解析组件
实时json解析组件可以从前置实时组件中读取到json格式的数据,然后将其解析成结构化数据,并向后置组件输出。
客户的一个接口返回的用户信息,姓名性别身份证家庭住址等等,这些信息是以JSON的形式返回的,可以通过时JSON解析组件将用户的信息落地到数据库表。
3.2操作步骤首先我们准备一个json式的数据存储到数据库表中,此处我们以下图数据为例;
具体json 内容如下:
{
"status": "1",
"count": "1",
"info": "OK",
"infocode": "10000",
"forecasts": [{
"city": "武汉市",
"adcode": "420100",
"province": "湖北",
"reporttime": "2021-05-24 11:02:00",
"casts": [{
"date": "2021-05-24",
"week": "1",
"dayweather": "多云",
"nightweather": "多云",
"daytemp": "28",
"nighttemp": "18",
"daywind": "西北",
"nightwind": "西北",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-25",
"week": "2",
"dayweather": "多云",
"nightweather": "小雨",
"daytemp": "25",
"nighttemp": "20",
"daywind": "西南",
"nightwind": "西南",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-26",
"week": "3",
"dayweather": "大雨",
"nightweather": "多云",
"daytemp": "23",
"nighttemp": "17",
"daywind": "西北",
"nightwind": "西北",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-27",
"week": "4",
"dayweather": "阴",
"nightweather": "阴",
"daytemp": "27",
"nighttemp": "18",
"daywind": "西南",
"nightwind": "西南",
"、": "4",
"nightpower": "4"
}]
}]
}
该json串中包含了多个层级节点,下面我们以解析此json 为例。我们需要解析casts节点下的字段内容。
首先我们需要定位casts节点在该json 串中所处的位置,这里casts是forecasts下的子节点,我们按照指定的格式填写解析的节点路径,此处为forecasts/casts,然后我们需要判断该节点的内容是数组类型还是对象,此处是对象类型。所以我们在配置解析组件时选择解析节点类型为对象。
然后我们需要再字段列表中新建我们解析出来的字段内容,此处我们casts节点下有date、week、dayweather、nightweather、daytemp、nighttemp、daywind、nightwind、daypower、nightwind等8个字段,此处我们仅解析前四个字段,我们在字段配置中添加四个字段date、week、dayweather、nightweather 其字段名和属性名保持一致,另外引用前置ID_字段。
完成配置后输出到控制台。可以看到我们将上述json串中的casts 节点下的值解析出来了。
注意:如果json 串中是对象和数组的复合数据,需要一步一步进行解析,需要先解析数组或者对象中的一个再解析另外一个。
实时CDC日志单表同步组件
1.前置步骤
1.需要提前对原数据库的环境进行操作配置。目前支持的五种数据库。分别为ORACLR,MYSQL,SQLSERVER,POSTGRESQL以及达梦数据库。详情参考数据库表配置。
2.在进行CDC同步的过程中原表必须拥有主键,否则无法达到数据修改和删除同步的效果。
3.服务器中需部署大数据实时处理服务。
2.使用步骤
1.实时CDC日志增量组件,使用主要是通过对数据库日志的监控来进行数据的实时传输。
2.用户通过界面填写必要的配置信息,选择对应的操作便可达到实时数据同步处理的效果。
3.操作生成的事件。
快照:当第一次监控这个数据库表的时候,会对这个数据库表进行一次快照处理。事件都是一条一条数据处理的。
INSERT(插入):插入分为批量插入和单条插入。无论是批量插入还是单条插入,每个事件也都是按单条数据处理 。
UPDATE(更新):更新分为批量更新和单条更新。无论是批量更新还是单条更新,每个事件也都是按单条数据处理。
DELETE(删除):删除也是如此。
1)选择对应的cdc 组件
2)选择需要进行CDC的库表
双击实时CDC日志增量组件,进行属性编辑,选择支持的数据库和表。(选择的数据库和表都应该是经过前提配置开启CDC日志监控的。)
业务操作:即设置同步的操作。默认勾选的三个INSERT\UPDATE\DELETE。
抽取存量数据:1、如果勾选在第一次执行ETL过程时,会同步源表数据更新目标表,之后实时的同步日志增删改数据。先全量,再实时增量。2、不勾选,在第一次执行时开始同步日志得增删改数据.实时增量。
扩展属性设置:提供一个扩展属性设置供用户添加扩展功能。
配置完成后即可运行该任务。
3.运行场景
4.开启CDC日志方法
①达梦配置
本质是通过达梦归档日志来实现同步的,本文档中用达梦V8验证通过
②开启归档模式
1 检查是否开启,执行如下查询
select para_name, para_value from v$dm_ini where para_name in ('ARCH_INI','RLOG_APPEND_LOGIC');
如果查询结果都为1,则表示已经开启,为0表示未开启
2 如果未开启,则执行如下操作开启
修改dm.ini文件,增加如下设置
RLOG_APPEND_LOGIC=2 //配置成2,达梦才会在主键表上进行更新操作时带上不相关的字段
ARCH_INI=1
③ 配置归档参数
修改dmarch.ini文件
#DaMeng Database Archive Configuration file #this is comments ARCH_WAIT_APPLY = 0 [ARCHIVE_LOCAL1] ARCH_TYPE = LOCAL ARCH_DEST = /home/dmdba/dmarch ARCH_FILE_SIZE = 2048 ARCH_SPACE_LIMIT = 102400 ARCH_FLUSH_BUF_SIZE = 0 ARCH_HANG_FLAG = 1 |
归档相关参数,可通过dmarch.ini修改
参数名称 | 默认值 | 推荐值 | 说明 |
FILE_SIZE | 128 | 2048 | 本地单个归档文件最大值(单位:M)。 |
ARCH_DEST |
|
| 归档目标,本地归档为归档文件存放路径 |
ARCH_TYPE |
|
| 归档类型:LOCAL |
SPACE_LIMIT | 0 | 102400 | 归档大小上限,0 表示无限制(按数据量的 1/5 保留,例如 500G 数据,保留 100G 归档)。 |
ARCH_FLUSH_BUF_SIZE | 0 | 0 | 归档合并刷盘缓存大小,单位 MB,取值范围 0~128,缺省为 0,表示不使用归档合并刷盘 此参数建议设置为0,可保证cdc的效率 |
配置后重启数据库
④创建用户并分配权限
Debezium 达梦连接器要求使用特定权限设置用户账号,以便连接器捕获更多事件
//创建表空间,监控已经存在的表空间则不需要创建 CREATE TABLESPACE logminer_tbs DATAFILE '/home/dmdbms/data/DAMENG/logminer_tbs.dbf' SIZE 100 AUTOEXTEND ON NEXT 10 MAXSIZE UNLIMITED;
//创建user create user dbzuser identified by Sanlink123 default tablespace logminer_tbs;
//授权 GRANT CREATE TABLE, CREATE VIEW, create index to dbzuser; GRANT SELECT ON V$VERSION TO dbzuser; GRANT SELECT ON V$ARCH_FILE TO dbzuser; GRANT SELECT ON V$ARCHIVED_LOG TO dbzuser; GRANT SELECT ON V$LOGMNR_LOGS TO dbzuser; GRANT SELECT ON V$LOGMNR_CONTENTS TO dbzuser; |
⑤创建表
CREATE TABLE DBZ_TEST(ID VARCHAR(100), NAME VARCHAR(100)); //监控已经存在的表可忽略 |
5.达梦数据库支持CDC定制
当达梦数据库开启CDC配置后,监控LOB字段需要变更,若要监控BLOB或者CLOB字段的变更,则需要满足以下两个条件中的一个才能获取变更内容:
1) 数据库表有唯一主键
2) 如果没有物理的唯一主键,但是数据在逻辑上存在唯一字段,则需要在组件的扩展属性中增加配置:edi.key.t1=ID_ (说明:其中t1是表名,ID_则是逻辑上的主键)
注意事项:
1.达梦CDC 当原表中存在自增字段时,表输出勾选的关键字字段必须对应源库表的主键字段
2.监控blob、clob等字段时,设置的物理主键与表输出中的关键字字段保持一致
3.由于多表cdc 组件无法在输出端设置关键字,所欲需要在扩展属性中给每个表都设置逻辑主键即edi.key.t1=xxx;edi.key.t2=xxx
4. 复合的逻辑主键情况,在扩展属性中,多字段的以逗号分隔,多表之间以分号分隔。即:edi.key.t1=id1,id2;edi.key.t2=auid
6.CDC支持从最新偏移量启动
为解决历史数据同步的灵活性问题,CDC日志单表/批量同步组件支持 “从最新偏移量开始运行” 模式。
目前在实时任务没有启用时,对表进行操作,在启用后会把历史偏移量数据同步。新增从最新偏移量开始运行,实时任务没有启用时,对表进行操作,启用后会忽略历史偏移量数据,从启用后的最新偏移量开始同步。
(1).任务编辑
在ETL任务编辑界面,点击“运行”,在弹出运行设置对话框中勾选“从最新偏移量开始运行”
(2).实时调度
实时任务调度界面,如果调度里的任务含有CDC日志单表/批量同步组件,点启动的时候出现一个启动设置弹框,新增“从最新偏移量开始运行”选项。
7.CDC调度任务支持计数功能
CDC日志组件由实时任务建立的调度后,执行需要在实时任务列表以及调度运行日志列表增加成功条数和失败条数
在调度运行期间,日志会记录详细的同步过程,详情见具体日志信息
当调度停止时,系统将累计并记录本次运行周期内总的成功与失败条数。
PS:当某次运行的成功与失败条数均为零时,列表中的计数列将显示为空白,而不会显示数字“0”。实时调度计数功能只对CDC相关的调度任务生效
1.组件介绍
本章节主要介绍了如何使用边缘采集输入输出组件实时采集服务器上的日志数据。
前提条件
1、需要提前在采集服务器上安装边缘节点程序。
2、在使用边缘采集输入输出组件前,需要配置边缘节点的IP地址和端口号。具体配置方法可参照边缘节点管理。
目前边缘采集输入组件支持以下五种
(1)边缘采集Avro输入组件:可通过RPC接收Avro数据,将数据输入到内存中。
(2)边缘采集文件目录输入组件:可监听目录下的新文件,将数据输入到内存中,不支持断点续传。
(3)边缘采集Kafka输入组件:可从Kafka中读取数据,将数据输入到内存中。
(4)边缘采集文本输入组件:可监听目录或文件,将数据输入到内存中,支持断点续传。
(5)边缘采集Http输入组件:可接收外部HTTP客户端发送过来的数据,将数据输入到内存中。
边缘采集输出组件支持以下四种
(1)边缘采集HDFS输出组件:可从内存中接收FLUME编译器输入组件的数据,将数据通过RPC实现端到端的批量压缩数据传输。
(2)边缘采集Avro输出组件:可从内存中接收FLUME编译器输入组件的数据,将数据写入HDFS文件系统。
(3)边缘采集HBase输出组件:可从内存中接收FLUME编译器输入组件的数据,将数据写入Hbase时序数据库中。
(4)边缘采集Kafka输出组件:可从内存中接收FLUME编译器输入组件的数据,将数据写入Kafka指定主题中。
2.使用场景
3.注意事项
1.应用背景
实时json解析组件可以从前置实时组件中读取到json格式的数据,然后将其解析成结构化数据,并向后置组件输出。
客户的一个接口返回的用户信息,姓名性别身份证家庭住址等等,这些信息是以JSON的形式返回的,可以通过时JSON解析组件将用户的信息落地到数据库表。
2.操作步骤
首先我们准备一个json式的数据存储到数据库表中,此处我们以下图数据为例;

具体json 内容如下:
{
"status": "1",
"count": "1",
"info": "OK",
"infocode": "10000",
"forecasts": [{
"city": "武汉市",
"adcode": "420100",
"province": "湖北",
"reporttime": "2021-05-24 11:02:00",
"casts": [{
"date": "2021-05-24",
"week": "1",
"dayweather": "多云",
"nightweather": "多云",
"daytemp": "28",
"nighttemp": "18",
"daywind": "西北",
"nightwind": "西北",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-25",
"week": "2",
"dayweather": "多云",
"nightweather": "小雨",
"daytemp": "25",
"nighttemp": "20",
"daywind": "西南",
"nightwind": "西南",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-26",
"week": "3",
"dayweather": "大雨",
"nightweather": "多云",
"daytemp": "23",
"nighttemp": "17",
"daywind": "西北",
"nightwind": "西北",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-27",
"week": "4",
"dayweather": "阴",
"nightweather": "阴",
"daytemp": "27",
"nighttemp": "18",
"daywind": "西南",
"nightwind": "西南",
"、": "4",
"nightpower": "4"
}]
}]
}

该json串中包含了多个层级节点,下面我们以解析此json 为例。我们需要解析casts节点下的字段内容。
首先我们需要定位casts节点在该json 串中所处的位置,这里casts是forecasts下的子节点,我们按照指定的格式填写解析的节点路径,此处为forecasts/casts,然后我们需要判断该节点的内容是数组类型还是对象,此处是对象类型。所以我们在配置解析组件时选择解析节点类型为对象。

然后我们需要再字段列表中新建我们解析出来的字段内容,此处我们casts节点下有date、week、dayweather、nightweather、daytemp、nighttemp、daywind、nightwind、daypower、nightwind等8个字段,此处我们仅解析前四个字段,我们在字段配置中添加四个字段date、week、dayweather、nightweather 其字段名和属性名保持一致,另外引用前置ID_字段。
完成配置后输出到控制台。可以看到我们将上述json串中的casts 节点下的值解析出来了。

注意:如果json 串中是对象和数组的复合数据,需要一步一步进行解析,需要先解析数组或者对象中的一个再解析另外一个。
请先登录