亿信ABI
当前版本:5.6.1

KAFKA生产消费组件

1.前置步骤

需提前部署kafka服务器,并且在产品服务器的kafka数据源中新建同一套生产开发kafka数据源连接池。如图,两数据源除适用环境不一致,其余的配置都是一样的。

如果没有配置配套的kafka生产和开发环境,运行kafka任务时会提示如下内容。

2.使用步骤

Kafka是一种高吞吐量的分布式发布订阅消息系统,Kafka发布环节由生产者进行消息采集和发布。在数据源中配置Kafka服务器,然后通过生产者组件可以实现将消息写入Kafka集群中,实现消息的生产。 kafka服务器中数据存储格式通常由三个字段分别是key、value、offset。

2.1Kafka生产组件

我们一般可以将我们的库表数据通过ETL实时的推送到kafka集群中。例如如下场景,我们通过实时表输入组件+实时清洗组件,将清洗过后的数据以json的格式推送到kafka服务器中。

其中kafka生产组件中需要配置的属性信息有kafka数据源、名称字段、主题(topic)、值字段

Kafka数据源的选择对应我们产品数据源中kafka连接池。

主题(topic)字段用于设置的是kafka存储的分区,可以输入不存在的主题,输入不存在的主题,在运行任务时会在kafka服务器中自动创建该主题。

名称字段对应kafka服务器中的key值,通常用于数据的分类。可不配置。

值字段即我们选择需要推送的数据,一般是前置组件的表数据,仅支持推送一个列的数据。它的值对应kafka服务器中的value字段的值。

注意:睿码273此组件该值支持多选,支持同时推送多个字段的值传输到kafka服务器中,推送的值以json的格式进行存储。睿码273之前的版本想要推送多个值,需先将数据合并处理成一个字段再进行推送。

配置完成后我们运行任务,就可以将我们的数据传输到kafka服务器了。我们可以通过第三方工具或者直接通过kafka命令行,查看对应的主题(topic)的数据。

2.2kafka消费组件

Kafka消费组件顾名思义就是将我们kafka服务器中的数据消费(传输)到我们的第三方数据库中。我们一般与实时表输出组件配套使用,kafka服务器中的数据输出到数据库中以数据库表的形式进行存储。

首先我们需要配置kafka消费组件里的基本属性,分别是我们消费的kafka数据源,以及kafka数据源里对应的主题(topic),主题topic可支持多选,即支持同时消费不同topic的数据。分隔符和扩展属性根据实际的kafka数据格式进行配置,这里我们不做配置。

然后在字段列表处新建字段用来暂时存放我们的kafka消费的value值,由于我们一般是将多个字段值合并处理成一个字段存储在kafka服务器中,此处我们以存储的数据格式为json串为例。所以在这里我们新建一个json的字段用来存储kafka消费的数据。 

然后将数据消费出来后我们需要通过json解析组件将这个json进行解析操作,此处我们将json串中的数据解析成city、casts 两个字段。(此处可根据实际存储值进行配置json解析组件)

配置完成后,运行任务即可将kafka服务器中指定的topic数据按照一定的数据格式传输到我们的第三方数据库表中。

3.JSON解析组件

3.1应用背景

实时json解析组件可以从前置实时组件中读取到json格式的数据,然后将其解析成结构化数据,并向后置组件输出。

客户的一个接口返回的用户信息,姓名性别身份证家庭住址等等,这些信息是以JSON的形式返回的,可以通过时JSON解析组件将用户的信息落地到数据库表。 

3.2操作步骤

首先我们准备一个json式的数据存储到数据库表中,此处我们以下图数据为例;

具体json 内容如下:

{

"status": "1",

"count": "1",

"info": "OK",

"infocode": "10000",

"forecasts": [{

"city": "武汉市",

"adcode": "420100",

"province": "湖北",

"reporttime": "2021-05-24 11:02:00",

"casts": [{

"date": "2021-05-24",

"week": "1",

"dayweather": "多云",

"nightweather": "多云",

"daytemp": "28",

"nighttemp": "18",

"daywind": "西北",

"nightwind": "西北",

"daypower": "≤3",

"nightpower": "≤3"

}, {

"date": "2021-05-25",

"week": "2",

"dayweather": "多云",

"nightweather": "小雨",

"daytemp": "25",

"nighttemp": "20",

"daywind": "西南",

"nightwind": "西南",

"daypower": "≤3",

"nightpower": "≤3"

}, {

"date": "2021-05-26",

"week": "3",

"dayweather": "大雨",

"nightweather": "多云",

"daytemp": "23",

"nighttemp": "17",

"daywind": "西北",

"nightwind": "西北",

"daypower": "≤3",

"nightpower": "≤3"

}, {

"date": "2021-05-27",

"week": "4",

"dayweather": "阴",

"nightweather": "阴",

"daytemp": "27",

"nighttemp": "18",

"daywind": "西南",

"nightwind": "西南",

"、": "4",

"nightpower": "4"

}]

}]

}

该json串中包含了多个层级节点,下面我们以解析此json 为例。我们需要解析casts节点下的字段内容。

首先我们需要定位casts节点在该json 串中所处的位置,这里casts是forecasts下的子节点,我们按照指定的格式填写解析的节点路径,此处为forecasts/casts,然后我们需要判断该节点的内容是数组类型还是对象,此处是对象类型。所以我们在配置解析组件时选择解析节点类型为对象。

然后我们需要再字段列表中新建我们解析出来的字段内容,此处我们casts节点下有date、week、dayweather、nightweather、daytemp、nighttemp、daywind、nightwind、daypower、nightwind等8个字段,此处我们仅解析前四个字段,我们在字段配置中添加四个字段date、week、dayweather、nightweather 其字段名和属性名保持一致,另外引用前置ID_字段。

完成配置后输出到控制台。可以看到我们将上述json串中的casts 节点下的值解析出来了。

注意:如果json 串中是对象和数组的复合数据,需要一步一步进行解析,需要先解析数组或者对象中的一个再解析另外一个。

附件列表

0

文档内容仅供参考
如果您需要解决具体问题,还可以登录亿信社区
在提问求助板块提问,30分钟内帮您解决问题

如果您认为本词条还有待完善,请编辑

上一篇主数据变更增量全量导入

下一篇实时CDC日志单表同步组件

请先登录