Sunday, August 30, 2015

Zookeeper Internal




ZooKeeper命名空间中的Znode,兼具文件和目录两种特点。既像文件一样维护着数据、元信息、ACL、时间戳等数据结构,又像目录一样可以作为路径标识的一部分。 每个Znode由3部分组成:
  • stat状态信息:描述该Znode的版本, 权限等信息
  • data:与该Znode关联的数据(配置文件信息、状态信息、汇集位置),数据大小至多1M
  • children:该Znode下的子节点
ZooKeeper中的每个节点存储的数据要被原子性的操作。也就是说读操作将获取与节点相关的所有数据,写操作也将替换掉节点的所有数据。另外,每一个节点都拥有自己的ACL(访问控制列表),这个列表规定了用户的权限,即限定了特定用户对目标节点可以执行的操作。

https://wiki.apache.org/hadoop/ZooKeeper/Observers
https://zookeeper.apache.org/doc/r3.5.0-alpha/zookeeperOver.html
The ZooKeeper ensemble was configured such that leaders do not allow connections from clients.
https://zookeeper.apache.org/doc/r3.4.13/zookeeperObservers.html

Observers: Scaling ZooKeeper Without Hurting Write Performance


Although ZooKeeper performs very well by having clients connect directly to voting members of the ensemble, this architecture makes it hard to scale out to huge numbers of clients. The problem is that as we add more voting members, the write performance drops. This is due to the fact that a write operation requires the agreement of (in general) at least half the nodes in an ensemble and therefore the cost of a vote can increase significantly as more voters are added.
We have introduced a new type of ZooKeeper node called an Observer which helps address this problem and further improves ZooKeeper's scalability. Observers are non-voting members of an ensemble which only hear the results of votes, not the agreement protocol that leads up to them. Other than this simple distinction, Observers function exactly the same as Followers - clients may connect to them and send read and write requests to them. Observers forward these requests to the Leader like Followers do, but they then simply wait to hear the result of the vote. Because of this, we can increase the number of Observers as much as we like without harming the performance of votes.
Observers have other advantages. Because they do not vote, they are not a critical part of the ZooKeeper ensemble. Therefore they can fail, or be disconnected from the cluster, without harming the availability of the ZooKeeper service. The benefit to the user is that Observers may connect over less reliable network links than Followers. In fact, Observers may be used to talk to a ZooKeeper server from another data center. Clients of the Observer will see fast reads, as all reads are served locally, and writes result in minimal network traffic as the number of messages required in the absence of the vote protocol is smaller.

Two example use cases for Observers are listed below. In fact, wherever you wish to scale the numbe of clients of your ZooKeeper ensemble, or where you wish to insulate the critical part of an ensemble from the load of dealing with client requests, Observers are a good architectural choice.

As a datacenter bridge: Forming a ZK ensemble between two datacenters is a problematic endeavour as the high variance in latency between the datacenters could lead to false positive failure detection and partitioning. However if the ensemble runs entirely in one datacenter, and the second datacenter runs only Observers, partitions aren't problematic as the ensemble remains connected. Clients of the Observers may still see and issue proposals.

As a link to a message bus: Some companies have expressed an interest in using ZK as a component of a persistent reliable message bus. Observers would give a natural integration point for this work: a plug-in mechanism could be used to attach the stream of proposals an Observer sees to a publish-subscribe system, again without loading the core ensemble.

https://mp.weixin.qq.com/s/lGvbfbM87flWI3zXkGdk0w

