博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
关于Flume断点续传(防止重复消费)的解决方案
阅读量:2225 次
发布时间:2019-05-09

本文共 11086 字,大约阅读时间需要 36 分钟。

背景:

前段时间写了个 ,其中我们是使用 exec source执行 tail命令来监控采集日志的,但这样做会存在一些问题:如果agent进程突然挂了,下次重启采集任务,会导致日志文件内容重复采集,虽然进程挂了这种事情不常发生,当我们还是要尽量避免因此带来的负面影响!


一、方案选择

和一些朋友交流过Flume断点续传问题,他们往往是自己修改source源码,写一个自定义的source,继承 AbstractSource 、实现 EventDrivenSource,Configurable接口;这种方案还不错,可以达到目的,但存在两个不便之处:一是自己造轮子需要开发、维护成本,二是如果团队水平不足可能导致后续各种bug。

那么Flume有没有提供现成的轮子可以解决此问题呢?答案是有的,那就是 Taildir Source

我们进入Flume官网,可以在Version 1.7.0Changes 里看到:

在这里插入图片描述
那么 Taildir Source有什么特点呢?

在这里插入图片描述

翻译如下:

注意:此source作为预览功能提供。它不适用于Windows。

观察指定的文件,并在检测到新行被添加到每个文件后能几乎实时地tail它们。如果正在写入新行,则此source将重试读取它们以等待写入完成。
此source是可靠的,即使tail的文件轮替也不会丢失数据。它定期以JSON格式写入给定位置文件上每个文件的最后读取位置。如果Flume由于某种原因stop或down,它可以从文件position处重新开始tail。
在其他用法中,此source也可以通过给定的position文件从每个文件的任意位置开始tail。当指定路径上没有position文件时,默认情况下它将从每个文件的第一行开始tail。
文件将按修改时间顺序使用。将首先使用具有最早修改时间的文件。

此source不会重命名或删除或对正在tail的文件执行任何修改。目前此source不支持tail二进制文件。它只能逐行读取文本文件。

我们可以发现,其功能的重点就在于有一个记录采集文件position记录,每次重新采集都可以从该记录中获取上一次的position,接着上次往后采集,也就是能解决断点续传的问题!


二、具体配置

在Flume的conf目录下创建配置文件:kafka-producer-moercredit.conf,内容如下:

pro.sources = s1pro.channels = c1pro.sinks = k1pro.sources.s1.type = TAILDIRpro.sources.s1.positionFile = /home/dev/flume/flume-1.8.0/log/taildir_position.jsonpro.sources.s1.filegroups = f1pro.sources.s1.filegroups.f1 = /home/dev/log/moercredit/logstash.logpro.sources.s1.headers.f1.headerKey1 = aaapro.sources.s1.fileHeader = truepro.channels.c1.type = memorypro.channels.c1.capacity = 1000pro.channels.c1.transactionCapacity = 100pro.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinkpro.sinks.k1.kafka.topic = moercredit_log_testpro.sinks.k1.kafka.bootstrap.servers = cdh1:9092,cdh2:9092,cdh3:9092pro.sinks.k1.kafka.flumeBatchSize = 20pro.sinks.k1.kafka.producer.acks = 1pro.sinks.k1.kafka.producer.linger.ms = 1pro.sinks.k1.kafka.producer.compression.type = snappypro.sources.s1.channels = c1pro.sinks.k1.channel = c1

对比上篇博客只修改了source部分,这个应该能一眼看懂意思。

要注意的是 filegroups是一组文件,可以以空格分隔,也支持正则表达式。

该source具体参数含义可以看官网:

在这里插入图片描述


三、使用测试及深入理解

建议看完,会理解更深刻一些。

bin目录下执行命令:

nohup ./flume-ng agent -n pro -c ../conf/ -f ../conf/kafka-producer-moercredit.conf >/dev/null 2>&1 &

执行后发现在当前目录下产生了一个logs目录,里面有一个flume.log文件,部分内容如下:

