花开不停 花开不停
首页
  • 追忆杂谈
  • 书信往来
  • 旅行记录
  • 文定之喜
  • 黄金屋-颜如玉
  • 程序化广告
  • 禅茶一味
  • 随写编年
  • 家人物语
  • 追忆青春
  • 大千世界
  • Shell
  • Java
  • Spark
  • Hadoop
  • ClickHouse
  • MySQL
  • PostgreSQL
  • MongoDB
  • 调度器
  • Zookeeper
  • Kafka
  • Flume
  • 学习周刊
关于
  • 分类
  • 标签
  • 归档
开往 (opens new window)

花开不停

此心光明,亦复何言
首页
  • 追忆杂谈
  • 书信往来
  • 旅行记录
  • 文定之喜
  • 黄金屋-颜如玉
  • 程序化广告
  • 禅茶一味
  • 随写编年
  • 家人物语
  • 追忆青春
  • 大千世界
  • Shell
  • Java
  • Spark
  • Hadoop
  • ClickHouse
  • MySQL
  • PostgreSQL
  • MongoDB
  • 调度器
  • Zookeeper
  • Kafka
  • Flume
  • 学习周刊
关于
  • 分类
  • 标签
  • 归档
开往 (opens new window)
  • Shell编程

  • Java编程笔记

  • Spark

  • Hadoop

  • ClickHouse

  • MySQL

  • PostgreSQL

  • MongoDB

  • 调度器

  • Zookeeper

  • Kafka

  • Flume

    • Flume安装部署
    • FLume任务配置
      • 1. CONF
      • 2. config.properties
        • 2.1. config.properties(KafkaSource-->hdfs)
        • 2.2. config.properties(KafkaSource-->KafkaSink)
        • 2.3. log4j2.xml
      • 3. START
  • 编程世界
  • Flume
花开不停
2025-12-11
目录

FLume任务配置原创

# 1. CONF

$ cd /data/apache-flume-1.11.0-bin/conf
$ mkdir flume_test
$ touch config.properties log4j2.xml
## 根据需求选择config.properties
$ vim config.properties
$ vim log4j2.xml
1
2
3
4
5
6

# 2. config.properties

# 2.1. config.properties(KafkaSource-->hdfs)

agent.sources = kafka_source
agent.channels = mem_channel
agent.sinks = hdfs_sink

# 以下配置 source
agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.channels = mem_channel
agent.sources.kafka_source.batchSize = 5000
agent.sources.kafka_source.kafka.bootstrap.servers = kafka_server1_ip:9092,kafka_server2_ip:9092,kafka_server3_ip:9092
agent.sources.kafka_source.kafka.topics = KAFKA_TOPIC_TEST

# 以下配置 sink
agent.sinks.hdfs_sink.type = hdfs
agent.sinks.hdfs_sink.channel = mem_channel
agent.sinks.hdfs_sink.hdfs.path = /consumer/flume_test/%Y/%m/%d/%H/%M
agent.sinks.hdfs_sink.hdfs.filePrefix = flume_test
agent.sinks.hdfs_sink.hdfs.fileSuffix = .gz
agent.sinks.hdfs_sink.hdfs.rollSize = 1073741824
agent.sinks.hdfs_sink.hdfs.rollCount = 0
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent.sinks.hdfs_sink.hdfs.fileType= CompressedStream
agent.sinks.hdfs_sink.hdfs.codeC = gzip
agent.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true
agent.sinks.hdfs_sink.hdfs.writeFormat = Text
agent.sinks.hdfs_sink.hdfs.round = true
agent.sinks.hdfs_sink.hdfs.roundValue = 10
agent.sinks.hdfs_sink.hdfs.roundUnit = minute

# 以下配置 channel,(根据数据需求选择channel)
#agent.channels.mem_channel.type = file
#agent.channels.mem_channel.checkpointDir = /data/apache-flume-1.10.1-bin/conf/flume_test/checkpoint
#agent.channels.mem_channel.dataDirs = /data/apache-flume-1.10.1-bin/conf/flume_test/data
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 500000
agent.channels.mem_channel.keep-alive = 60
agent.channels.mem_channel.transactionCapacity = 10000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 2.2. config.properties(KafkaSource-->KafkaSink)

