Skip to content

Commit

Permalink
Fefactor Fate lock code to use concrete types
Browse files Browse the repository at this point in the history
Instead of passing around byte arrays for DistributedReadWriteLock and
related classes we can use LockType and FateId to make the code easier
to work with and to undertand as those values are serialized as part of
the lock. A new FateLockEntry type has been created which is passed
around instead and that is serialized/deserialized into the correct byte
array format when reading/writing to zookeeeper.

This closes apache#5264
  • Loading branch information
cshannon committed Jan 31, 2025
1 parent daed0fd commit 8f4aa40
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
*/
package org.apache.accumulo.core.fate.zookeeper;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;

import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockEntry;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,65 +45,16 @@ public enum LockType {
READ, WRITE,
}

// serializer for lock type and user data
static class ParsedLock {
public ParsedLock(LockType type, byte[] userData) {
this.type = type;
this.userData = Arrays.copyOf(userData, userData.length);
}

public ParsedLock(byte[] lockData) {
if (lockData == null || lockData.length < 1) {
throw new IllegalArgumentException();
}

int split = -1;
for (int i = 0; i < lockData.length; i++) {
if (lockData[i] == ':') {
split = i;
break;
}
}

if (split == -1) {
throw new IllegalArgumentException();
}

this.type = LockType.valueOf(new String(lockData, 0, split, UTF_8));
this.userData = Arrays.copyOfRange(lockData, split + 1, lockData.length);
}

public LockType getType() {
return type;
}

public byte[] getUserData() {
return userData;
}

public byte[] getLockData() {
byte[] typeBytes = type.name().getBytes(UTF_8);
byte[] result = new byte[userData.length + 1 + typeBytes.length];
System.arraycopy(typeBytes, 0, result, 0, typeBytes.length);
result[typeBytes.length] = ':';
System.arraycopy(userData, 0, result, typeBytes.length + 1, userData.length);
return result;
}

private LockType type;
private byte[] userData;
}

// This kind of lock can be easily implemented by ZooKeeper
// You make an entry at the bottom of the queue, readers run when there are no writers ahead of
// them,
// a writer only runs when they are at the top of the queue.
public interface QueueLock {
SortedMap<Long,byte[]> getEarlierEntries(long entry);
SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry);

void removeEntry(long entry);

long addEntry(byte[] data);
long addEntry(FateLockEntry entry);
}