19 Apr 2019 14:41:03,800 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration.validateConfiguration:140)  - Post-validation flume configuration contains configuration for agents: [pro]19 Apr 2019 14:41:03,800 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:147)  - Creating channels19 Apr 2019 14:41:03,807 INFO  [conf-file-poller-0] (org.apache.flume.channel.DefaultChannelFactory.create:42)  - Creating instance of channel c1 type memory19 Apr 2019 14:41:03,816 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:201)  - Created channel c119 Apr 2019 14:41:03,817 INFO  [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:41)  - Creating instance of source s1, type TAILDIR19 Apr 2019 14:41:03,908 INFO  [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:42)  - Creating instance of sink: k1, type: org.apache.flume.sink.kafka.KafkaSink19 Apr 2019 14:41:03,916 INFO  [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:314)  - Using the static topic moercredit_log_test. This may be overridden by event headers19 Apr 2019 14:41:03,929 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.getConfiguration:116)  - Channel c1 connected to [s1, k1]19 Apr 2019 14:41:03,937 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:137)  - Starting new configuration:{ sourceRunners:{s1=PollableSourceRunner: { source:Taildir source: { positionFile: /home/dev/flume/flume-1.8.0/log/taildir_position.json, skipToEnd: false, byteOffsetHeader: false, idleTimeout: 120000, writePosInterval: 3000 } counterGroup:{ name:null counters:{} } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@30010525 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }19 Apr 2019 14:41:03,938 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:144)  - Starting Channel c119 Apr 2019 14:41:04,011 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:119)  - Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.19 Apr 2019 14:41:04,011 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:95)  - Component type: CHANNEL, name: c1 started19 Apr 2019 14:41:04,011 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:171)  - Starting Sink k119 Apr 2019 14:41:04,012 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:182)  - Starting Source s119 Apr 2019 14:41:04,014 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.TaildirSource.start:92)  - s1 TaildirSource source starting with directory: {f1=/home/dev/log/moercredit/logstash.log}19 Apr 2019 14:41:04,018 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.
:83) - taildirCache: [{filegroup='f1', filePattern='/home/dev/log/moercredit/logstash.log', cached=true}]19 Apr 2019 14:41:04,024 INFO [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.
:84) - headerTable: {f1={headerKey1=aaa}}19 Apr 2019 14:41:04,029 INFO [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile:283) - Opening file: /home/dev/log/moercredit/logstash.log, inode: 807943550, pos: 019 Apr 2019 14:41:04,031 INFO [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.
:94) - Updating position from position file: /home/dev/flume/flume-1.8.0/log/taildir_position.json19 Apr 2019 14:41:04,031 INFO [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.loadPositionFile:144) - File not found: /home/dev/flume/flume-1.8.0/log/taildir_position.json, not updating position19 Apr 2019 14:41:04,033 INFO [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:119) - Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.19 Apr 2019 14:41:04,033 INFO [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:95) - Component type: SOURCE, name: s1 started

通过该日志我们可以详细看到flume运行过程,我们重点关注这几行:

Opening file: /home/dev/log/moercredit/logstash.log, inode: 807943550, pos: 0Updating position from position file: /home/dev/flume/flume-1.8.0/log/taildir_position.jsonFile not found: /home/dev/flume/flume-1.8.0/log/taildir_position.json, not updating position

第一次执行该flume agent进程,先找到待采集的日志文件inode为807943550,然后会创建taildir_position.json文件将pos更新其中,进程运行后会马上采集完该日志文件,并更新position,我们此时查看下taildir_position.json文件内容:

[{
"inode":807943550,"pos":579077,"file":"/home/dev/log/moercredit/logstash.log"}]

其实就是个json array,每个采集文件对应一个数组元素,每个元素包含三个属性:inode(文件唯一标识号码)、pos(被采集文件的最后采集位置,也就是文件的byte字节数)、file(被采集文件的绝对路径)

扩展知识:

除了文件名以外的所有文件元信息,都存在inode之中,每个inode都有一个号码,操作系统用inode号码来识别不同的文件。