agent.sources = kafka_source
agent.channels = mem_channel
agent.sinks = kafka_sink

# 以下配置 source
agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.channels = mem_channel
agent.sources.kafka_source.batchSize = 5000
agent.sources.kafka_source.kafka.bootstrap.servers =  kafka_server1_ip:9092,kafka_server2_ip:9092,kafka_server3_ip:9092
agent.sources.kafka_source.kafka.topics = KAFKA_TOPIC_TEST
#默认flume
agent.sources.kafka_source.kafka.consumer.group.id = flume_test
agent.sources.kafka_source.kafka.consumer.auto.offset.reset = earliest
# 以下配置 source.interceptors
agent.sources.kafka_source.interceptors = MyInterceptors
agent.sources.kafka_source.interceptors.MyInterceptors.type = io.flume.FlumeTestInterceptor$Builder


# 以下配置 sink
agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka_sink.channel = mem_channel
agent.sinks.kafka_sink.kafka.topic = adx_request_log
agent.sinks.kafka_sink.kafka.bootstrap.servers = kafka_server1_ip:9092,kafka_server2_ip:9092,kafka_server3_ip:9092
agent.sinks.kafka_sink.kafka.flumeBatchSize = 5000
agent.sinks.kafka_sink.kafka.producer.acks = 0

# 以下配置 channel
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 500000
agent.channels.mem_channel.keep-alive = 60
agent.channels.mem_channel.transactionCapacity = 10000

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 2.3. log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="ERROR">
  <Properties>
    <Property name="LOG_DIR">.</Property>
  </Properties>
  <Appenders>
    <Console name="Console" target="SYSTEM_ERR">
      <PatternLayout pattern="%d (%t) [%p - %l] %m%n" />
    </Console>
    <RollingFile name="LogFile" fileName="${LOG_DIR}/flume_test.log" filePattern="${LOG_DIR}/archive/flume_test.log.%d{yyyyMMdd}-%i.gz">
      <PatternLayout pattern="%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %equals{%x}{[]}{} - %m%n" />
      <Policies>
        <!-- Roll every night at midnight or when the file reaches 100MB -->
        <SizeBasedTriggeringPolicy size="100 MB"/>
        <CronTriggeringPolicy schedule="0 0 0 * * ?"/>
      </Policies>
      <DefaultRolloverStrategy min="1" max="20">
        <Delete basePath="${LOG_DIR}/archive">
          <!-- Nested conditions: the inner condition is only evaluated on files for which the outer conditions are true. -->
          <IfFileName glob="flume.log.*">
            <!-- Only allow 1 GB of files to accumulate -->
            <IfAccumulatedFileSize exceeds="1 GB"/>
          </IfFileName>
        </Delete>
      </DefaultRolloverStrategy>
    </RollingFile>
  </Appenders>

  <Loggers>
    <Logger name="org.apache.flume.lifecycle" level="info"/>
    <Logger name="org.jboss" level="WARN"/>
    <Logger name="org.apache.avro.ipc.netty.NettyTransceiver" level="WARN"/>
    <Logger name="org.apache.hadoop" level="INFO"/>
    <Logger name="org.apache.hadoop.hive" level="ERROR"/>
    <Root level="INFO">
      <AppenderRef ref="LogFile" />
    </Root>
  </Loggers>
</Configuration>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 3. START

$ cd /data/apache-flume-1.11.0-bin/script
$ mkdir flume_test
$ touch start_flume_test.sh
$ chmod +x start_flume_test.sh
$ vim start_flume_test.sh
nohup ../bin/flume-ng agent --conf ../conf/ -f ../conf/flume_test/config.properties -n agent -Dlog4j.configurationFile=../conf/flume_test/log4j2.xml > flume_test.log 2>&1 &

## 启动 ##
$ /data/apache-flume-1.11.0-bin/script/start_flume_test.sh
1
2
3
4
5
6
7
8
9

← Flume安装部署

最近更新
01
Kafka常用命令 原创
12-11
02
Flume安装部署 原创
12-11
03
关于我为什么要爬山 原创
12-11
更多文章>
Theme by Vdoing | Copyright © 2023-2025 | 京ICP备2023013437号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式