原 Flume + Solr + log4j搭建web日志采集系统【转】
3348 | 0 | 0
2、使用 Exec Source
就是通过下面的命令行产生的结果作为源,在agent 死亡或或者机器重启的过程可能会存在数据丢失agent.sources.execSrc.type = exec agent.sources.execSrc.shell=/bin/bash -c agent.sources.execSrc.command= tail -F /var/log/flume/flume.log | grep "error: "
The Flume Appender supports three modes of operation.
1、It can act as a remote Flume client which sends Flume events via Avro to a Flume Agent configured with an Avro Source.(同 步,Avro协议)
2、It can act as an embedded Flume Agent where Flume events pass directly into Flume for processing.(异步,需要维护客户端 flume)
Usage as an embedded agent will cause the messages to be directly passed to the Flume Channel and then control will be immediately returned to the application. All interaction with remote agents will occur asynchronously. Setting the "type" attribute to "Embedded" will force the use of the embedded agent. In addition, configuring agent properties in the appender configuration will also cause the embedded agent to be used.
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="warn" name="MyApp" packages=""> <Appenders> <Flume name="eventLogger" compress="true"> <Agent host="192.168.10.101" port="8800"/> <Agent host="192.168.10.102" port="8800"/> <RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/> </Flume> </Appenders> <Loggers> <Root level="error"> <AppenderRef ref="eventLogger"/> </Root> </Loggers> </Configuration>
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = org.apache.flume.clients.log4jappender.Log4jAppender a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
$ cat /etc/solr/conf/solr-env.sh export SOLR_ZK_ENSEMBLE=zk01.example.com:2181,zk02.example.com:2181,zk03.example.com:2181/solr
$ cat /etc/default/solr
//地址nn01.example.com:8020 是hdfs name node的地址
SOLR_HDFS_HOME=hdfs://nn01.example.com:8020/solr
//To create the /solr directory in HDFS,需要创建/solr hdfs目录:
$ sudo -u hdfs hdfs dfs -mkdir /solr
$ sudo -u hdfs hdfs dfs -chown solr /solr
initializing the ZooKeeper Namespace
$ sudo service solr-server restart
$ sudo service solr-server restart
// 使用默认模版创建instancedir $ solrctl instancedir --generate $HOME/weblogs_config // upload instancedir to zookeeper,上传配置 $ solrctl instancedir --create weblogs_config $HOME/weblogs_config //verify instance $ solrctl instancedir --list // create collection -s shard_count, collection 和config 关联 $ solrctl collection --create weblogs_collection -s 2 -c weblogs_config
配置flume
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4444 # Describe the sink a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink a1.sinks.k1.morphlineFile=morphlines.conf a1.sinks.k1.morphlineId = morphline_log4j2 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
[INFO ] 2017-07-14 11:40:51.556 [main] RequestMappingHandlerAdapter - Detected ResponseBodyAdvice bean in apiResponseAdvice
level: INFO create_time: 2017-07-14 11:40:51.556 thread: main class: RequestMappingHandlerAdapter - //这里有个短横线 message: Detected ResponseBodyAdvice bean in apiResponseAdvice
# grok get data from unstructured line { grok { dictionaryFiles : [grok-dictionary.conf] expressions : { message : """\[%{LOGLEVEL:level} \] %{SC_LOGDATETIME:create_time} \[%{DATA:thread}\] %{WORD:class} [-] %{GREEDYDATA:message}""" } } } # Consume the output record of the previous command and pipe another # record downstream. # # convert timestamp field to native Solr timestamp format # e.g. 2017-07-14 11:40:52.512 to 2012-09-06T07:14:34.000Z { convertTimestamp { field : create_time inputFormats : ["yyyy-MM-dd HH:mm:ss.SSS", "yyyy-MM-dd"] inputTimezone : America/Los_Angeles outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" outputTimezone : UTC } }
<field name="level" type="text_general" indexed="true" stored="true" multiValued="true"/> <field name="create_time" type="date" indexed="true" stored="true"/> <field name="thread" type="text_general" indexed="true" stored="true"/> <field name="class" type="text_general" indexed="true" stored="true"/> <field name="message" type="text_general" indexed="true" stored="true"/>
$ solrctl instancedir --update weblogs_config $HOME/weblogs_config $ solrctl collection --reload weblogs_collection
到此为止,我们完成了日志的收集,解析,索引,你可以通过 Hue来进行搜索和查询了,或者自己定义UI。这个教程比较基础也相对简单,但是可以完成基本的需求,也把日志处理流程走了一遍,剩下的大家自定义即可。
文章来源:https://my.oschina.net/tigerlene/blog/1475239
0
一默
3人已关注
领课教育 32089
9897
update 47168
4973
领课教育 18052
husheng 20971
请更新代码 41627
凯哥Java 2232
凯哥Java 2604
凯哥Java 2001