KAFKA生产消费组件
1.前置步骤
需提前部署kafka服务器,并且在产品服务器的kafka数据源中新建同一套生产开发kafka数据源连接池。如图,两数据源除适用环境不一致,其余的配置都是一样的。
如果没有配置配套的kafka生产和开发环境,运行kafka任务时会提示如下内容。
2.使用步骤
Kafka是一种高吞吐量的分布式发布订阅消息系统,Kafka发布环节由生产者进行消息采集和发布。在数据源中配置Kafka服务器,然后通过生产者组件可以实现将消息写入Kafka集群中,实现消息的生产。 kafka服务器中数据存储格式通常由三个字段分别是key、value、offset。
2.1Kafka生产组件我们一般可以将我们的库表数据通过ETL实时的推送到kafka集群中。例如如下场景,我们通过实时表输入组件+实时清洗组件,将清洗过后的数据以json的格式推送到kafka服务器中。
其中kafka生产组件中需要配置的属性信息有kafka数据源、名称字段、主题(topic)、值字段
Kafka数据源的选择对应我们产品数据源中kafka连接池。
主题(topic)字段用于设置的是kafka存储的分区,可以输入不存在的主题,输入不存在的主题,在运行任务时会在kafka服务器中自动创建该主题。
名称字段对应kafka服务器中的key值,通常用于数据的分类。可不配置。
值字段即我们选择需要推送的数据,一般是前置组件的表数据,仅支持推送一个列的数据。它的值对应kafka服务器中的value字段的值。
注意:睿治272此组件该值支持多选,支持同时推送多个字段的值传输到kafka服务器中,推送的值以json的格式进行存储。睿治272之前的版本想要推送多个值,需先将数据合并处理成一个字段再进行推送。
配置完成后我们运行任务,就可以将我们的数据传输到kafka服务器了。我们可以通过第三方工具或者直接通过kafka命令行,查看对应的主题(topic)的数据。
2.2kafka消费组件Kafka消费组件顾名思义就是将我们kafka服务器中的数据消费(传输)到我们的第三方数据库中。我们一般与实时表输出组件配套使用,kafka服务器中的数据输出到数据库中以数据库表的形式进行存储。
首先我们需要配置kafka消费组件里的基本属性,分别是我们消费的kafka数据源,以及kafka数据源里对应的主题(topic),主题topic可支持多选,即支持同时消费不同topic的数据。分隔符和扩展属性根据实际的kafka数据格式进行配置,这里我们不做配置。
然后在字段列表处新建字段用来暂时存放我们的kafka消费的value值,由于我们一般是将多个字段值合并处理成一个字段存储在kafka服务器中,此处我们以存储的数据格式为json串为例。所以在这里我们新建一个json的字段用来存储kafka消费的数据。
然后将数据消费出来后我们需要通过json解析组件将这个json进行解析操作,此处我们将json串中的数据解析成city、casts 两个字段。(此处可根据实际存储值进行配置json解析组件)
配置完成后,运行任务即可将kafka服务器中指定的topic数据按照一定的数据格式传输到我们的第三方数据库表中。
3.JSON解析组件
实时json解析组件可以从前置实时组件中读取到json格式的数据,然后将其解析成结构化数据,并向后置组件输出。
客户的一个接口返回的用户信息,姓名性别身份证家庭住址等等,这些信息是以JSON的形式返回的,可以通过时JSON解析组件将用户的信息落地到数据库表。
3.2操作步骤首先我们准备一个json式的数据存储到数据库表中,此处我们以下图数据为例;
具体json 内容如下:
{
"status": "1",
"count": "1",
"info": "OK",
"infocode": "10000",
"forecasts": [{
"city": "武汉市",
"adcode": "420100",
"province": "湖北",
"reporttime": "2021-05-24 11:02:00",
"casts": [{
"date": "2021-05-24",
"week": "1",
"dayweather": "多云",
"nightweather": "多云",
"daytemp": "28",
"nighttemp": "18",
"daywind": "西北",
"nightwind": "西北",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-25",
"week": "2",
"dayweather": "多云",
"nightweather": "小雨",
"daytemp": "25",
"nighttemp": "20",
"daywind": "西南",
"nightwind": "西南",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-26",
"week": "3",
"dayweather": "大雨",
"nightweather": "多云",
"daytemp": "23",
"nighttemp": "17",
"daywind": "西北",
"nightwind": "西北",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-27",
"week": "4",
"dayweather": "阴",
"nightweather": "阴",
"daytemp": "27",
"nighttemp": "18",
"daywind": "西南",
"nightwind": "西南",
"、": "4",
"nightpower": "4"
}]
}]
}
该json串中包含了多个层级节点,下面我们以解析此json 为例。我们需要解析casts节点下的字段内容。
首先我们需要定位casts节点在该json 串中所处的位置,这里casts是forecasts下的子节点,我们按照指定的格式填写解析的节点路径,此处为forecasts/casts,然后我们需要判断该节点的内容是数组类型还是对象,此处是对象类型。所以我们在配置解析组件时选择解析节点类型为对象。
然后我们需要再字段列表中新建我们解析出来的字段内容,此处我们casts节点下有date、week、dayweather、nightweather、daytemp、nighttemp、daywind、nightwind、daypower、nightwind等8个字段,此处我们仅解析前四个字段,我们在字段配置中添加四个字段date、week、dayweather、nightweather 其字段名和属性名保持一致,另外引用前置ID_字段。
完成配置后输出到控制台。可以看到我们将上述json串中的casts 节点下的值解析出来了。
注意:如果json 串中是对象和数组的复合数据,需要一步一步进行解析,需要先解析数组或者对象中的一个再解析另外一个。
实时CDC日志单表同步组件
1.前置步骤
1.需要提前对原数据库的环境进行操作配置。目前支持的四种数据库。分别为ORACLR,MYSQL,SQLSERVER,POSTGRESQL. 详情参考数据库表配置。
2.在进行CDC同步的过程中原表必须拥有主键,否则无法达到数据修改和删除同步的效果。
3.服务器中需部署大数据实时处理服务。
2.使用步骤
1.实时CDC日志增量组件,使用主要是通过对数据库日志的监控来进行数据的实时传输。
2.用户通过界面填写必要的配置信息,选择对应的操作便可达到实时数据同步处理的效果。
3.操作生成的事件。
快照:当第一次监控这个数据库表的时候,会对这个数据库表进行一次快照处理。事件都是一条一条数据处理的。
INSERT(插入):插入分为批量插入和单条插入。无论是批量插入还是单条插入,每个事件也都是按单条数据处理 。
UPDATE(更新):更新分为批量更新和单条更新。无论是批量更新还是单条更新,每个事件也都是按单条数据处理。
DELETE(删除):删除也是如此。
1)选择对应的cdc 组件
2)选择需要进行CDC的库表
双击实时CDC日志增量组件,进行属性编辑,选择支持的数据库和表。(选择的数据库和表都应该是经过前提配置开启CDC日志监控的。)
业务操作:即设置同步的操作。默认勾选的三个INSERT\UPDATE\DELETE。
抽取存量数据:1、如果勾选在第一次执行ETL过程时,会同步源表数据更新目标表,之后实时的同步日志增删改数据。先全量,再实时增量。2、不勾选,在第一次执行时开始同步日志得增删改数据.实时增量。
扩展属性设置:提供一个扩展属性设置供用户添加扩展功能。
配置完成后即可运行该任务。
3.运行场景
4.开启CDC日志方法
1.组件介绍
本章节主要介绍了如何使用边缘采集输入输出组件实时采集服务器上的日志数据。
前提条件
1、需要提前在采集服务器上安装边缘节点程序。
2、在使用边缘采集输入输出组件前,需要配置边缘节点的IP地址和端口号。具体配置方法可参照边缘节点管理。
目前边缘采集输入组件支持以下五种
(1)边缘采集Avro输入组件:可通过RPC接收Avro数据,将数据输入到内存中。
(2)边缘采集文件目录输入组件:可监听目录下的新文件,将数据输入到内存中,不支持断点续传。
(3)边缘采集Kafka输入组件:可从Kafka中读取数据,将数据输入到内存中。
(4)边缘采集文本输入组件:可监听目录或文件,将数据输入到内存中,支持断点续传。
(5)边缘采集Http输入组件:可接收外部HTTP客户端发送过来的数据,将数据输入到内存中。
边缘采集输出组件支持以下四种
(1)边缘采集HDFS输出组件:可从内存中接收FLUME编译器输入组件的数据,将数据通过RPC实现端到端的批量压缩数据传输。
(2)边缘采集Avro输出组件:可从内存中接收FLUME编译器输入组件的数据,将数据写入HDFS文件系统。
(3)边缘采集HBase输出组件:可从内存中接收FLUME编译器输入组件的数据,将数据写入Hbase时序数据库中。
(4)边缘采集Kafka输出组件:可从内存中接收FLUME编译器输入组件的数据,将数据写入Kafka指定主题中。
2.使用场景
3.注意事项
1.应用背景
实时json解析组件可以从前置实时组件中读取到json格式的数据,然后将其解析成结构化数据,并向后置组件输出。
客户的一个接口返回的用户信息,姓名性别身份证家庭住址等等,这些信息是以JSON的形式返回的,可以通过时JSON解析组件将用户的信息落地到数据库表。
2.操作步骤
首先我们准备一个json式的数据存储到数据库表中,此处我们以下图数据为例;