这里值得重复一遍,Unix/linux系统内部不使用文件名,而使用inode号码来识别文件。对于系统来说,文件名只是inode号码便于识别的别称或者绰号。
表面上,用户通过文件名,打开文件。实际上,系统内部这个过程分成三步:首先,系统找到这个文件名对应的inode号码;其次,通过inode号码,获取inode信息;最后,根据inode信息,找到文件数据所在的block,读出数据。
使用ls -i命令,可以看到文件名对应的inode号码:
[dev@localhost log]$ ls -i
taildir_position.json 542922380 taildir_position.json
或者通过stat命令查看inode元信息:
[dev@localhost log]$ stat taildir_position.json
File: ‘taildir_position.json’ Size: 81 Blocks: 8 IO
Block: 4096 regular file Device: fd02h/64770d Inode: 542922380
Links: 1 Access: (0664/-rw-rw-r–) Uid: ( 1000/ dev) Gid: (
1000/ dev) Context: unconfined_u:object_r:user_home_t:s0 Access:
2019-04-19 15:18:19.034511139 +0800 Modify: 2019-04-19
15:19:24.806511139 +0800 Change: 2019-04-19 15:19:24.806511139 +0800
Birth: -
由于inode号码与文件名分离,这种机制导致了一些Unix/Linux系统特有的现象。
  1. 有时,文件名包含特殊字符,无法正常删除。这时,直接删除inode节点,就能起到删除文件的作用。
  2. 移动文件或重命名文件,只是改变文件名,不影响inode号码。
  3. 打开一个文件以后,系统就以inode号码来识别这个文件,不再考虑文件名。
因此,通常来说,系统无法从inode号码得知文件名。 第3点使得软件更新变得简单,可以在不关闭软件的情况下进行更新,不需要重启。因为系统通过inode号码,识别运行中的文件,不通过文件名。更新的时候,新版文件以同样的文件名,生成一个新的inode,不会影响到运行中的文件。等到下一次运行这个软件的时候,文件名就自动指向新版文件,旧版文件的inode则被回收。

我们测试下,如果flume进程down了,重启是否会重复消费:

目前topic数据为1661条。
在这里插入图片描述

