Skip to content

Commit

Permalink
Small changes:
Browse files Browse the repository at this point in the history
- renamed
  FateInterleavingIT/UserFateInterleavingIT/MetaFateInterleavingIT to
FateExecutionOrderIT to better indicate what is being tested.
- Simplified how an interleave is counted/checked for
  • Loading branch information
kevinrr888 committed Jan 17, 2025
1 parent ddaaca8 commit 8319506
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@

import com.google.common.collect.Iterators;

public abstract class FateInterleavingIT extends SharedMiniClusterBase
implements FateTestRunner<FateInterleavingIT.FilTestEnv> {
public abstract class FateExecutionOrderIT extends SharedMiniClusterBase
implements FateTestRunner<FateExecutionOrderIT.FeoTestEnv> {

public static class FilTestEnv extends TestEnv {
public static class FeoTestEnv extends TestEnv {
private final AccumuloClient client;

public FilTestEnv(AccumuloClient client) {
public FeoTestEnv(AccumuloClient client) {
this.client = client;
}

Expand All @@ -80,19 +80,19 @@ AccumuloClient getClient() {
}
}

public static class FirstOp implements Repo<FateInterleavingIT.FilTestEnv> {
public static class FirstOp implements Repo<FateExecutionOrderIT.FeoTestEnv> {

private static final long serialVersionUID = 1L;

protected boolean isTrackingDataSet(FateId tid, FilTestEnv env, String step) throws Exception {
protected boolean isTrackingDataSet(FateId tid, FeoTestEnv env, String step) throws Exception {
try (Scanner scanner = env.getClient().createScanner(FATE_TRACKING_TABLE)) {
return scanner.stream()
.anyMatch(e -> e.getKey().getColumnFamily().toString().equals(tid.canonical())
&& e.getValue().toString().equals(step));
}
}

protected static void insertTrackingData(FateId tid, FilTestEnv env, String step)
protected static void insertTrackingData(FateId tid, FeoTestEnv env, String step)
throws TableNotFoundException, MutationsRejectedException {
try (BatchWriter bw = env.getClient().createBatchWriter(FATE_TRACKING_TABLE)) {
Mutation mut = new Mutation(Long.toString(System.currentTimeMillis()));
Expand All @@ -102,7 +102,7 @@ protected static void insertTrackingData(FateId tid, FilTestEnv env, String step
}

@Override
public long isReady(FateId tid, FilTestEnv env) throws Exception {
public long isReady(FateId tid, FeoTestEnv env) throws Exception {
// First call to isReady will return that it's not ready (defer time of 100ms), inserting
// the data 'isReady1' so we know isReady was called once. The second attempt (after the
// deferral time) will pass as ready (return 0) and insert the data 'isReady2' so we know
Expand All @@ -124,14 +124,14 @@ public String getName() {
}

@Override
public Repo<FilTestEnv> call(FateId tid, FilTestEnv env) throws Exception {
public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv env) throws Exception {
Thread.sleep(50);
insertTrackingData(tid, env, this.getName() + "::call");
return new SecondOp();
}

@Override
public void undo(FateId fateId, FilTestEnv environment) throws Exception {
public void undo(FateId fateId, FeoTestEnv environment) throws Exception {
throw new UnsupportedOperationException();
}

Expand All @@ -145,7 +145,7 @@ public static class SecondOp extends FirstOp {
private static final long serialVersionUID = 1L;

@Override
public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception {
public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception {
super.call(tid, environment);
return new LastOp();
}
Expand All @@ -156,7 +156,7 @@ public static class LastOp extends FirstOp {
private static final long serialVersionUID = 1L;

@Override
public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception {
public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception {
super.call(tid, environment);
return null;
}
Expand Down Expand Up @@ -186,17 +186,17 @@ public void before() throws Exception {
}
}

private void waitFor(FateStore<FilTestEnv> store, FateId txid) throws Exception {
private void waitFor(FateStore<FeoTestEnv> store, FateId txid) throws Exception {
while (store.read(txid).getStatus() != SUCCESSFUL) {
Thread.sleep(50);
}
}

protected Fate<FilTestEnv> initializeFate(AccumuloClient client, FateStore<FilTestEnv> store) {
protected Fate<FeoTestEnv> initializeFate(AccumuloClient client, FateStore<FeoTestEnv> store) {
ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
return new Fate<>(new FilTestEnv(client), store, false, r -> r + "", config);
return new Fate<>(new FeoTestEnv(client), store, false, r -> r + "", config);
}

private static Entry<FateId,String> toIdStep(Entry<Key,Value> e) {
Expand All @@ -209,11 +209,14 @@ public void testInterleaving() throws Exception {
executeTest(this::testInterleaving);
}

protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext sctx)
protected void testInterleaving(FateStore<FeoTestEnv> store, ServerContext sctx)
throws Exception {

// This test verifies that fates will interleave in time when their isReady() returns >0 and
// then 0.
// This test verifies that FATE will interleave at least once between fate operations when
// their isReady() returns > 0. Interleaving is not guaranteed, so we just check for one
// occurrence which is highly unlikely to fail unless something is broken with FATE.
// This test also ensures that the expected order of operations occurs per fate op.
// Interleaving should have no effect on this.

final int numFateIds = 3;
FateId[] fateIds = new FateId[numFateIds];
Expand All @@ -230,7 +233,7 @@ protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext sctx)
}
}

Fate<FilTestEnv> fate = null;
Fate<FeoTestEnv> fate = null;

// The execution order of the transactions is not according to their insertion
// order. However, we do know that the first step of each transaction will be
Expand All @@ -244,7 +247,7 @@ protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext sctx)
}

Scanner scanner = client.createScanner(FATE_TRACKING_TABLE);
var iter = scanner.stream().map(FateInterleavingIT::toIdStep).iterator();
var iter = scanner.stream().map(FateExecutionOrderIT::toIdStep).iterator();

// we should see the following execution order for all fate ids:
// FirstOp::isReady1, FirstOp::isReady2, FirstOp::call,
Expand All @@ -269,11 +272,9 @@ protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext sctx)
String currStep = currOp.getValue();
var expRunOrderFateId = fateIdsToExpRunOrder.get(fateId);

// An interleave occurred if we do not see <FateIdX, OpN::isReady2> immediately after
// <FateIdX, OpN::isReady1>
if (prevOp != null && prevOp.getValue().contains("isReady1")
&& !currOp.equals(new AbstractMap.SimpleImmutableEntry<>(prevOp.getKey(),
prevOp.getValue().replace('1', '2')))) {
boolean passedFirstStep = !currStep.equals(expRunOrder.get(0));
boolean prevFateIdDiffered = prevOp != null && !prevOp.getKey().equals(fateId);
if (passedFirstStep && prevFateIdDiffered) {
interleaves++;
}
assertEquals(currStep, expRunOrderFateId.remove(0));
Expand All @@ -300,14 +301,14 @@ public static class FirstNonInterleavingOp extends FirstOp {
private static final long serialVersionUID = 1L;

@Override
public long isReady(FateId tid, FilTestEnv env) throws Exception {
public long isReady(FateId tid, FeoTestEnv env) throws Exception {
Thread.sleep(50);
insertTrackingData(tid, env, this.getName() + "::isReady");
return 0;
}

@Override
public Repo<FilTestEnv> call(FateId tid, FilTestEnv manager) throws Exception {
public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv manager) throws Exception {
Thread.sleep(50);
insertTrackingData(tid, manager, this.getName() + "::call");
return new SecondNonInterleavingOp();
Expand All @@ -319,7 +320,7 @@ public static class SecondNonInterleavingOp extends FirstNonInterleavingOp {
private static final long serialVersionUID = 1L;

@Override
public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception {
public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception {
super.call(tid, environment);
return new LastNonInterleavingOp();
}
Expand All @@ -331,7 +332,7 @@ public static class LastNonInterleavingOp extends FirstNonInterleavingOp {
private static final long serialVersionUID = 1L;

@Override
public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception {
public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception {
super.call(tid, environment);
return null;
}
Expand All @@ -343,7 +344,7 @@ public void testNonInterleaving() throws Exception {
executeTest(this::testNonInterleaving);
}

protected void testNonInterleaving(FateStore<FilTestEnv> store, ServerContext sctx)
protected void testNonInterleaving(FateStore<FeoTestEnv> store, ServerContext sctx)
throws Exception {

// This test ensures that when isReady() always returns zero that all the fate steps will
Expand All @@ -364,7 +365,7 @@ protected void testNonInterleaving(FateStore<FilTestEnv> store, ServerContext sc
}
}

Fate<FilTestEnv> fate = null;
Fate<FeoTestEnv> fate = null;

// The execution order of the transactions is not according to their insertion
// order. In this case, without interleaving, a transaction will run start to finish
Expand Down Expand Up @@ -407,6 +408,7 @@ private FateId verifySameIds(Iterator<Entry<Key,Value>> iter, SortedMap<Key,Valu
Text fateId = subset.keySet().iterator().next().getColumnFamily();
assertTrue(subset.keySet().stream().allMatch(k -> k.getColumnFamily().equals(fateId)));

// list is used to ensure correct operations and correct order of operations
var expectedVals = List.of("FirstNonInterleavingOp::isReady", "FirstNonInterleavingOp::call",
"SecondNonInterleavingOp::isReady", "SecondNonInterleavingOp::call",
"LastNonInterleavingOp::isReady", "LastNonInterleavingOp::call");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.fate.FateInterleavingIT;
import org.apache.accumulo.test.fate.FateExecutionOrderIT;

public class MetaFateInterleavingIT extends FateInterleavingIT {
public class MetaFateExecutionOrderIT extends FateExecutionOrderIT {

// put the fate data for the test in a different location than what accumulo is using
private static final InstanceId IID = InstanceId.of(UUID.randomUUID());
private static final String ZK_ROOT = ZooUtil.getRoot(IID);

@Override
public void executeTest(FateTestExecutor<FilTestEnv> testMethod, int maxDeferred,
public void executeTest(FateTestExecutor<FeoTestEnv> testMethod, int maxDeferred,
AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
ServerContext sctx = getCluster().getServerContext();
String path = ZK_ROOT + Constants.ZFATE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.fate.AbstractFateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.test.fate.FateInterleavingIT;
import org.apache.accumulo.test.fate.FateExecutionOrderIT;

public class UserFateInterleavingIT extends FateInterleavingIT {
public class UserFateExecutionOrderIT extends FateExecutionOrderIT {
@Override
public void executeTest(FateTestExecutor<FilTestEnv> testMethod, int maxDeferred,
public void executeTest(FateTestExecutor<FeoTestEnv> testMethod, int maxDeferred,
AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
var table = getUniqueNames(1)[0];
try (ClientContext client =
Expand Down

0 comments on commit 8319506

Please sign in to comment.