private static final Logger log = LoggerFactory.getLogger(DistributedReadWriteLock.class);
Expand All @@ -114,18 +66,18 @@ public static interface DistributedLock extends Lock {
static class ReadLock implements DistributedLock {

final QueueLock qlock;
final byte[] userData;
final FateId fateId;
long entry = -1;

ReadLock(QueueLock qlock, byte[] userData) {
ReadLock(QueueLock qlock, FateId fateId) {
this.qlock = qlock;
this.userData = userData;
this.fateId = fateId;
}

// for recovery
ReadLock(QueueLock qlock, byte[] userData, long entry) {
ReadLock(QueueLock qlock, FateId fateId, long entry) {
this.qlock = qlock;
this.userData = userData;
this.fateId = fateId;
this.entry = entry;
}

Expand Down Expand Up @@ -160,22 +112,21 @@ public void lockInterruptibly() throws InterruptedException {
@Override
public boolean tryLock() {
if (entry == -1) {
entry = qlock.addEntry(new ParsedLock(this.getType(), this.userData).getLockData());
log.info("Added lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), getType());
entry = qlock.addEntry(FateLockEntry.from(this.getType(), this.fateId));
log.info("Added lock entry {} fateId {} lockType {}", entry, fateId, getType());
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
for (Entry<Long,byte[]> entry : entries.entrySet()) {
ParsedLock parsed = new ParsedLock(entry.getValue());
SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(entry);
for (Entry<Long,Supplier<FateLockEntry>> entry : entries.entrySet()) {
if (entry.getKey().equals(this.entry)) {
return true;
}
if (parsed.type == LockType.WRITE) {
FateLockEntry lockEntry = entry.getValue().get();
if (lockEntry.getLockType() == LockType.WRITE) {
return false;
}
}
throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry
+ " userData " + new String(this.userData, UTF_8) + " lockType " + getType());
+ " fateId " + this.fateId + " lockType " + getType());
}

@Override
Expand All @@ -198,8 +149,7 @@ public void unlock() {
if (entry == -1) {
return;
}
log.debug("Removing lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), getType());
log.debug("Removing lock entry {} fateId {} lockType {}", entry, this.fateId, getType());
qlock.removeEntry(entry);
entry = -1;
}
Expand All @@ -212,12 +162,12 @@ public Condition newCondition() {

static class WriteLock extends ReadLock {

WriteLock(QueueLock qlock, byte[] userData) {
super(qlock, userData);
WriteLock(QueueLock qlock, FateId fateId) {
super(qlock, fateId);
}

WriteLock(QueueLock qlock, byte[] userData, long entry) {
super(qlock, userData, entry);
WriteLock(QueueLock qlock, FateId fateId, long entry) {
super(qlock, fateId, entry);
}

@Override
Expand All @@ -228,38 +178,37 @@ public LockType getType() {
@Override
public boolean tryLock() {
if (entry == -1) {
entry = qlock.addEntry(new ParsedLock(this.getType(), this.userData).getLockData());
log.info("Added lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), getType());
entry = qlock.addEntry(FateLockEntry.from(this.getType(), this.fateId));
log.info("Added lock entry {} fateId {} lockType {}", entry, this.fateId, getType());
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(entry);
Iterator<Entry<Long,Supplier<FateLockEntry>>> iterator = entries.entrySet().iterator();
if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry
+ " userData " + new String(this.userData, UTF_8) + " lockType " + getType());
+ " fateId " + this.fateId + " lockType " + getType());
}
return iterator.next().getKey().equals(entry);
}
}

private final QueueLock qlock;
private final byte[] data;
private final FateId fateId;

public DistributedReadWriteLock(QueueLock qlock, byte[] data) {
public DistributedReadWriteLock(QueueLock qlock, FateId fateId) {
this.qlock = qlock;
this.data = Arrays.copyOf(data, data.length);
this.fateId = fateId;
}

public static DistributedLock recoverLock(QueueLock qlock, byte[] data) {
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(Long.MAX_VALUE);
for (Entry<Long,byte[]> entry : entries.entrySet()) {
ParsedLock parsed = new ParsedLock(entry.getValue());
if (Arrays.equals(data, parsed.getUserData())) {
switch (parsed.getType()) {
public static DistributedLock recoverLock(QueueLock qlock, FateId fateId) {
SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(Long.MAX_VALUE);
for (Entry<Long,Supplier<FateLockEntry>> entry : entries.entrySet()) {
FateLockEntry lockEntry = entry.getValue().get();
if (fateId.equals(lockEntry.getFateId())) {
switch (lockEntry.getLockType()) {
case READ:
return new ReadLock(qlock, parsed.getUserData(), entry.getKey());
return new ReadLock(qlock, lockEntry.getFateId(), entry.getKey());
case WRITE:
return new WriteLock(qlock, parsed.getUserData(), entry.getKey());
return new WriteLock(qlock, lockEntry.getFateId(), entry.getKey());
}
}
}
Expand All @@ -268,11 +217,11 @@ public static DistributedLock recoverLock(QueueLock qlock, byte[] data) {

@Override
public DistributedLock readLock() {
return new ReadLock(qlock, data);
return new ReadLock(qlock, fateId);
}

@Override
public DistributedLock writeLock() {
return new WriteLock(qlock, data);
return new WriteLock(qlock, fateId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@
*/
package org.apache.accumulo.core.fate.zookeeper;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Supplier;

import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.QueueLock;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
Expand All @@ -35,6 +41,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Suppliers;

/**
* A persistent lock mechanism in ZooKeeper used for locking tables during FaTE operations.
*/
Expand All @@ -59,6 +67,64 @@ public String toString() {
}
}

public static class FateLockEntry {
final LockType lockType;
final FateId fateId;

private FateLockEntry(LockType lockType, FateId fateId) {
this.lockType = Objects.requireNonNull(lockType);
this.fateId = Objects.requireNonNull(fateId);
}

private FateLockEntry(byte[] entry) {
if (entry == null || entry.length < 1) {
throw new IllegalArgumentException();
}

int split = -1;
for (int i = 0; i < entry.length; i++) {
if (entry[i] == ':') {
split = i;
break;
}
}

if (split == -1) {
throw new IllegalArgumentException();
}

this.lockType = LockType.valueOf(new String(entry, 0, split, UTF_8));
this.fateId =
FateId.from(new String(Arrays.copyOfRange(entry, split + 1, entry.length), UTF_8));
}

public LockType getLockType() {
return lockType;
}

public FateId getFateId() {
return fateId;
}

public byte[] serialize() {
byte[] typeBytes = lockType.name().getBytes(UTF_8);
byte[] fateIdBytes = fateId.canonical().getBytes(UTF_8);
byte[] result = new byte[fateIdBytes.length + 1 + typeBytes.length];
System.arraycopy(typeBytes, 0, result, 0, typeBytes.length);
result[typeBytes.length] = ':';
System.arraycopy(fateIdBytes, 0, result, typeBytes.length + 1, fateIdBytes.length);
return result;
}

public static FateLockEntry from(LockType lockType, FateId fateId) {
return new FateLockEntry(lockType, fateId);
}

public static FateLockEntry deserialize(byte[] serialized) {
return new FateLockEntry(serialized);
}
}

public static FateLockPath path(String path) {
return new FateLockPath(path);
}
Expand All @@ -69,12 +135,12 @@ public FateLock(ZooReaderWriter zrw, FateLockPath path) {
}

@Override
public long addEntry(byte[] data) {
public long addEntry(FateLockEntry entry) {
String newPath;
try {
while (true) {
try {
newPath = zoo.putPersistentSequential(path + "/" + PREFIX, data);
newPath = zoo.putPersistentSequential(path + "/" + PREFIX, entry.serialize());
String[] parts = newPath.split("/");
String last = parts[parts.length - 1];
return Long.parseLong(last.substring(PREFIX.length()));
Expand All @@ -89,8 +155,8 @@ public long addEntry(byte[] data) {
}

@Override
public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
SortedMap<Long,byte[]> result = new TreeMap<>();
public SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry) {
SortedMap<Long,Supplier<FateLockEntry>> result = new TreeMap<>();
try {
List<String> children = Collections.emptyList();
try {
Expand All @@ -106,7 +172,9 @@ public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
long order = Long.parseLong(name.substring(PREFIX.length()));
if (order <= entry) {
byte[] data = zoo.getData(path + "/" + name);
result.put(order, data);
// Use a supplier so we don't need to deserialize unless the calling code cares about
// the value for that entry.
result.put(order, Suppliers.memoize(() -> FateLockEntry.deserialize(data)));
}
} catch (KeeperException.NoNodeException ex) {
// ignored
Expand Down
Loading

0 comments on commit 8f4aa40

Please sign in to comment.