1.实验场景
操作系统:Centos 5.5 JDK版本:1.7.0_21 Flume版本:1.3.1 Hadoop版本:0.20.2 配置1个agent ,2个collector,1个storage |
2.安装步骤JDK+flume
#下载安装jdk1.7 tar zxvf jdk-7u21-linux-x64.gz -C /usr/local/ #/etc/profile增加环境变量 pathmunge /usr/local/jdk1.7.0_21/bin export JAVA_HOME=/usr/local/jdk1.7.0_21/ export JRE_HOME=/usr/local/jdk1.7.0_21/jre export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar #验证java java -version #下载安装flume 1.3.1 Flume的下载地址 tar zxvf flume-distribution-0.9.4-bin.tar.gz -C /usr/local/ #/etc/profile增加环境变量 export FLUME_HOME=/usr/local/apache-flume-1.3.1-bin export FLUME_CONF_DIR=$FLUME_HOME/conf export PATH=.:$PATH::$FLUME_HOME/bin #验证 flume # flume-ng version Flume 1.3.1 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 77b5d2885fecb3560a873bd89f49cbac8a010347 Compiled by hshreedharan on Fri Dec 21 22:14:21 PST 2012 From source with checksum 2565bdfd8b6af459dbf85c6960f189a5 |
3.一个简单的例子
设置配置文件 [root@cc-staging-loginmgr2 conf]# cat example.conf # example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 #命令参数说明 -c conf 指定配置目录为conf -f conf/example.conf 指定配置文件为conf/example.conf -n a1 指定agent名字为a1,需要与example.conf中的一致 -Dflume.root.logger=INFO,console 指定DEBUF模式在console输出INFO信息 #启动agent cd /usr/local/apache-flume-1.3.1-bin 2013-05-24 00:00:09,288 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:150)] Source starting 2013-05-24 00:00:09,303 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] #在另一个终端进行测试 [root@cc-staging-loginmgr2 conf]# telnet 127.0.0.1 44444 Trying 127.0.0.1... Connected to localhost.localdomain (127.0.0.1). Escape character is '^]'. hello world! OK #输出 2013-05-24 00:00:24,306 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D hello world!. } #测试成功,flume可以正常使用 |
4. Flume Source测试
avro source可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制 #设置avro配置文件 [root@cc-staging-loginmgr2 conf]# cat avro.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 启动flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf #创建指定文件 echo "hello world" > /usr/logs/log.10 #使用avro-client发送文件 flume-ng avro-client -c . -H localhost -p 4141 -F #输出 2013-05-27 01:11:45,852 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world } Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out #修改的配置文件 [root@cc-staging-loginmgr2 conf]# cat exec.conf # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = cat /usr/logs/log.10 a1.sources.r1.channels = c1 启动flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf 追加内容到文件 echo "exec test" >> /usr/logs/log.10 在启动的终端查看console输出 2013-05-27 01:50:12,825 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world } 2013-05-27 01:50:12,826 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 exec test } #如果要使用tail命令,必选使得file足够大才能看到输出内容 a1.sources.r1.command = tail -F /usr/logs/log.10 #生成足够多的内容在文件里 #可以在console看到output 2013-05-27 19:17:18,157 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:155)] Exec source starting with command:tail -n 5 -F /usr/logs/log.10 2013-05-27 19:19:50,334 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 37 exec test7 } 测试3: Spooling directory source This source lets you ingest data by dropping files in a spooling directory on disk. Unlike other asynchronous sources, this source avoids data loss even if Flume is restarted or fails. SpoolSource:是监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:1) 拷贝到spool目录下的文件不可以再打开编辑。 2) spool目录下不可包含相应的子目录 #修改的配置文件 [root@cc-staging-loginmgr2 conf]# cat spool.conf # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/logs/flumeSpool a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 #启动flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf #追加内容到spool目录 [root@cc-staging-loginmgr2 ~]# echo "spool test1" > /usr/logs/flumeSpool/spool1.log 在启动的终端查看console输出 2013-05-27 22:49:06,098 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:229)] Preparing to move file /usr/logs/flumeSpool/spool1.log to /usr/logs/flumeSpool/spool1.log.COMPLETED 2013-05-27 22:49:06,101 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{file=/usr/logs/flumeSpool/spool1.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31 spool test1 } 测试4 Netcat source 参见第3部分一个简单的例子 Syslog tcp source 修改的配置文件 [root@cc-staging-loginmgr2 conf]# cat syslog.conf # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 启动flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf #, <37>因为需要wire format数据,否则会报错” Failed to extract syslog wire entry” 在启动的终端查看console输出 2013-05-27 23:39:10,755 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 73 79 73 6C 6F 67 hello via syslog } #UDP需要修改配置文件 a1.sources.r1.type = syslogudp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 测试产生syslog echo "<37>hello via syslog" | nc -u localhost 5140 测试6 HTTP source JSONHandler #修改的配置文件 [root@cc-staging-loginmgr2 conf]# cat post.conf # Describe/configure the source a1.sources = r1 a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 启动flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f post.conf -n a1 -Dflume.root.logger=INFO,console 生成JSON 格式的POST request 在启动的终端查看console输出 2013-05-28 01:17:47,186 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{namenode=namenode.example.com, datanode=random_datanode.example.com} body: 72 65 61 6C 6C 79 5F 72 61 6E 64 6F 6D 5F 62 6F really_random_bo } |