Rahul Kumar (@rahul)
We live in an era where a single machine/process is not enough to fulfil all requirements. Most of the time we need multiple machines to perform the job. When a job is processed by multiple distributed processes, then often needs coordination and synchronisation at a point. We are going to leverage the concept and implementation of distributed lock.
I am assuming that you know Java and Zookeeper
, in case if you don’t know please refer to the official documentation. I'll cover the important parts of
which is required for this article.ZooKeeper
ZooKeeper has a hierarchal namespace, much like a distributed file system. Each node in ZooKeeper can have children, just like directories can have files. Each node in ZooKeeper is referred to as znode.
ZooKeeper has a notion of Ephemeral nodes, these nodes exist as long as the session created these nodes are alive.
ZooKeeper has a notion of sequence nodes. When creating sequence nodes, ZooKeeper appends a monotonically increasing counter at the end of the path. The counter is unique to the parent node.
ZooKeeper watches are one-time triggers sent to the client that set the watch. A watch set on
will be triggered when /znode-xx
changes./znode-xx
Imagine a cloud storage service that allows clients to upload/download/edit files. Multiple clients may need to access and modify the same file concurrently. However, to ensure data consistency and avoid conflicts, we need to coordinate access to the files and prevent multiple clients from modifying the same file simultaneously.
By using distributed locks, the cloud storage service can maintain data integrity and ensure that multiple clients can safely access and modify files without causing conflicts.
Suppose a client wants to modify a file named abc.json
, then he has to follow the following steps.
In ZooKeeper we can use znode
for providing distributed lock on any resource.
Any client who wants to acquire a lock on abc.json
will have to first create a znode
with a path /abc.json
if it doesn’t exist already. Since it also needs to acquire the lock it'll create an ephemeral child node with sequence flag in abc.json
.
With the sequence flag, ZooKeeper automatically appends a sequence number that is greater than anyone previously appended to a child of /abc.json
. The process that created the znode with the smallest appended sequence number will be able to acquire the lock.
The process that wants to release the lock will delete the znode created by that process.
The next process with the next smallest sequence number will acquire the lock, and so on...
Suppose we have four clients. Below are ephemeral nodes created by each client.
/abc.json
- /0000000000 // created by client 3
- /0000000001 // created by client 2
- /0000000002 // created by client 1
- /0000000003 // created by client 4
Note: we are using ephemeral nodes, because ephemeral nodes are deleted by ZooKeeper when the client disconnected.
Client 3 will be able to acquire the lock on /abc.json
as its sequence number if the smallest among all the other nodes.
Once the process is finished client 3 will delete znode with a sequence number of 0000000000
. Now, client 2 has the smallest sequence number and it is allowed to acquire the lock.
Similarly, clients 1 and 4 will be provided with the lock on the shared resource sequentially.
Let's create a Watcher
class because every ZooKeeper instance needs a Watcher
instance.
package org.example.lock;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
public class RootWatcher implements Watcher {
@Override
public void process(WatchedEvent watchedEvent) {
// noop
}
}
We'll create a Executor
class which acts like a ZooKeeper client. We will have multiple instances of Executor
running concurrently on separate threads. All of them will try to acquire a lock on the same resource and process them.
public class Executor implements Runnable {
private final String connectionString;
private final int connectionTimeout;
private final String LOCK_PATH;
private String lockPath;
ZooKeeper zk;
Executor(String connectionString, int connectionTimeout, String lockPath) {
this.connectionString = connectionString;
this.connectionTimeout = connectionTimeout;
this.LOCK_PATH = lockPath;
}
@Override
public void run() {
// noop
}
}
There will be a main class responsible for creating instances of executors and starting them.
package org.example.lock;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import org.slf4j.LoggerFactory;
public class LockDemo {
static Logger logger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
public static void main(String[] args) {
logger.setLevel(Level.ERROR);
String connectionString="localhost:2181";
int connectionTimeout = 3000;
String lockPath = "/distributed-lock.json";
int executorsCount = 10;
for(int i=0; i<executorsCount; i++){
Executor executor = new Executor(connectionString,connectionTimeout,lockPath);
new Thread(executor).start();
}
}
}
LockDemo
class is creating 10 executors and starting them.
We have a Zk
class which will be creating ZooKeeper connections
package org.example.lock;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class Zk {
private final String connectionString;
private final int connectionTimeout;
Zk(String connectionString, int connectionTimeout){
this.connectionString = connectionString;
this.connectionTimeout = connectionTimeout;
}
ZooKeeper createZkClient() throws IOException {
RootWatcher watcher = new RootWatcher();
return new ZooKeeper(connectionString, connectionTimeout, watcher);
}
}
Now, we have all the classes required for this article. We can start implementing them.
The executor will first create a ZooKeeper connection.
In ZooKeeper, you can not create a child without a parent. For example, you can not create a node named /locks/L0
if /locks
node doesn’t exist.
So, the very first thing that the executor will do is to create a lock node, i.e. parent.
public class Executor implements Runnable {
private final String connectionString;
private final int connectionTimeout;
private final String LOCK_PATH;
private String lockPath;
ZooKeeper zk;
Executor(String connectionString, int connectionTimeout, String lockPath) {
this.connectionString = connectionString;
this.connectionTimeout = connectionTimeout;
this.LOCK_PATH = lockPath;
}
public void createLockNode() throws InterruptedException, KeeperException {
if (this.zk.exists(LOCK_PATH, false) != null) {
return;
}
try{
this.zk.create(LOCK_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}catch (Exception ex){
// noop - node already exist, it might have been created by other node
// there could be other error too, but for now let's do not think about them
}
}
@Override
public void run() {
try {
// create zookeeper client
this.zk = new Zk(connectionString, connectionTimeout)
.createZkClient();
// create a lock node if it does not exist already
this.createLockNode();
} catch (IOException | InterruptedException | KeeperException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
After creating the parent node, each executor will start processing his tasks. Each executor has 10 tasks.
public void doProcessing() throws InterruptedException, KeeperException {
for (var task = 0; task < 10; task++) {
doTask(task);
}
}
doTask
method will first acquire the lock, then perform the task and finally release the lock.
boolean acquireLock() throws InterruptedException, KeeperException {
if(this.lockPath == null) {
this.lockPath = this.zk.create(LOCK_PATH + "/", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
List<String> children = zk.getChildren(LOCK_PATH,false);
Collections.sort(children);
String smallestChild = children.get(0);
return this.lockPath.endsWith(smallestChild);
}
void releaseLock() throws InterruptedException, KeeperException {
zk.delete(this.lockPath, -1);
this.lockPath = null;
}
public void doTask(int task) throws InterruptedException, KeeperException {
long oneSecond = Duration
.ofSeconds(1)
.toMillis();
while (!this.acquireLock()) {
Thread.sleep(oneSecond);
}
System.out.printf("Lock acquired for task %d by thread %s\n ",task,Thread.currentThread().getName());
Thread.sleep(oneSecond); // busy processing simulation
this.releaseLock();
System.out.printf("Lock released for task %d by thread %s\n ",task,Thread.currentThread().getName());
}
Complete Executor
class implementation
package org.example.lock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
public class Executor implements Runnable {
private final String connectionString;
private final int connectionTimeout;
private final String LOCK_PATH;
private String lockPath;
ZooKeeper zk;
Executor(String connectionString, int connectionTimeout, String lockPath) {
this.connectionString = connectionString;
this.connectionTimeout = connectionTimeout;
this.LOCK_PATH = lockPath;
}
public void createLockNode() throws InterruptedException, KeeperException {
if (zk.exists(LOCK_PATH, false) != null) {
return;
}
try{
zk.create(LOCK_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}catch (Exception ex){
// noop - node already exist
}
}
boolean acquireLock() throws InterruptedException, KeeperException {
if(this.lockPath == null) {
this.lockPath = this.zk.create(LOCK_PATH + "/", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
List<String> children = zk.getChildren(LOCK_PATH,false);
Collections.sort(children);
String smallestChild = children.get(0);
return this.lockPath.endsWith(smallestChild);
}
void releaseLock() throws InterruptedException, KeeperException {
zk.delete(this.lockPath, -1);
this.lockPath = null;
}
public void doTask(int task) throws InterruptedException, KeeperException {
long oneSecond = Duration
.ofSeconds(1)
.toMillis();
while (!this.acquireLock()) {
Thread.sleep(oneSecond);
}
System.out.printf("Lock acquired for task %d by thread %s\n ",task,Thread.currentThread().getName());
Thread.sleep(oneSecond); // busy processing simulation
this.releaseLock();
System.out.printf("Lock released for task %d by thread %s\n ",task,Thread.currentThread().getName());
}
public void doProcessing() throws InterruptedException, KeeperException {
for (var task = 0; task < 10; task++) {
doTask(task);
}
}
@Override
public void run() {
try {
// create zookeeper client
zk = new Zk(connectionString, connectionTimeout)
.createZkClient();
// create a lock node if it does not exist already
this.createLockNode();
} catch (IOException | InterruptedException | KeeperException e) {
throw new RuntimeException(e);
}
try {
doProcessing();
} catch (InterruptedException | KeeperException e) {
throw new RuntimeException(e);
}
}
}
Our distributed lock is ready and here is the output.
Lock acquired for task 0 by thread Thread-9
Lock released for task 0 by thread Thread-9
Lock acquired for task 0 by thread Thread-7
Lock released for task 0 by thread Thread-7
Lock acquired for task 0 by thread Thread-1
.............
.........
...
The above design has one issue. We are calling zk.getChildren(...)
inside acquireLock
unnecessarily in a loop. When you have a large number of clients waiting for the same resource, this will brust ZooKeeper.
To avoid a large number of calls to zk.getChildren(...)
, we can use ZooKeeper watches. The idea is, we'll set a watch on the node whose sequence number is just small to the current node and only if the current node is not the smallest.
You can check this commit to get an idea of how this can be implemented.
Add a thoughtful comment...
✨ Explore more tech insights and coding wonders with @dsabyte! Your journey in innovation has just begun. Keep learning, keep sharing, and let's continue to code a brighter future together. Happy exploring! 🚀❤️
Join the "News Later" community by entering your email. It's quick, it's easy, and it's your key to unlocking future tech revelations.
Weekly Updates
Every week, we curate and deliver a collection of articles, blogs and chapters directly to your inbox. Stay informed, stay inspired, and stay ahead in the fast-paced world of technology.
No spam
Rest assured, we won't clutter your inbox with unnecessary emails. No spam, only meaningful insights, and valuable content designed to elevate your tech experience.