引言
今天针对线上生产环境下单机 flume 拉取kafka数据并存储数据入Hdfs 出现大批量数据延迟. 在网上官网各种搜索数据,并结合官网数据,现进行以下总结
1. 线上单机存在问题简述
当前flume拉取kafa数据量并不大 ,根据flume客户端日志 ,每半分钟hdfs文件写入一次数据生成文件 发现问题: 拉取kafka数据过慢
2. 解决思路
- 加大kafka拉取数据量
- 加大flume中channel,source,sink 各通道的单条数据量
- 将flume拉取数据单机版本改成多数据拉取,通过flume-avore-sink-> flume-avore-source 进行数据多数据采取并合并
3 加大kafka拉取数据量
3.1 kafka-source简述
- flume 输入单线程拉取数据并将数据发送内置channel并通过sink组件进行数据转发和处理,故对于kafka集群多副本方式拉取数据的时候,应适当考虑多个flume节点拉取kafka多副本数据,以避免flume节点在多个kafka集群副本中轮询。加大flume拉取kafka数据的速率。
- flume-kafka-source 是flume内置的kafka source数据组件,是为了拉取kafka数据,配置如下:
agent.sources = r1
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest
- flume-kafka-source 的offset是交由zk集群去维护offset
3.2 kafka-source配置详解
Kafka Source是一个Apache Kafka消费者,它从Kafka主题中读取消息。 如果您正在运行多个Kafka源,则可以使用相同的使用者组配置它们,以便每个源都读取一组唯一的主题分区。
Property Name |
Default |
Description |
channels |
– |
配置的channels 可配置多个channels 后续文章会说到 |
type |
– |
org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers |
– |
配置kafka集群地址 |
kafka.consumer.group.id |
flume |
唯一确定的消费者群体。 在多个源或代理中设置相同的ID表示它们是同一个使用者组的一部分 |
kafka.topics |
– |
你需要消费的topic |
kafka.topics.regex |
– |
正则表达式,用于定义源订阅的主题集。 此属性的优先级高于kafka.topics ,如果存在则覆盖kafka.topics 。 |
batchSize |
1000 |
一批中写入Channel的最大消息数 (优化项) |
batchDurationMillis |
1000 |
将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。(优化项) |
backoffSleepIncrement |
1000 |
Kafka主题显示为空时触发的初始和增量等待时间。 等待时间将减少对空kafka 主题的激进ping操作。 一秒钟是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。 |
maxBackoffSleep |
5000 |
Kafka主题显示为空时触发的最长等待时间。 5秒是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。 |
useFlumeEventFormat |
false |
默认情况下,事件从Kafka主题直接作为字节直接进入事件主体。 设置为true以将事件读取为Flume Avro二进制格式。 与KafkaSink上的相同属性或Kafka Channel上的parseAsFlumeEvent属性一起使用时,这将保留在生成端发送的任何Flume标头。 |
setTopicHeader |
true |
设置为true时,将检索到的消息的主题存储到标题中,该标题由topicHeader 属性定义。 |
topicHeader |
topic |
如果setTopicHeader 属性设置为true ,则定义用于存储接收消息主题名称的标题的名称。 如果与Kafka SinktopicHeader 属性结合使用,应该小心,以避免在循环中将消息发送回同一主题。 |
migrateZookeeperOffsets |
true |
如果找不到Kafka存储的偏移量,请在Zookeeper中查找偏移量并将它们提交给Kafka。 这应该是支持从旧版本的Flume无缝Kafka客户端迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果未找到Zookeeper偏移量,则Kafka配置kafka.consumer.auto.offset.reset定义如何处理偏移量。 查看[Kafka文档](http://kafka.apache.org/documentation.html#newconsumerconfigs)了解详细信息 |
kafka.consumer.security.protocol |
PLAINTEXT |
如果使用某种级别的安全性写入Kafka,则设置为SASL_PLAINTEXT,SASL_SSL或SSL。 |
Other Kafka Consumer Properties |
– |
这些属性用于配置Kafka Consumer。 可以使用Kafka支持的任何消费者财产。 唯一的要求是在前缀为“kafka.consumer”的前缀中添加属性名称。 例如:kafka.consumer.auto.offset.reset |
注意: Kafka Source会覆盖两个Kafka使用者参数:source.com将auto.commit.enable设置为“false”,并提交每个批处理。 Kafka源至少保证一次消息检索策略。 源启动时可以存在重复项。 Kafka Source还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值。 不建议修改这些参数。 官方配置示例:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
Example for topic subscription by regex
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used
本案例kafka-source配置
agent.sources = r1
agent.sources.r1.channels=c1
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest
官网配置文件地址 : kafka-source
3.3 配置优化
主要是在放入flume-channels 的批量数据加大 更改参数: agent.sources.r1.batchSize = 50000 agent.sources.r1.batchDurationMillis = 2000 更改解释: 即每2秒钟拉取 kafka 一批数据 批数据大小为50000 放入到flume-channels 中 。即flume该节点 flume-channels 输入端数据已放大
更改依据:
- 需要配置kafka单条数据 broker.conf 中配置
message.max.bytes
- 当前flume channel sink 组件最大消费能力如何?
4. 加大flume中channel,source,sink 各通道的单条数据量
4.1 source 发送至channels 数据量大小已配置 见 3.3
4.2 channel 配置
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be memory |
capacity |
100 |
通道中存储的最大事件数 (优化项) |
transactionCapacity |
100 |
每个事务通道从源或提供给接收器的最大事件数 (优化项) |
keep-alive |
3 |
添加或删除事件的超时(以秒为单位) |
byteCapacityBufferPercentage |
20 |
定义byteCapacity与通道中所有事件的估计总大小之间的缓冲区百分比,以计算标头中的数据。 见下文。 |
byteCapacity |
see description |
允许的最大总字节作为此通道中所有事件的总和。 实现只计算Eventbody ,这也是提供byteCapacityBufferPercentage 配置参数的原因。 默认为计算值,等于JVM可用的最大内存的80%(即命令行传递的-Xmx值的80%)。 请注意,如果在单个JVM上有多个内存通道,并且它们碰巧保持相同的物理事件(即,如果您使用来自单个源的复制通道选择器),那么这些事件大小可能会因为通道byteCapacity目的而被重复计算。 将此值设置为“0”将导致此值回退到大约200 GB的内部硬限制。 |
配置 capacity 和 transactionCapacity 值 。默认配置规则为: $$ channels.capacity >= channels.transactionCapacity >= source.batchSize $$ 官方channels配置示例
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
本案例修改之后的channels 配置
agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000
5. 将flume拉取数据单机版本改成多数据拉取,通过flume-avore-sink-> flume-avore-source 进行数据多数据采取并合并
5.1 存在问题
通过上续修改会发现单机版本的flume会在多副本kafka轮询造成效率浪费 单机版本flume处理数据时会存在单机瓶颈,单机channels可能最多只能处理最大数据无法扩充 单机flume配置多个数据源不方便,不能适合后续多需求开发
5.2 修改架构
5.3采集节点配置文件
收集节点配置(3台):
agent.sources = r1
agent.channels = c1
agent.sinks = k1
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = qcloud-test-hadoop03:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest
agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000
agent.sinks.k1.type = avro
agent.sinks.k1.hostname = test-hadoop03
agent.sinks.k1.port=4545
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
汇总节点配置(1台):
agent.sources = r1
agent.channels = memoryChannel
agent.sinks = hdfsSink
agent.sources.r1.type = avro
agent.sources.r1.bind = ip
agent.sources.r1.port = 4545
agent.sources.r1.batchSize = 100000
agent.sources.r1.batchDurationMillis = 1000
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.keep-alive=30
agent.channels.memoryChannel.capacity=120000
agent.channels.memoryChannel.transactionCapacity=100000
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.hdfs.path=hdfs://nameser/data/hm2/%Y-%m-%d-%H
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.rollCount = 0
agent.sinks.hdfsSink.hdfs.rollSize = 134217728
agent.sinks.hdfsSink.hdfs.rollInterval = 60
agent.sinks.hdfsSink.hdfs.fileType=DataStream
agent.sinks.hdfsSink.hdfs.idleTimeout=65
agent.sinks.hdfsSink.hdfs.callTimeout=65000
agent.sinks.hdfsSink.hdfs.threadsPoolSize=300
agent.sinks.hdfsSink.channel = memoryChannel
agent.sources.r1.channels = memoryChannel
5.4 架构注意点
- 当前架构需要保证聚合节点机器的性能
- 当前架构新的瓶颈可能会存在存储Hdfs数据时过慢 ,导致聚合节点Channels 占用率居高不下,导致堵塞 。
- 需要关注avro 自定义source sink 的发送效率
6.flume 监控工具(http)
flume 监控工具总共有三种方式 ,我们这里为方便简单,使用内置http接口监控方式进行操作
6.1 配置
在启动脚本处设置 参数 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 即可
6.2 访问 地址 :
http://flumeIp:34545
6.3 返回结果示例 和字段解释 :
{
"CHANNEL.memoryChannel": {
"ChannelCapacity": "550000",
"ChannelFillPercentage": "0.18181818181818182",
"Type": "CHANNEL",
"ChannelSize": "1000",
"EventTakeSuccessCount": "33541400",
"EventTakeAttemptCount": "33541527",
"StartTime": "1536572886273",
"EventPutAttemptCount": "33542500",
"EventPutSuccessCount": "33542500",
"StopTime": "0"
},
"SINK.hdfsSink": {
"ConnectionCreatedCount": "649",
"ConnectionClosedCount": "648",
"Type": "SINK",
"BatchCompleteCount": "335414",
"BatchEmptyCount": "27",
"EventDrainAttemptCount": "33541500",
"StartTime": "1536572886275",
"EventDrainSuccessCount": "33541400",
"BatchUnderflowCount": "0",
"StopTime": "0",
"ConnectionFailedCount": "0"
},
"SOURCE.avroSource": {
"EventReceivedCount": "33542500",
"AppendBatchAcceptedCount": "335425",
"Type": "SOURCE",
"EventAcceptedCount": "33542500",
"AppendReceivedCount": "0",
"StartTime": "1536572886465",
"AppendAcceptedCount": "0",
"OpenConnectionCount": "3",
"AppendBatchReceivedCount": "335425",
"StopTime": "0"
}
}
参数定义:
字段名称 |
含义 |
备注 |
SOURCE.OpenConnectionCount |
打开的连接数 |
|
SOURCE.TYPE |
组件类型 |
|
SOURCE.AppendBatchAcceptedCount |
追加到channel中的批数量 |
|
SOURCE.AppendBatchReceivedCount |
source端刚刚追加的批数量 |
|
SOURCE.EventAcceptedCount |
成功放入channel的event数量 |
|
SOURCE.AppendReceivedCount |
source追加目前收到的数量 |
|
SOURCE.StartTime(StopTIme) |
组件开始时间、结束时间 |
|
SOURCE.EventReceivedCount |
source端成功收到的event数量 |
|
SOURCE.AppendAcceptedCount |
source追加目前放入channel的数量 |
|
|
|
|
CHANNEL.EventPutSuccessCount |
成功放入channel的event数量 |
|
CHANNEL.ChannelFillPercentage |
通道使用比例 |
|
CHANNEL.EventPutAttemptCount |
尝试放入将event放入channel的次数 |
|
CHANNEL.ChannelSize |
目前在channel中的event数量 |
|
CHANNEL.EventTakeSuccessCount |
从channel中成功取走的event数量 |
|
CHANNEL.ChannelCapacity |
通道容量 |
|
CHANNEL.EventTakeAttemptCount |
尝试从channel中取走event的次数 |
|
|
|
|
SINK.BatchCompleteCount |
完成的批数量 |
|
SINK.ConnectionFailedCount |
连接失败数 |
|
SINK.EventDrainAttemptCount |
尝试提交的event数量 |
|
SINK.ConnectionCreatedCount |
创建连接数 |
|
SINK.Type |
组件类型 |
|
SINK.BatchEmptyCount |
批量取空的数量 |
|
SINK.ConnectionClosedCount |
关闭连接数量 |
|
SINK.EventDrainSuccessCount |
成功发送event的数量 |
|
SINK.BatchUnderflowCount |
正处于批量处理的batch数 |
|
参考地址 flume-document : http://flume.apache.org/FlumeUserGuide.html |