Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Jan 31, 2025
1 parent 4777675 commit 77e5f63
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,9 +47,10 @@ public enum LockType {

// serializer for lock type and user data
static class ParsedLock {
public ParsedLock(LockType type, byte[] userData) {
public ParsedLock(LockType type, FateId fateId) {
this.type = type;
this.userData = Arrays.copyOf(userData, userData.length);
// this.fateId = Arrays.copyOf(userData, userData.length);
this.fateId = fateId;
}

public ParsedLock(byte[] lockData) {
Expand All @@ -69,28 +71,30 @@ public ParsedLock(byte[] lockData) {
}

this.type = LockType.valueOf(new String(lockData, 0, split, UTF_8));
this.userData = Arrays.copyOfRange(lockData, split + 1, lockData.length);
this.fateId =
FateId.from(new String(Arrays.copyOfRange(lockData, split + 1, lockData.length), UTF_8));
}

public LockType getType() {
return type;
}

public byte[] getUserData() {
return userData;
private FateId getFateId() {
return fateId;
}

public byte[] getLockData() {
byte[] typeBytes = type.name().getBytes(UTF_8);
byte[] result = new byte[userData.length + 1 + typeBytes.length];
byte[] fateIdBytes = fateId.canonical().getBytes(UTF_8);
byte[] result = new byte[fateIdBytes.length + 1 + typeBytes.length];
System.arraycopy(typeBytes, 0, result, 0, typeBytes.length);
result[typeBytes.length] = ':';
System.arraycopy(userData, 0, result, typeBytes.length + 1, userData.length);
System.arraycopy(fateIdBytes, 0, result, typeBytes.length + 1, fateIdBytes.length);
return result;
}

private LockType type;
private byte[] userData;
private FateId fateId;
}

// This kind of lock can be easily implemented by ZooKeeper
Expand All @@ -114,18 +118,18 @@ public static interface DistributedLock extends Lock {
static class ReadLock implements DistributedLock {

final QueueLock qlock;
final byte[] userData;
final FateId fateId;
long entry = -1;

ReadLock(QueueLock qlock, byte[] userData) {
ReadLock(QueueLock qlock, FateId fateId) {
this.qlock = qlock;
this.userData = userData;
this.fateId = fateId;
}

// for recovery
ReadLock(QueueLock qlock, byte[] userData, long entry) {
ReadLock(QueueLock qlock, FateId fateId, long entry) {
this.qlock = qlock;
this.userData = userData;
this.fateId = fateId;
this.entry = entry;
}

Expand Down Expand Up @@ -160,9 +164,8 @@ public void lockInterruptibly() throws InterruptedException {
@Override
public boolean tryLock() {
if (entry == -1) {
entry = qlock.addEntry(new ParsedLock(this.getType(), this.userData).getLockData());
log.info("Added lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), getType());
entry = qlock.addEntry(new ParsedLock(this.getType(), this.fateId).getLockData());
log.info("Added lock entry {} fateId {} lockType {}", entry, fateId, getType());
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
for (Entry<Long,byte[]> entry : entries.entrySet()) {
Expand All @@ -175,7 +178,7 @@ public boolean tryLock() {
}
}
throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry
+ " userData " + new String(this.userData, UTF_8) + " lockType " + getType());
+ " fateId " + this.fateId + " lockType " + getType());
}

@Override
Expand All @@ -198,8 +201,7 @@ public void unlock() {
if (entry == -1) {
return;
}
log.debug("Removing lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), getType());
log.debug("Removing lock entry {} fateId {} lockType {}", entry, this.fateId, getType());
qlock.removeEntry(entry);
entry = -1;
}
Expand All @@ -212,12 +214,12 @@ public Condition newCondition() {

static class WriteLock extends ReadLock {

WriteLock(QueueLock qlock, byte[] userData) {
super(qlock, userData);
WriteLock(QueueLock qlock, FateId fateId) {
super(qlock, fateId);
}

WriteLock(QueueLock qlock, byte[] userData, long entry) {
super(qlock, userData, entry);
WriteLock(QueueLock qlock, FateId fateId, long entry) {
super(qlock, fateId, entry);
}

@Override
Expand All @@ -228,38 +230,37 @@ public LockType getType() {
@Override
public boolean tryLock() {
if (entry == -1) {
entry = qlock.addEntry(new ParsedLock(this.getType(), this.userData).getLockData());
log.info("Added lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), getType());
entry = qlock.addEntry(new ParsedLock(this.getType(), this.fateId).getLockData());
log.info("Added lock entry {} fateId {} lockType {}", entry, this.fateId, getType());
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry
+ " userData " + new String(this.userData, UTF_8) + " lockType " + getType());
+ " fateId " + this.fateId + " lockType " + getType());
}
return iterator.next().getKey().equals(entry);
}
}

private final QueueLock qlock;
private final byte[] data;
private final FateId fateId;

public DistributedReadWriteLock(QueueLock qlock, byte[] data) {
public DistributedReadWriteLock(QueueLock qlock, FateId fateId) {
this.qlock = qlock;
this.data = Arrays.copyOf(data, data.length);
this.fateId = fateId;
}

public static DistributedLock recoverLock(QueueLock qlock, byte[] data) {
public static DistributedLock recoverLock(QueueLock qlock, FateId fateId) {
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(Long.MAX_VALUE);
for (Entry<Long,byte[]> entry : entries.entrySet()) {
ParsedLock parsed = new ParsedLock(entry.getValue());
if (Arrays.equals(data, parsed.getUserData())) {
if (fateId.equals(parsed.getFateId())) {
switch (parsed.getType()) {
case READ:
return new ReadLock(qlock, parsed.getUserData(), entry.getKey());
return new ReadLock(qlock, parsed.getFateId(), entry.getKey());
case WRITE:
return new WriteLock(qlock, parsed.getUserData(), entry.getKey());
return new WriteLock(qlock, parsed.getFateId(), entry.getKey());
}
}
}
Expand All @@ -268,11 +269,11 @@ public static DistributedLock recoverLock(QueueLock qlock, byte[] data) {

@Override
public DistributedLock readLock() {
return new ReadLock(qlock, data);
return new ReadLock(qlock, fateId);
}

@Override
public DistributedLock writeLock() {
return new WriteLock(qlock, data);
return new WriteLock(qlock, fateId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
*/
package org.apache.accumulo.core.fate.zookeeper;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;

import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.QueueLock;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -91,7 +93,8 @@ public void testLock() throws Exception {
data.read();
QueueLock qlock = new MockQueueLock();

final ReadWriteLock locker = new DistributedReadWriteLock(qlock, "locker1".getBytes(UTF_8));
final ReadWriteLock locker =
new DistributedReadWriteLock(qlock, FateId.from(FateInstanceType.USER, UUID.randomUUID()));
final Lock readLock = locker.readLock();
final Lock writeLock = locker.writeLock();
readLock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,10 @@ public static void unreserveHdfsDirectory(Manager env, String directory, FateId

private static Lock getLock(ServerContext context, AbstractId<?> id, FateId fateId,
LockType lockType) {
byte[] lockData = fateId.canonical().getBytes(UTF_8);
var fLockPath =
FateLock.path(context.getZooKeeperRoot() + Constants.ZTABLE_LOCKS + "/" + id.canonical());
FateLock qlock = new FateLock(context.getZooSession().asReaderWriter(), fLockPath);
DistributedLock lock = DistributedReadWriteLock.recoverLock(qlock, lockData);
DistributedLock lock = DistributedReadWriteLock.recoverLock(qlock, fateId);
if (lock != null) {

// Validate the recovered lock type
Expand All @@ -235,7 +234,7 @@ private static Lock getLock(ServerContext context, AbstractId<?> id, FateId fate
+ " on object " + id + ". Expected " + lockType + " lock instead.");
}
} else {
DistributedReadWriteLock locker = new DistributedReadWriteLock(qlock, lockData);
DistributedReadWriteLock locker = new DistributedReadWriteLock(qlock, fateId);
switch (lockType) {
case WRITE:
lock = locker.writeLock();
Expand Down

0 comments on commit 77e5f63

Please sign in to comment.