Thursday, January 7, 2016

Apache Flume



https://flume.apache.org/
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
Agent component diagram

Using Flume: Flexible, Scalable, and Reliable Data Streaming
Usually, these systems have components that are responsible for accepting the data from the producer, through an RPC call or HTTP (which may be exposed via a client API). They also have components that act as buffers where the data is stored until it is removed by the components that move the data to the next hop or destination.

HDFS requires that exactly one client writes to a file.

Each Flume agent has three components: the source, the channel, and the sink. The source is responsible for getting events into the Flume agent, while the sink is responsible for removing the events from the agent and forwarding them to the next agent in the topology, or to HDFS, HBase, Solr, etc. The channel is a buffer that stores data that the source has received, until a sink has successfully written the data out to the next hop or the eventual destination.

Flume represents data as events. Events are very simple data structures, with a body and a set of headers. The body of the event is a byte array that usually is the payload that Flume is transporting. The headers are represented as a map with string keys and string values. Headers are not meant to transfer data, but for routing purposes and to keep track of priority, severity of events being sent, etc. The headers can be used to add event IDs or UUIDs to events as well.

Flume is really meant to push events in real time where the stream of data is continuous and its volume reasonably large.

A source can write to several channels, replicating the events to all or some of the channels
Channels behave like queues, with sources writing to them and sinks reading from them. Multiple sources can write to the same channel safely, and multiple sinks can read from the same channel. Each sink, though, can read from only exactly one channel. If multiple sinks read from the same channel, it is guaranteed that exactly one sink will read a specific event from the channel.

 Once the data is safely at the next hop or at its destination, the sinks inform the channels, via transaction commits, that those events can now be deleted from the channels.

 Each source has its own channel processor. Each time the source writes data to the channels, it does so by delegating this task to its channel processor. The channel processor then passes these events to one or more interceptors configured for the source.

An interceptor is a piece of code that can read the event and modify or drop the event based on some processing it does. Interceptors can be used to drop events based on some criteria, like a regex, add new headers to events or remove existing ones, etc. Each source can be configured to use multiple interceptors, which are called in the order defined by the configuration, with the result of one interceptor passed to the next in the chain.

Once the interceptors are done processing the events, the list of events returned by the interceptor chain is passed to the list of channels selected for every event in the list by the channel selector.

A source can write to multiple channels via the processor-interceptor-selector route. Channel selectors are the components that decide which channels attached to this source each event must be written to. Interceptors can thus be used to insert or remove data from events so that channel selectors may apply some criteria on these events to decide which channels the events must be written to. Channel selectors can apply arbitrary filtering criteria to events to decide which channels each event must be written to, and which channels are required and optional.

A failure to write to a required channel causes the channel processor to throw a ChannelException to indicate that the source must retry the event (all events that are in that transaction, actually), while a failure to write to an optional channel is simply ignored.

Getting Flume Agents to Talk to Each Other
The preferred RPC sink–RPC source pair for agent-to-agent communication is the Avro Sink–Avro Source pair.

Since each source can actually write to multiple channels, events can easily be replicated to make sure that each event goes to more than one destination.

The first and the simplest one is to deploy a single tier of Flume agents to receive data from the application servers and have the same agents write the data directly to the storage system.
Flume will adjust the rate of writes to the storage system by backing off for increasing amounts of time every time a write fails (up to some maximum period), so as to not overwhelm the storage system if the capacity is lower than what is required to handle the current write rate.

By writing to a Flume agent within the same data center, the application can avoid having to write data across a cross–data center WAN link, yet ensure that the data will eventually get persisted to the storage system. The communication between Flume agents can be configured to allow higher cross–data center latency to ensure that the agent-to-agent communication can complete successfully without timeouts.

Flume has features allowing applications that use the Flume API to automatically load balance between multiple Flume agents (this would be a subset of the outermost tier of Flume agents), and also allows sinks to load balance between multiple agents in the next tier via a sink processor. These two combined ensure that data flow continues if there is capacity remaining in tiers following the failed agent.

Dynamic Routing - multiplexing channel selector
Avro Source

The simplest way to gather data from these files is to tail the files by configuring Flume nodes to use Flume’s tail source:
  • tail(“/var/log/httpd/access_log”)