具体json 内容如下:
{
"status": "1",
"count": "1",
"info": "OK",
"infocode": "10000",
"forecasts": [{
"city": "武汉市",
"adcode": "420100",
"province": "湖北",
"reporttime": "2021-05-24 11:02:00",
"casts": [{
"date": "2021-05-24",
"week": "1",
"dayweather": "多云",
"nightweather": "多云",
"daytemp": "28",
"nighttemp": "18",
"daywind": "西北",
"nightwind": "西北",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-25",
"week": "2",
"dayweather": "多云",
"nightweather": "小雨",
"daytemp": "25",
"nighttemp": "20",
"daywind": "西南",
"nightwind": "西南",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-26",
"week": "3",
"dayweather": "大雨",
"nightweather": "多云",
"daytemp": "23",
"nighttemp": "17",
"daywind": "西北",
"nightwind": "西北",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-27",
"week": "4",
"dayweather": "阴",
"nightweather": "阴",
"daytemp": "27",
"nighttemp": "18",
"daywind": "西南",
"nightwind": "西南",
"、": "4",
"nightpower": "4"
}]
}]
}

该json串中包含了多个层级节点,下面我们以解析此json 为例。我们需要解析casts节点下的字段内容。
首先我们需要定位casts节点在该json 串中所处的位置,这里casts是forecasts下的子节点,我们按照指定的格式填写解析的节点路径,此处为forecasts/casts,然后我们需要判断该节点的内容是数组类型还是对象,此处是对象类型。所以我们在配置解析组件时选择解析节点类型为对象。

然后我们需要再字段列表中新建我们解析出来的字段内容,此处我们casts节点下有date、week、dayweather、nightweather、daytemp、nighttemp、daywind、nightwind、daypower、nightwind等8个字段,此处我们仅解析前四个字段,我们在字段配置中添加四个字段date、week、dayweather、nightweather 其字段名和属性名保持一致,另外引用前置ID_字段。
完成配置后输出到控制台。可以看到我们将上述json串中的casts 节点下的值解析出来了。

注意:如果json 串中是对象和数组的复合数据,需要一步一步进行解析,需要先解析数组或者对象中的一个再解析另外一个。
请先登录