diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index a5b1c5645af..9577ab7ff2b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -33,12 +33,14 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.SortedSet; import java.util.stream.Stream; import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType; import org.apache.accumulo.core.fate.zookeeper.FateLock; import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -275,24 +277,21 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath, List lockedIds = zr.getChildren(lockPath.toString()); for (String id : lockedIds) { - try { - FateLockPath fLockPath = FateLock.path(lockPath + "/" + id); - List lockNodes = - FateLock.validateAndSort(fLockPath, zr.getChildren(fLockPath.toString())); + SortedSet lockNodes = + FateLock.validateAndWarn(fLockPath, zr.getChildren(fLockPath.toString())); int pos = 0; boolean sawWriteLock = false; - for (String node : lockNodes) { + for (FateLock.NodeName node : lockNodes) { try { - byte[] data = zr.getData(lockPath + "/" + id + "/" + node); - // Example data: "READ:". FateId contains ':' hence the limit of 2 - String[] lda = new String(data, UTF_8).split(":", 2); - FateId fateId = FateId.from(lda[1]); + FateLock.FateLockEntry fateLockEntry = node.fateLockEntry.get(); + var fateId = fateLockEntry.getFateId(); + var lockType = fateLockEntry.getLockType(); - if (lda[0].charAt(0) == 'W') { + if (lockType == LockType.WRITE) { sawWriteLock = true; } @@ -300,13 +299,14 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath, if (pos == 0) { locks = heldLocks; - } else if (lda[0].charAt(0) == 'R' && !sawWriteLock) { + } else if (lockType == LockType.READ && !sawWriteLock) { locks = heldLocks; } else { locks = waitingLocks; } - locks.computeIfAbsent(fateId, k -> new ArrayList<>()).add(lda[0].charAt(0) + ":" + id); + locks.computeIfAbsent(fateId, k -> new ArrayList<>()) + .add(lockType.name().charAt(0) + ":" + id); } catch (Exception e) { log.error("{}", e.getMessage(), e); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index 1f36ee7a92b..1b52a477d73 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import java.util.function.BiPredicate; import java.util.function.Supplier; import org.apache.accumulo.core.fate.FateId; @@ -50,9 +51,10 @@ public enum LockType { // them, // a writer only runs when they are at the top of the queue. public interface QueueLock { - SortedMap> getEarlierEntries(long entry); + SortedMap> + getEntries(BiPredicate> predicate); - void removeEntry(long entry); + void removeEntry(FateLockEntry data, long seq); long addEntry(FateLockEntry entry); } @@ -115,7 +117,9 @@ public boolean tryLock() { entry = qlock.addEntry(FateLockEntry.from(this.getType(), this.fateId)); log.info("Added lock entry {} fateId {} lockType {}", entry, fateId, getType()); } - SortedMap> entries = qlock.getEarlierEntries(entry); + + SortedMap> entries = + qlock.getEntries((seq, lockData) -> seq <= entry); for (Entry> entry : entries.entrySet()) { if (entry.getKey().equals(this.entry)) { return true; @@ -150,7 +154,7 @@ public void unlock() { return; } log.debug("Removing lock entry {} fateId {} lockType {}", entry, this.fateId, getType()); - qlock.removeEntry(entry); + qlock.removeEntry(FateLockEntry.from(this.getType(), this.fateId), entry); entry = -1; } @@ -181,7 +185,8 @@ public boolean tryLock() { entry = qlock.addEntry(FateLockEntry.from(this.getType(), this.fateId)); log.info("Added lock entry {} fateId {} lockType {}", entry, this.fateId, getType()); } - SortedMap> entries = qlock.getEarlierEntries(entry); + SortedMap> entries = + qlock.getEntries((seq, locData) -> seq <= entry); Iterator>> iterator = entries.entrySet().iterator(); if (!iterator.hasNext()) { throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry @@ -200,19 +205,26 @@ public DistributedReadWriteLock(QueueLock qlock, FateId fateId) { } public static DistributedLock recoverLock(QueueLock qlock, FateId fateId) { - SortedMap> entries = qlock.getEarlierEntries(Long.MAX_VALUE); - for (Entry> entry : entries.entrySet()) { - FateLockEntry lockEntry = entry.getValue().get(); - if (fateId.equals(lockEntry.getFateId())) { + SortedMap> entries = + qlock.getEntries((seq, lockData) -> lockData.get().fateId.equals(fateId)); + + switch (entries.size()) { + case 0: + return null; + case 1: + var entry = entries.entrySet().iterator().next(); + FateLockEntry lockEntry = entry.getValue().get(); switch (lockEntry.getLockType()) { case READ: return new ReadLock(qlock, lockEntry.getFateId(), entry.getKey()); case WRITE: return new WriteLock(qlock, lockEntry.getFateId(), entry.getKey()); + default: + throw new IllegalStateException("Unknown lock type " + lockEntry.getLockType()); } - } + default: + throw new IllegalStateException("Found more than one lock node " + entries); } - return null; } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java index 74c3065c702..66b8191f2ed 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java @@ -18,16 +18,16 @@ */ package org.apache.accumulo.core.fate.zookeeper; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; +import java.util.function.BiPredicate; import java.util.function.Supplier; import org.apache.accumulo.core.fate.FateId; @@ -41,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.base.Suppliers; /** @@ -49,7 +50,7 @@ public class FateLock implements QueueLock { private static final Logger log = LoggerFactory.getLogger(FateLock.class); - private static final String PREFIX = "flock#"; + static final String PREFIX = "flock#"; private final ZooReaderWriter zoo; private final FateLockPath path; @@ -67,7 +68,7 @@ public String toString() { } } - public static class FateLockEntry { + public static class FateLockEntry implements Comparable { final LockType lockType; final FateId fateId; @@ -76,26 +77,10 @@ private FateLockEntry(LockType lockType, FateId fateId) { this.fateId = Objects.requireNonNull(fateId); } - private FateLockEntry(byte[] entry) { - if (entry == null || entry.length < 1) { - throw new IllegalArgumentException(); - } - - int split = -1; - for (int i = 0; i < entry.length; i++) { - if (entry[i] == ':') { - split = i; - break; - } - } - - if (split == -1) { - throw new IllegalArgumentException(); - } - - this.lockType = LockType.valueOf(new String(entry, 0, split, UTF_8)); - this.fateId = - FateId.from(new String(Arrays.copyOfRange(entry, split + 1, entry.length), UTF_8)); + private FateLockEntry(String entry) { + var fields = entry.split(":", 2); + this.lockType = LockType.valueOf(fields[0]); + this.fateId = FateId.from(fields[1]); } public LockType getLockType() { @@ -106,14 +91,8 @@ public FateId getFateId() { return fateId; } - public byte[] serialize() { - byte[] typeBytes = lockType.name().getBytes(UTF_8); - byte[] fateIdBytes = fateId.canonical().getBytes(UTF_8); - byte[] result = new byte[fateIdBytes.length + 1 + typeBytes.length]; - System.arraycopy(typeBytes, 0, result, 0, typeBytes.length); - result[typeBytes.length] = ':'; - System.arraycopy(fateIdBytes, 0, result, typeBytes.length + 1, fateIdBytes.length); - return result; + public String serialize() { + return lockType.name() + ":" + fateId.canonical(); } @Override @@ -137,9 +116,18 @@ public static FateLockEntry from(LockType lockType, FateId fateId) { return new FateLockEntry(lockType, fateId); } - public static FateLockEntry deserialize(byte[] serialized) { + public static FateLockEntry deserialize(String serialized) { return new FateLockEntry(serialized); } + + @Override + public int compareTo(FateLockEntry o) { + int cmp = lockType.compareTo(o.lockType); + if (cmp == 0) { + cmp = fateId.compareTo(o.fateId); + } + return cmp; + } } public static FateLockPath path(String path) { @@ -151,16 +139,59 @@ public FateLock(ZooReaderWriter zrw, FateLockPath path) { this.path = requireNonNull(path); } + public static class NodeName implements Comparable { + public final long sequence; + public final Supplier fateLockEntry; + + NodeName(String nodeName) { + int len = nodeName.length(); + Preconditions.checkArgument(nodeName.startsWith(PREFIX) && nodeName.charAt(len - 11) == '#', + "Illegal node name %s", nodeName); + sequence = Long.parseUnsignedLong(nodeName.substring(len - 10), 10); + // Use a supplier so we don't need to deserialize unless the calling code cares about + // the value for that entry. + fateLockEntry = Suppliers + .memoize(() -> FateLockEntry.deserialize(nodeName.substring(PREFIX.length(), len - 11))); + } + + @Override + public int compareTo(NodeName o) { + int cmp = Long.compare(sequence, o.sequence); + if (cmp == 0) { + cmp = fateLockEntry.get().compareTo(o.fateLockEntry.get()); + } + return cmp; + } + + @Override + public boolean equals(Object o) { + if (o instanceof NodeName) { + return this.compareTo((NodeName) o) == 0; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(sequence, fateLockEntry.get()); + } + } + @Override public long addEntry(FateLockEntry entry) { + + String dataString = entry.serialize(); + Preconditions.checkState(!dataString.contains("#")); + String newPath; try { while (true) { try { - newPath = zoo.putPersistentSequential(path + "/" + PREFIX, entry.serialize()); + newPath = + zoo.putPersistentSequential(path + "/" + PREFIX + dataString + "#", new byte[0]); String[] parts = newPath.split("/"); String last = parts[parts.length - 1]; - return Long.parseLong(last.substring(PREFIX.length())); + return new NodeName(last).sequence; } catch (NoNodeException nne) { // the parent does not exist so try to create it zoo.putPersistentData(path.toString(), new byte[] {}, NodeExistsPolicy.SKIP); @@ -172,7 +203,8 @@ public long addEntry(FateLockEntry entry) { } @Override - public SortedMap> getEarlierEntries(long entry) { + public SortedMap> + getEntries(BiPredicate> predicate) { SortedMap> result = new TreeMap<>(); try { List children = Collections.emptyList(); @@ -184,17 +216,9 @@ public SortedMap> getEarlierEntries(long entry) { } for (String name : children) { - // this try catch must be done inside the loop because some subset of the children may exist - try { - long order = Long.parseLong(name.substring(PREFIX.length())); - if (order <= entry) { - byte[] data = zoo.getData(path + "/" + name); - // Use a supplier so we don't need to deserialize unless the calling code cares about - // the value for that entry. - result.put(order, Suppliers.memoize(() -> FateLockEntry.deserialize(data))); - } - } catch (KeeperException.NoNodeException ex) { - // ignored + var parsed = new NodeName(name); + if (predicate.test(parsed.sequence, parsed.fateLockEntry)) { + Preconditions.checkState(result.put(parsed.sequence, parsed.fateLockEntry) == null); } } } catch (KeeperException | InterruptedException ex) { @@ -204,9 +228,12 @@ public SortedMap> getEarlierEntries(long entry) { } @Override - public void removeEntry(long entry) { + public void removeEntry(FateLockEntry data, long entry) { + String dataString = data.serialize(); + Preconditions.checkState(!dataString.contains("#")); try { - zoo.recursiveDelete(path + String.format("/%s%010d", PREFIX, entry), NodeMissingPolicy.SKIP); + zoo.recursiveDelete(path + String.format("/%s%s#%010d", PREFIX, dataString, entry), + NodeMissingPolicy.SKIP); try { // try to delete the parent if it has no children zoo.delete(path.toString()); @@ -221,50 +248,25 @@ public void removeEntry(long entry) { /** * Validate and sort child nodes at this lock path by the lock prefix */ - public static List validateAndSort(FateLockPath path, List children) { + public static SortedSet validateAndWarn(FateLockPath path, List children) { log.trace("validating and sorting children at path {}", path); - List validChildren = new ArrayList<>(); + + SortedSet validChildren = new TreeSet<>(); + if (children == null || children.isEmpty()) { return validChildren; } + children.forEach(c -> { log.trace("Validating {}", c); - if (c.startsWith(PREFIX)) { - int idx = c.indexOf('#'); - String sequenceNum = c.substring(idx + 1); - if (sequenceNum.length() == 10) { - try { - log.trace("Testing number format of {}", sequenceNum); - Integer.parseInt(sequenceNum); - validChildren.add(c); - } catch (NumberFormatException e) { - log.warn("Fate lock found with invalid sequence number format: {} (not a number)", c); - } - } else { - log.warn("Fate lock found with invalid sequence number format: {} (not 10 characters)", - c); - } - } else { - log.warn("Fate lock found with invalid lock format: {} (does not start with {})", c, - PREFIX); + try { + var fateLockNode = new NodeName(c); + validChildren.add(fateLockNode); + } catch (RuntimeException e) { + log.warn("Illegal fate lock node {}", c, e); } }); - if (validChildren.size() > 1) { - validChildren.sort((o1, o2) -> { - // Lock should be of the form: - // lock-sequenceNumber - // Example: - // flock#0000000000 - - // Lock length - sequenceNumber length - // 16 - 10 - int secondHashIdx = 6; - return Integer.valueOf(o1.substring(secondHashIdx)) - .compareTo(Integer.valueOf(o2.substring(secondHashIdx))); - }); - } - log.trace("Children nodes (size: {}): {}", validChildren.size(), validChildren); return validChildren; } } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java index bf55b79d4a1..229e8ec07d2 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.function.BiPredicate; import java.util.function.Supplier; import org.apache.accumulo.core.fate.FateId; @@ -45,14 +46,19 @@ public static class MockQueueLock implements QueueLock { final SortedMap locks = new TreeMap<>(); @Override - public synchronized SortedMap> getEarlierEntries(long entry) { + public synchronized SortedMap> + getEntries(BiPredicate> predicate) { SortedMap> result = new TreeMap<>(); - locks.headMap(entry + 1).forEach((k, v) -> result.put(k, () -> v)); + locks.forEach((seq, lockData) -> { + if (predicate.test(seq, () -> lockData)) { + result.put(seq, () -> lockData); + } + }); return result; } @Override - public synchronized void removeEntry(long entry) { + public synchronized void removeEntry(FateLockEntry data, long entry) { synchronized (locks) { locks.remove(entry); locks.notifyAll(); @@ -147,7 +153,7 @@ public void testFateLockEntrySerDes() { assertEquals(LockType.READ, entry.getLockType()); assertEquals(FateId.from(FateInstanceType.USER, uuid), entry.getFateId()); - byte[] serialized = entry.serialize(); + String serialized = entry.serialize(); var deserialized = FateLockEntry.deserialize(serialized); assertEquals(entry, deserialized); } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java new file mode 100644 index 00000000000..b4648c3f90c --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.zookeeper; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.UUID; + +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.junit.jupiter.api.Test; + +public class FateLockTest { + + @Test + public void testParsing() { + var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + // ZooKeeper docs state that sequence numbers are formatted using %010d + String lockData = "WRITE:" + fateId.canonical(); + var lockNode = + new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%010d", 40)); + assertEquals(40, lockNode.sequence); + assertEquals(lockData, lockNode.fateLockEntry.get().serialize()); + + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(lockData + "#" + String.format("%010d", 40))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%d", 40))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%09d", 40))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%011d", 40))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#abc")); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + String.format("%010d", 40))); + + // ZooKeeper docs state that sequence numbers can roll and become negative. The FateLock code + // does not support this, so make sure it fails if this happens. + for (int i : new int[] {Integer.MIN_VALUE, Integer.MIN_VALUE / 2, Integer.MIN_VALUE / 10, + Integer.MIN_VALUE / 1000, -40}) { + var seq = String.format("%010d", i); + if (seq.length() == 10) { + assertThrows(NumberFormatException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + seq)); + } else if (seq.length() == 11) { + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + seq)); + } else { + fail("Unexpected length " + seq.length()); + } + } + + // Test a negative number that is not formatted w/ %010d + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%d", -40))); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index b3ed3c26fa2..46271c2cafb 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -99,15 +99,20 @@ public static String row(int r) { return String.format("r:%04d", r); } - public static void compact(final AccumuloClient client, String table1, int modulus, - String expectedQueue, boolean wait) - throws AccumuloSecurityException, TableNotFoundException, AccumuloException { + public static void addCompactionIterators(CompactionConfig config, int modulus, + String expectedQueue) { IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class); // make sure iterator options make it to compactor process iterSetting.addOption("expectedQ", expectedQueue); iterSetting.addOption("modulus", modulus + ""); - CompactionConfig config = - new CompactionConfig().setIterators(List.of(iterSetting)).setWait(wait); + config.setIterators(List.of(iterSetting)); + } + + public static void compact(final AccumuloClient client, String table1, int modulus, + String expectedQueue, boolean wait) + throws AccumuloSecurityException, TableNotFoundException, AccumuloException { + CompactionConfig config = new CompactionConfig().setWait(wait); + addCompactionIterators(config, modulus, expectedQueue); client.tableOperations().compact(table1, config); } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 99002079d4a..fc1020bcfad 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -25,6 +25,7 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP6; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP8; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.addCompactionIterators; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.assertNoCompactionMetadata; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; @@ -58,12 +59,14 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.PluginConfig; +import org.apache.accumulo.core.client.admin.TabletInformation; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer; @@ -72,6 +75,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; @@ -514,6 +518,68 @@ public void testManytablets() throws Exception { compact(client, table1, 3, GROUP4, true); verify(client, table1, 3); + + List tabletIds; + // start a compaction on each tablet + try (var tablets = client.tableOperations().getTabletInformation(table1, new Range())) { + tabletIds = tablets.map(TabletInformation::getTabletId).collect(Collectors.toList()); + } + // compact the even tablet with a modulus filter of 2 + List evenRanges = new ArrayList<>(); + for (int i = 0; i < tabletIds.size(); i += 2) { + var tabletId = tabletIds.get(i); + CompactionConfig compactionConfig = new CompactionConfig() + .setStartRow(tabletId.getPrevEndRow()).setEndRow(tabletId.getEndRow()).setWait(false); + addCompactionIterators(compactionConfig, 2, GROUP4); + client.tableOperations().compact(table1, compactionConfig); + evenRanges.add(tabletId.toRange()); + } + + // compact the odd tablets with a modulus filter of 5 + List oddRanges = new ArrayList<>(); + for (int i = 1; i < tabletIds.size(); i += 2) { + var tabletId = tabletIds.get(i); + CompactionConfig compactionConfig = new CompactionConfig() + .setStartRow(tabletId.getPrevEndRow()).setEndRow(tabletId.getEndRow()).setWait(false); + addCompactionIterators(compactionConfig, 5, GROUP4); + client.tableOperations().compact(table1, compactionConfig); + oddRanges.add(tabletId.toRange()); + } + + Wait.waitFor(() -> { + try (BatchScanner scanner = client.createBatchScanner(table1)) { + scanner.setRanges(evenRanges); + // filtered out data that was divisible by 3 and then 2 by compactions, so should end up + // w/ only data divisible by 6 + int matching = 0; + int nonMatching = 0; + for (var entry : scanner) { + int val = Integer.parseInt(entry.getValue().toString()); + if (val % 6 == 0) { + matching++; + } else { + nonMatching++; + } + } + boolean evenDone = matching > 0 && nonMatching == 0; + // filtered out data that was divisible by 3 and then 5 by compactions, so should end up + // w/ only data divisible by 15 + scanner.setRanges(oddRanges); + matching = 0; + nonMatching = 0; + for (var entry : scanner) { + int val = Integer.parseInt(entry.getValue().toString()); + if (val % 15 == 0) { + matching++; + } else { + nonMatching++; + } + } + boolean oddDone = matching > 0 && nonMatching == 0; + return evenDone && oddDone; + } + }); + } }