1.前置步骤
需提前部署kafka服务器,并且在产品服务器的kafka数据源中新建同一套生产开发kafka数据源连接池。如图,两数据源除适用环境不一致,其余的配置都是一样的。
如果没有配置配套的kafka生产和开发环境,运行kafka任务时会提示如下内容。
2.使用步骤
Kafka是一种高吞吐量的分布式发布订阅消息系统,Kafka发布环节由生产者进行消息采集和发布。在数据源中配置Kafka服务器,然后通过生产者组件可以实现将消息写入Kafka集群中,实现消息的生产。 kafka服务器中数据存储格式通常由三个字段分别是key、value、offset。
2.1Kafka生产组件
我们一般可以将我们的库表数据通过ETL实时的推送到kafka集群中。例如如下场景,我们通过实时表输入组件+实时清洗组件,将清洗过后的数据以json的格式推送到kafka服务器中。
其中kafka生产组件中需要配置的属性信息有kafka数据源、名称字段、主题(topic)、值字段
Kafka数据源的选择对应我们产品数据源中kafka连接池。
主题(topic)字段用于设置的是kafka存储的分区,可以输入不存在的主题,输入不存在的主题,在运行任务时会在kafka服务器中自动创建该主题。
名称字段对应kafka服务器中的key值,通常用于数据的分类。可不配置。
值字段即我们选择需要推送的数据,一般是前置组件的表数据,仅支持推送一个列的数据。它的值对应kafka服务器中的value字段的值。
注意:睿码273此组件该值支持多选,支持同时推送多个字段的值传输到kafka服务器中,推送的值以json的格式进行存储。睿码273之前的版本想要推送多个值,需先将数据合并处理成一个字段再进行推送。
配置完成后我们运行任务,就可以将我们的数据传输到kafka服务器了。我们可以通过第三方工具或者直接通过kafka命令行,查看对应的主题(topic)的数据。
2.2kafka消费组件
Kafka消费组件顾名思义就是将我们kafka服务器中的数据消费(传输)到我们的第三方数据库中。我们一般与实时表输出组件配套使用,kafka服务器中的数据输出到数据库中以数据库表的形式进行存储。
首先我们需要配置kafka消费组件里的基本属性,分别是我们消费的kafka数据源,以及kafka数据源里对应的主题(topic),主题topic可支持多选,即支持同时消费不同topic的数据。分隔符和扩展属性根据实际的kafka数据格式进行配置,这里我们不做配置。
然后在字段列表处新建字段用来暂时存放我们的kafka消费的value值,由于我们一般是将多个字段值合并处理成一个字段存储在kafka服务器中,此处我们以存储的数据格式为json串为例。所以在这里我们新建一个json的字段用来存储kafka消费的数据。
然后将数据消费出来后我们需要通过json解析组件将这个json进行解析操作,此处我们将json串中的数据解析成city、casts 两个字段。(此处可根据实际存储值进行配置json解析组件)
配置完成后,运行任务即可将kafka服务器中指定的topic数据按照一定的数据格式传输到我们的第三方数据库表中。
3.JSON解析组件
3.1应用背景
实时json解析组件可以从前置实时组件中读取到json格式的数据,然后将其解析成结构化数据,并向后置组件输出。
客户的一个接口返回的用户信息,姓名性别身份证家庭住址等等,这些信息是以JSON的形式返回的,可以通过时JSON解析组件将用户的信息落地到数据库表。
3.2操作步骤
首先我们准备一个json式的数据存储到数据库表中,此处我们以下图数据为例;
具体json 内容如下:
{
"status": "1",
"count": "1",
"info": "OK",
"infocode": "10000",
"forecasts": [{
"city": "武汉市",
"adcode": "420100",
"province": "湖北",
"reporttime": "2021-05-24 11:02:00",
"casts": [{
"date": "2021-05-24",
"week": "1",
"dayweather": "多云",
"nightweather": "多云",
"daytemp": "28",
"nighttemp": "18",
"daywind": "西北",
"nightwind": "西北",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-25",
"week": "2",
"dayweather": "多云",
"nightweather": "小雨",
"daytemp": "25",
"nighttemp": "20",
"daywind": "西南",
"nightwind": "西南",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-26",
"week": "3",
"dayweather": "大雨",
"nightweather": "多云",
"daytemp": "23",
"nighttemp": "17",
"daywind": "西北",
"nightwind": "西北",
"daypower": "≤3",
"nightpower": "≤3"
}, {
"date": "2021-05-27",
"week": "4",
"dayweather": "阴",
"nightweather": "阴",
"daytemp": "27",
"nighttemp": "18",
"daywind": "西南",
"nightwind": "西南",
"、": "4",
"nightpower": "4"
}]
}]
}
该json串中包含了多个层级节点,下面我们以解析此json 为例。我们需要解析casts节点下的字段内容。
首先我们需要定位casts节点在该json 串中所处的位置,这里casts是forecasts下的子节点,我们按照指定的格式填写解析的节点路径,此处为forecasts/casts,然后我们需要判断该节点的内容是数组类型还是对象,此处是对象类型。所以我们在配置解析组件时选择解析节点类型为对象。
然后我们需要再字段列表中新建我们解析出来的字段内容,此处我们casts节点下有date、week、dayweather、nightweather、daytemp、nighttemp、daywind、nightwind、daypower、nightwind等8个字段,此处我们仅解析前四个字段,我们在字段配置中添加四个字段date、week、dayweather、nightweather 其字段名和属性名保持一致,另外引用前置ID_字段。
完成配置后输出到控制台。可以看到我们将上述json串中的casts 节点下的值解析出来了。
注意:如果json 串中是对象和数组的复合数据,需要一步一步进行解析,需要先解析数组或者对象中的一个再解析另外一个。
请先登录