Getting Log Entries from Piped Log Files
The Apache 2.x’s documentation describes using piped logging with the CustomLog descriptor. Their example uses the rotatelogs program to periodically write data to new files with a given prefix. Here are some example directives that could be in the httpd.conf/apache2.conf file.
LogFormat “%h %l %u %t \”%r\” %>s %b” common
CustomLog “|/usr/sbin/rotatelogs /var/log/apache2/foo_access_log 3600? common
You can configure a Flume node to use Flume’s tailDir source to read all files without modifying the Apache settings:
  • tailDir(“/var/log/apache2/”, “foo_access_log.*”)
the second is a regex that should match against the file name.  tailDir will watch the directory and tail all files that have matching file names.
Using Piped Logs
Instead of writing data to disk and then having Flume read it, you can have Flume ingest data directly from Apache.  To do so, modify the web server’s parameters and use its piped log feature by adding some directives to the Apache server’s configuration:
CustomLog "|flume node_nowatch -1 -n apache -c \'apache:console|agentBESink(\"collector\");\'" common
CustomLog "|flume node_nowatch -1 -n apache -c \'apache:console|agentDFOSink(\"collector\");\'" common
http://www.thecloudavenue.com/2013/11/using-log4jflume-to-log-application.html
# Define the flume appender
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = localhost
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = false
log4j.appender.flume.layout=org.apache.log4j.PatternLayout

http://jinnianshilongnian.iteye.com/blog/2261225
假设我们有采集并索引Nginx访问日志,我们可以按照如下方式部署:

1、Agent和Web Server是部署在同一台机器;
2、Source使用ExecSource并使用tail命令采集日志;
3、Channel使用MemoryChannel,因为日志数据丢点也不算什么大问题;
4、Sink使用ElasticSearchSink写入到ElasticSearch,此处可以配置多个ElasticSearch服务器IP:PORT列表以便提升处理能力。

以上介绍了日志是如何流的,对于复杂的日志采集,我们需要对Source日志进行过滤、写到多个Channel、对Sink进行失败处理/负载均衡等处理,这些Flume默认都提供了支持:
2、过滤完成后接下来会交给ChannelSelector进行处理,默认提供了两种选择器:复制或多路复用选择器;复制即把一个日志复制到多个Channel;而多路复用会根据配置的选择器条件,把符合条件的路由到相应的Channel;在写多个Channel时可能存在存在失败的情况,对于失败的处理有两种:稍后重试或者忽略。重试一般采用指数级时间进行重试。

我们之前说过Source生产日志给Channel、Sink从Channel消费日志;它俩完全是异步的,因此Sink只需要监听自己关系的Channel变化即可。

到此我们可以对Source日志进行过滤/修改,把一个消息复制/路由到多个Channel,对于Sink的话也应该存在写失败的情况,Flume默认提供了如下策略:


默认策略就是一个Sink,失败了则这个事务就失败了,会稍后重试。

Flume还提供了故障转移策略:

Failover策略是给多个Sink定义优先级,假设其中一个失败了,则路由到下一个优先级的Sink;Sink只要抛出一次异常就会被认为是失败了,则从存活Sink中移除,然后指数级时间等待重试,默认是等待1s开始重试,最大等待重试时间是30s。

Flume也提供了负载均衡策略:

负载均衡算法默认提供了两种:轮训和随机;其通过抽象一个类似ChannelSelectorSinkSelector进行选择,失败补偿机制和Failover中的算法类似,但是默认是关闭失败补偿的,需要配置backoff参数为true开启。

到此Flume涉及的一些核心组件就介绍完了,对于Source和Sink如何异步、Channel提供的事务机制等我们后续分析组件时再讲。

假设我们需要采集非常多的客户端日志并对他们进行一些缓冲或集中的处理,就可以部署一个聚合层,整体架构类似于如下:

 1、首先是日志采集层,该层的Agent和应用部署在同一台机器上,负责采集如Nginx访问日志;然后通过RPC将日志流入到收集/聚合层;在这一层应该快速的采集到日志然后流入到收集/聚合层;
2、收集/聚合层进行日志的收集或聚合,并且可以进行容错处理,如故障转移或负载均衡,以提升可靠性;另外可以在该层开启文件Channel,做数据缓冲区;
3、收集/聚合层对数据进行过滤或修改然后进行存储或处理;比如存储到HDFS,或者流入Kafka然后通过Storm对数据进行实时处理。

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts