每日热文:大数据NiFi(二十一):监控日志文件生产到Kafka

监控日志文件生产到Kafka

案例:监控某个目录下的文件内容,将消息生产到Kafka中。

此案例使用到“TailFile”和“PublishKafka_1_0”处理器。

一、​​​​​​​配置“TailFile”处理器

创建“TailFile”处理器并配置:


【资料图】

注意:以上需要在NiFi集群中的每个节点上创建“/root/test/logdata”文件,“logdata”是文件,而非目录。

二、配置“PublishKafka_1_0”处理器

“PublishKafka_1_0”处理器作用是使用Kafka 1.0生产者API将FlowFile的内容作为消息发送给Apache Kafka。发送的内容可以是单独的FlowFile,也可以通过用户指定分隔符分割的FlowFile内容。

关于“PublishKafka_1_0”处理器的“Properties”主要配置的说明如下:

配置项

默认值

允许值

描述

Kafka Brokers(Kafka节点)

localhost:9092

逗号分割的Kafka集群Broker列表。格式:host:port

Topic Name(topic 名称)

将消息生产到的Topic 名称。

Delivery Guarantee(数据传递保证)

0

指定保证消息被发送到Kafka的要求。对应Kafka的"acks"属性。可以配置的项如下:Best Effort (尽力交付,相当于ack=0):在向Kafka节点写出消息后,FlowFile将被路由到成功,而不需要等待响应。这提供了最好的性能,但可能会导致数据丢失。例如:消息写出到Kafka节点,但是对应节点挂掉,这时将消息路由到成功。Guarantee Single Node Delivery(保证单节点交付,相当于ack=1,Kafka中的默认配置):KafkaProducer把消息发送出去,至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。该情况下,如果follower没有成功备份数据,而此时leader刚好又挂掉了,就会导致消息丢失。该选项就是如果消息被单个Kafka节点接收到,FlowFile将被路由到成功,无论它是否被复制,但如果Kafka节点崩溃,可能会导致数据丢失。Guarantee Replicated Delivery(保证复制交付,相当于ack=-1):FlowFile数据写出后,Kafka topic ISR列表离跟leader保持同步的那些follower都要把消息同步过去,该消息才会被认为成功,否则路由到失败。

Use Transactions(使用事务)

true

▪true▪false

指定NiFi是否应该在与Kafka通信时提供事务性保证。如果发送数据到Kafka有问题,并且这个属性设置为false,那么已经发送到Kafka的消息将继续发送,并被传递给消费者。如果这个设置为true,那么Kafka事务将被回滚,这样这些消息对消费者是不可用的。将此设置为true需要将属性设置为"Guarantee Replicated Delivery"。

Best Effort (尽力交付,相当于ack=0):

在向Kafka节点写出消息后,FlowFile将被路由到成功,而不需要等待响应。这提供了最好的性能,但可能会导致数据丢失。例如:消息写出到Kafka节点,但是对应节点挂掉,这时将消息路由到成功。

Guarantee Single Node Delivery(保证单节点交付,相当于ack=1,Kafka中的默认配置):

KafkaProducer把消息发送出去,至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。该情况下,如果follower没有成功备份数据,而此时leader刚好又挂掉了,就会导致消息丢失。该选项就是如果消息被单个Kafka节点接收到,FlowFile将被路由到成功,无论它是否被复制,但如果Kafka节点崩溃,可能会导致数据丢失。 Guarantee Replicated Delivery(保证复制交付,相当于ack=-1): FlowFile数据写出后,Kafka topic ISR列表离跟leader保持同步的那些follower都要把消息同步过去,该消息才会被认为成功,否则路由到失败。 Use Transactions(使用事务)true true false 指定NiFi是否应该在与Kafka通信时提供事务性保证。如果发送数据到Kafka有问题,并且这个属性设置为false,那么已经发送到Kafka的消息将继续发送,并被传递给消费者。如果这个设置为true,那么Kafka事务将被回滚,这样这些消息对消费者是不可用的。将此设置为true需要将属性设置为"Guarantee Replicated Delivery"。

“PublishKafka_1_0”处理器配置如下:

1、创建“PublishKafka_1_0”处理器

2、配置“PROPERTIES”

注意:以上topic 可以在Kafka中创建好,也可以执行时自动创建。

3、连接“TailFile”处理器和“PublishKafka_1_0”处理器

连接“TailFile”处理器和“PublishKafka_1_0”处理器,并设置“PublishKafka_1_0”处理器“failure”和“success”路由关系为自动终止。

三、运行测试

1、启动Kafka集群,启动NiFi处理流程

2、向/root/test/logdata文件中写入数据并保存

向NiFi集群中的其中一台节点的“logdata”中写入以下数据即可

[root@node1 test]# echo "hello world1" > /root/test/logdata[root@node1 test]# echo "hello world2" >> /root/test/logdata[root@node1 test]# echo "hello world3" >> /root/test/logdata

3、查看Kafka中自动创建的“nifi_topic”中的数据

以上数据每写入一行,有个空行,这是由于“TailFile”处理器监控数据导致的,实际就是写入了3条数据,可以通过后期业务处理时,对数据进行trim处理即可。

关键词: Kafka

为您推荐

每日热文:大数据NiFi(二十一):监控日志文件生产到Kafka

注意:以上需要在NiFi集群中的每个节点上创建“ root test logdata”文件,“logdata”是文件,而非目录。

来源:腾讯云2023-03-05

阿甘正传简介300字_阿甘正传简介 环球今亮点

1、这部电影主要讲述了一个叫阿甘的孩子生活在一个美国小镇里,可阿甘从小就带着纠正骨骼的支架,就连走路都很吃力。2、阿甘的

来源:互联网2023-03-05

焦点速看:奥迪最新的信息娱乐系统将让您无需智能手机即可下载应用程序

如果你在接下来的几个月里想买一辆新奥迪,我们有一些好消息:今年晚些时候,这家德国汽车制造商将推出一个本地应用程序商店,允

来源:互联网2023-03-05

【新要闻】表格里名字怎么按照26个英文字母排序_英文字母排序

1、1、26个英文字母排序大写是:A、B、C、D、E、F、G、H、I、J、K、L、M、N、O、P、Q、R、S、T、U、V

来源:互联网2023-03-05

全球今头条!半场-比林开场9秒破门特罗萨德伤退 阿森纳0-1伯恩茅斯

半场-比林开场9秒破门特罗萨德伤退阿森纳0-1伯恩茅斯,比林,萨卡,阿森纳,史密斯,足球竞赛,英国足球,伯恩茅斯队,英格兰足球,国际足球赛事,莱昂德

来源:直播吧2023-03-05

时讯:核显也能视频超分辨率,英特尔 VSR 类似技术曝光

IT之家3月4日消息,英伟达日前发布了531 18WHQL驱动程序,带来了用户期待已久的RTXVide

来源:IT之家2023-03-04

藏獒和狼谁厉害

1、藏獒和狼谁厉害,一直都是人们好奇的,其实要看不同的品种,不同品种下的藏獒和狼的战斗力也不一样。从科学数据上来说,无论体重还是咬合力

来源:万年历2023-03-04

天天热资讯!抱死制动装置_ABS是什么意思

汽车现在已经越来越普及,基本上都快实现每家每户都有汽车了,那么汽车这么多的情况之下,我们在用车的过程当中肯定也就会遇到各

来源:互联网2023-03-04