另外:follower和observer同时均为learner(学习者)角色,learner的分工是同步leader的状态。
zookeeper的各个复制集节点(follower,leader,observer)都包含了集群所有的数据且存在内存中,像个内存数据库。更新操作会以日志的形式记录到磁盘以保证可恢复性,并且写入操作会在写入内存数据库之前序列化到磁盘。
  每个ZooKeeper服务器都为客户端服务。客户端只连接到一台服务器以提交请求。读取请求由每个服务器数据库的本地副本提供服务。更改服务状态,写请求的请求由zab协议处理。
  作为协议协议的一部分,来自客户端的所有写入请求都被转发到称为leader的单个服务器。其余的ZooKeeper服务器(称为followers)接收来自领导者leader的消息提议并同意消息传递。消息传递层负责替换失败的leader并将followers与leader同步。
  ZooKeeper使用自定义原子消息传递协议zab。由于消息传递层是原子的,当领导者收到写入请求时,它会计算应用写入时系统的状态,并将其转换为捕获此新状态的事务。
  为了协调CA(一致性和可用性),用户可以自己选择是否使用Sync()操作。使用则保证所有节点强一致,但是这个操作同步数据会有一定的延迟时间。反过来若不是必须保证强一致性的场景,可不使用sync,虽然zookeeper同步的数据很快,但是此时是没有办法保证各个节点的数据一定是一致的,这一点用户要注意。实际的开发中就要开发者根据实际场景来做取舍了,看更关注一致性还是可用性。
  为了协调AP(一致性和扩展性),用户可以自己选择是否添加obsever以及添加个数,observer是3.3.0 以后版本新增角色,它不会参加选举和投票过程,目的就是提高集群扩展性。因为follower的数量不能过多,follower需要参加选举和投票,过多的话选举的收敛速度会非常慢,写数据时的投票过程也会很久。observer的增加可以提高可用性和扩展性,集群可接受client请求的点多了,可用性自然会提高,但是一致性的问题依然存在,这时又回到了上面CA的取舍问题上。

http://blog.xiaohansong.com/2016/08/25/zab/
Zookeeper 客户端会随机连接到 Zookeeper 集群的一个节点,如果是读请求,就直接从当前节点中读取数据;如果是写请求,那么节点就会向 leader 提交事务,leader 会广播事务,只要有超过半数节点写入成功,该写请求就会被提交(类 2PC 协议)。
ZAB 中的节点有三种状态
  • following:当前节点是跟随者,服从 leader 节点的命令
  • leading:当前节点是 leader,负责协调事务
  • election/looking:节点处于选举状态
代码实现中多了一种:observing 状态,这是 Zookeeper 引入 Observer 之后加入的,Observer 不参与选举,是只读节点,跟 ZAB 协议没有关系
节点的持久状态
  • history:当前节点接收到事务提议的 log
  • acceptedEpoch:follower 已经接受的 leader 更改年号的 NEWEPOCH 提议
  • currentEpoch:当前所处的年代
  • lastZxid:history 中最近接收到的提议的 zxid (最大的)
在 ZAB 协议的事务编号 Zxid 设计中,Zxid 是一个 64 位的数字,其中低 32 位是一个简单的单调递增的计数器,针对客户端每一个事务请求,计数器加 1;而高 32 位则代表 Leader 周期 epoch 的编号,每个当选产生一个新的 Leader 服务器,就会从这个 Leader 服务器上取出其本地日志中最大事务的ZXID,并从中读取 epoch 值,然后加 1,以此作为新的 epoch,并将低 32 位从 0 开始计数。
epoch:可以理解为当前集群所处的年代或者周期,每个 leader 就像皇帝,都有自己的年号,所以每次改朝换代,leader 变更之后,都会在前一个年代的基础上加 1。这样就算旧的 leader 崩溃恢复之后,也没有人听他的了,因为 follower 只听从当前年代的 leader 的命令。*

Phase 0: Leader election(选举阶段)

节点在一开始都处于选举阶段,只要有一个节点得到超半数节点的票数,它就可以当选准 leader。只有到达 Phase 3 准 leader 才会成为真正的 leader。这一阶段的目的是就是为了选出一个准 leader,然后进入下一个阶段。
协议并没有规定详细的选举算法,后面我们会提到实现中使用的 Fast Leader Election。

Phase 1: Discovery(发现阶段)

在这个阶段,followers 跟准 leader 进行通信,同步 followers 最近接收的事务提议。这个一阶段的主要目的是发现当前大多数节点接收的最新提议,并且准 leader 生成新的 epoch,让 followers 接受,更新它们的 acceptedEpoch
phase 1phase 1
一个 follower 只会连接一个 leader,如果有一个节点 f 认为另一个 follower p 是 leader,f 在尝试连接 p 时会被拒绝,f 被拒绝之后,就会进入 Phase 0。

Phase 2: Synchronization(同步阶段)

