Flume 进阶使用【事务机制与多Agent级联实战】

Flume 进阶使用【事务机制与多Agent级联实战】 一、前言在上一篇Flume入门文章中我们已经掌握了Flume的基础架构和Taildir Source HDFS Sink的实战配置。但在实际的大数据生产环境中单Agent往往无法满足复杂的数据采集需求。本文将深入Flume的事务机制、Agent内部处理流程以及企业级拓扑架构并通过一个多Agent级联实战案例带你从入门走向进阶。二、Flume事务机制深度解析Flume的数据可靠性核心在于其事务机制。Source向Channel发送数据、Sink从Channel拉取数据时都会开启独立的事务确保数据不丢失、不重复。2.1 事务整体流程2.2 Put事务Source → ChannelSource在往Channel发送数据之前会开启一个Put事务确保数据可靠写入阶段操作说明doPut将批量数据写入临时缓冲区putList当Source中的数据达到batchSize或超过特定时间阈值时触发发送doCommit检查Channel内存队列空间是否足够空间充足则将putList中的数据合并到Channel队列doRollbackChannel内存队列空间不足时回滚数据将putList中的数据归还等待下次重试关键理解putList是Source与Channel之间的临时缓冲区事务提交前数据不会真正进入Channel。如果提交失败数据回滚到Source端保证不丢失。2.3 Take事务Channel → SinkSink在从Channel主动拉取数据时也会开启一个Take事务阶段操作说明doTake将数据读取到临时缓冲区takeList同时尝试将数据发送到HDFS等目标存储doCommit数据全部发送成功清除takeList中的临时数据事务完成doRollback数据发送过程中出现异常将takeList中的数据归还给Channel内存队列等待重试关键理解takeList是Channel与Sink之间的临时缓冲区。如果Sink发送数据到HDFS失败事务回滚数据重新回到Channel确保不丢失。2.4 事务机制总结三、Flume Agent内部原理3.1 完整处理流程一个Event从Source进入Agent到最终从Sink输出的完整流程如下3.2 各组件详解3.2.1 Channel ProcessorChannel处理器Channel Processor是Source与Channel之间的核心协调器负责接收Source产生的Event将Event传递给拦截器链进行处理将处理后的Event交给Channel Selector进行路由根据选择器结果将Event写入对应的Channel此时才真正开启Put事务⚠️重要只有Source和Channel之间可以存在拦截器Channel和Sink之间不可以3.2.2 Interceptor拦截器拦截器可以对Event进行加工处理常见用途给Event添加Header信息如时间戳、主机名过滤不符合条件的Event修改Event的内容拦截器可以配置多个形成拦截器链按顺序处理。3.2.3 Channel SelectorChannel选择器Channel Selector决定Event应该发送到哪些Channel有两种类型类型说明使用场景Replicating默认将Source发送来的Events复制到所有Channel数据需要多副本备份Multiplexing根据Event的Header属性选择性发送到指定Channel数据需要按条件分流3.2.4 SinkProcessorSink处理器SinkProcessor负责协调多个Sink拉取Channel中的数据有三种类型类型说明特点DefaultSinkProcessor默认处理器一个Channel只能绑定一个Sink无Sink组概念LoadBalancingSinkProcessor负载均衡处理器多个Sink轮询读取Channel数据均衡负载FailoverSinkProcessor故障转移处理器每个Sink有优先级高优先级Sink挂掉后自动切换⚠️重要约束一个Sink只能绑定一个Channel但一个Channel可以绑定多个Sink四、Flume企业级拓扑结构在生产环境中单Agent往往无法满足高可用、高吞吐的需求。Flume支持多种拓扑结构适应不同的业务场景。4.1 简单串联Multi-Agent Flow适用场景数据需要经过多层处理如过滤、转换后再存储。4.2 复制和多路复用Fan Out适用场景同一份数据需要同时写入HDFS归档和Kafka实时消费。4.3 负载均衡和故障转移Sink Group适用场景高可用数据采集避免单点故障。4.4 聚合Consolidation适用场景大规模分布式日志收集是生产环境最常用的拓扑结构。五、多Agent级联——复制与多路复用5.1 案例需求使用Flume实现以下数据流Flume-1监控Hive日志文件变动Flume-1将变动内容同时传递给Flume-2和Flume-3Flume-2负责将数据存储到HDFSFlume-3负责将数据输出到本地文件系统5.2 组件选型分析AgentSource类型Sink类型说明Flume-1exec监控文件avro数据发送者需要两个Avro Sink分别发送给Flume-2和Flume-3Flume-2avro数据接收服务hdfs接收数据后写入HDFSFlume-3avro数据接收服务file_roll接收数据后写入本地目录注意file_rollSink的目标目录必须提前创建它不会自动创建目录与HDFS Sink不同5.3 核心组件参数速查Avro Sink数据发送端参数必填说明channel✓绑定的Channeltype✓必须为avrohostname✓目标Avro Source的主机名或IPport✓目标Avro Source的端口batch-size批量发送的Event数量默认100connect-timeout首次握手超时时间默认20000msrequest-timeout请求超时时间默认20000msAvro Source数据接收端参数必填说明channels✓绑定的Channeltype✓必须为avrobind✓监听的主机名或IPport✓监听的端口threads最大工作线程数File Roll Sink本地文件滚动参数必填说明channel✓绑定的Channeltype✓必须为file_rollsink.directory✓文件存储目录sink.rollInterval文件滚动间隔默认30秒0为禁用sink.batchSize批量写入数量默认100Channel Selector默认Replicating如果不指定selector.type默认就是replicating会将Event复制到所有Channel。5.4 完整配置文件5.4.1 Flume-1配置flume-file-flume.conf# # Flume-1 配置监控文件 → 复制到两个Avro Sink # 功能将Hive日志同时发送给Flume-2和Flume-3 # # 1. 定义组件 a1.sources r1 a1.sinks k1 k2 a1.channels c1 c2 # 2. Channel Selector配置复制模式可不配置默认就是replicating a1.sources.r1.selector.type replicating # 3. Source配置Exec Source监控Hive日志文件 a1.sources.r1.type exec a1.sources.r1.command tail -F /opt/module/hive-3.1.2/logs/hive.log a1.sources.r1.shell /bin/bash -c # 4. Sink配置两个Avro Sink分别发送给Flume-2和Flume-3 # Sink k1发送给Flume-2HDFS a1.sinks.k1.type avro a1.sinks.k1.hostname hadoop102 a1.sinks.k1.port 4141 # Sink k2发送给Flume-3本地文件 a1.sinks.k2.type avro a1.sinks.k2.hostname hadoop102 a1.sinks.k2.port 4142 # 5. Channel配置两个Memory Channel a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.channels.c2.type memory a1.channels.c2.capacity 1000 a1.channels.c2.transactionCapacity 100 # 6. 绑定关系 # 一个Source可以绑定多个Channel a1.sources.r1.channels c1 c2 # 一个Sink只能绑定一个Channel a1.sinks.k1.channel c1 a1.sinks.k2.channel c25.4.2 Flume-2配置flume-hdfs.conf# # Flume-2 配置Avro Source → HDFS Sink # 功能接收Flume-1的数据写入HDFS # # 1. 定义组件 a2.sources r1 a2.sinks k1 a2.channels c1 # 2. Source配置Avro Source接收数据 a2.sources.r1.type avro a2.sources.r1.bind hadoop102 a2.sources.r1.port 4141 # 3. Sink配置HDFS Sink a2.sinks.k1.type hdfs a2.sinks.k1.hdfs.path hdfs://hadoop102:9820/flume2/%Y%m%d/%H # 上传文件前缀 a2.sinks.k1.hdfs.filePrefix flume2- # 启用时间滚动文件夹 a2.sinks.k1.hdfs.round true # 每小时创建新文件夹 a2.sinks.k1.hdfs.roundValue 1 a2.sinks.k1.hdfs.roundUnit hour # 使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp true # 积攒100个Event才flush到HDFS a2.sinks.k1.hdfs.batchSize 100 # 文件类型为DataStream普通文本 a2.sinks.k1.hdfs.fileType DataStream # 每30秒生成新文件 a2.sinks.k1.hdfs.rollInterval 30 # 文件大小达到约128MB时滚动 a2.sinks.k1.hdfs.rollSize 134217700 # 文件滚动与Event数量无关 a2.sinks.k1.hdfs.rollCount 0 # 4. Channel配置 a2.channels.c1.type memory a2.channels.c1.capacity 1000 a2.channels.c1.transactionCapacity 100 # 5. 绑定关系 a2.sources.r1.channels c1 a2.sinks.k1.channel c15.4.3 Flume-3配置flume-dir.conf# # Flume-3 配置Avro Source → File Roll Sink # 功能接收Flume-1的数据写入本地文件系统 # # 1. 定义组件 a3.sources r1 a3.sinks k1 a3.channels c2 # 2. Source配置Avro Source接收数据 a3.sources.r1.type avro a3.sources.r1.bind hadoop102 a3.sources.r1.port 4142 # 3. Sink配置File Roll Sink本地文件滚动 a3.sinks.k1.type file_roll # ⚠️ 目标目录必须提前创建 a3.sinks.k1.sink.directory /opt/module/data/flume3 # 4. Channel配置 a3.channels.c2.type memory a3.channels.c2.capacity 1000 a3.channels.c2.transactionCapacity 100 # 5. 绑定关系 a3.sources.r1.channels c2 a3.sinks.k1.channel c25.5 启动与测试5.5.1 准备工作# 1. 创建Flume-3的目标目录必须提前创建mkdir-p/opt/module/data/flume3# 2. 确保Hive日志文件存在ls/opt/module/hive-3.1.2/logs/hive.log5.5.2 启动顺序重要⚠️必须先启动下游AgentFlume-2和Flume-3再启动上游AgentFlume-1否则Flume-1的Avro Sink会连接失败。# 步骤1启动Flume-3本地文件接收端cd/opt/module/flume-1.9.0/ bin/flume-ng agent-cconf/-na3-fjob/group1/flume-dir.conf# 步骤2启动Flume-2HDFS接收端bin/flume-ng agent-cconf/-na2-fjob/group1/flume-hdfs.conf# 步骤3启动Flume-1数据发送端bin/flume-ng agent-na1-cconf/-fjob/group1/flume-file-flume.conf5.5.3 验证结果验证HDFSFlume-2hdfs dfs-ls/flume2/ hdfs dfs-cat/flume2/20240129/10/flume2-*.txt验证本地文件Flume-3ls/opt/module/data/flume3/cat/opt/module/data/flume3/*六、Multiplexing多路复用配置6.1 需求场景假设日志数据中有不同类型如INFO、ERROR希望INFO类型日志 → Channel1 → HDFSERROR类型日志 → Channel2 → 本地文件单独归档6.2 配置示例# # Multiplexing多路复用配置示例 # 功能根据Event Header中的logType路由到不同Channel # a1.sources r1 a1.sinks k1 k2 a1.channels c1 c2 # 1. Source配置 a1.sources.r1.type exec a1.sources.r1.command tail -F /opt/module/logs/app.log # 2. 配置拦截器给Event添加Header a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type static a1.sources.r1.interceptors.i1.key logType a1.sources.r1.interceptors.i1.value INFO # 3. Channel SelectorMultiplexing模式 a1.sources.r1.selector.type multiplexing # 根据Header中的logType字段进行路由 a1.sources.r1.selector.header logType # logTypeINFO → c1 a1.sources.r1.selector.mapping.INFO c1 # logTypeERROR → c2 a1.sources.r1.selector.mapping.ERROR c2 # 默认路由到c1 a1.sources.r1.selector.default c1 # 4. Sink配置 a1.sinks.k1.type hdfs a1.sinks.k1.hdfs.path hdfs://hadoop102:9820/logs/info a1.sinks.k1.channel c1 a1.sinks.k2.type file_roll a1.sinks.k2.sink.directory /opt/module/logs/error a1.sinks.k2.channel c2 # 5. Channel配置 a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.channels.c2.type memory a1.channels.c2.capacity 1000 a1.channels.c2.transactionCapacity 100关键点Multiplexing必须配合拦截器使用因为需要在Event Header中添加用于路由判断的信息总结本文从Flume的事务机制出发深入讲解了Agent内部的处理流程并介绍了四种企业级拓扑结构拓扑结构特点适用场景简单串联多Agent接力传输数据需要多层处理复制/多路复用一份数据多个目的地数据备份实时消费负载均衡/故障转移高可用Sink组避免单点故障聚合多源汇聚到单目的地大规模分布式日志收集通过多Agent级联实战案例我们掌握了✅ Avro Source/Sink的跨Agent数据传输✅ Replicating Channel Selector的数据复制✅ HDFS Sink与File Roll Sink的差异化配置✅ 多Agent启动顺序的关键注意事项