Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Dec 3, 2023
1 parent 705b408 commit 463de08
Show file tree
Hide file tree
Showing 6 changed files with 642 additions and 216 deletions.
229 changes: 229 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/AbstractTStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package org.apache.accumulo.core.fate;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

public abstract class AbstractTStore<T> implements TStore<T> {

private static final Logger log = LoggerFactory.getLogger(AbstractTStore.class);

protected String lastReserved = "";
protected final Set<Long> reserved;
protected final Map<Long,Long> defered;
protected long statusChangeEvents = 0;
protected int reservationsWaiting = 0;

protected byte[] serialize(Object o) {

try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
oos.close();

return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@SuppressFBWarnings(value = "OBJECT_DESERIALIZATION",
justification = "unsafe to store arbitrary serialized objects like this, but needed for now"
+ " for backwards compatibility")
protected Object deserialize(byte[] ser) {
try {
ByteArrayInputStream bais = new ByteArrayInputStream(ser);
ObjectInputStream ois = new ObjectInputStream(bais);
return ois.readObject();
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (ReflectiveOperationException e) {
throw new IllegalStateException(e);
}
}

protected long parseTid(String txdir) {
return Long.parseLong(txdir.split("_")[1], 16);
}

protected abstract List<String> getTxDirs();

public AbstractTStore() {
this.reserved = new HashSet<>();
this.defered = new HashMap<>();
}

@Override
public long reserve() {
try {
while (true) {

long events;
synchronized (this) {
events = statusChangeEvents;
}

List<String> txdirs = getTxDirs();
Collections.sort(txdirs);

synchronized (this) {
if (!txdirs.isEmpty() && txdirs.get(txdirs.size() - 1).compareTo(lastReserved) <= 0) {
lastReserved = "";
}
}

for (String txdir : txdirs) {
long tid = parseTid(txdir);

synchronized (this) {
// this check makes reserve pick up where it left off, so that it cycles through all as
// it is repeatedly called.... failing to do so can lead to
// starvation where fate ops that sort higher and hold a lock are never reserved.
if (txdir.compareTo(lastReserved) <= 0) {
continue;
}

if (defered.containsKey(tid)) {
if (defered.get(tid) < System.currentTimeMillis()) {
defered.remove(tid);
} else {
continue;
}
}
if (reserved.contains(tid)) {
continue;
} else {
reserved.add(tid);
lastReserved = txdir;
}
}

// have reserved id, status should not change
Long reservedTid = verify(txdir, tid);
if (reservedTid != null) {
return reservedTid;
}
}

synchronized (this) {
// suppress lgtm alert - synchronized variable is not always true
if (events == statusChangeEvents) { // lgtm [java/constant-comparison]
if (defered.isEmpty()) {
this.wait(5000);
} else {
Long minTime = Collections.min(defered.values());
long waitTime = minTime - System.currentTimeMillis();
if (waitTime > 0) {
this.wait(Math.min(waitTime, 5000));
}
}
}
}
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}

@Override
public void reserve(long tid) {
synchronized (this) {
reservationsWaiting++;
try {
while (reserved.contains(tid)) {
try {
this.wait(1000);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}

reserved.add(tid);
} finally {
reservationsWaiting--;
}
}
}

/**
* Attempt to reserve transaction
*
* @param tid transaction id
* @return true if reserved by this call, false if already reserved
*/
@Override
public boolean tryReserve(long tid) {
synchronized (this) {
if (!reserved.contains(tid)) {
reserve(tid);
return true;
}
return false;
}
}

protected void unreserve(long tid) {
synchronized (this) {
if (!reserved.remove(tid)) {
throw new IllegalStateException(
"Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid));
}

// do not want this unreserve to unesc wake up threads in reserve()... this leads to infinite
// loop when tx is stuck in NEW...
// only do this when something external has called reserve(tid)...
if (reservationsWaiting > 0) {
this.notifyAll();
}
}
}

@Override
public void unreserve(long tid, long deferTime) {

if (deferTime < 0) {
throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
}

synchronized (this) {
if (!reserved.remove(tid)) {
throw new IllegalStateException(
"Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid));
}

if (deferTime > 0) {
defered.put(tid, System.currentTimeMillis() + deferTime);
}

this.notifyAll();
}

}

protected void verifyReserved(long tid) {
synchronized (this) {
if (!reserved.contains(tid)) {
throw new IllegalStateException(
"Tried to operate on unreserved transaction " + FateTxId.formatTid(tid));
}
}
}

protected abstract Long verify(String txdir, long tid);

}
Loading

0 comments on commit 463de08

Please sign in to comment.