同步阶段主要是利用 leader 前一阶段获得的最新提议历史,同步集群中所有的副本。只有当 quorum 都同步完成,准 leader 才会成为真正的 leader。follower 只会接收 zxid 比自己的 lastZxid 大的提议。
phase 2phase 2

Phase 3: Broadcast(广播阶段)

到了这个阶段,Zookeeper 集群才能正式对外提供事务服务,并且 leader 可以进行消息广播。同时如果有新的节点加入,还需要对新节点进行同步。
协议的 Java 版本实现跟上面的定义有些不同,选举阶段使用的是 Fast Leader Election(FLE),它包含了 Phase 1 的发现职责。因为 FLE 会选举拥有最新提议历史的节点作为 leader,这样就省去了发现最新提议的步骤。实际的实现将 Phase 1 和 Phase 2 合并为 Recovery Phase(恢复阶段)。所以,ZAB 的实现只有三个阶段:
  • Fast Leader Election
  • Recovery Phase
  • Broadcast Phase

Fast Leader Election

前面提到 FLE 会选举拥有最新提议历史(lastZixd最大)的节点作为 leader,这样就省去了发现最新提议的步骤。这是基于拥有最新提议的节点也有最新提交记录的前提。

成为 leader 的条件

  1. epoch最大的
  2. epoch相等,选 zxid 最大的
  3. epochzxid都相等,选择server id最大的(就是我们配置zoo.cfg中的myid
节点在选举开始都默认投票给自己,当接收其他节点的选票时,会根据上面的条件更改自己的选票并重新发送选票给其他节点,当有一个节点的得票超过半数,该节点会设置自己的状态为 leading,其他节点会设置自己的状态为 following。

选举过程

FLE
这一阶段 follower 发送它们的 lastZixd 给 leader,leader 根据 lastZixd 决定如何同步数据。这里的实现跟前面 Phase 2 有所不同:Follower 收到 TRUNC 指令会中止 L.lastCommittedZxid 之后的提议,收到 DIFF 指令会接收新的提议。
history.lastCommittedZxid:最近被提交的提议的 zxid
history:oldThreshold:被认为已经太旧的已提交提议的 zxid
  • 主从架构下,leader 崩溃,数据一致性怎么保证?
    leader 崩溃之后,集群会选出新的 leader,然后就会进入恢复阶段,新的 leader 具有所有已经提交的提议,因此它会保证让 followers 同步已提交的提议,丢弃未提交的提议(以 leader 的记录为准),这就保证了整个集群的数据一致性。
  • 选举 leader 的时候,整个集群无法处理写请求的,如何快速进行 leader 选举?
    这是通过 Fast Leader Election 实现的,leader 的选举只需要超过半数的节点投票即可,这样不需要等待所有节点的选票,能够尽早选出 leader。

http://rore.im/posts/zookeeper-pub-sub-messaging

Types of znodes
The persistent znode
The ephemeral znode
The sequential znode
ZooKeeper allows distributed processes to coordinate with each other through a shared hierarchical namespace of data registers(znode)
The namespace looks quite similar to a Unix filesystem


an ephemeral znode is not allowed to have children

Persistent znodes are useful for storing data that needs to be highly available and accessible by all the components of a distributed application.

A pull model often suffers from scalability problems when implemented in large and complex distributed systems


Watches allow clients to get notifications when a znode changes in any way. A watch is a one-time operation, which means that it triggers only one notification. To continue receiving notifications over time, the client must reregister the watch upon receiving each event notification.


Any changes to the data of a znode, such as when new data is written to the znode's data field using the setData operation.

Any changes to the children of a znode. For instance, children of a znode are deleted with the delete operation.

A znode being created or deleted, which could happen in the event that a new znode is added to a path or an existing one is deleted.



ZooKeeper ensures that watches are always ordered in the first in first out (FIFO) manner and that notifications are always dispatched in order

Watch notifications are delivered to a client before any other change is made to the same znode

The order of the watch events are ordered with respect to the updates seen by the ZooKeeper service


it's possible that a client might lose changes done to a znode during this interval


when a client has set a watch for the existence of a znode that has not yet been created. In this case, a watch event will be missed if the znode is created, and deleted while the client is in the disconnected state.


Sequential consistency: This ensures that the updates from clients are always applied in a FIFO order.

Atomicity: This ensures that the updates either succeed or fail, so there is no partial commit.

Single system image: A client sees the same view of the ZooKeeper service, which doesn't depend on which ZooKeeper server in the ensemble it connects to.

Reliability: This ensures that the updates will persist once they are applied. This is until they are overwritten by some clients.

Timeliness: The clients' view of the system is guaranteed to be up-to-date within a certain time bound. This is known as eventual consistency.


ACL

World: This represents anyone who is connecting to the ZooKeeper service

Auth: This represents any authenticated user, but doesn't use any ID

Digest: This represents the username and password way of authentication

IP address: This represents authentication with the IP address of the client



ZooKeeper Atomic Broadcast (ZAB)


The message exchanged by the participant servers with their peers in the ensemble contains the server's identifier (sid) and the transaction ID (zxid) of the most recent transaction it executed. Each participating server, upon receiving a peer server's message, compares its own sid and zxid with the one it receives. If the received zxid is greater than the one held by the server, the server accepts the received zxid, otherwise, it sets and advertises its own zxid to the peers in the ensemble.


ATOMIC BROADCAST


observers do not participate in the voting processes of the two-phase commit process. Observers aid to the scalability of read requests in a ZooKeeper service and help in propagating updates in the ZooKeeper ensemble that span multiple data centers.

Recipe

Out of the Box Applications: Name Service, Configuration, Group Membership

Name service and configuration are two of the primary applications of ZooKeeper. These two functions are provided directly by the ZooKeeper API.
Another function directly provided by ZooKeeper is group membership. The group is represented by a node. Members of the group create ephemeral nodes under the group node. Nodes of the members that fail abnormally will be removed automatically when ZooKeeper detects the failure.

Barriers

Distributed systems use barriers to block processing of a set of nodes until a condition is met at which time all the nodes are allowed to proceed. Barriers are implemented in ZooKeeper by designating a barrier node. The barrier is in place if the barrier node exists. Here's the pseudo code:
  1. Client calls the ZooKeeper API's exists() function on the barrier node, with watch set to true.
  2. If exists() returns false, the barrier is gone and the client proceeds
  3. Else, if exists() returns true, the clients wait for a watch event from ZooKeeper for the barrier node.
  4. When the watch event is triggered, the client reissues the exists( ) call, again waiting until the barrier node is removed.

Leader Election

A simple way of doing leader election with ZooKeeper is to use the SEQUENCE|EPHEMERAL flags when creating znodes that represent "proposals" of clients. The idea is to have a znode, say "/election", such that each znode creates a child znode "/election/n_" with both flags SEQUENCE|EPHEMERAL. With the sequence flag, ZooKeeper automatically appends a sequence number that is greater that any one previously appended to a child of "/election". The process that created the znode with the smallest appended sequence number is the leader.
it is sufficient to watch for the next znode down on the sequence of znodes. If a client receives a notification that the znode it is watching is gone, then it becomes the new leader in the case that there is no smaller znode. Note that this avoids the herd effect by not having all clients watching the same znode.

https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#ch_zkSessions

An optional "chroot" suffix may also be appended to the connection string. This will run the client commands while interpreting all paths relative to this root (similar to the unix chroot command). If used the example would look like: "127.0.0.1:4545/app/a" or "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" where the client would be rooted at "/app/a" and all paths would be relative to this root - ie getting/setting/etc... "/foo/bar" would result in operations being run on "/app/a/foo/bar" (from the server perspective). This feature is particularly useful in multi-tenant environments where each user of a particular ZooKeeper service could be rooted differently. This makes re-use much simpler as each user can code his/her application as if it were rooted at "/", while actual location (say /app/a) could be determined at deployment time.

Apache ZooKeeper Essentials

ZooKeeper maintains a strict ordering of its transactions, which enables the implementation of advanced distributed synchronization primitives that are simple and reliable.

The ZooKeeper data model
ZooKeeper allows distributed processes to coordinate with each other through a shared hierarchical namespace of data registers.
The namespace looks quite similar to a Unix filesystem. The data registers are known as znodes

ZooKeeper has two types of znodes: persistent and ephemeral.
persistent znodes have a lifetime in the ZooKeeper's namespace until they're explicitly deleted.

an ephemeral znode is deleted by the ZooKeeper service when the creating client's session ends. 
An end to a client's session can happen because of disconnection due to a client crash or explicit termination of the connection.
Even though ephemeral nodes are tied to a client session, they are visible to all clients, depending on the configured Access Control List (ACL) policy.

Ephemerals cannot have children.
A sequential znode is assigned a sequence number by ZooKeeper as a part of its name during its creation. The value of a monotonously increasing counter (maintained by the parent znode) is appended to the name of the znode.

ZooKeeper Watches - Push Model
ZooKeeper watches are a one-time trigger. What this means is that if a client receives a watch event and wants to get notified of future changes, it must set another watch

Whenever a watch is triggered, a notification is dispatched to the client that had set the watch. Watches are maintained in the ZooKeeper server to which a client is connected, and this makes it a fast and lean method of event notification.

ZooKeeper ensures that watches are always ordered in the first in first out (FIFO) manner and that notifications are always dispatched in order

Watch notifications are delivered to a client before any other change is made to the same znode
The order of the watch events are ordered with respect to the updates seen by the ZooKeeper service

updates in ZooKeeper are non-blocking operations
Read requests: These are processed locally in the ZooKeeper server to which the client is currently connected
Write requests: These are forwarded to the leader and go through majority consensus before a response is generated

Sequential consistency: This ensures that the updates from clients are always applied in a FIFO order.

Atomicity: This ensures that the updates either succeed or fail, so there is no partial commit.
Single system image
client sees the same view of the ZooKeeper service, which doesn't depend on which ZooKeeper server in the ensemble it connects to.
Reliability: This ensures that the updates will persist once they are applied. This is until they are overwritten by some clients.
Timeliness: The clients' view of the system is guaranteed to be up-to-date within a certain time bound. This is known as eventual consistency.

ZooKeeper provides the following built-in authentication mechanisms based on ACLs:
World: This represents anyone who is connecting to the ZooKeeper service
Auth: This represents any authenticated user, but doesn't use any ID
Digest: This represents the username and password way of authentication
IP address: This represents authentication with the IP address of the client

Split-brain is a scenario when two subsets of servers in the ensemble function independently.

ZooKeeper uses a special atomic messaging protocol called ZooKeeper Atomic Broadcast (ZAB). This protocol ensures that the local replicas in the ensemble never diverge. Also, the ZAB protocol is atomic, so the protocol guarantees that updates either succeed or fail.

Phase 1 – leader election
The servers in an ensemble go through a process of electing a master server, called the leader. The other servers in the ensemble are called followers.

Each server that participates in the leader election algorithm has a state called LOOKING. If a leader already exists in the ensemble, the peer servers inform the new participant servers about the existing leader. After learning about the leader, the new servers sync their state with the leader.

When a leader doesn't exist in the ensemble, ZooKeeper runs a leader election algorithm in the ensemble of servers. In this case, to start with, all of the servers are in the LOOKING state. The algorithm dictates the servers to exchange messages to elect a leader. The algorithm stops when the participant servers converge on a common choice for a particular server, which becomes the leader. The server that wins this election enters the LEADING state, while the other servers in the ensemble enter the FOLLOWING state.

The message exchanged by the participant servers with their peers in the ensemble contains the server's identifier (sid) and the transaction ID (zxid) of the most recent transaction it executed.

Each participating server, upon receiving a peer server's message, compares its own sid and zxid with the one it receives. If the received zxid is greater than the one held by the server, the server accepts the received zxid, otherwise, it sets and advertises its own zxid to the peers in the ensemble.

At the end of this algorithm, the server that has the most recent transaction ID (zxid) wins the leader election algorithm. After the algorithm is completed, the follower servers sync their state with the elected leader.

The next step to leader election is leader activation. 
The newly elected leader proposes a NEW_LEADER proposal, and only after the NEW_LEADER proposal is acknowledged by a majority of servers (quorum) in the ensemble, the leader gets activated. The new leader doesn't accept new proposals until the NEW_LEADER proposal is committed.

Apart from leaders and followers, there can be a third personality of a server in a ZooKeeper ensemble, known as observers.
Observers and followers are conceptually similar as they both commit proposals from the leader.

However, unlike followers, observers do not participate in the voting processes of the two-phase commit process. Observers aid to the scalability of read requests in a ZooKeeper service and help in propagating updates in the ZooKeeper ensemble that span multiple data centers.

ZooKeeper recipes
Barrier
Queue
Lock
Leader election
Group membership
Two-phase commit
The two-phase commit (2PC) protocol is a distributed algorithm that coordinates all the processes that participate in a distributed atomic transaction on whether to commit or abort (roll back) the transaction. 2PC is a specialized type of consensus protocol and is widely used in transaction processing systems.

The 2PC protocol consists of two phases, which are as follows:
In the first phase, the coordinator node asks all the transaction's participating processes to prepare and vote to either commit or abort the transaction.
In the second phase, the coordinator decides whether to commit or abort the transaction, depending on the result of the voting in the first phase. If all participants voted for commit, it commits the transaction; otherwise, it aborts it. It finally notifies the result to all the participants.

It allows services to register their availability
It provides a mechanism to locate a live instance of a particular service
It propagates a service change notification when the instances of a service change

Service discovery
YARN is the next-generation compute and resource-management framework in Apache Hadoop.
It consists of a global ResourceManager (RM) and a per-application ApplicationMaster:

The RM mediates resource allocation among all the applications in the system. It has two components: Scheduler and ApplicationsManager. The Scheduler is responsible for allocating resources to the applications, and the ApplicationsManager manages job- and application-specific details. RM uses a per-node daemon called the NodeManager to monitor resource usage (CPU, memory, disk, and network) and report the same to the RM.
The ApplicationMaster is a library that is specific to every application. It runs along with the application in the Hadoop cluster, does resource negotiating with the RM, and assists the NodeManager(s) to execute and monitor the tasks.
The RM coordinates the running tasks in a YARN cluster. However, in a Hadoop cluster, only one instance of the RM runs and is a single point of failure. A ZooKeeper solution provides high availability (HA) to the RM, which allows a failover of the RM to another machine when the active one crashes.

The solution works by storing the current internal state of the RM in ZooKeeper. Since ZooKeeper itself is a highly available data store for small amount of data, it makes the RM state highly available too. Whenever an RM resumes after a restart or a crash, it loads the internal state from ZooKeeper.

An extension to the solution to provide failover capability is to have multiple RMs, of which one is in an active role and the others are mere standbys. When the active RM goes down, a leader election can be done with ZooKeeper to elect a new RM. Use of ZooKeeper prevents the potential problem of more than one node claiming the active role (fencing).

The problem is solved in a similar manner to the approach used for YARN RM. Multiple NNs are set up, of which only one NN assumes the active role, and the others remain in the standby mode. All client filesystem operations go to the active NN in the cluster, while the standby acts as a slave. The standby NN maintains enough state about the filesystem namespace to provide a fast failover.

Each of the NNs (active as well as standbys) runs a ZKFailoverController (ZKFC) in it. ZKFC maintains a heartbeat with the ZooKeeper service. The ZKFC in the active NN holds a special "lock" znode through an ephemeral znode in the ZooKeeper tree. In the event of a failure of the current active NN, the session with the ZooKeeper service expires, triggering an election for the next active NN. One among the standby NNs wins the election and acquires the active NN role.

HBase uses ZooKeeper for distributed coordination. Every RegionServer creates its own ephemeral znode in ZooKeeper, which the HMaster uses in order to discover available servers. HBase also uses ZooKeeper to ensure that there is only one HMaster running and to store the root of the regions for region discovery.

Ensuring the generation of unique values in various sequence ID generators

https://zookeeper.apache.org/doc/trunk/recipes.html

Queues

Distributed queues are a common data structure. To implement a distributed queue in ZooKeeper, first designate a znode to hold the queue, the queue node. The distributed clients put something into the queue by calling create() with a pathname ending in "queue-", with the sequence andephemeral flags in the create() call set to true. Because the sequence flag is set, the new pathnames will have the form _path-to-queue-node_/queue-X, where X is a monotonic increasing number. A client that wants to be removed from the queue calls ZooKeeper's getChildren( )function, with watch set to true on the queue node, and begins processing nodes with the lowest number. The client does not need to issue another getChildren( ) until it exhausts the list obtained from the first getChildren( ) call. If there are are no children in the queue node, the reader waits for a watch notification to check the queue again.

Apache ZooKeeper Essentials
BookKeeper can be used to reliably log streams of records. It achieves high availability through replication. Applications that need to log operations or transactions in a reliable fashion so that crash recovery can be done in case of failure can use BookKeeper.

BookKeeper is a highly available and reliable distributed logging service. Hedwig is a topic-based distributed publish/subscribe system built on BookKeeper.

Ledger: Ledgers are streams of logs that consist of a sequence of bytes. Log streams are written sequentially to a ledger in an append-only semantics. It uses the write-ahead logging (WAL) protocol.
BookKeeper client: A BookKeeper client creates ledgers. It runs in the same machine as the application and enables the application to write to the ledgers.
Bookie: Bookies are BookKeeper storage servers that store and manage the ledgers.
Metadata storage service: The information related to ledgers and bookies are stored with this service.
BookKeeper uses ZooKeeper for its metadata storage service. Whenever the application creates a ledger with the BookKeeper client, it stores the metadata about the ledger in the metadata storage service backed by a ZooKeeper instance. Clients use ZooKeeper coordination to ascertain that only a single client is writing to a ledger. The writer has to close the ledger before any other client issues a read operation on that ledger. BookKeeper ensures that after the ledger has been closed, other clients can see the same content while reading from it. The closing of a ledger is done by creating a close znode for the ledger, and the use of ZooKeeper prevents any race conditions.

Hadoop YARN (Yet Another Resource Negotiator): YARN is a distributed framework that provides job scheduling and cluster resource management
It consists of a global ResourceManager (RM) and a per-application ApplicationMaster:

The RM mediates resource allocation among all the applications in the system. It has two components: Scheduler and ApplicationsManager. The Scheduler is responsible for allocating resources to the applications, and the ApplicationsManager manages job- and application-specific details. RM uses a per-node daemon called the NodeManager to monitor resource usage (CPU, memory, disk, and network) and report the same to the RM.
The ApplicationMaster is a library that is specific to every application. It runs along with the application in the Hadoop cluster, does resource negotiating with the RM, and assists the NodeManager(s) to execute and monitor the tasks.
The RM coordinates the running tasks in a YARN cluster. However, in a Hadoop cluster, only one instance of the RM runs and is a single point of failure. A ZooKeeper solution provides high availability (HA) to the RM, which allows a failover of the RM to another machine when the active one crashes.

The solution works by storing the current internal state of the RM in ZooKeeper. Since ZooKeeper itself is a highly available data store for small amount of data, it makes the RM state highly available too. Whenever an RM resumes after a restart or a crash, it loads the internal state from ZooKeeper.

An extension to the solution to provide failover capability is to have multiple RMs, of which one is in an active role and the others are mere standbys. When the active RM goes down, a leader election can be done with ZooKeeper to elect a new RM. Use of ZooKeeper prevents the potential problem of more than one node claiming the active role (fencing).

ZooKeeper is also used in Hadoop for the purpose of achieving high availability for HDFS. The metadata node or the NameNode (NN), which stores the metadata information of the whole filesystem is a single point of failure (SPOF) in HDFS. The NN being a SPOF was a problem till Hadoop 1.x. However, in Hadoop 2.x, an automatic failover mechanism is available in HDFS, for a fast failover of the NN role from the active node to another node in the event of a crash.

The problem is solved in a similar manner to the approach used for YARN RM. Multiple NNs are set up, of which only one NN assumes the active role, and the others remain in the standby mode. All client filesystem operations go to the active NN in the cluster, while the standby acts as a slave. The standby NN maintains enough state about the filesystem namespace to provide a fast failover. Each of the NNs (active as well as standbys) runs a ZKFailoverController (ZKFC) in it. ZKFC maintains a heartbeat with the ZooKeeper service. The ZKFC in the active NN holds a special "lock" znode through an ephemeral znode in the ZooKeeper tree. In the event of a failure of the current active NN, the session with the ZooKeeper service expires, triggering an election for the next active NN. One among the standby NNs wins the election and acquires the active NN role.

In HBase architecture, there is a master server called HMaster, and a number of slave servers called RegionServer. The HMaster monitors the RegionServers, which store and manage the regions. Regions are contiguous ranges of rows stored together. The data is stored in persistent storage files called HFiles.

HBase uses ZooKeeper for distributed coordination. Every RegionServer creates its own ephemeral znode in ZooKeeper, which the HMaster uses in order to discover available servers. HBase also uses ZooKeeper to ensure that there is only one HMaster running and to store the root of the regions for region discovery. ZooKeeper is an essential component in HBase, without which HBase can't operate.

Helix provides a generic way of automatically managing the resources in a cluster. Helix acts as a decision subsystem for the cluster, and is responsible for the following tasks and many more:

Automating the reassignment of resources to the nodes
Handling node failure detection and recovery
Dynamic cluster reconfiguration (node and resource addition/deletion)
Scheduling of maintenance tasks (backups, index rebuilds, and so on)
Maintaining load balancing and flow control in the cluster
In order to store the current cluster state, Helix needs a distributed and highly available configuration or cluster metadata store, for which it uses ZooKeeper.

ZooKeeper provides Helix with the following capabilities:

This framework represents the PERSISTENT state, which remains until it's removed
This framework also represents the TRANSIENT/EPHEMERAL state, which goes away when the process that created the state leaves the cluster
This framework notifies the subsystem when there is a change in the PERSISTENT and EPHEMERAL state of the cluster
Helix also allows simple lookups of task assignments through the configuration store built on top of ZooKeeper. Through this, clients can look up where the tasks are currently assigned. This way, Helix can also provide a service discovery registry.

http://curator.apache.org/curator-recipes/distributed-queue.html
An implementation of the Distributed Queue ZK recipe. Items put into the queue are guaranteed to be ordered (by means of ZK's PERSISTENTSEQUENTIAL node). If a single consumer takes items out of the queue, they will be ordered FIFO. If ordering is important, use a LeaderSelector to nominate a single consumer.
https://cwiki.apache.org/confluence/display/CURATOR/TN4
ZooKeeper makes a very bad Queue source.
The ZooKeeper recipes page lists Queues as a possible use-case for ZooKeeper. Curator includes several Queue recipes. In our experience, however, it is a bad idea to use ZooKeeper as a Queue:
  • ZooKeeper has a 1MB transport limitation. In practice this means that ZNodes must be relatively small. Typically, queues can contain many thousands of messages.
  • ZooKeeper can slow down considerably on startup if there are many large ZNodes. This will be common if you are using ZooKeeper for queues. You will need to significantly increase initLimit and syncLimit.
  • If a ZNode gets too big it can be extremely difficult to clean. getChildren() will fail on the node. At Netflix we had to create a special-purpose program that had a huge value for jute.maxbuffer in order to get the nodes and delete them.
  • ZooKeeper can start to perform badly if there are many nodes with thousands of children.
  • The ZooKeeper database is kept entirely in memory. So, you can never have more messages than can fit in memory.
In terms of CAP theorem, Zookeeper is a CP system. It emphasizes consistency in distributed system. It is a Leader – Follower based system. Zookeeper assumes that at least n/2+1 nodes are available. This guarantees brain-split won’t happen to zookeeper.
How to write?
When a client submits a write request. This request will be forwarded to leader. Then leader will forward this write request to all followers. Only more than n/2+1 nodes acknowledged, then it means a write is successful. Write to data will be firstly persistent to disk, then it will be available in memory.
How to read?
Client reach a node, node directly returns the data values in-memory. Because they maybe network delay, the data in a node still may not be the up-to-date. So in order to keep a more strict consistency, we can call sync() first before read.
How to recover?
Zookeeper keeps a snapshot of each state called fuzzy snapshot. Because there may be network delay, it is possible that a state at a node doesn’t correspond to any state in leader. So we called it fuzzy snapshot. When a node is crashed and tried to recover, it will recover based on the fuzz snapshot it has and log from other nodes.

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts