From 2abf5b1bbbbf72609dcd8142e49868c79b4ef79d Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Mon, 16 Dec 2024 11:06:11 -0500 Subject: [PATCH] Single node META FATE data (#5127) - Moved all the fate data for a single `META` transaction into a single ZK node - Pushed all the data into `NodeValue` (renamed to `FateData`) - Previously, `FateData` stored `TStatus`, `FateReservation`, and `FateKey`. Now additionally stores the `REPO` stack and `TxInfo`. - Status enforcement added to `MetaFateStore` (previously only possible for `UserFateStore`). - Moved `testFateInitialConfigCorrectness()` from `UserFateStoreIT` to `UserFateIT` - Renamed `UserFateStoreIT` to `UserFateStatusEnforcementIT` (now extends a new class `FateStatusEnforcementIT`) - Now only tests status enforcement (previously status enforcement + `testFateInitialConfigCorrectness()`) - Created `MetaFateStatusEnforcementIT` (extends `FateStatusEnforcementIT`) - Tests that the new status enforcement in `MetaFateStore` works - Created `FateStoreUtil`, moving the `createFateTable()` util here, created `MetaFateZKSetup` inner class here (the counterpart to `createFateTable()` for `UserFateStore` but sets up ZooKeeper for use in `MetaFateStore`) - Deleted `UserFateStoreIT`s (now `UserFateStatusEnforcementIT`) method `injectStatus` replacing with the existing `setStatus` which does the same thing - Changed `StackOverflowException` to instead be a `RuntimeException` (previously `Exception`) - Deleted unnecessary preexisting catch and immediate re-throw of a `StackOverflowException` in `MetaFateStore.FateTxStoreImpl.push(repo)` - Cleaned up and refactored `MetaFateStore` methods which mutate existing FateData; now reuse same pattern across these methods: all call new method `MetaFateStore.mutate()` --- .../accumulo/core/fate/AbstractFateStore.java | 15 +- .../core/fate/StackOverflowException.java | 2 +- .../core/fate/user/UserFateStore.java | 25 +- .../core/fate/zookeeper/MetaFateStore.java | 565 ++++++++++-------- .../test/fate/FateStatusEnforcementIT.java | 98 +++ .../accumulo/test/fate/FateStoreUtil.java | 111 ++++ .../accumulo/test/fate/meta/MetaFateIT.java | 35 +- .../meta/MetaFateStatusEnforcementIT.java | 54 ++ .../test/fate/meta/MetaFateStoreFateIT.java | 102 ++-- .../test/fate/meta/MetaMultipleStoresIT.java | 21 +- .../accumulo/test/fate/user/UserFateIT.java | 56 +- .../fate/user/UserFateInterleavingIT.java | 2 +- .../user/UserFateStatusEnforcementIT.java | 62 ++ .../test/fate/user/UserFateStoreFateIT.java | 2 +- .../test/fate/user/UserFateStoreIT.java | 232 ------- .../test/fate/user/UserMultipleStoresIT.java | 2 +- 16 files changed, 811 insertions(+), 573 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/fate/FateStatusEnforcementIT.java create mode 100644 test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java create mode 100644 test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java create mode 100644 test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java delete mode 100644 test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 3bc322c3c21..749a3260649 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -75,12 +76,22 @@ public FateId newRandomId(FateInstanceType instanceType) { return FateId.from(instanceType, UUID.randomUUID()); } }; + protected static final int MAX_REPOS = 100; // The ZooKeeper lock for the process that's running this store instance protected final ZooUtil.LockID lockID; protected final Predicate isLockHeld; protected final Map deferred; protected final FateIdGenerator fateIdGenerator; + // the statuses required to perform operations + public static final Set REQ_PUSH_STATUS = Set.of(TStatus.IN_PROGRESS, TStatus.NEW); + public static final Set REQ_POP_STATUS = + Set.of(TStatus.FAILED_IN_PROGRESS, TStatus.SUCCESSFUL); + public static final Set REQ_DELETE_STATUS = + Set.of(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED); + // all but UNKNOWN + public static final Set REQ_FORCE_DELETE_STATUS = Set.of(TStatus.NEW, TStatus.SUBMITTED, + TStatus.SUCCESSFUL, TStatus.FAILED, TStatus.FAILED_IN_PROGRESS, TStatus.IN_PROGRESS); private final int maxDeferred; private final AtomicBoolean deferredOverflow = new AtomicBoolean(); @@ -415,7 +426,7 @@ protected void seededTx() { unreservedRunnableCount.increment(); } - protected byte[] serializeTxInfo(Serializable so) { + protected static byte[] serializeTxInfo(Serializable so) { if (so instanceof String) { return ("S " + so).getBytes(UTF_8); } else { @@ -428,7 +439,7 @@ protected byte[] serializeTxInfo(Serializable so) { } } - protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) { + protected static Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) { if (data[0] == 'O') { byte[] sera = new byte[data.length - 2]; System.arraycopy(data, 2, sera, 0, sera.length); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/StackOverflowException.java b/core/src/main/java/org/apache/accumulo/core/fate/StackOverflowException.java index 80956b71c7d..e332024d606 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/StackOverflowException.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/StackOverflowException.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.core.fate; -public class StackOverflowException extends Exception { +public class StackOverflowException extends RuntimeException { public StackOverflowException(String msg) { super(msg); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index c134db18405..195848e276b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -71,9 +71,8 @@ public class UserFateStore extends AbstractFateStore { private final String tableName; private static final FateInstanceType fateInstanceType = FateInstanceType.USER; - private static final int maxRepos = 100; private static final com.google.common.collect.Range REPO_RANGE = - com.google.common.collect.Range.closed(1, maxRepos); + com.google.common.collect.Range.closed(1, MAX_REPOS); public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID lockID, Predicate isLockHeld) { @@ -457,12 +456,12 @@ public void push(Repo repo) throws StackOverflowException { Optional top = findTop(); - if (top.filter(t -> t >= maxRepos).isPresent()) { + if (top.filter(t -> t >= MAX_REPOS).isPresent()) { throw new StackOverflowException("Repo stack size too large"); } FateMutator fateMutator = - newMutator(fateId).requireStatus(TStatus.IN_PROGRESS, TStatus.NEW); + newMutator(fateId).requireStatus(REQ_PUSH_STATUS.toArray(TStatus[]::new)); fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate(); } @@ -471,8 +470,8 @@ public void pop() { verifyReservedAndNotDeleted(true); Optional top = findTop(); - top.ifPresent(t -> newMutator(fateId) - .requireStatus(TStatus.FAILED_IN_PROGRESS, TStatus.SUCCESSFUL).deleteRepo(t).mutate()); + top.ifPresent(t -> newMutator(fateId).requireStatus(REQ_POP_STATUS.toArray(TStatus[]::new)) + .deleteRepo(t).mutate()); } @Override @@ -497,7 +496,7 @@ public void delete() { verifyReservedAndNotDeleted(true); var mutator = newMutator(fateId); - mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED); + mutator.requireStatus(REQ_DELETE_STATUS.toArray(TStatus[]::new)); mutator.delete().mutate(); this.deleted = true; } @@ -507,9 +506,7 @@ public void forceDelete() { verifyReservedAndNotDeleted(true); var mutator = newMutator(fateId); - // allow deletion of all txns other than UNKNOWN - mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED, - TStatus.FAILED_IN_PROGRESS, TStatus.IN_PROGRESS); + mutator.requireStatus(REQ_FORCE_DELETE_STATUS.toArray(TStatus[]::new)); mutator.delete().mutate(); this.deleted = true; } @@ -537,14 +534,14 @@ protected void unreserve() { static Text invertRepo(int position) { Preconditions.checkArgument(REPO_RANGE.contains(position), - "Position %s is not in the valid range of [0,%s]", position, maxRepos); - return new Text(String.format("%02d", maxRepos - position)); + "Position %s is not in the valid range of [0,%s]", position, MAX_REPOS); + return new Text(String.format("%02d", MAX_REPOS - position)); } static Integer restoreRepo(Text invertedPosition) { - int position = maxRepos - Integer.parseInt(invertedPosition.toString()); + int position = MAX_REPOS - Integer.parseInt(invertedPosition.toString()); Preconditions.checkArgument(REPO_RANGE.contains(position), - "Position %s is not in the valid range of [0,%s]", position, maxRepos); + "Position %s is not in the valid range of [0,%s]", position, MAX_REPOS); return position; } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java index 28c0904ffa1..4a691417c64 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java @@ -29,16 +29,20 @@ import java.io.Serializable; import java.io.UncheckedIOException; import java.time.Duration; +import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Collections; +import java.util.Deque; +import java.util.EnumMap; import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.stream.Stream; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; @@ -52,7 +56,6 @@ import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.StackOverflowException; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.hadoop.io.DataInputBuffer; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -104,7 +107,9 @@ public FateId create() { while (true) { try { FateId fateId = fateIdGenerator.newRandomId(fateInstanceType); - zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, null).serialize(), + zk.putPersistentData(getTXPath(fateId), + new FateData(TStatus.NEW, null, null, createEmptyRepoDeque(), createEmptyTxInfo()) + .serialize(), NodeExistsPolicy.FAIL); return fateId; } catch (NodeExistsException nee) { @@ -120,40 +125,43 @@ private Optional> createAndReserve(FateKey fateKey) { final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); try { - byte[] nodeVal = zk.mutateOrCreate(getTXPath(fateId), - new NodeValue(TStatus.NEW, reservation, fateKey).serialize(), currSerNodeVal -> { - // We are only returning a non-null value for the following cases: - // 1) The existing NodeValue for fateId is exactly the same as the value set for the - // node if it doesn't yet exist: - // TStatus = TStatus.NEW, FateReservation = reservation, FateKey = fateKey - // This might occur if there was a ZK server fault and the same write is running a 2nd - // time - // 2) The existing NodeValue for fateId has: - // TStatus = TStatus.NEW, no FateReservation present, FateKey = fateKey - // The fateId is NEW/unseeded and not reserved, so we can allow it to be reserved - // Note: returning null here will not change the value to null but will return null - NodeValue currNodeVal = new NodeValue(currSerNodeVal); - if (currNodeVal.status == TStatus.NEW) { - verifyFateKey(fateId, currNodeVal.fateKey, fateKey); - if (currNodeVal.isReservedBy(reservation)) { - return currSerNodeVal; - } else if (!currNodeVal.isReserved()) { - // NEW/unseeded transaction and not reserved, so we can allow it to be reserved - return new NodeValue(TStatus.NEW, reservation, fateKey).serialize(); - } else { - // NEW/unseeded transaction reserved under a different reservation - return null; - } - } else { - log.trace( - "fate id {} tstatus {} fate key {} is reserved {} " - + "has already been seeded with work (non-NEW status)", - fateId, currNodeVal.status, currNodeVal.fateKey.orElse(null), - currNodeVal.isReserved()); - return null; - } - }); - if (nodeVal != null) { + byte[] newSerFateData = + zk.mutateOrCreate(getTXPath(fateId), new FateData<>(TStatus.NEW, reservation, fateKey, + createEmptyRepoDeque(), createEmptyTxInfo()).serialize(), currSerFateData -> { + // We are only returning a non-null value for the following cases: + // 1) The existing node for fateId is exactly the same as the value set for the + // node if it doesn't yet exist: + // TStatus = TStatus.NEW, FateReservation = reservation, FateKey = fateKey + // This might occur if there was a ZK server fault and the same write is running a + // 2nd + // time + // 2) The existing node for fateId has: + // TStatus = TStatus.NEW, no FateReservation present, FateKey = fateKey + // The fateId is NEW/unseeded and not reserved, so we can allow it to be reserved + FateData currFateData = new FateData<>(currSerFateData); + if (currFateData.status == TStatus.NEW) { + verifyFateKey(fateId, currFateData.fateKey, fateKey); + if (currFateData.isReservedBy(reservation)) { + return currSerFateData; + } else if (!currFateData.isReserved()) { + // NEW/unseeded transaction and not reserved, so we can allow it to be reserved + return new FateData<>(TStatus.NEW, reservation, fateKey, createEmptyRepoDeque(), + createEmptyTxInfo()).serialize(); + } else { + // NEW/unseeded transaction reserved under a different reservation + // This will not change the value and will return null + return null; + } + } else { + log.trace( + "fate id {} tstatus {} fate key {} is reserved {} " + + "has already been seeded with work (non-NEW status)", + fateId, currFateData.status, currFateData.fateKey.orElse(null), + currFateData.isReserved()); + return null; + } + }); + if (newSerFateData != null) { return Optional.of(new FateTxStoreImpl(fateId, reservation)); } else { return Optional.empty(); @@ -214,32 +222,35 @@ public Optional> tryReserve(FateId fateId) { // uniquely identify this attempt to reserve the fate operation data FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); - try { - byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> { - NodeValue currNodeVal = new NodeValue(currSerNodeVal); - // The uuid handles the case where there was a ZK server fault and the write for this thread - // went through but that was not acknowledged, and we are reading our own write for 2nd - // time. - if (!currNodeVal.isReserved() || currNodeVal.isReservedBy(reservation)) { - FateKey currFateKey = currNodeVal.fateKey.orElse(null); - // Add the FateReservation to the node to reserve - return new NodeValue(currNodeVal.status, reservation, currFateKey).serialize(); - } else { - // This will not change the value to null but will return null - return null; - } - }); - if (newSerNodeVal != null) { - return Optional.of(new FateTxStoreImpl(fateId, reservation)); + UnaryOperator> fateDataOp = currFateData -> { + // The uuid handles the case where there was a ZK server fault and the write for this thread + // went through but that was not acknowledged, and we are reading our own write for 2nd + // time. + if (!currFateData.isReserved() || currFateData.isReservedBy(reservation)) { + // Add the FateReservation to the node to reserve + return new FateData<>(currFateData.status, reservation, currFateData.fateKey.orElse(null), + currFateData.repoDeque, currFateData.txInfo); } else { - return Optional.empty(); + // This will not change the value and will return null + return null; } - } catch (KeeperException.NoNodeException e) { + }; + + byte[] newSerFateData; + try { + newSerFateData = mutate(fateId, fateDataOp); + } catch (KeeperException.NoNodeException nne) { log.trace("Tried to reserve a transaction {} that does not exist", fateId); return Optional.empty(); - } catch (InterruptedException | KeeperException | AcceptableThriftTableOperationException e) { + } catch (KeeperException e) { throw new IllegalStateException(e); } + + if (newSerFateData != null) { + return Optional.of(new FateTxStoreImpl(fateId, reservation)); + } else { + return Optional.empty(); + } } @Override @@ -250,25 +261,27 @@ public void deleteDeadReservations() { if (isLockHeld.test(reservation.getLockID())) { continue; } + + UnaryOperator> fateDataOp = currFateData -> { + // Make sure the current node is still reserved and reserved with the expected reservation + // and it is dead + if (currFateData.isReservedBy(reservation) + && !isLockHeld.test(currFateData.reservation.orElseThrow().getLockID())) { + // Delete the reservation + log.trace("Deleted the dead reservation {} for fate id {}", reservation, fateId); + return new FateData<>(currFateData.status, null, currFateData.fateKey.orElse(null), + currFateData.repoDeque, currFateData.txInfo); + } else { + // This will not change the value and will return null + return null; + } + }; + try { - zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> { - NodeValue currNodeVal = new NodeValue(currSerNodeVal); - // Make sure the current node is still reserved and reserved with the expected reservation - // and it is dead - if (currNodeVal.isReservedBy(reservation) - && !isLockHeld.test(currNodeVal.reservation.orElseThrow().getLockID())) { - // Delete the reservation - log.trace("Deleted the dead reservation {} for fate id {}", reservation, fateId); - return new NodeValue(currNodeVal.status, null, currNodeVal.fateKey.orElse(null)) - .serialize(); - } else { - // No change - return null; - } - }); - } catch (KeeperException.NoNodeException e) { + mutate(fateId, fateDataOp); + } catch (KeeperException.NoNodeException nne) { // the node has since been deleted. Can safely ignore - } catch (KeeperException | InterruptedException | AcceptableThriftTableOperationException e) { + } catch (KeeperException e) { throw new RuntimeException(e); } } @@ -294,70 +307,46 @@ private FateTxStoreImpl(FateId fateId, FateReservation reservation) { @Override public Repo top() { verifyReservedAndNotDeleted(false); + String txpath = getTXPath(fateId); for (int i = 0; i < RETRIES; i++) { - String txpath = getTXPath(fateId); - try { - String top = findTop(txpath); - if (top == null) { - return null; - } + FateData fateData = getFateData(fateId); - byte[] ser = zk.getData(txpath + "/" + top); - @SuppressWarnings("unchecked") - var deserialized = (Repo) deserialize(ser); - return deserialized; - } catch (KeeperException.NoNodeException ex) { - log.debug("zookeeper error reading " + txpath + ": " + ex, ex); + if (fateData.status == TStatus.UNKNOWN) { + log.debug("zookeeper error reading fate data for {} at {}", fateId, txpath); sleepUninterruptibly(100, MILLISECONDS); continue; - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); } - } - return null; - } - private String findTop(String txpath) throws KeeperException, InterruptedException { - List ops; - try { - ops = zk.getChildren(txpath); - } catch (NoNodeException e) { - return null; - } - - ops = new ArrayList<>(ops); - - String max = ""; - - for (String child : ops) { - if (child.startsWith("repo_") && child.compareTo(max) > 0) { - max = child; + var repoDeque = fateData.repoDeque; + if (repoDeque.isEmpty()) { + return null; + } else { + return repoDeque.peek(); } } - - if (max.isEmpty()) { - return null; - } - - return max; + return null; } @Override public void push(Repo repo) throws StackOverflowException { verifyReservedAndNotDeleted(true); - String txpath = getTXPath(fateId); - try { - String top = findTop(txpath); - if (top != null && Long.parseLong(top.split("_")[1]) > 100) { + UnaryOperator> fateDataOp = currFateData -> { + Preconditions.checkState(REQ_PUSH_STATUS.contains(currFateData.status), + "Tried to push to the repo stack for %s when the transaction status is %s", fateId, + currFateData.status); + var repoDeque = currFateData.repoDeque; + if (repoDeque.size() >= MAX_REPOS) { throw new StackOverflowException("Repo stack size too large"); } + repoDeque.push(repo); + return currFateData; + }; - zk.putPersistentSequential(txpath + "/repo_", serialize(repo)); - } catch (StackOverflowException soe) { - throw soe; - } catch (KeeperException | InterruptedException e) { + try { + mutate(fateId, fateDataOp); + } catch (KeeperException e) { throw new IllegalStateException(e); } } @@ -366,14 +355,24 @@ public void push(Repo repo) throws StackOverflowException { public void pop() { verifyReservedAndNotDeleted(true); - try { - String txpath = getTXPath(fateId); - String top = findTop(txpath); - if (top == null) { + UnaryOperator> fateDataOp = currFateData -> { + Preconditions.checkState(REQ_POP_STATUS.contains(currFateData.status), + "Tried to pop from the repo stack for %s when the transaction status is %s", fateId, + currFateData.status); + var repoDeque = currFateData.repoDeque; + + if (repoDeque.isEmpty()) { throw new IllegalStateException("Tried to pop when empty " + fateId); + } else { + repoDeque.pop(); } - zk.recursiveDelete(txpath + "/" + top, NodeMissingPolicy.SKIP); - } catch (KeeperException | InterruptedException e) { + + return currFateData; + }; + + try { + mutate(fateId, fateDataOp); + } catch (KeeperException e) { throw new IllegalStateException(e); } } @@ -382,23 +381,22 @@ public void pop() { public void setStatus(TStatus status) { verifyReservedAndNotDeleted(true); + UnaryOperator> fateDataOp = currFateData -> { + // Ensure the FateId is reserved in ZK, and it is reserved with the expected reservation + if (currFateData.isReservedBy(this.reservation)) { + return new FateData<>(status, currFateData.reservation.orElseThrow(), + currFateData.fateKey.orElse(null), currFateData.repoDeque, currFateData.txInfo); + } else { + throw new IllegalStateException("Either the FateId " + fateId + + " is not reserved in ZK, or it is but the reservation in ZK: " + + currFateData.reservation.orElse(null) + " differs from that in the store: " + + this.reservation); + } + }; + try { - zk.mutateExisting(getTXPath(fateId), currSerializedData -> { - NodeValue currNodeVal = new NodeValue(currSerializedData); - // Ensure the FateId is reserved in ZK, and it is reserved with the expected reservation - if (currNodeVal.isReservedBy(this.reservation)) { - FateReservation currFateReservation = currNodeVal.reservation.orElseThrow(); - FateKey currFateKey = currNodeVal.fateKey.orElse(null); - NodeValue newNodeValue = new NodeValue(status, currFateReservation, currFateKey); - return newNodeValue.serialize(); - } else { - throw new IllegalStateException("Either the FateId " + fateId - + " is not reserved in ZK, or it is but the reservation in ZK: " - + currNodeVal.reservation.orElse(null) + " differs from that in the store: " - + this.reservation); - } - }); - } catch (KeeperException | InterruptedException | AcceptableThriftTableOperationException e) { + mutate(fateId, fateDataOp); + } catch (KeeperException e) { throw new IllegalStateException(e); } @@ -407,30 +405,51 @@ public void setStatus(TStatus status) { @Override public void delete() { - verifyReservedAndNotDeleted(true); - - try { - zk.recursiveDelete(getTXPath(fateId), NodeMissingPolicy.SKIP); - this.deleted = true; - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } + _delete(REQ_DELETE_STATUS); } @Override public void forceDelete() { - delete(); + _delete(REQ_FORCE_DELETE_STATUS); + } + + private void _delete(Set requiredStatus) { + verifyReservedAndNotDeleted(true); + + // atomically check the txn status and delete the node + // retry until we either atomically delete the node or the txn status is disallowed + while (!this.deleted) { + Stat stat = new Stat(); + FateData fateData = getFateData(fateId, stat); + Preconditions.checkState(requiredStatus.contains(fateData.status), + "Tried to delete fate data for %s when the transaction status is %s", fateId, + fateData.status); + try { + zk.deleteStrict(getTXPath(fateId), stat.getVersion()); + this.deleted = true; + } catch (KeeperException.BadVersionException e) { + log.trace( + "Deletion of ZK node fate data for {} was not able to be completed atomically... Retrying", + fateId); + } catch (InterruptedException | KeeperException e) { + throw new IllegalStateException(e); + } + } } @Override public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) { verifyReservedAndNotDeleted(true); + UnaryOperator> fateDataOp = currFateData -> { + currFateData.txInfo.put(txInfo, so); + return currFateData; + }; + try { - zk.putPersistentData(getTXPath(fateId) + "/" + txInfo, serializeTxInfo(so), - NodeExistsPolicy.OVERWRITE); - } catch (KeeperException | InterruptedException e2) { - throw new IllegalStateException(e2); + mutate(fateId, fateDataOp); + } catch (KeeperException e) { + throw new IllegalStateException(e); } } @@ -456,100 +475,74 @@ public long timeCreated() { @Override public List> getStack() { verifyReservedAndNotDeleted(false); - String txpath = getTXPath(fateId); - - outer: while (true) { - List ops; - try { - ops = zk.getChildren(txpath); - } catch (KeeperException.NoNodeException e) { - return Collections.emptyList(); - } catch (KeeperException | InterruptedException e1) { - throw new IllegalStateException(e1); - } - - ops = new ArrayList<>(ops); - ops.sort(Collections.reverseOrder()); - - ArrayList> dops = new ArrayList<>(); - - for (String child : ops) { - if (child.startsWith("repo_")) { - byte[] ser; - try { - ser = zk.getData(txpath + "/" + child); - @SuppressWarnings("unchecked") - var repo = (ReadOnlyRepo) deserialize(ser); - dops.add(repo); - } catch (KeeperException.NoNodeException e) { - // children changed so start over - continue outer; - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } - } - } - return dops; - } + FateData fateData = getFateData(fateId); + return new ArrayList<>(fateData.repoDeque); } @Override protected void unreserve() { - try { - if (!this.deleted) { - zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> { - NodeValue currNodeVal = new NodeValue(currSerNodeVal); - FateKey currFateKey = currNodeVal.fateKey.orElse(null); - if (currNodeVal.isReservedBy(this.reservation)) { - // Remove the FateReservation from the NodeValue to unreserve - return new NodeValue(currNodeVal.status, null, currFateKey).serialize(); - } else { - // possible this is running a 2nd time in zk server fault conditions and its first - // write went through - if (!currNodeVal.isReserved()) { - log.trace("The FATE reservation for fate id {} does not exist in ZK", fateId); - } else if (!currNodeVal.reservation.orElseThrow().equals(this.reservation)) { - log.debug( - "The FATE reservation for fate id {} in ZK differs from that in the store", - fateId); - } - return null; - } - }); + UnaryOperator> fateDataOp = currFateData -> { + if (currFateData.isReservedBy(this.reservation)) { + // Remove the FateReservation from the node to unreserve + return new FateData<>(currFateData.status, null, currFateData.fateKey.orElse(null), + currFateData.repoDeque, currFateData.txInfo); + } else { + // possible this is running a 2nd time in zk server fault conditions and its first + // write went through + if (!currFateData.isReserved()) { + log.trace("The FATE reservation for fate id {} does not exist in ZK", fateId); + } else if (!currFateData.reservation.orElseThrow().equals(this.reservation)) { + log.debug("The FATE reservation for fate id {} in ZK differs from that in the store", + fateId); + } + // This will not change the value and will return null + return null; + } + }; + + if (!this.deleted) { + try { + mutate(fateId, fateDataOp); + } catch (KeeperException e) { + throw new IllegalStateException(e); } - this.reservation = null; - } catch (InterruptedException | KeeperException | AcceptableThriftTableOperationException e) { - throw new IllegalStateException(e); } + this.reservation = null; } } private Serializable getTransactionInfo(TxInfo txInfo, FateId fateId) { - try { - return deserializeTxInfo(txInfo, zk.getData(getTXPath(fateId) + "/" + txInfo)); - } catch (NoNodeException nne) { - return null; - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } + return getFateData(fateId).txInfo.get(txInfo); } @Override protected TStatus _getStatus(FateId fateId) { - return getNode(fateId).status; + return getFateData(fateId).status; } @Override protected Optional getKey(FateId fateId) { - return getNode(fateId).fateKey; + return getFateData(fateId).fateKey; } - private NodeValue getNode(FateId fateId) { + private FateData getFateData(FateId fateId) { try { - return new NodeValue(zk.getData(getTXPath(fateId))); + return new FateData<>(zk.getData(getTXPath(fateId))); } catch (NoNodeException nne) { - return new NodeValue(TStatus.UNKNOWN, null); + return new FateData<>(TStatus.UNKNOWN, null, null, createEmptyRepoDeque(), + createEmptyTxInfo()); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + + private FateData getFateData(FateId fateId, Stat stat) { + try { + return new FateData<>(zk.getData(getTXPath(fateId), stat)); + } catch (NoNodeException nne) { + return new FateData<>(TStatus.UNKNOWN, null, null, createEmptyRepoDeque(), + createEmptyTxInfo()); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } @@ -569,7 +562,7 @@ protected Stream getTransactions(EnumSet statuses) { // Memoizing for two reasons. First the status or reservation may never be requested, so // in that case avoid the lookup. Second, if it's requested multiple times the result will // always be consistent. - Supplier nodeSupplier = Suppliers.memoize(() -> getNode(fateId)); + Supplier> nodeSupplier = Suppliers.memoize(() -> getFateData(fateId)); return new FateIdStatusBase(fateId) { @Override public TStatus getStatus() { @@ -599,30 +592,72 @@ public Stream list(FateKey.FateKeyType type) { .filter(fateKey -> fateKey.getType() == type); } - protected static class NodeValue { + private Deque> createEmptyRepoDeque() { + return new ArrayDeque<>(); + } + + private Map createEmptyTxInfo() { + return new EnumMap<>(TxInfo.class); + } + + /** + * Mutate the existing FateData for the given fateId using the given operator. + * + * @param fateId the fateId for the FateData to change + * @param fateDataOp the operation to apply to the existing FateData. Op should return null if no + * change is desired. Otherwise, should return the new FateData with the desired changes + * @return the resulting serialized FateData or null if the op resulted in no change + */ + private byte[] mutate(FateId fateId, UnaryOperator> fateDataOp) + throws KeeperException { + try { + return zk.mutateExisting(getTXPath(fateId), currSerFateData -> { + FateData currFateData = new FateData<>(currSerFateData); + FateData newFateData = fateDataOp.apply(currFateData); + if (newFateData == null) { + // This will not change the value and will return null + return null; + } else { + return newFateData.serialize(); + } + }); + } catch (InterruptedException | AcceptableThriftTableOperationException e) { + throw new IllegalStateException(e); + } + } + + protected static class FateData { final TStatus status; final Optional fateKey; final Optional reservation; - - private NodeValue(byte[] serializedData) { + final Deque> repoDeque; + final Map txInfo; + + /** + * Construct a FateData from a previously {@link #serialize()}ed FateData + * + * @param serializedData the serialized data + */ + private FateData(byte[] serializedData) { try (DataInputBuffer buffer = new DataInputBuffer()) { buffer.reset(serializedData, serializedData.length); this.status = TStatus.valueOf(buffer.readUTF()); this.reservation = deserializeFateReservation(buffer); this.fateKey = deserializeFateKey(buffer); + this.repoDeque = deserializeRepoDeque(buffer); + this.txInfo = deserializeTxInfo(buffer); } catch (IOException e) { throw new UncheckedIOException(e); } } - private NodeValue(TStatus status, FateReservation reservation) { - this(status, reservation, null); - } - - private NodeValue(TStatus status, FateReservation reservation, FateKey fateKey) { + private FateData(TStatus status, FateReservation reservation, FateKey fateKey, + Deque> repoDeque, Map txInfo) { this.status = Objects.requireNonNull(status); this.reservation = Optional.ofNullable(reservation); this.fateKey = Optional.ofNullable(fateKey); + this.repoDeque = Objects.requireNonNull(repoDeque); + this.txInfo = Objects.requireNonNull(txInfo); } private Optional deserializeFateKey(DataInputBuffer buffer) throws IOException { @@ -644,10 +679,46 @@ private Optional deserializeFateReservation(DataInputBuffer buf return Optional.empty(); } - byte[] serialize() { + private Deque> deserializeRepoDeque(DataInputBuffer buffer) throws IOException { + Deque> deque = new ArrayDeque<>(); + int numRepos = buffer.readInt(); + + for (int i = 0; i < numRepos; i++) { + int length = buffer.readInt(); + Preconditions.checkArgument(length > 0); + @SuppressWarnings("unchecked") + var repo = (Repo) deserialize(buffer.readNBytes(length)); + deque.add(repo); + } + + return deque; + } + + private Map deserializeTxInfo(DataInputBuffer buffer) throws IOException { + Map txInfo = new EnumMap<>(TxInfo.class); + int length = buffer.readInt(); + + while (length != 0) { + Preconditions.checkArgument(length >= 0); + TxInfo type = TxInfo.values()[buffer.readInt()]; + txInfo.put(type, AbstractFateStore.deserializeTxInfo(type, buffer.readNBytes(length - 1))); + + // if we have reached the end of the buffer (= reached the end of the tx info data) + if (buffer.getPosition() == buffer.getLength()) { + break; + } + length = buffer.readInt(); + } + + return txInfo; + } + + private byte[] serialize() { try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos)) { + // status dos.writeUTF(status.name()); + // reservation if (isReserved()) { byte[] serializedFateReservation = reservation.orElseThrow().getSerialized(); dos.writeInt(serializedFateReservation.length); @@ -655,6 +726,7 @@ byte[] serialize() { } else { dos.writeInt(0); } + // fate key if (fateKey.isPresent()) { byte[] serializedFateKey = fateKey.orElseThrow().getSerialized(); dos.writeInt(serializedFateKey.length); @@ -662,6 +734,27 @@ byte[] serialize() { } else { dos.writeInt(0); } + // repo deque + byte[] serializedRepo; + dos.writeInt(repoDeque.size()); + // iterates from top/first/head to bottom/last/tail + for (Repo repo : repoDeque) { + serializedRepo = AbstractFateStore.serialize(repo); + dos.writeInt(serializedRepo.length); + dos.write(serializedRepo); + } + // tx info + if (!txInfo.isEmpty()) { + for (var elt : txInfo.entrySet()) { + byte[] serTxInfo = serializeTxInfo(elt.getValue()); + dos.writeInt(1 + serTxInfo.length); + dos.writeInt(elt.getKey().ordinal()); + dos.write(serTxInfo); + } + } else { + dos.writeInt(0); + } + // done dos.close(); return baos.toByteArray(); } catch (IOException e) { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStatusEnforcementIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateStatusEnforcementIT.java new file mode 100644 index 00000000000..5ca1a08b8dd --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStatusEnforcementIT.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Set; + +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; + +public abstract class FateStatusEnforcementIT extends SharedMiniClusterBase { + + protected FateId fateId; + protected FateStore store; + protected FateStore.FateTxStore txStore; + + @Test + public void push() throws Exception { + testOperationWithStatuses(() -> {}, // No special setup needed for push + () -> txStore.push(new FateIT.TestRepo("testOp")), AbstractFateStore.REQ_PUSH_STATUS); + } + + @Test + public void pop() throws Exception { + testOperationWithStatuses(() -> { + // Setup for pop: Ensure there something to pop by first pushing + try { + txStore.setStatus(ReadOnlyFateStore.TStatus.NEW); + txStore.push(new FateIT.TestRepo("testOp")); + } catch (Exception e) { + throw new RuntimeException("Failed to setup for pop", e); + } + }, txStore::pop, AbstractFateStore.REQ_POP_STATUS); + } + + @Test + public void delete() throws Exception { + testOperationWithStatuses(() -> { + // Setup for delete: Create a new txStore before each delete since delete cannot be called + // on the same txStore more than once + fateId = store.create(); + txStore = store.reserve(fateId); + }, () -> txStore.delete(), AbstractFateStore.REQ_DELETE_STATUS); + } + + @Test + public void forceDelete() throws Exception { + testOperationWithStatuses(() -> { + // Setup for forceDelete: same as delete + fateId = store.create(); + txStore = store.reserve(fateId); + }, () -> txStore.forceDelete(), AbstractFateStore.REQ_FORCE_DELETE_STATUS); + } + + protected void testOperationWithStatuses(Runnable beforeOperation, Executable operation, + Set acceptableStatuses) throws Exception { + for (ReadOnlyFateStore.TStatus status : ReadOnlyFateStore.TStatus.values()) { + // Run any needed setup for the operation before each iteration + beforeOperation.run(); + + txStore.setStatus(status); + var fateIdStatus = store.list().filter(statusEntry -> statusEntry.getFateId().equals(fateId)) + .findFirst().orElseThrow(); + assertEquals(status, fateIdStatus.getStatus()); + if (!acceptableStatuses.contains(status)) { + assertThrows(IllegalStateException.class, operation, + "Expected operation to fail with status " + status + " but it did not"); + } else { + assertDoesNotThrow(operation, + "Expected operation to succeed with status " + status + " but it did not"); + } + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java new file mode 100644 index 00000000000..856fd11f26a --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.File; +import java.util.UUID; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.TabletInformation; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.metadata.AccumuloTable; +import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; +import org.apache.zookeeper.ZooKeeper; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.io.TempDir; + +import com.google.common.collect.MoreCollectors; + +/** + * A class with utility methods for testing UserFateStore and MetaFateStore + */ +public class FateStoreUtil { + /** + * Create the fate table with the exact configuration as the real Fate user instance table + * including table properties and TabletAvailability. For use in testing UserFateStore + */ + public static void createFateTable(ClientContext client, String table) throws Exception { + final var fateTableProps = + client.tableOperations().getTableProperties(AccumuloTable.FATE.tableName()); + + TabletAvailability availability; + try (var tabletStream = client.tableOperations() + .getTabletInformation(AccumuloTable.FATE.tableName(), new Range())) { + availability = tabletStream.map(TabletInformation::getTabletAvailability).distinct() + .collect(MoreCollectors.onlyElement()); + } + + var newTableConf = new NewTableConfiguration().withInitialTabletAvailability(availability) + .withoutDefaultIterators().setProperties(fateTableProps); + client.tableOperations().create(table, newTableConf); + var testFateTableProps = client.tableOperations().getTableProperties(table); + + // ensure that create did not set any other props + assertEquals(fateTableProps, testFateTableProps); + } + + /** + * Contains the necessary utilities for setting up (and shutting down) a ZooKeeper instance for + * use in testing MetaFateStore + */ + @Tag(ZOOKEEPER_TESTING_SERVER) + public static class MetaFateZKSetup { + private static ZooKeeperTestingServer szk; + private static ZooReaderWriter zk; + private static final String ZK_ROOT = "/accumulo/" + UUID.randomUUID(); + private static String ZK_FATE_PATH; + + /** + * Sets up the ZooKeeper instance and creates the paths needed for testing MetaFateStore + */ + public static void setup(@TempDir File tempDir) throws Exception { + szk = new ZooKeeperTestingServer(tempDir); + zk = szk.getZooReaderWriter(); + ZK_FATE_PATH = ZK_ROOT + Constants.ZFATE; + zk.mkdirs(ZK_FATE_PATH); + zk.mkdirs(ZK_ROOT + Constants.ZTABLE_LOCKS); + } + + /** + * Tears down the ZooKeeper instance + */ + public static void teardown() throws Exception { + szk.close(); + } + + public static String getZkRoot() { + return ZK_ROOT; + } + + public static ZooReaderWriter getZooReaderWriter() { + return zk; + } + + public static String getZkFatePath() { + return ZK_FATE_PATH; + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java index a96dc11e71b..ded495b9581 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java @@ -19,7 +19,6 @@ package org.apache.accumulo.test.fate.meta; import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; -import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; @@ -27,59 +26,48 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.UUID; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateIT; -import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; +import org.apache.accumulo.test.fate.FateStoreUtil; import org.apache.hadoop.io.DataInputBuffer; import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.io.TempDir; -@Tag(ZOOKEEPER_TESTING_SERVER) public class MetaFateIT extends FateIT { - - private static ZooKeeperTestingServer szk = null; - private static ZooReaderWriter zk = null; - private static final InstanceId IID = InstanceId.of(UUID.randomUUID()); - private static final String ZK_ROOT = ZooUtil.getRoot(IID); - @TempDir private static File tempDir; @BeforeAll public static void setup() throws Exception { - szk = new ZooKeeperTestingServer(tempDir); - zk = szk.getZooReaderWriter(); - zk.mkdirs(ZK_ROOT + Constants.ZFATE); - zk.mkdirs(ZK_ROOT + Constants.ZTABLE_LOCKS); + FateStoreUtil.MetaFateZKSetup.setup(tempDir); } @AfterAll public static void teardown() throws Exception { - szk.close(); + FateStoreUtil.MetaFateZKSetup.teardown(); } @Override public void executeTest(FateTestExecutor testMethod, int maxDeferred, FateIdGenerator fateIdGenerator) throws Exception { + String zkRoot = FateStoreUtil.MetaFateZKSetup.getZkRoot(); + var zooReaderWriter = FateStoreUtil.MetaFateZKSetup.getZooReaderWriter(); + String fatePath = FateStoreUtil.MetaFateZKSetup.getZkFatePath(); ServerContext sctx = createMock(ServerContext.class); - expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); - expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); + expect(sctx.getZooKeeperRoot()).andReturn(zkRoot).anyTimes(); + expect(sctx.getZooReaderWriter()).andReturn(zooReaderWriter).anyTimes(); replay(sctx); - testMethod.execute(new MetaFateStore<>(ZK_ROOT + Constants.ZFATE, zk, createDummyLockID(), null, + testMethod.execute(new MetaFateStore<>(fatePath, zooReaderWriter, createDummyLockID(), null, maxDeferred, fateIdGenerator), sctx); } @@ -98,8 +86,9 @@ protected TStatus getTxStatus(ServerContext sctx, FateId fateId) { */ private static TStatus getTxStatus(ZooReaderWriter zrw, FateId fateId) throws KeeperException, InterruptedException { - zrw.sync(ZK_ROOT); - String txdir = String.format("%s%s/tx_%s", ZK_ROOT, Constants.ZFATE, fateId.getTxUUIDStr()); + String zkRoot = FateStoreUtil.MetaFateZKSetup.getZkRoot(); + zrw.sync(zkRoot); + String txdir = String.format("%s%s/tx_%s", zkRoot, Constants.ZFATE, fateId.getTxUUIDStr()); try (DataInputBuffer buffer = new DataInputBuffer()) { var serialized = zrw.getData(txdir); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java new file mode 100644 index 00000000000..4800e6816e6 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.meta; + +import java.io.File; + +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; +import org.apache.accumulo.test.fate.FateStatusEnforcementIT; +import org.apache.accumulo.test.fate.FateStoreUtil; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +public class MetaFateStatusEnforcementIT extends FateStatusEnforcementIT { + @TempDir + private static File tempDir; + + @BeforeAll + public static void beforeAllSetup() throws Exception { + FateStoreUtil.MetaFateZKSetup.setup(tempDir); + } + + @AfterAll + public static void afterAllTeardown() throws Exception { + FateStoreUtil.MetaFateZKSetup.teardown(); + } + + @BeforeEach + public void beforeEachSetup() throws Exception { + store = new MetaFateStore<>(FateStoreUtil.MetaFateZKSetup.getZkFatePath(), + FateStoreUtil.MetaFateZKSetup.getZooReaderWriter(), AbstractFateStore.createDummyLockID(), + null); + fateId = store.create(); + txStore = store.reserve(fateId); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java index fb9e9c7d759..66d01b1489a 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java @@ -26,25 +26,26 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.io.File; +import java.io.Serializable; import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.util.Deque; +import java.util.Map; import java.util.Optional; -import java.util.UUID; -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; +import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; -import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateStoreIT; -import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; +import org.apache.accumulo.test.fate.FateStoreUtil; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; @@ -52,37 +53,31 @@ @Tag(ZOOKEEPER_TESTING_SERVER) public class MetaFateStoreFateIT extends FateStoreIT { - - private static ZooKeeperTestingServer szk = null; - private static ZooReaderWriter zk = null; - private static final InstanceId IID = InstanceId.of(UUID.randomUUID()); - private static final String ZK_ROOT = ZooUtil.getRoot(IID); - @TempDir private static File tempDir; @BeforeAll public static void setup() throws Exception { - szk = new ZooKeeperTestingServer(tempDir); - zk = szk.getZooReaderWriter(); - zk.mkdirs(ZK_ROOT + Constants.ZFATE); - zk.mkdirs(ZK_ROOT + Constants.ZTABLE_LOCKS); + FateStoreUtil.MetaFateZKSetup.setup(tempDir); } @AfterAll public static void teardown() throws Exception { - szk.close(); + FateStoreUtil.MetaFateZKSetup.teardown(); } @Override public void executeTest(FateTestExecutor testMethod, int maxDeferred, FateIdGenerator fateIdGenerator) throws Exception { ServerContext sctx = createMock(ServerContext.class); - expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); - expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); + expect(sctx.getZooKeeperRoot()).andReturn(FateStoreUtil.MetaFateZKSetup.getZkRoot()).anyTimes(); + expect(sctx.getZooReaderWriter()).andReturn(FateStoreUtil.MetaFateZKSetup.getZooReaderWriter()) + .anyTimes(); replay(sctx); - MetaFateStore store = new MetaFateStore<>(ZK_ROOT + Constants.ZFATE, zk, - createDummyLockID(), null, maxDeferred, fateIdGenerator); + MetaFateStore store = + new MetaFateStore<>(FateStoreUtil.MetaFateZKSetup.getZkFatePath(), + FateStoreUtil.MetaFateZKSetup.getZooReaderWriter(), createDummyLockID(), null, + maxDeferred, fateIdGenerator); // Check that the store has no transactions before and after each test assertEquals(0, store.list().count()); @@ -93,38 +88,49 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred, @Override protected void deleteKey(FateId fateId, ServerContext sctx) { try { - // We have to use reflection since the NodeValue is internal to the store - - // Grab both the constructors that use the serialized bytes and status, reservation - Class nodeClass = Class.forName(MetaFateStore.class.getName() + "$NodeValue"); - Constructor statusReservationCons = - nodeClass.getDeclaredConstructor(TStatus.class, FateStore.FateReservation.class); - Constructor serializedCons = nodeClass.getDeclaredConstructor(byte[].class); - statusReservationCons.setAccessible(true); + // We have to use reflection since the FateData is internal to the store + + Class fateDataClass = Class.forName(MetaFateStore.class.getName() + "$FateData"); + // Constructor for constructing FateData + Constructor fateDataCons = fateDataClass.getDeclaredConstructor(TStatus.class, + FateStore.FateReservation.class, FateKey.class, Deque.class, Map.class); + // Constructor for constructing FateData from a byte array (the serialized form of FateData) + Constructor serializedCons = fateDataClass.getDeclaredConstructor(byte[].class); + fateDataCons.setAccessible(true); serializedCons.setAccessible(true); - // Get the status and reservation fields so they can be read and get the serialize method - Field nodeStatus = nodeClass.getDeclaredField("status"); - Field nodeReservation = nodeClass.getDeclaredField("reservation"); - Method nodeSerialize = nodeClass.getDeclaredMethod("serialize"); - nodeStatus.setAccessible(true); - nodeReservation.setAccessible(true); + // Get the status, reservation, repoDeque, txInfo fields so that they can be read and get the + // serialize method + Field status = fateDataClass.getDeclaredField("status"); + Field reservation = fateDataClass.getDeclaredField("reservation"); + Field repoDeque = fateDataClass.getDeclaredField("repoDeque"); + Field txInfo = fateDataClass.getDeclaredField("txInfo"); + Method nodeSerialize = fateDataClass.getDeclaredMethod("serialize"); + status.setAccessible(true); + reservation.setAccessible(true); + repoDeque.setAccessible(true); + txInfo.setAccessible(true); nodeSerialize.setAccessible(true); - // Get the existing status and reservation for the node and build a new node with an empty key - // but uses the existing tid - String txPath = ZK_ROOT + Constants.ZFATE + "/tx_" + fateId.getTxUUIDStr(); - Object currentNode = serializedCons.newInstance(new Object[] {zk.getData(txPath)}); - TStatus currentStatus = (TStatus) nodeStatus.get(currentNode); + // Gather the existing fields, create a new FateData object with those existing fields + // (excluding the FateKey in the new object), and replace the zk node with this new FateData + String txPath = + FateStoreUtil.MetaFateZKSetup.getZkFatePath() + "/tx_" + fateId.getTxUUIDStr(); + Object currentNode = serializedCons.newInstance( + new Object[] {FateStoreUtil.MetaFateZKSetup.getZooReaderWriter().getData(txPath)}); + TStatus currentStatus = (TStatus) status.get(currentNode); Optional currentReservation = - getCurrentReservation(nodeReservation, currentNode); - // replace the node with no key and just a tid and existing status and reservation - Object newNode = - statusReservationCons.newInstance(currentStatus, currentReservation.orElse(null)); - - // Replace the transaction with the same status and reservation but no key - zk.putPersistentData(txPath, (byte[]) nodeSerialize.invoke(newNode), - NodeExistsPolicy.OVERWRITE); + getCurrentReservation(reservation, currentNode); + @SuppressWarnings("unchecked") + Deque> currentRepoDeque = (Deque>) repoDeque.get(currentNode); + @SuppressWarnings("unchecked") + Map currentTxInfo = + (Map) txInfo.get(currentNode); + Object newNode = fateDataCons.newInstance(currentStatus, currentReservation.orElse(null), + null, currentRepoDeque, currentTxInfo); + + FateStoreUtil.MetaFateZKSetup.getZooReaderWriter().putPersistentData(txPath, + (byte[]) nodeSerialize.invoke(newNode), NodeExistsPolicy.OVERWRITE); } catch (Exception e) { throw new IllegalStateException(e); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaMultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaMultipleStoresIT.java index a2866cb900f..d5d1903493b 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaMultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaMultipleStoresIT.java @@ -21,13 +21,11 @@ import java.io.File; import java.util.function.Predicate; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; -import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.test.fate.FateStoreUtil; import org.apache.accumulo.test.fate.MultipleStoresIT; -import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -35,21 +33,16 @@ public class MetaMultipleStoresIT extends MultipleStoresIT { @TempDir - private static File TEMP_DIR; - private static ZooKeeperTestingServer SZK; - private static ZooReaderWriter ZK; - private static String FATE_DIR; + private static File tempDir; @BeforeAll public static void setup() throws Exception { - SZK = new ZooKeeperTestingServer(TEMP_DIR); - ZK = SZK.getZooReaderWriter(); - FATE_DIR = Constants.ZFATE; + FateStoreUtil.MetaFateZKSetup.setup(tempDir); } @AfterAll public static void teardown() throws Exception { - SZK.close(); + FateStoreUtil.MetaFateZKSetup.teardown(); } @Override @@ -68,7 +61,8 @@ static class SleepingEnvMetaStoreFactory implements TestStoreFactory create(ZooUtil.LockID lockID, Predicate isLockHeld) throws InterruptedException, KeeperException { - return new MetaFateStore<>(FATE_DIR, ZK, lockID, isLockHeld); + return new MetaFateStore<>(FateStoreUtil.MetaFateZKSetup.getZkFatePath(), + FateStoreUtil.MetaFateZKSetup.getZooReaderWriter(), lockID, isLockHeld); } } @@ -76,7 +70,8 @@ static class LatchEnvMetaStoreFactory implements TestStoreFactory @Override public FateStore create(ZooUtil.LockID lockID, Predicate isLockHeld) throws InterruptedException, KeeperException { - return new MetaFateStore<>(FATE_DIR, ZK, lockID, isLockHeld); + return new MetaFateStore<>(FateStoreUtil.MetaFateZKSetup.getZkFatePath(), + FateStoreUtil.MetaFateZKSetup.getZooReaderWriter(), lockID, isLockHeld); } } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java index 7f0383e6f4c..014b6c97bcb 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java @@ -19,26 +19,33 @@ package org.apache.accumulo.test.fate.user; import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; -import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; +import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.stream.StreamSupport; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; +import org.apache.accumulo.core.iterators.user.VersioningIterator; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateIT; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; public class UserFateIT extends FateIT { @@ -66,6 +73,53 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred, } } + // UserFateStore only test: + // Test that configs related to the correctness of the FATE instance user table + // are initialized correctly + @Test + public void testFateInitialConfigCorrectness() throws Exception { + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + + // It is important here to use getTableProperties() and not getConfiguration() + // because we want only the table properties and not a merged view + var fateTableProps = + client.tableOperations().getTableProperties(AccumuloTable.FATE.tableName()); + + // Verify properties all have a table. prefix + assertTrue(fateTableProps.keySet().stream().allMatch(key -> key.startsWith("table."))); + + // Verify properties are correctly set + assertEquals("5", fateTableProps.get(Property.TABLE_FILE_REPLICATION.getKey())); + assertEquals("sync", fateTableProps.get(Property.TABLE_DURABILITY.getKey())); + assertEquals("false", fateTableProps.get(Property.TABLE_FAILURES_IGNORE.getKey())); + assertEquals("", fateTableProps.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())); + + // Verify VersioningIterator related properties are correct + var iterClass = "10," + VersioningIterator.class.getName(); + var maxVersions = "1"; + assertEquals(iterClass, + fateTableProps.get(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers")); + assertEquals(maxVersions, fateTableProps + .get(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers.opt.maxVersions")); + assertEquals(iterClass, + fateTableProps.get(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers")); + assertEquals(maxVersions, fateTableProps + .get(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions")); + assertEquals(iterClass, + fateTableProps.get(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers")); + assertEquals(maxVersions, fateTableProps + .get(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions")); + + // Verify all tablets are HOSTED + try (var tablets = + client.getAmple().readTablets().forTable(AccumuloTable.FATE.tableId()).build()) { + assertTrue(tablets.stream() + .allMatch(tm -> tm.getTabletAvailability() == TabletAvailability.HOSTED)); + } + } + } + @Override protected TStatus getTxStatus(ServerContext context, FateId fateId) { try (Scanner scanner = context.createScanner(table, Authorizations.EMPTY)) { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java index 83a87db975d..de30125e5df 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java @@ -19,7 +19,7 @@ package org.apache.accumulo.test.fate.user; import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; -import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; +import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.clientImpl.ClientContext; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java new file mode 100644 index 00000000000..22ecb9fe6e2 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.user; + +import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.test.fate.FateStatusEnforcementIT; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; + +public class UserFateStatusEnforcementIT extends FateStatusEnforcementIT { + private ClientContext client; + private String table; + + @BeforeAll + public static void beforeAllSetup() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + } + + @AfterAll + public static void afterAllTeardown() { + SharedMiniClusterBase.stopMiniCluster(); + } + + @BeforeEach + public void beforeEachSetup() throws Exception { + client = (ClientContext) Accumulo.newClient().from(getClientProps()).build(); + table = getUniqueNames(1)[0]; + createFateTable(client, table); + store = new UserFateStore<>(client, table, AbstractFateStore.createDummyLockID(), null); + fateId = store.create(); + txStore = store.reserve(fateId); + } + + @AfterEach + public void afterEachTeardown() { + client.close(); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java index c967fbea5ab..17cc06a5ac2 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java @@ -20,7 +20,7 @@ import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId; -import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; +import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.BatchWriter; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java deleted file mode 100644 index ead901b7ed4..00000000000 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.test.fate.user; - -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.EnumSet; - -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.NewTableConfiguration; -import org.apache.accumulo.core.client.admin.TabletAvailability; -import org.apache.accumulo.core.client.admin.TabletInformation; -import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.fate.AbstractFateStore; -import org.apache.accumulo.core.fate.FateId; -import org.apache.accumulo.core.fate.FateStore; -import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; -import org.apache.accumulo.core.fate.user.UserFateStore; -import org.apache.accumulo.core.fate.user.schema.FateSchema; -import org.apache.accumulo.core.iterators.user.VersioningIterator; -import org.apache.accumulo.core.metadata.AccumuloTable; -import org.apache.accumulo.harness.SharedMiniClusterBase; -import org.apache.accumulo.test.fate.FateIT; -import org.apache.accumulo.test.fate.FateTestRunner.TestEnv; -import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.function.Executable; - -import com.google.common.collect.MoreCollectors; - -public class UserFateStoreIT extends SharedMiniClusterBase { - - @BeforeAll - public static void setup() throws Exception { - SharedMiniClusterBase.startMiniCluster(); - } - - @AfterAll - public static void teardown() { - SharedMiniClusterBase.stopMiniCluster(); - } - - // Test that configs related to the correctness of the FATE instance user table - // are initialized correctly - @Test - public void testFateInitialConfigCorrectness() throws Exception { - try (ClientContext client = - (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { - - // It is important here to use getTableProperties() and not getConfiguration() - // because we want only the table properties and not a merged view - var fateTableProps = - client.tableOperations().getTableProperties(AccumuloTable.FATE.tableName()); - - // Verify properties all have a table. prefix - assertTrue(fateTableProps.keySet().stream().allMatch(key -> key.startsWith("table."))); - - // Verify properties are correctly set - assertEquals("5", fateTableProps.get(Property.TABLE_FILE_REPLICATION.getKey())); - assertEquals("sync", fateTableProps.get(Property.TABLE_DURABILITY.getKey())); - assertEquals("false", fateTableProps.get(Property.TABLE_FAILURES_IGNORE.getKey())); - assertEquals("", fateTableProps.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())); - - // Verify VersioningIterator related properties are correct - var iterClass = "10," + VersioningIterator.class.getName(); - var maxVersions = "1"; - assertEquals(iterClass, - fateTableProps.get(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers")); - assertEquals(maxVersions, fateTableProps - .get(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers.opt.maxVersions")); - assertEquals(iterClass, - fateTableProps.get(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers")); - assertEquals(maxVersions, fateTableProps - .get(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions")); - assertEquals(iterClass, - fateTableProps.get(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers")); - assertEquals(maxVersions, fateTableProps - .get(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions")); - - // Verify all tablets are HOSTED - try (var tablets = - client.getAmple().readTablets().forTable(AccumuloTable.FATE.tableId()).build()) { - assertTrue(tablets.stream() - .allMatch(tm -> tm.getTabletAvailability() == TabletAvailability.HOSTED)); - } - } - } - - @Nested - class TestStatusEnforcement { - - String tableName; - ClientContext client; - FateId fateId; - UserFateStore store; - FateStore.FateTxStore txStore; - - @BeforeEach - public void setup() throws Exception { - client = (ClientContext) Accumulo.newClient().from(getClientProps()).build(); - tableName = getUniqueNames(1)[0]; - createFateTable(client, tableName); - store = new UserFateStore<>(client, tableName, AbstractFateStore.createDummyLockID(), null); - fateId = store.create(); - txStore = store.reserve(fateId); - } - - @AfterEach - public void teardown() throws Exception { - client.close(); - } - - private void testOperationWithStatuses(Runnable beforeOperation, Executable operation, - EnumSet acceptableStatuses) throws Exception { - for (TStatus status : TStatus.values()) { - // Run any needed setup for the operation before each iteration - beforeOperation.run(); - - injectStatus(client, tableName, fateId, status); - var fateIdStatus = - store.list().filter(statusEntry -> statusEntry.getFateId().equals(fateId)).findFirst() - .orElseThrow(); - assertEquals(status, fateIdStatus.getStatus()); - if (!acceptableStatuses.contains(status)) { - assertThrows(IllegalStateException.class, operation, - "Expected operation to fail with status " + status + " but it did not"); - } else { - assertDoesNotThrow(operation, - "Expected operation to succeed with status " + status + " but it did not"); - } - } - } - - @Test - public void push() throws Exception { - testOperationWithStatuses(() -> {}, // No special setup needed for push - () -> txStore.push(new FateIT.TestRepo("testOp")), - EnumSet.of(TStatus.IN_PROGRESS, TStatus.NEW)); - } - - @Test - public void pop() throws Exception { - testOperationWithStatuses(() -> { - // Setup for pop: Ensure there something to pop by first pushing - try { - injectStatus(client, tableName, fateId, TStatus.NEW); - txStore.push(new FateIT.TestRepo("testOp")); - } catch (Exception e) { - throw new RuntimeException("Failed to setup for pop", e); - } - }, txStore::pop, EnumSet.of(TStatus.FAILED_IN_PROGRESS, TStatus.SUCCESSFUL)); - } - - @Test - public void delete() throws Exception { - testOperationWithStatuses(() -> { - // Setup for delete: Create a new txStore before each delete since delete cannot be called - // on the same txStore more than once - fateId = store.create(); - txStore = store.reserve(fateId); - }, () -> txStore.delete(), - EnumSet.of(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED)); - } - } - - /** - * Inject a status into the status col of the fate store table for a given transaction id. - */ - private void injectStatus(ClientContext client, String table, FateId fateId, TStatus status) - throws TableNotFoundException { - try (BatchWriter writer = client.createBatchWriter(table)) { - Mutation mutation = new Mutation(new Text(fateId.getTxUUIDStr())); - FateSchema.TxColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name())); - writer.addMutation(mutation); - } catch (MutationsRejectedException e) { - throw new RuntimeException(e); - } - } - - // Create the fate table with the exact configuration as the real Fate user instance table - // including table properties and TabletAvailability - public static void createFateTable(ClientContext client, String table) throws Exception { - final var fateTableProps = - client.tableOperations().getTableProperties(AccumuloTable.FATE.tableName()); - - TabletAvailability availability; - try (var tabletStream = client.tableOperations() - .getTabletInformation(AccumuloTable.FATE.tableName(), new Range())) { - availability = tabletStream.map(TabletInformation::getTabletAvailability).distinct() - .collect(MoreCollectors.onlyElement()); - } - - var newTableConf = new NewTableConfiguration().withInitialTabletAvailability(availability) - .withoutDefaultIterators().setProperties(fateTableProps); - client.tableOperations().create(table, newTableConf); - var testFateTableProps = client.tableOperations().getTableProperties(table); - - // ensure that create did not set any other props - assertEquals(fateTableProps, testFateTableProps); - } -} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserMultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserMultipleStoresIT.java index f3569f07aab..507a4b1b862 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserMultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserMultipleStoresIT.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.test.fate.user; -import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; +import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; import java.util.function.Predicate;