KAFKA生产消费组件
1.前置步骤
需提前部署kafka服务器,并且在产品服务器的kafka数据源中新建同一套生产开发kafka数据源连接池。如图,两数据源除适用环境不一致,其余的配置都是一样的。
如果没有配置配套的kafka生产和开发环境,运行kafka任务时会提示如下内容。
2.使用步骤
Kafka是一种高吞吐量的分布式发布订阅消息系统,Kafka发布环节由生产者进行消息采集和发布。在数据源中配置Kafka服务器,然后通过生产者组件可以实现将消息写入Kafka集群中,实现消息的生产。 kafka服务器中数据存储格式通常由三个字段分别是key、value、offset。
2.1Kafka生产组件我们一般可以将我们的库表数据通过ETL实时的推送到kafka集群中。例如如下场景,我们通过实时表输入组件+实时清洗组件,将清洗过后的数据以json的格式推送到kafka服务器中。
其中kafka生产组件中需要配置的属性信息有kafka数据源、名称字段、主题(topic)、值字段
Kafka数据源的选择对应我们产品数据源中kafka连接池。
主题(topic)字段用于设置的是kafka存储的分区,可以输入不存在的主题,输入不存在的主题,在运行任务时会在kafka服务器中自动创建该主题。
名称字段对应kafka服务器中的key值,通常用于数据的分类。可不配置。
值字段即我们选择需要推送的数据,一般是前置组件的表数据,仅支持推送一个列的数据。它的值对应kafka服务器中的value字段的值。
注意:睿治272此组件该值支持多选,支持同时推送多个字段的值传输到kafka服务器中,推送的值以json的格式进行存储。睿治272之前的版本想要推送多个值,需先将数据合并处理成一个字段再进行推送。
配置完成后我们运行任务,就可以将我们的数据传输到kafka服务器了。我们可以通过第三方工具或者直接通过kafka命令行,查看对应的主题(topic)的数据。
2.2kafka消费组件Kafka消费组件顾名思义就是将我们kafka服务器中的数据消费(传输)到我们的第三方数据库中。我们一般与实时表输出组件配套使用,kafka服务器中的数据输出到数据库中以数据库表的形式进行存储。
首先我们需要配置kafka消费组件里的基本属性,分别是我们消费的kafka数据源,以及kafka数据源里对应的主题(topic),主题topic可支持多选,即支持同时消费不同topic的数据。分隔符和扩展属性根据实际的kafka数据格式进行配置,这里我们不做配置。
然后在字段列表处新建字段用来暂时存放我们的kafka消费的value值,由于我们一般是将多个字段值合并处理成一个字段存储在kafka服务器中,此处我们以存储的数据格式为json串为例。所以在这里我们新建一个json的字段用来存储kafka消费的数据。
然后将数据消费出来后我们需要通过json解析组件将这个json进行解析操作,此处我们将json串中的数据解析成city、casts 两个字段。(此处可根据实际存储值进行配置json解析组件)
配置完成后,运行任务即可将kafka服务器中指定的topic数据按照一定的数据格式传输到我们的第三方数据库表中。
请先登录