ZooKeeper as distributed consensus service
The ability to bring consensus in
a network of unreliable services is at the heart of any fault-tolerant
distributed system. Apache ZooKeeper is a high-performance open-source server
that provides a highly reliable distributed co-ordination and meets the
production demands of the web-scale, mission critical applications. ZooKeeper
is popular among multi-host, multi-process applications (C, Java bindings)
running in data centers. This paper discusses how ZooKeeper is used to
implement distributed consensus among the available services.
Every complex distributed
application needs a service for maintaining configuration information, naming,
providing distributed synchronization, and/or providing group services. Because
of the complexity in implementing these solutions for distributed computing,
the more natural preference is to use a tried, tested and reliable solution
available in the market.
ZooKeeper provides a simple interface
to a centralized coordinating service. The service itself is distributed and
highly reliable and hence prevents it from being the single point of failure in
big systems. Some of the more common services that ZooKeeper provides are:
consensus, group management, name service, and configuration management. The
applications using zookeeper can build more powerful abstractions like event
notification, leader election, locking and priority queues. Some of the key
advantages of using ZooKeeper are low latency, high throughput, high
availability and strict ordered access to the znode.
Apache Zookeeper
ZooKeeper service uses the
“quorum” model and will only be available if a majority of servers are alive.
The servers in the ZooKeeper service must know about each other. Also, the
clients should know the list of servers in the ZooKeeper service. The client
maintains a TCP connection to a single ZooKeeper server through which it sends
requests, gets responses, gets watch events, and sends heartbeats. If the
connection to the server breaks, the client needs to connect to a different
server. When a client connects to a ZooKeeper service, the first eligible
server will create a session for it. In the event of a connection failure, this
session gets re-established in a different server to which the client connects.
All updates to ZooKeeper are ordered. Watches and reads are ordered with
respect to updates. Each update is stamped with a unique ZooKeeper Transaction
Id (zxid). All updates pass via the leader (which assigns zxid to all updates)
and are considered complete when a quorum confirms the update. From the results
of published performance experiments, it has been found that around 50K updates
per second can be pushed through a leader and the associated quorum. Serializing
all updates greater than this number through a single leader can be a
performance bottleneck.
Read and Watch requests on the
znode sent by a client to a ZooKeeper server are processed locally. But, write
requests are propagated to other servers and go through a consensus before a
response is generated. In the case of sync requests, they are forwarded to
other servers but do not go through consensus. Thus throughput of read request
increases with number of servers and throughput of write requests, decreases
with number of servers. Read requests go to any member in the cluster and
therefore there is a possibility of getting stale data for a small window of
time, if there was latency in receiving and processing the update.
For the service to be reliable and
scalable, it is replicated over a set of machines. ZooKeeper uses a version of
the famous Paxos
algorithm, to keep replicas consistent. An in-memory image of the
data tree, transaction logs and snapshots are stored in the replicated
machines. Since the data is in-memory, it can achieve low latency and high
throughput. However, complete replication limits the total size of data that
can be managed using ZooKeeper.
For event notification, zookeeper
resembles a state-based system rather than a event-based system. Watch events
can be used to refresh and fetch new values but cannot be used to log changes.
However, the change history can be designed to be included in the data itself.
Data model
ZooKeeper employs a hierarchical
namespace of data registers called znodes, similar to a file system. Its path
identifies the znode. A path is a sequence of path elements separated by a
slash expressed as canonical, absolute path. Except root node, every znode has
a parent. A znode cannot be deleted if it has any children but can be added at
any point in the hierarchy. In the ZooKeeper world:
znodes - data nodes
servers - machines that make up the
ZooKeeper service
quorum peers - the servers that make up
an ensemble
client - any
host or process which uses the service.
Znode
Each znode can store a small
amount (measured in kilobytes) of co-ordination data such as status
information, configuration, location information etc that is versioned and
time-stamped. This is referred to as the stat structure. The data storage model
is completely non relational.
Depending on the use case with
respect to client session, znode can be classified as:
- permanent (used to store managed data),
- ephemeral - exists for the lifetime of its client session; cannot have children
- sequential – for unique naming; append counter to the end of path
Ephemeral and sequential znodes
can be used for implementing synchronization related tasks such as locks,
queues, barriers, transactions, elections etc in a distributed environment.
Some of the primitive features inherently supported are:
1.
Fetch value for a znode entry.
2.
Fetch the list of znodes for a parent znode.
3.
Receive notifications when a znode value gets
modified
4.
Receive notifications when new znodes are added
to a parent or when existing znodes belonging to a parent is removed.
ZooKeeper keeps data in memory
(and backs it to a log for reliability) and hence it is very fast and can
handle high loads. However, the downside to this is that, it cannot be used, as
a general data store since the data stored in memory is limited. ZooKeeper has
a built-in sanity check of 1MB to prevent it from becoming a large data store.
Typically,
each zookeeper application creates a “namespace” node (e.g: /app1) for each
zookeeper application. The application then creates individual nodes under the
namespace node.
Connectivity,
State and Sessions
ZooKeeper client creates a handle
to establish a session with the ZooKeeper service. The client library connects
to an arbitrary server. ZooKeeper keeps a session for each active client and
uses a heartbeat mechanism to keep an active connection with its clients. When
a client is disconnected from ZooKeeper for more than a specified period its
session expires. If this connection fails or gets disconnected, the client will
retry to connect to another server.
Figure: state
transitions of a ZooKeeper client
Watches
Clients having sessions with the
ZooKeeper nodes can watch for events in the distributed environment it is
running in. A watch event (function/context pair) is one-time lightweight
trigger sent asynchronously to its watchers when the data or child has changed
and follows the same order of the updates as seen by the ZooKeeper service. It
is managed locally at the server to which the client is connected. Zookeeper
guarantees that the clients will first see the watch event before seeing the
change for which it has set a watch. When an event happens, ZooKeeper notifies
all the clients listening to this event. The ZooKeeper client is responsible
for handling the case in its own instance and setting a new watch if it is
interested in future events. The client is responsible for handling session
expiration and re-persisting ephemeral nodes after expiration. Upon
re-connection to a new server, any previously registered watches will be
reregistered and triggered if needed.
Consistency
Guarantees
Following are the consistency
guarantees provided by ZooKeeper:
·
Sequential Consistency – ZooKeeper makes the
updates in the order sent by the clients.
·
Atomicity – All or nothing rule on updates, no
partial results will be seen.
·
Single System Image – Clients will see the same
view of the service in any server that it connects to.
·
Reliability – Updates once done will persist
until it is over-written
o
On a successful return code, the update will
have been applied.
o
Updates seen by the client will never be rolled
back when recovering from server failures
·
Timeliness – view of the system is up-to-date
within a certain time bound.
Zookeeper does not guarantee
consistent simultaneous cross-client views.
ZAB Protocol
ZooKeeper atomic broadcast protocol
(ZAB) is a totally ordered broadcast protocol that is used to implement
ZooKeeper’s client guarantees and maintain consistent replicas of ZooKeeper
state at each server (replicated state machines). ZAB is used to propagate
state changes produced by the ZooKeeper leader and to reach a quorum.
ZAB has two modes of operation:
broadcast mode and recovery mode.
Broadcast mode: Resembles a simple two-phase commit.
Leader assigns a unique id (zxid)
to the message with the proposal and sends it to the outgoing queue through the
FIFO (TCP) channels for each follower. When the follower receives the proposal,
it writes it to its disk and sends an acknowledgement to the leader. When the
leader receives the acknowledgement from the quorum it broadcasts a COMMIT and
delivers the message locally. Followers deliver the message to the listeners
once they receive COMMIT from the leader.
Figure: ZAB
broadcast mode
Recovery mode:
This is the mode used during
service startup or upon leader failure. This mode ends when the zookeeper
ensemble elects a leader.
There are two guarantees provided
by this mode for maintaining consistent zookeeper global state.
·
Delivered Messages will not be missed – There
could be consistency issues if the leader crashes after it commits locally but
before the followers commit. In this case, the clients connected to the leader
instance would have seen the effects of committing and hence the transaction
has to be committed in followers to preserve consistency. To overcome this, the
follower with the highest proposal zxid is selected as the new leader in the
event of leader crash. This will ensure that the new leader is up to date with
all the proposals. The two-phase nature of broadcast protocol implies that the
partially committed proposal will be present in the follower’s transaction log.
The new elected leader will then just have to replay any missed proposals from
its transaction log to the followers so that they are in sync with the leader.
In the above
example, the old leader has locally committed proposal 2. Since the new leader
has the proposal in its transaction log, it will be replayed to the follower to
make the ensemble consistent.
·
A skipped message must remain skipped - When a
leader generates a proposal and fails before anyone sees the proposal, then the
proposal have to be skipped. This is again easy to guarantee, as the new leader
will not have the proposal in its transaction logs. By replaying only proposals
only in the local transaction log, the unsent proposal can be skipped.
In the above
example, the old leader has locally not sent proposal 3 outside. This has to be
skipped when the new leader gets elected. Since the new leader does not have
the proposal in its transaction log, it will be skipped.
Use Cases
Data centers running complex
back-end systems having hundreds of nodes needs a backplane over which all
these nodes and its subsystems coordinate. In the event of a host shutdown the
rest of the servers should self-organize themselves and elect a new primary. Having
complicated retry logic in application code to manage failures and electing the
new candidate is a tedious task. ZooKeeper supports this type of coordination
behavior and can be used as the service locator for such systems. ZooKeeper
sends an event to all listening nodes to react for electing the new primary
service. Each process goes to ZooKeeper to find the primary service. The
advantage of this system is that it can run easily both locally on one machine
or on many machines in the data center.
Consider the case where the
configuration settings for state machines running in many processes on
different hosts needs to be changed. There are different approaches to solving
the problem. New code release with the changed configuration, is possible but not
desirable. Configuration changes made in a distribution package and pushed to
all nodes with each process checking to see if the configuration settings have
changed can add to the complexities if separate configurations are made for
each sub-system. Tracking packages running in nodes, dealing with rollback in
case of package workflow failures, testing the changes and pushing it to
production can all make this approach very undesirable. A better approach is to
embed a web server in each process to check the metrics and change the
configuration dynamically. Although the later is more powerful for a single
process it is more tedious for a large number of processes as in a data center.
A simple and straightforward
approach is to use ZooKeeper to store the state machine definition as a node.
Nodes are created using the configuration properties collected from the
distribution packages in a product. All processes dependent on that node can
register as a watcher when they initially read the state machine. All listening
entities will receive an event when an update happens in the state machine.
These entities then reload the state machine into the process. All processes
will eventually get the change.
However, there is a limitation to this approach. The application’s state machine using ZooKeeper is dependent on ZooKeeper’s state machine. On the occurrence of an event, the application must handle it. When a ZooKeeper server dies, the application must re-initiate all its watches on a new server. The application needs to manage higher order operations like locks and queues. Also, ZooKeeper dealing with state stored in the application may not be thread-safe since callbacks from the ZooKeeper thread could access shared data structures. For a thread-safe behavior, Actor model can be employed to store ZooKeeper events into the application’s Actor queues for synthesizing different states.
However, there is a limitation to this approach. The application’s state machine using ZooKeeper is dependent on ZooKeeper’s state machine. On the occurrence of an event, the application must handle it. When a ZooKeeper server dies, the application must re-initiate all its watches on a new server. The application needs to manage higher order operations like locks and queues. Also, ZooKeeper dealing with state stored in the application may not be thread-safe since callbacks from the ZooKeeper thread could access shared data structures. For a thread-safe behavior, Actor model can be employed to store ZooKeeper events into the application’s Actor queues for synthesizing different states.
ZooKeeper
recipes
Zookeeper
provides client guarantees like ordered delivery, high availability and
functions like name service, group membership and configuration management from
which higher order synchronization functions can be built. This section
provides algorithm for building functions like leader election, barriers and
locks.
Leader
Election
Leader
election refers to the problem of selecting a leader among a group of nodes
belonging to one logical cluster. This
is a hard problem in the face of node crashes. With Zookeeper, the problem can
be solved using the following algorithm:
- Create a permanent znode (/election/) in zookeeper.
- Each client in the group creates an ephemeral node with “sequence” flag set (/election/node_). The sequence flag ensures that Zookeeper appends a monotonically increasing number to the node name. For instance, the first client which creates the ephemeral node will have the node named /election/node_1 while the second client creates node /election/node_2
- After the ephemeral node creation, the client will fetch all the children nodes under /election. The client that created the node with smallest sequence is elected the leader. This is an unambiguous way of electing the leader.
- Each client will watch the presence of node with sequence value that is one less than its sequence. If the “watched” node gets deleted, the client will again repeat step 3 to check if it has become the leader.
- If all clients need to know of the current leader, they can subscribe to group notifications for the node “/election” and determine the current leader on their own.
Barriers
Barriers
are used in distributed systems to block processing of a set of nodes until a
condition is satisfied. The algorithm for implementing the barrier using
zookeeper is given below
- Create a permanent node designated as barrier node. (“/barrier”)
- Each client checks for presence of the barrier node by creating watch event on the node. If the node is present, the barrier exist and the client just waits.
- Once the barrier exit condition is met, the client which controls the barrier will delete the barrier node.
- All clients will be notified of this deletion and the clients will be able to proceed.
Locks
Zookeeper
can be used to construct global locks. Here is the algorithm:
Acquiring a Lock:
- Create a permanent node “/lock” in Zookeeper.
- Each client in the group creates an ephemeral node with “sequence” flag set (/lock/node_).
- After the ephemeral node creation, the client will fetch all the children nodes under /lock. The client that created the node with smallest sequence is said to be holding the lock.
- Each client will watch the presence of node with sequence value that is one less than its sequence. If the “watched” node gets deleted, the client will again repeat step 3 to check if it is holding the lock.
Releasing a Lock:
1.
The client that holds the lock deletes the ephemeral node it created.
2.
The client which created the next highest sequence node will be
notified and will hold the lock.
3.
If all clients need to know about the change of lock ownership, they
can listen to group notifications for the node “/lock” and determine the
current owner.
Group
membership
In addition
to the above higher order functions, Group membership is a “popular” ready-made
feature available in Zookeeper. This
function is achieved by exploiting the child notification mechanism. Here are the details:
- A permanent node (e.g (“/mygroup/”) represents the logical group node.
- Clients create ephemeral nodes under the group node to indicate membership.
- All the members of the group will subscribe to the group “/mygroup” thereby being aware of other members in the group.
- If the client shuts down (normally or abnormally), zookeeper guarantees that the ephemeral nodes corresponding to the client will automatically be removed and group members notified.
Performance
Read requests in Zookeeper can be
handled by any Zookeeper instance in the quorum. But, write requests have to be
forwarded to the leader and broadcasted using ZAB. Clearly, Write request will
be the bottleneck. Lets look at the result of a published performance study
done on Zookeeper
From the below throughput graph,
its clear that for a given number of clients, the throughput increases with
increasing percentage of read requests in the request-set.
The below table shows the elapsed
time for completing 110K operations for varying number of clients and varying
number of cores.
# of Clients
|
1 Core
|
2 Core
|
4 Core
|
1
|
22 sec
|
20 sec
|
20 sec
|
10
|
52 sec
|
35 sec
|
30 sec
|
20
|
93 sec
|
65 sec
|
53 sec
|
From the above table, we can observe that increasing the
number of cores provides more benefit for higher number of clients.
Access control
list
ZooKeeper maintains an ACL to
control access to its znodes. ZooKeeper’s ACLs apply only to specific nodes and
are not recursive to its children. ACLs are made up of pairs of (scheme:expression,
permssions). ACL permissions supported
are: CREATE, READ, WRITE, DELETE, ADMIN.
Built-in
ACL schemes:
*world
- represents anyone
*auth
- represents any authenticated user
*digest
- username:password string to generate MD5 hash used as an ACL ID
* ip - client host IP as an
ACL ID identity.
Error-Handling
ZooKeeper errors/exception can be
classified as below:
* Normal state exceptions: the application needs to handle such
exceptions when they occur.
Eg: trying to
create a znode that already exists
* Recoverable
errors: indicate a problem that occurred while the ZooKeeper handle is
still valid and future operations will succeed once the connection is
re-established. They are passed back to the application because ZooKeeper
cannot recover from them by itself.
Eg: disconnected
event, connection timed out, connection loss exception
* Fatal errors: occurs when
the ZooKeeper handle has become invalid. The application that receives this
error needs to shutdown all entities that relied on the previous ZooKeeper
handle and re-start the entire process. When a library receives this error, it
needs to mark it’s state as invalid and cleanup it’s internal data structure
and shutdown gracefully.
Eg: session
expiration, explicit close, authentication errors
Troubleshooting
Frequent timeouts, session expirations, poor
performance and high operation latencies are some of the issues commonly
reported in a ZooKeeper cluster. Using
JMX, accessing log4j log, accessing the statistics through four letter words
are some of the ways to collect important and useful information for
troubleshooting. Some of the common commands are: ifconfig (to verify network switch), hdparm (with -t
and -T options, to verify the performance of persistent storage). However this is a difficult process. In
order to easily troubleshoot problems, it is important to monitor the operating
environment in which the ZooKeeper is running. Monitoring can be done at host
level and/or process level. ZooKeeper server JMX interface can be used to
collect information on latencies and JVM workings. Following are points to take
care while configuring ZooKeeper:
- The list of ZooKeeper servers used by the clients must be a subset of servers that each ZooKeeper server has in the same cluster. The list of server in the configuration file for each ZooKeeper server should be consistent.
- If the storage device is limited, trace files needs to be placed on NFS to maintain performance
- Java max heap size should be set below the usage limit that would cause the system to swap
When a ZooKeeper client gets
disconnected, it will not receive any notifications until it is reconnected.
The client is also responsible for recovering its state and outstanding
requests that failed during a disconnection. Common causes of failure:
·
Client disconnects/Session timeout – occurs
because of IO issues, JVM swapping, virtual machine/system/network latencies,
etc…
·
Client side swapping resulting in client
disconnection
·
Network interface card (NIC) mis-configuration
·
Low-grade network switch
·
Resource issues causing latency in virtual
environments
·
Large latencies in cloud environments due to
resource contention
· Error-correcting code memory (ECC memory) causing the server to be more slow
·
Starvation of the heartbeat thread in the virtual
machine caused by java garbage collection
Popular Applications and Companies using Zookeeper
Some of the better known
applications and organizations using ZooKeeper:
·
Apache HBase, a Hadoop database uses
ZooKeeper for leader election, bootstrapping, server lease management and
coordination between servers.
·
Apache Solr cloudedition v1.5 uses ZooKeeper for leader election,
configuration and other services.
·
Yahoo! uses ZooKeeper for leader
election, configuration management, sharding, locking, group membership, etc…
·
Katta, a scalable, failure tolerant,
distributed, data storage for real time access uses ZooKeeper for node, master and index management in the grid.
·
Eclipse Communication Framework, a
framework for building distributed servers, applications, and tools uses
ZooKeeper for its abstract discovery services.
·
Deepdyve, online article rental
service uses ZooKeeper to manage server state, control index deployment and
other tasks.
·
AdtroitLogic’s UltraESB, a
enterprise service bus that uses ZooKeeper to implement it’s clustering support
and the automated round-robin-restart of the complete cluster.
“Read-Only
Mode” Optimization
Zookeeper server will stop
responding to client requests if it loses the quorum (contact with half of
other servers). One optimization that is being implemented is the “read-only”
mode where zookeeper server responds to read requests even if it loses a
quorum. This will help improve availability of Zookeeper nodes for read
requests.
This paper discussed how Zookeeper
provides distributed consensus and facilitates in the implementation of various
higher order synchronization functions using its client guarantees. Zookeeper
is being widely used by various companies for the above functions. Its simple
and powerful interface and easy-to-understand client guarantees has been the
driving force behind its success.
References
Hadoop Wiki: Zookeeper general
information
ZooKeeper Programmer's Guide
ZooKeeper service latencies under
various loads & configurations
ZooKeeper - A Reliable, Scalable
Distributed Coordination System by Todd Hoff 07/15/2008
Really Good blog post.provided a helpful information.I hope that you will post more updates like this Big Data Hadoop Online Training Hyderabad
ReplyDeleteYeni perde modelleri
ReplyDeletesms onay
Türk telekom mobil ödeme bozdurma
Nft nasil alinir
ankara evden eve nakliyat
trafik sigortası
DEDEKTÖR
Web sitesi kurma
Aşk Kitapları
smm panel
ReplyDeleteSMM PANEL
iş ilanları
İNSTAGRAM TAKİPÇİ SATIN AL
hirdavatciburada.com
Https://www.beyazesyateknikservisi.com.tr
Servis
tiktok jeton hilesi