FLume任务配置原创
# 1. CONF
$ cd /data/apache-flume-1.11.0-bin/conf
$ mkdir flume_test
$ touch config.properties log4j2.xml
## 根据需求选择config.properties
$ vim config.properties
$ vim log4j2.xml
1
2
3
4
5
6
2
3
4
5
6
# 2. config.properties
# 2.1. config.properties(KafkaSource-->hdfs)
agent.sources = kafka_source
agent.channels = mem_channel
agent.sinks = hdfs_sink
# 以下配置 source
agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.channels = mem_channel
agent.sources.kafka_source.batchSize = 5000
agent.sources.kafka_source.kafka.bootstrap.servers = kafka_server1_ip:9092,kafka_server2_ip:9092,kafka_server3_ip:9092
agent.sources.kafka_source.kafka.topics = KAFKA_TOPIC_TEST
# 以下配置 sink
agent.sinks.hdfs_sink.type = hdfs
agent.sinks.hdfs_sink.channel = mem_channel
agent.sinks.hdfs_sink.hdfs.path = /consumer/flume_test/%Y/%m/%d/%H/%M
agent.sinks.hdfs_sink.hdfs.filePrefix = flume_test
agent.sinks.hdfs_sink.hdfs.fileSuffix = .gz
agent.sinks.hdfs_sink.hdfs.rollSize = 1073741824
agent.sinks.hdfs_sink.hdfs.rollCount = 0
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent.sinks.hdfs_sink.hdfs.fileType= CompressedStream
agent.sinks.hdfs_sink.hdfs.codeC = gzip
agent.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true
agent.sinks.hdfs_sink.hdfs.writeFormat = Text
agent.sinks.hdfs_sink.hdfs.round = true
agent.sinks.hdfs_sink.hdfs.roundValue = 10
agent.sinks.hdfs_sink.hdfs.roundUnit = minute
# 以下配置 channel,(根据数据需求选择channel)
#agent.channels.mem_channel.type = file
#agent.channels.mem_channel.checkpointDir = /data/apache-flume-1.10.1-bin/conf/flume_test/checkpoint
#agent.channels.mem_channel.dataDirs = /data/apache-flume-1.10.1-bin/conf/flume_test/data
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 500000
agent.channels.mem_channel.keep-alive = 60
agent.channels.mem_channel.transactionCapacity = 10000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 2.2. config.properties(KafkaSource-->KafkaSink)
agent.sources = kafka_source
agent.channels = mem_channel
agent.sinks = kafka_sink
# 以下配置 source
agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.channels = mem_channel
agent.sources.kafka_source.batchSize = 5000
agent.sources.kafka_source.kafka.bootstrap.servers = kafka_server1_ip:9092,kafka_server2_ip:9092,kafka_server3_ip:9092
agent.sources.kafka_source.kafka.topics = KAFKA_TOPIC_TEST
#默认flume
agent.sources.kafka_source.kafka.consumer.group.id = flume_test
agent.sources.kafka_source.kafka.consumer.auto.offset.reset = earliest
# 以下配置 source.interceptors
agent.sources.kafka_source.interceptors = MyInterceptors
agent.sources.kafka_source.interceptors.MyInterceptors.type = io.flume.FlumeTestInterceptor$Builder
# 以下配置 sink
agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka_sink.channel = mem_channel
agent.sinks.kafka_sink.kafka.topic = adx_request_log
agent.sinks.kafka_sink.kafka.bootstrap.servers = kafka_server1_ip:9092,kafka_server2_ip:9092,kafka_server3_ip:9092
agent.sinks.kafka_sink.kafka.flumeBatchSize = 5000
agent.sinks.kafka_sink.kafka.producer.acks = 0
# 以下配置 channel
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 500000
agent.channels.mem_channel.keep-alive = 60
agent.channels.mem_channel.transactionCapacity = 10000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 2.3. log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="ERROR">
<Properties>
<Property name="LOG_DIR">.</Property>
</Properties>
<Appenders>
<Console name="Console" target="SYSTEM_ERR">
<PatternLayout pattern="%d (%t) [%p - %l] %m%n" />
</Console>
<RollingFile name="LogFile" fileName="${LOG_DIR}/flume_test.log" filePattern="${LOG_DIR}/archive/flume_test.log.%d{yyyyMMdd}-%i.gz">
<PatternLayout pattern="%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %equals{%x}{[]}{} - %m%n" />
<Policies>
<!-- Roll every night at midnight or when the file reaches 100MB -->
<SizeBasedTriggeringPolicy size="100 MB"/>
<CronTriggeringPolicy schedule="0 0 0 * * ?"/>
</Policies>
<DefaultRolloverStrategy min="1" max="20">
<Delete basePath="${LOG_DIR}/archive">
<!-- Nested conditions: the inner condition is only evaluated on files for which the outer conditions are true. -->
<IfFileName glob="flume.log.*">
<!-- Only allow 1 GB of files to accumulate -->
<IfAccumulatedFileSize exceeds="1 GB"/>
</IfFileName>
</Delete>
</DefaultRolloverStrategy>
</RollingFile>
</Appenders>
<Loggers>
<Logger name="org.apache.flume.lifecycle" level="info"/>
<Logger name="org.jboss" level="WARN"/>
<Logger name="org.apache.avro.ipc.netty.NettyTransceiver" level="WARN"/>
<Logger name="org.apache.hadoop" level="INFO"/>
<Logger name="org.apache.hadoop.hive" level="ERROR"/>
<Root level="INFO">
<AppenderRef ref="LogFile" />
</Root>
</Loggers>
</Configuration>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 3. START
$ cd /data/apache-flume-1.11.0-bin/script
$ mkdir flume_test
$ touch start_flume_test.sh
$ chmod +x start_flume_test.sh
$ vim start_flume_test.sh
nohup ../bin/flume-ng agent --conf ../conf/ -f ../conf/flume_test/config.properties -n agent -Dlog4j.configurationFile=../conf/flume_test/log4j2.xml > flume_test.log 2>&1 &
## 启动 ##
$ /data/apache-flume-1.11.0-bin/script/start_flume_test.sh
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9