[{
"inode":807943550,"pos":585006,"file":"/home/dev/log/moercredit/logstash.log"}

重启后:

topic数据为1663条,并未重复消费(这两条是操作时新增的数据),达到了断点续传的目的!
在这里插入图片描述
查看flume.log:

Updating position from position file: /home/dev/flume/flume-1.8.0/log/taildir_position.jsonUpdated position, file: /home/dev/log/moercredit/logstash.log, inode: 807943550, pos: 585006

这时问题来了:

1.该source是根据文件名还是inode采集对应文件呢?

2.读取taildir_position.json文件中既有inode也有filepath,到底以谁为主

先看第一个问题,因为conf中的source配置的是文件路径:

pro.sources.s1.filegroups.f1 = /home/dev/log/moercredit/logstash.log
猜测是根据文件名来采集文件的,即文件名改了,会导致采集中断,再新建一个文件名和原来一样的文件,会采集新的文件

测试:

[dev@localhost moercredit]$ mv logstash.log logstash.log.bak

此时观察到flume.log发现改名后的文件被关闭,而且taildir_position.json中记录消失:

Closed file: /home/dev/log/moercredit/logstash.log, inode: 807943550, pos: 593777

然后我创建一个新文件logstash.log:

touch logstash.log

发现flume.log新增两条日志:

Opening file: /home/dev/log/moercredit/logstash.log, inode: 809248997, pos: 0Closed file: /home/dev/log/moercredit/logstash.log, inode: 809248997, pos: 0

open了新的文件(新的inode),但是马上又close了。

我再在新文件里写入几条数据,发现又open了,kafka也新增了几条消息,现在采集新的文件了,说明确实是通过文件名而不是inode来采集文件

Opening file: /home/dev/log/moercredit/logstash.log, inode: 809248999, pos: 0

观察到,2分钟没写入数据,文件又被close了:

Closed file: /home/dev/log/moercredit/logstash.log, inode: 809248999, pos: 21

这个时间是由source的idleTimeout属性控制的,默认120s,在此期间文件没新增行,则自动关闭文件,这也解决了防止文件资源一直占用的问题

在这里插入图片描述
这是我们查看taildir_position.json,发现原来的inode被新文件的inode覆盖了!这也解决了第二个问题,断点续传也是根据文件名来记录position的

[{
"inode":809248999,"pos":21,"file":"/home/dev/log/moercredit/logstash.log"}

注意的是,虽然老文件(inode=807943550)此时不在taildir_position.json文件中记录,但不意味这此时把新logstash.log删除、logstash.log.bak改回logstash.log,flume又会从头消费!

事实是依旧能断点续传,从上次position接着消费,为啥呢?taildir_position.json不是记录被覆盖了么?

看源码可以知道,内存中存在所有tairdir的详细信息:

private Map
tailFiles = Maps.newHashMap();
public class TailFile {
...... private RandomAccessFile raf; private final String path; private final long inode; private long pos; private long lastUpdated; private boolean needTail; private final Map
headers; private byte[] buffer; private byte[] oldBuffer; private int bufferPos; private long lineReadPos;}

ReliableTaildirEventReaderloadPositionFile方法中会首先读取taildir_position.json文件内容,将其inode、pos、file更新到tailFiles这个Map之中,如果taildir_position.json文件没有内容,自然使用的是内存中的tairdir pos了:

TailFile tf = tailFiles.get(inode);// 此处 path、inode、pos是taildir_position.json文件内容if (tf != null && tf.updatePos(path, inode, pos)) {
// 更新到tailFiles中 tailFiles.put(inode, tf); } else {
logger.info("Missing file: " + path + ", inode: " + inode + ", pos: " + pos);}

故其实是内存中一直存在老文件的消费pos,故只要操作中间进程不挂,依旧没问题。那进程挂了再重启还能续传么?答案是不能,因为重启后只能从磁盘文件taildir_position.json中读取pos!


我们的采集需求恰好是文件名不变(按时间滚动),所以无需改动源码扩展需求!直接就能用了。


转载地址:http://wglfb.baihongyu.com/

你可能感兴趣的文章
Intellij IDEA使用(十三)—— 在Intellij IDEA中配置Maven
查看>>
面试题 —— 关于main方法的十个面试题
查看>>
集成测试(一)—— 使用PHP页面请求Spring项目的Java接口数据
查看>>
使用Maven构建的简单的单模块SSM项目
查看>>
Intellij IDEA使用(十四)—— 在IDEA中创建包(package)的问题
查看>>
FastDFS集群架构配置搭建(转载)
查看>>
HTM+CSS实现立方体图片旋转展示效果
查看>>
FFmpeg 命令操作音视频
查看>>
问题:Opencv(3.1.0/3.4)找不到 /opencv2/gpu/gpu.hpp 问题
查看>>
目的:使用CUDA环境变量CUDA_VISIBLE_DEVICES来限定CUDA程序所能使用的GPU设备
查看>>
问题:Mysql中字段类型为text的值, java使用selectByExample查询为null
查看>>
程序员--学习之路--技巧
查看>>
解决问题之 MySQL慢查询日志设置
查看>>
contOS6 部署 lnmp、FTP、composer、ThinkPHP5、docker详细步骤
查看>>
TP5.1模板布局中遇到的坑,配置完不生效解决办法
查看>>
PHPstudy中遇到的坑No input file specified,以及传到linux环境下遇到的坑,模板文件不存在
查看>>
TP5.1事务操作和TP5事务回滚操作多表
查看>>
composer install或composer update 或 composer require phpoffice/phpexcel 失败解决办法
查看>>
TP5.1项目从windows的Apache服务迁移到linux的Nginx服务需要注意几点。
查看>>
win10安装软件 打开时报错 找不到 msvcp120.dll
查看>>