This blog post from Martin Kleppmann triggered this note. That blog post discusses an issue with locks in Redis and argues that a solution to avoid the issue of depending on timing is to use a combination of distributed locks with ZooKeeper and fencing. This argument caused some confusion and I wanted to address it here. The following text covers some background on ZooKeeper and distributed locks. It discusses the issue with using ZooKeeper locks alone and how to address it with fencing.
Apache ZooKeeper is a replicated coordination service. It has a set of clients and a replica set (ensemble) that serves requests from the clients. A simple way to implement a lock with ZooKeeper is to create a znode, say /lock. If a /lock znode exists, then any other client that attempts to create it will fail. We also make this znode ephemeral. Being ephemeral means that if the client that created the znode crashes, then the znode will be automatically deleted. That’s the basics of ephemerals, but it is bit more subtle than that, though. Any ZooKeeper client needs to establish a session to the ZooKeeper ensemble before submitting any request. The session has a timeout associated to it: if the ensemble leader doesn’t hear from the client within the timeout period, then it expires the session. Expiring the session causes all ephemeral nodes of the session to be deleted.
Let’s now consider the following sequence of events:
- Client C1 creates /lock
- Client C1 goes into a long GC pause
- The session of C1 expires
- Client C2 creates /lock and becomes the new lock holder
- Client C1 comes back from the GC pause and it still thinks it holds the lock
At step 5, we have both clients C1 and C2 thinking they hold the lock… Trouble ahead!
Getting shared resources involved
The situation above illustrates that just acquiring a lock via ZooKeeper is not sufficient if you don’t want to trust the session timeout. As the post by Martin Kleppmann very nicely points out, there are many things that can cause your client to stop for arbitrarily long and it is possible to end up with multiple processes holding the lock. However, this is not entirely true if acquiring the lock also implies that the shared resource protected by the lock needs to be involved. Say that every time a client acquires a lock to exclusively access a resource, it goes to the resource and before anything else it marks the resource in such a way that clients that acquired the lock previously cannot access the resource. In the scenario above, client C1 thinks that it still holds the lock, but when it tries to access the shared resource, it fails because it has an earlier mark from C2.
Let’s see a couple of examples of how “marking” a resource works. An obvious candidate is ZooKeeper itself. Say that the lock is used to elect a master and the master needs to update other znodes in ZooKeeper. By step 5 of the scenario, the session of client C1 has expired and it cannot update the ZooKeeper state. The ZooKeeper ensemble fails any operation for an expired session. Note that this argument assumes that a new session is not being created under the hood, otherwise C1 would be able to update the ZooKeeper state using a new session, which violates safety.
A more interesting example involves a different shared resource, say a data store, like in Martin’s post. Before performing any writes, a writer needs to acquire the lock via ZooKeeper and use an epoch number to fence off previous writers. If the epoch number is unique and strictly increasing, then the data store can fail requests from previous writers. The writer can optimistically perform a write to the data store with a new epoch number and the write will succeed only if the epoch number in the request is greater or equal to the one the data store has.
But wait, what if the data store of the previous example is distributed? What kind of guarantees can we have? The simplest way of doing this is to make sure the writer updates the epoch across all nodes before performing any operation. Upon obtaining a new epoch number, a writer sends a request to update the epoch to all nodes in the data store it needs to access. To simplify the discussion, let’s say that it is a small replicated data store and a writer writes to all nodes. If the writer does it, then no writer with an older epoch is able to perform requests successfully.
The main issue with this approach is that a static system like that is not fault tolerant, any crash of a data store node prevents progress, which is bad. What systems like Apache Kafka and Apache BookKeeper have done to make groups fault tolerant is to rely on ZooKeeper to coordinate the reconfiguration of replica groups. If a node is crashed or simply partitioned away, then it is removed from the current replica group and the new group is agreed upon through ZooKeeper. ZooKeeper here serves as a sequencer for the the group reconfigurations by totally ordering these changes, and it also detects crashes of nodes with ephemeral znodes. This scheme of having a master implementing a Paxos-like protocol to perform reconfigurations of replica groups is the essence of the Vertical Paxos work . The concept of ballot in Vertical Paxos is pretty much the epoch we discuss here. It also relies on process groups, reconfigurations of such groups, and message broadcasts for replication like in Virtual Synchrony .
Note that Kafka uses such distributed locks internally. A writer is an elected broker, not a Kafka client. The leader of a replica group is the designated broker that receives and processes all requests to read and write to a topic-partition. Each leader has an associated subset of in-sync replicas (ISR), and each message produced into Kafka is only declared committed once the ISR, including the leader, acknowledges the message. If there is any change to the ISR, including a leader change, then the new ISR needs to be persisted in ZooKeeper. ZooKeeper guarantees that the changes to the ISR are totally ordered and consecutive ISRs must overlap in at least one broker.
While in Kafka a replica group can have multiple leaders over time and make use of epochs, the single ledger writer in BookKeeper performs the role of leader. The fencing in BookKeeper is much simpler because there can be at most one writer adding new records to a ledger. The only time a new writer is needed for a ledger is when the client writer crashes or is suspected to have crashed.
How to obtain an epoch number?
A simple way to obtain an epoch number to use with the scheme described above is through cversion in ZooKeeper. For example, if the lock znode is /lock, then the cversion of / strictly increases with the number of children. Consequently, every time the /lock znode is created, the version is incremented. Incrementing a value and conditionally updating a znode with that value is also a valid option.
Using locks for mutual exclusion in distributed systems is tricky. When using a lock service, it is often not sufficient to rely on the leases and session schemes these services offer because they depend on timing, leaving shared resources unprotected without any additional mechanism. Here we discussed the use of epochs for fencing resources. The idea is general idea is to make sure the shared resource is consistent by preventing old writers from coming back and messing with the state. It might not always be possible to introduce such epochs with legacy systems, but we do have examples of systems that make use of this scheme.
Thanks to Martin Kleppmann and Eno Thereska for the feedback.
 Leslie Lamport, Dahlia Malkhi, and Lidong Zhou. 2009. Vertical paxos and primary-backup replication. In Proceedings of the 28th ACM symposium on Principles of distributed computing (PODC ’09). ACM, New York, NY, USA, 312-313.
 K. Birman and T. Joseph. 1987. Exploiting virtual synchrony in distributed systems. SIGOPS Oper. Syst. Rev. 21, 5 (November 1987), 123-138.