From 20fbbe8f538ab06c63d90f985cb008c3c632b11f Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 14 Jan 2025 20:42:04 +0000 Subject: [PATCH 01/11] speeds up fate lock acquisition Stores the lock data for fate locks in the zookeeper node name instead of the zookeeper data for the node. Ran some local performance test with hundreds of fate operations and saw lock times go from 750ms to 15ms. fixes #5181 --- .../apache/accumulo/core/fate/AdminUtil.java | 13 +-- .../zookeeper/DistributedReadWriteLock.java | 33 ++++--- .../core/fate/zookeeper/FateLock.java | 93 +++++++++---------- .../DistributedReadWriteLockTest.java | 11 ++- .../ExternalCompactionTestUtils.java | 15 ++- .../compaction/ExternalCompaction_1_IT.java | 66 +++++++++++++ 6 files changed, 157 insertions(+), 74 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 54fb62e6a5a..8465bb149fb 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -25,6 +25,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.EnumSet; import java.util.Formatter; @@ -285,19 +286,19 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath, List lockedIds = zr.getChildren(lockPath.toString()); for (String id : lockedIds) { - try { - FateLockPath fLockPath = FateLock.path(lockPath + "/" + id); - List lockNodes = - FateLock.validateAndSort(fLockPath, zr.getChildren(fLockPath.toString())); + List lockNodes = + FateLock.validateAndWarn(fLockPath, zr.getChildren(fLockPath.toString())); + + lockNodes.sort(Comparator.comparingLong(ln -> ln.sequence)); int pos = 0; boolean sawWriteLock = false; - for (String node : lockNodes) { + for (FateLock.FateLockNode node : lockNodes) { try { - byte[] data = zr.getData(lockPath + "/" + id + "/" + node); + byte[] data = node.lockData.getBytes(UTF_8); // Example data: "READ:". FateId contains ':' hence the limit of 2 String[] lda = new String(data, UTF_8).split(":", 2); FateId fateId = FateId.from(lda[1]); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index 221494c9115..e0b11fc7400 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import java.util.function.BiPredicate; import org.apache.accumulo.core.util.UtilWaitThread; import org.slf4j.Logger; @@ -98,9 +99,9 @@ public byte[] getLockData() { // them, // a writer only runs when they are at the top of the queue. public interface QueueLock { - SortedMap getEarlierEntries(long entry); + SortedMap getEntries(BiPredicate predicate); - void removeEntry(long entry); + void removeEntry(byte[] data, long entry); long addEntry(byte[] data); } @@ -164,7 +165,8 @@ public boolean tryLock() { log.info("Added lock entry {} userData {} lockType {}", entry, new String(this.userData, UTF_8), getType()); } - SortedMap entries = qlock.getEarlierEntries(entry); + + SortedMap entries = qlock.getEntries((seq, lockData) -> seq <= entry); for (Entry entry : entries.entrySet()) { ParsedLock parsed = new ParsedLock(entry.getValue()); if (entry.getKey().equals(this.entry)) { @@ -200,7 +202,7 @@ public void unlock() { } log.debug("Removing lock entry {} userData {} lockType {}", entry, new String(this.userData, UTF_8), getType()); - qlock.removeEntry(entry); + qlock.removeEntry(new ParsedLock(this.getType(), this.userData).getLockData(), entry); entry = -1; } @@ -232,7 +234,7 @@ public boolean tryLock() { log.info("Added lock entry {} userData {} lockType {}", entry, new String(this.userData, UTF_8), getType()); } - SortedMap entries = qlock.getEarlierEntries(entry); + SortedMap entries = qlock.getEntries((seq, locData) -> seq <= entry); Iterator> iterator = entries.entrySet().iterator(); if (!iterator.hasNext()) { throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry @@ -251,19 +253,28 @@ public DistributedReadWriteLock(QueueLock qlock, byte[] data) { } public static DistributedLock recoverLock(QueueLock qlock, byte[] data) { - SortedMap entries = qlock.getEarlierEntries(Long.MAX_VALUE); - for (Entry entry : entries.entrySet()) { - ParsedLock parsed = new ParsedLock(entry.getValue()); - if (Arrays.equals(data, parsed.getUserData())) { + SortedMap entries = qlock.getEntries((seq, lockData) -> { + ParsedLock parsed = new ParsedLock(lockData); + return Arrays.equals(data, parsed.getUserData()); + }); + + switch (entries.size()) { + case 0: + return null; + case 1: + var entry = entries.entrySet().iterator().next(); + ParsedLock parsed = new ParsedLock(entry.getValue()); switch (parsed.getType()) { case READ: return new ReadLock(qlock, parsed.getUserData(), entry.getKey()); case WRITE: return new WriteLock(qlock, parsed.getUserData(), entry.getKey()); + default: + throw new IllegalStateException("Uknown lock type " + parsed.getType()); } - } + default: + throw new IllegalStateException("Found more than one lock node " + entries); } - return null; } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java index 6b42014c310..04a0d68d73e 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.core.fate.zookeeper; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import java.util.ArrayList; @@ -25,6 +26,7 @@ import java.util.List; import java.util.SortedMap; import java.util.TreeMap; +import java.util.function.BiPredicate; import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.QueueLock; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -35,6 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** * A persistent lock mechanism in ZooKeeper used for locking tables during FaTE operations. */ @@ -68,16 +72,34 @@ public FateLock(ZooReaderWriter zrw, FateLockPath path) { this.path = requireNonNull(path); } + public static class FateLockNode { + public final long sequence; + public final String lockData; + + private FateLockNode(String nodeName) { + int len = nodeName.length(); + Preconditions.checkArgument(nodeName.startsWith(PREFIX) && nodeName.charAt(len - 11) == '#', + "Illegal node name %s", nodeName); + sequence = Long.parseLong(nodeName.substring(len - 10)); + lockData = nodeName.substring(PREFIX.length(), len - 11); + } + } + + // TODO change data arg from byte[] to String.. in the rest of the code its always a String. @Override public long addEntry(byte[] data) { + + String dataString = new String(data, UTF_8); + Preconditions.checkState(!dataString.contains("#")); + String newPath; try { while (true) { try { - newPath = zoo.putPersistentSequential(path + "/" + PREFIX, data); + newPath = zoo.putPersistentSequential(path + "/" + PREFIX + dataString + "#", data); String[] parts = newPath.split("/"); String last = parts[parts.length - 1]; - return Long.parseLong(last.substring(PREFIX.length())); + return new FateLockNode(last).sequence; } catch (NoNodeException nne) { // the parent does not exist so try to create it zoo.putPersistentData(path.toString(), new byte[] {}, NodeExistsPolicy.SKIP); @@ -89,7 +111,7 @@ public long addEntry(byte[] data) { } @Override - public SortedMap getEarlierEntries(long entry) { + public SortedMap getEntries(BiPredicate predicate) { SortedMap result = new TreeMap<>(); try { List children = Collections.emptyList(); @@ -101,15 +123,10 @@ public SortedMap getEarlierEntries(long entry) { } for (String name : children) { - // this try catch must be done inside the loop because some subset of the children may exist - try { - long order = Long.parseLong(name.substring(PREFIX.length())); - if (order <= entry) { - byte[] data = zoo.getData(path + "/" + name); - result.put(order, data); - } - } catch (KeeperException.NoNodeException ex) { - // ignored + var parsed = new FateLockNode(name); + byte[] data = parsed.lockData.getBytes(UTF_8); + if (predicate.test(parsed.sequence, data)) { + result.put(parsed.sequence, data); } } } catch (KeeperException | InterruptedException ex) { @@ -119,9 +136,12 @@ public SortedMap getEarlierEntries(long entry) { } @Override - public void removeEntry(long entry) { + public void removeEntry(byte[] data, long entry) { + String dataString = new String(data, UTF_8); + Preconditions.checkState(!dataString.contains("#")); try { - zoo.recursiveDelete(path + String.format("/%s%010d", PREFIX, entry), NodeMissingPolicy.SKIP); + zoo.recursiveDelete(path + String.format("/%s%s#%010d", PREFIX, dataString, entry), + NodeMissingPolicy.SKIP); try { // try to delete the parent if it has no children zoo.delete(path.toString()); @@ -136,50 +156,25 @@ public void removeEntry(long entry) { /** * Validate and sort child nodes at this lock path by the lock prefix */ - public static List validateAndSort(FateLockPath path, List children) { + public static List validateAndWarn(FateLockPath path, List children) { log.trace("validating and sorting children at path {}", path); - List validChildren = new ArrayList<>(); + + List validChildren = new ArrayList<>(); + if (children == null || children.isEmpty()) { return validChildren; } + children.forEach(c -> { log.trace("Validating {}", c); - if (c.startsWith(PREFIX)) { - int idx = c.indexOf('#'); - String sequenceNum = c.substring(idx + 1); - if (sequenceNum.length() == 10) { - try { - log.trace("Testing number format of {}", sequenceNum); - Integer.parseInt(sequenceNum); - validChildren.add(c); - } catch (NumberFormatException e) { - log.warn("Fate lock found with invalid sequence number format: {} (not a number)", c); - } - } else { - log.warn("Fate lock found with invalid sequence number format: {} (not 10 characters)", - c); - } - } else { - log.warn("Fate lock found with invalid lock format: {} (does not start with {})", c, - PREFIX); + try { + var fateLockNode = new FateLockNode(c); + validChildren.add(fateLockNode); + } catch (RuntimeException e) { + log.warn("Illegal fate lock node {}", c, e); } }); - if (validChildren.size() > 1) { - validChildren.sort((o1, o2) -> { - // Lock should be of the form: - // lock-sequenceNumber - // Example: - // flock#0000000000 - - // Lock length - sequenceNumber length - // 16 - 10 - int secondHashIdx = 6; - return Integer.valueOf(o1.substring(secondHashIdx)) - .compareTo(Integer.valueOf(o2.substring(secondHashIdx))); - }); - } - log.trace("Children nodes (size: {}): {}", validChildren.size(), validChildren); return validChildren; } } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java index da30442d933..4acd3558933 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.function.BiPredicate; import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.QueueLock; import org.junit.jupiter.api.Test; @@ -40,14 +41,18 @@ public static class MockQueueLock implements QueueLock { final SortedMap locks = new TreeMap<>(); @Override - public synchronized SortedMap getEarlierEntries(long entry) { + public synchronized SortedMap getEntries(BiPredicate predicate) { SortedMap result = new TreeMap<>(); - result.putAll(locks.headMap(entry + 1)); + locks.forEach((seq, lockData) -> { + if (predicate.test(seq, lockData)) { + result.put(seq, lockData); + } + }); return result; } @Override - public synchronized void removeEntry(long entry) { + public synchronized void removeEntry(byte[] data, long entry) { synchronized (locks) { locks.remove(entry); locks.notifyAll(); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index b3ed3c26fa2..46271c2cafb 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -99,15 +99,20 @@ public static String row(int r) { return String.format("r:%04d", r); } - public static void compact(final AccumuloClient client, String table1, int modulus, - String expectedQueue, boolean wait) - throws AccumuloSecurityException, TableNotFoundException, AccumuloException { + public static void addCompactionIterators(CompactionConfig config, int modulus, + String expectedQueue) { IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class); // make sure iterator options make it to compactor process iterSetting.addOption("expectedQ", expectedQueue); iterSetting.addOption("modulus", modulus + ""); - CompactionConfig config = - new CompactionConfig().setIterators(List.of(iterSetting)).setWait(wait); + config.setIterators(List.of(iterSetting)); + } + + public static void compact(final AccumuloClient client, String table1, int modulus, + String expectedQueue, boolean wait) + throws AccumuloSecurityException, TableNotFoundException, AccumuloException { + CompactionConfig config = new CompactionConfig().setWait(wait); + addCompactionIterators(config, modulus, expectedQueue); client.tableOperations().compact(table1, config); } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 82c0f9b8bce..c1d7ee00599 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -26,6 +26,7 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP6; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP8; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.addCompactionIterators; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.assertNoCompactionMetadata; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; @@ -59,12 +60,14 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.PluginConfig; +import org.apache.accumulo.core.client.admin.TabletInformation; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer; @@ -73,6 +76,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; @@ -500,6 +504,68 @@ public void testManytablets() throws Exception { compact(client, table1, 3, GROUP4, true); verify(client, table1, 3); + + List tabletIds; + // start a compaction on each tablet + try (var tablets = client.tableOperations().getTabletInformation(table1, new Range())) { + tabletIds = tablets.map(TabletInformation::getTabletId).collect(Collectors.toList()); + } + // compact the even tablet with a modulus filter of 2 + List evenRanges = new ArrayList<>(); + for (int i = 0; i < tabletIds.size(); i += 2) { + var tabletId = tabletIds.get(i); + CompactionConfig compactionConfig = new CompactionConfig() + .setStartRow(tabletId.getPrevEndRow()).setEndRow(tabletId.getEndRow()).setWait(false); + addCompactionIterators(compactionConfig, 2, GROUP4); + client.tableOperations().compact(table1, compactionConfig); + evenRanges.add(tabletId.toRange()); + } + + // compact the odd tablets with a modulus filter of 5 + List oddRanges = new ArrayList<>(); + for (int i = 1; i < tabletIds.size(); i += 2) { + var tabletId = tabletIds.get(i); + CompactionConfig compactionConfig = new CompactionConfig() + .setStartRow(tabletId.getPrevEndRow()).setEndRow(tabletId.getEndRow()).setWait(false); + addCompactionIterators(compactionConfig, 5, GROUP4); + client.tableOperations().compact(table1, compactionConfig); + oddRanges.add(tabletId.toRange()); + } + + Wait.waitFor(() -> { + try (BatchScanner scanner = client.createBatchScanner(table1)) { + scanner.setRanges(evenRanges); + // filtered out data that was divisible by 3 and then 2 by compactions, so should end up + // w/ only data divisible by 6 + int matching = 0; + int nonMatching = 0; + for (var entry : scanner) { + int val = Integer.parseInt(entry.getValue().toString()); + if (val % 6 == 0) { + matching++; + } else { + nonMatching++; + } + } + boolean evenDone = matching > 0 && nonMatching == 0; + // filtered out data that was divisible by 3 and then 5 by compactions, so should end up + // w/ only data divisible by 15 + scanner.setRanges(oddRanges); + matching = 0; + nonMatching = 0; + for (var entry : scanner) { + int val = Integer.parseInt(entry.getValue().toString()); + if (val % 15 == 0) { + matching++; + } else { + nonMatching++; + } + } + boolean oddDone = matching > 0 && nonMatching == 0; + return evenDone && oddDone; + } + }); + } } From 20097d5f4d490edf4e4dca25aa1a48a28f7e3e01 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 14 Jan 2025 20:26:29 -0500 Subject: [PATCH 02/11] Update core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java Co-authored-by: Christopher Tubbs --- .../java/org/apache/accumulo/core/fate/zookeeper/FateLock.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java index 04a0d68d73e..94b113f896f 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java @@ -96,7 +96,7 @@ public long addEntry(byte[] data) { try { while (true) { try { - newPath = zoo.putPersistentSequential(path + "/" + PREFIX + dataString + "#", data); + newPath = zoo.putPersistentSequential(path + "/" + PREFIX + dataString + "#", new byte[0]); String[] parts = newPath.split("/"); String last = parts[parts.length - 1]; return new FateLockNode(last).sequence; From a45b3ad2e4e41b0d2c69f7eba810f77a28cf39bf Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 14 Jan 2025 20:26:38 -0500 Subject: [PATCH 03/11] Update core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java Co-authored-by: Christopher Tubbs --- .../accumulo/core/fate/zookeeper/DistributedReadWriteLock.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index e0b11fc7400..3b7dcec0377 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -270,7 +270,7 @@ public static DistributedLock recoverLock(QueueLock qlock, byte[] data) { case WRITE: return new WriteLock(qlock, parsed.getUserData(), entry.getKey()); default: - throw new IllegalStateException("Uknown lock type " + parsed.getType()); + throw new IllegalStateException("Unknown lock type " + parsed.getType()); } default: throw new IllegalStateException("Found more than one lock node " + entries); From 3fbdd6eefc159483645e1ed441aee29c6f0af965 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 15 Jan 2025 17:11:25 +0000 Subject: [PATCH 04/11] format code --- .../java/org/apache/accumulo/core/fate/zookeeper/FateLock.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java index 94b113f896f..75dcfa38d22 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java @@ -96,7 +96,8 @@ public long addEntry(byte[] data) { try { while (true) { try { - newPath = zoo.putPersistentSequential(path + "/" + PREFIX + dataString + "#", new byte[0]); + newPath = + zoo.putPersistentSequential(path + "/" + PREFIX + dataString + "#", new byte[0]); String[] parts = newPath.split("/"); String last = parts[parts.length - 1]; return new FateLockNode(last).sequence; From 3f25ce04dc6cabb859e1b6c4a37a3963eab727f2 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 15 Jan 2025 17:46:48 +0000 Subject: [PATCH 05/11] code review update --- .../apache/accumulo/core/fate/AdminUtil.java | 6 ++-- .../core/fate/zookeeper/FateLock.java | 32 ++++++++++++++++--- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 8465bb149fb..4e7b51f0d27 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -25,7 +25,6 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.Date; import java.util.EnumSet; import java.util.Formatter; @@ -33,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; import java.util.stream.Stream; import org.apache.accumulo.core.fate.FateStore.FateTxStore; @@ -288,11 +288,9 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath, for (String id : lockedIds) { try { FateLockPath fLockPath = FateLock.path(lockPath + "/" + id); - List lockNodes = + SortedSet lockNodes = FateLock.validateAndWarn(fLockPath, zr.getChildren(fLockPath.toString())); - lockNodes.sort(Comparator.comparingLong(ln -> ln.sequence)); - int pos = 0; boolean sawWriteLock = false; diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java index 75dcfa38d22..b62a8f70939 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java @@ -21,11 +21,13 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; -import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; import java.util.function.BiPredicate; import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.QueueLock; @@ -72,7 +74,7 @@ public FateLock(ZooReaderWriter zrw, FateLockPath path) { this.path = requireNonNull(path); } - public static class FateLockNode { + public static class FateLockNode implements Comparable { public final long sequence; public final String lockData; @@ -83,6 +85,28 @@ private FateLockNode(String nodeName) { sequence = Long.parseLong(nodeName.substring(len - 10)); lockData = nodeName.substring(PREFIX.length(), len - 11); } + + @Override + public int compareTo(FateLockNode o) { + int cmp = Long.compare(sequence, o.sequence); + if (cmp == 0) { + cmp = lockData.compareTo(o.lockData); + } + return cmp; + } + + @Override + public boolean equals(Object o) { + if (o instanceof FateLockNode) { + return this.compareTo((FateLockNode) o) == 0; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(sequence, lockData); + } } // TODO change data arg from byte[] to String.. in the rest of the code its always a String. @@ -157,10 +181,10 @@ public void removeEntry(byte[] data, long entry) { /** * Validate and sort child nodes at this lock path by the lock prefix */ - public static List validateAndWarn(FateLockPath path, List children) { + public static SortedSet validateAndWarn(FateLockPath path, List children) { log.trace("validating and sorting children at path {}", path); - List validChildren = new ArrayList<>(); + SortedSet validChildren = new TreeSet<>(); if (children == null || children.isEmpty()) { return validChildren; From 95fb8dc112b638395fec60ff4f8de8a49f804160 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 15 Jan 2025 18:13:11 +0000 Subject: [PATCH 06/11] remove TODO --- .../java/org/apache/accumulo/core/fate/zookeeper/FateLock.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java index b62a8f70939..f8177b9ddde 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java @@ -109,7 +109,6 @@ public int hashCode() { } } - // TODO change data arg from byte[] to String.. in the rest of the code its always a String. @Override public long addEntry(byte[] data) { From 2309b9b76bc1095912376c9441ffef24c52948d4 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 17 Jan 2025 21:49:32 +0000 Subject: [PATCH 07/11] code review update --- .../core/fate/zookeeper/FateLock.java | 6 +- .../core/fate/zookeeper/FateLockTest.java | 65 +++++++++++++++++++ 2 files changed, 68 insertions(+), 3 deletions(-) create mode 100644 core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java index f8177b9ddde..a06db62eb23 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java @@ -47,7 +47,7 @@ public class FateLock implements QueueLock { private static final Logger log = LoggerFactory.getLogger(FateLock.class); - private static final String PREFIX = "flock#"; + static final String PREFIX = "flock#"; private final ZooReaderWriter zoo; private final FateLockPath path; @@ -78,11 +78,11 @@ public static class FateLockNode implements Comparable { public final long sequence; public final String lockData; - private FateLockNode(String nodeName) { + FateLockNode(String nodeName) { int len = nodeName.length(); Preconditions.checkArgument(nodeName.startsWith(PREFIX) && nodeName.charAt(len - 11) == '#', "Illegal node name %s", nodeName); - sequence = Long.parseLong(nodeName.substring(len - 10)); + sequence = Long.parseUnsignedLong(nodeName.substring(len - 10), 10); lockData = nodeName.substring(PREFIX.length(), len - 11); } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java new file mode 100644 index 00000000000..2eb57298b1d --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.zookeeper; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.UUID; + +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.junit.jupiter.api.Test; + +public class FateLockTest { + + @Test + public void testParsing() { + var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + // ZooKeeper docs state that sequence numbers are formatted using %010d + var lockNode = new FateLock.FateLockNode( + FateLock.PREFIX + fateId.canonical() + "#" + String.format("%010d", 40)); + assertEquals(40, lockNode.sequence); + assertEquals(fateId.canonical(), lockNode.lockData); + + assertThrows(IllegalArgumentException.class, + () -> new FateLock.FateLockNode(fateId.canonical() + "#" + String.format("%010d", 40))); + assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( + FateLock.PREFIX + fateId.canonical() + "#" + String.format("%d", 40))); + assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( + FateLock.PREFIX + fateId.canonical() + "#" + String.format("%09d", 40))); + assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( + FateLock.PREFIX + fateId.canonical() + "#" + String.format("%011d", 40))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.FateLockNode(FateLock.PREFIX + fateId.canonical() + "#abc")); + assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( + FateLock.PREFIX + fateId.canonical() + String.format("%010d", 40))); + + // ZooKeeper docs state that sequence numbers can roll and become negative. The FateLock code + // does not support this, so make sure it fails if this happens. + for (int i : new int[] {Integer.MIN_VALUE, Integer.MIN_VALUE / 2, Integer.MIN_VALUE / 10, + Integer.MIN_VALUE / 1000, -40}) { + assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( + FateLock.PREFIX + fateId.canonical() + "#" + String.format("%010d", i))); + } + + assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( + FateLock.PREFIX + fateId.canonical() + "#" + String.format("%d", -40))); + } +} From ac9a6b536ba52216d18b1a16945dd5ca59b70f94 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 24 Jan 2025 17:13:21 +0000 Subject: [PATCH 08/11] improve test --- .../accumulo/core/fate/zookeeper/FateLockTest.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java index 2eb57298b1d..4235f91b382 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; import java.util.UUID; @@ -55,10 +56,19 @@ public void testParsing() { // does not support this, so make sure it fails if this happens. for (int i : new int[] {Integer.MIN_VALUE, Integer.MIN_VALUE / 2, Integer.MIN_VALUE / 10, Integer.MIN_VALUE / 1000, -40}) { - assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( - FateLock.PREFIX + fateId.canonical() + "#" + String.format("%010d", i))); + var seq = String.format("%010d", i); + if (seq.length() == 10) { + assertThrows(NumberFormatException.class, () -> new FateLock.FateLockNode( + FateLock.PREFIX + fateId.canonical() + "#" + String.format("%010d", i))); + } else if (seq.length() == 11) { + assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( + FateLock.PREFIX + fateId.canonical() + "#" + String.format("%010d", i))); + } else { + fail("Unexpected length " + seq.length()); + } } + // Test a negative number that is not formatted w/ %010d assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( FateLock.PREFIX + fateId.canonical() + "#" + String.format("%d", -40))); } From 1d4d7f2e378202e71068b8180728588be9924fc9 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 24 Jan 2025 17:16:09 +0000 Subject: [PATCH 09/11] use variable --- .../apache/accumulo/core/fate/zookeeper/FateLockTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java index 4235f91b382..26285ced643 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java @@ -58,11 +58,11 @@ public void testParsing() { Integer.MIN_VALUE / 1000, -40}) { var seq = String.format("%010d", i); if (seq.length() == 10) { - assertThrows(NumberFormatException.class, () -> new FateLock.FateLockNode( - FateLock.PREFIX + fateId.canonical() + "#" + String.format("%010d", i))); + assertThrows(NumberFormatException.class, + () -> new FateLock.FateLockNode(FateLock.PREFIX + fateId.canonical() + "#" + seq)); } else if (seq.length() == 11) { - assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( - FateLock.PREFIX + fateId.canonical() + "#" + String.format("%010d", i))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.FateLockNode(FateLock.PREFIX + fateId.canonical() + "#" + seq)); } else { fail("Unexpected length " + seq.length()); } From 1c08a342967f3d510d7aea94489f8d606553b9d4 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Sat, 1 Feb 2025 18:05:12 +0000 Subject: [PATCH 10/11] follow on changes after merge --- .../apache/accumulo/core/fate/AdminUtil.java | 17 ++-- .../zookeeper/DistributedReadWriteLock.java | 8 +- .../core/fate/zookeeper/FateLock.java | 84 ++++++------------- .../DistributedReadWriteLockTest.java | 2 +- .../core/fate/zookeeper/FateLockTest.java | 35 ++++---- 5 files changed, 57 insertions(+), 89 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index d9b9dfaa300..9577ab7ff2b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -40,7 +40,7 @@ import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; -import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock; +import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType; import org.apache.accumulo.core.fate.zookeeper.FateLock; import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -279,17 +279,19 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath, for (String id : lockedIds) { try { FateLockPath fLockPath = FateLock.path(lockPath + "/" + id); - SortedSet lockNodes = + SortedSet lockNodes = FateLock.validateAndWarn(fLockPath, zr.getChildren(fLockPath.toString())); int pos = 0; boolean sawWriteLock = false; - for (FateLock.FateLockNode node : lockNodes) { + for (FateLock.NodeName node : lockNodes) { try { - FateId fateId = node.lockData.getFateId(); + FateLock.FateLockEntry fateLockEntry = node.fateLockEntry.get(); + var fateId = fateLockEntry.getFateId(); + var lockType = fateLockEntry.getLockType(); - if (node.lockData.getLockType() == DistributedReadWriteLock.LockType.WRITE) { + if (lockType == LockType.WRITE) { sawWriteLock = true; } @@ -297,15 +299,14 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath, if (pos == 0) { locks = heldLocks; - } else if (node.lockData.getLockType() == DistributedReadWriteLock.LockType.READ - && !sawWriteLock) { + } else if (lockType == LockType.READ && !sawWriteLock) { locks = heldLocks; } else { locks = waitingLocks; } locks.computeIfAbsent(fateId, k -> new ArrayList<>()) - .add(node.lockData.getLockType().name().charAt(0) + ":" + id); + .add(lockType.name().charAt(0) + ":" + id); } catch (Exception e) { log.error("{}", e.getMessage(), e); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index 8bda9097780..1b52a477d73 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -154,7 +154,7 @@ public void unlock() { return; } log.debug("Removing lock entry {} fateId {} lockType {}", entry, this.fateId, getType()); - qlock.removeEntry(new FateLockEntry(this.getType(), this.fateId), entry); + qlock.removeEntry(FateLockEntry.from(this.getType(), this.fateId), entry); entry = -1; } @@ -205,10 +205,8 @@ public DistributedReadWriteLock(QueueLock qlock, FateId fateId) { } public static DistributedLock recoverLock(QueueLock qlock, FateId fateId) { - SortedMap> entries = qlock.getEntries((seq, lockData) -> { - // TODO fix this lambda - return lockData.get().fateId.equals(fateId); - }); + SortedMap> entries = + qlock.getEntries((seq, lockData) -> lockData.get().fateId.equals(fateId)); switch (entries.size()) { case 0: diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java index b626d915102..f6a654fe6a1 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java @@ -18,10 +18,8 @@ */ package org.apache.accumulo.core.fate.zookeeper; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -74,36 +72,15 @@ public static class FateLockEntry implements Comparable { final LockType lockType; final FateId fateId; - FateLockEntry(LockType lockType, FateId fateId) { + private FateLockEntry(LockType lockType, FateId fateId) { this.lockType = Objects.requireNonNull(lockType); this.fateId = Objects.requireNonNull(fateId); } private FateLockEntry(String entry) { - // TODO can byte constructor be removed? - this(entry.getBytes(UTF_8)); - } - - private FateLockEntry(byte[] entry) { - if (entry == null || entry.length < 1) { - throw new IllegalArgumentException(); - } - - int split = -1; - for (int i = 0; i < entry.length; i++) { - if (entry[i] == ':') { - split = i; - break; - } - } - - if (split == -1) { - throw new IllegalArgumentException(); - } - - this.lockType = LockType.valueOf(new String(entry, 0, split, UTF_8)); - this.fateId = - FateId.from(new String(Arrays.copyOfRange(entry, split + 1, entry.length), UTF_8)); + var fields = entry.split(":", 2); + this.lockType = LockType.valueOf(fields[0]); + this.fateId = FateId.from(fields[1]); } public LockType getLockType() { @@ -114,15 +91,8 @@ public FateId getFateId() { return fateId; } - public byte[] serialize() { - // TODO redo for serialization into name - byte[] typeBytes = lockType.name().getBytes(UTF_8); - byte[] fateIdBytes = fateId.canonical().getBytes(UTF_8); - byte[] result = new byte[fateIdBytes.length + 1 + typeBytes.length]; - System.arraycopy(typeBytes, 0, result, 0, typeBytes.length); - result[typeBytes.length] = ':'; - System.arraycopy(fateIdBytes, 0, result, typeBytes.length + 1, fateIdBytes.length); - return result; + public String serialize() { + return lockType.name() + ":" + fateId.canonical(); } @Override @@ -146,13 +116,12 @@ public static FateLockEntry from(LockType lockType, FateId fateId) { return new FateLockEntry(lockType, fateId); } - public static FateLockEntry deserialize(byte[] serialized) { + public static FateLockEntry deserialize(String serialized) { return new FateLockEntry(serialized); } @Override public int compareTo(FateLockEntry o) { - // TODO determine how it sorted before this change int cmp = lockType.compareTo(o.lockType); if (cmp == 0) { cmp = fateId.compareTo(o.fateId); @@ -170,46 +139,46 @@ public FateLock(ZooReaderWriter zrw, FateLockPath path) { this.path = requireNonNull(path); } - // TODO rename to NodeName - public static class FateLockNode implements Comparable { + public static class NodeName implements Comparable { public final long sequence; - public final FateLockEntry lockData; + public final Supplier fateLockEntry; - FateLockNode(String nodeName) { + NodeName(String nodeName) { int len = nodeName.length(); Preconditions.checkArgument(nodeName.startsWith(PREFIX) && nodeName.charAt(len - 11) == '#', "Illegal node name %s", nodeName); sequence = Long.parseUnsignedLong(nodeName.substring(len - 10), 10); - lockData = new FateLockEntry(nodeName.substring(PREFIX.length(), len - 11)); + fateLockEntry = Suppliers + .memoize(() -> FateLockEntry.deserialize(nodeName.substring(PREFIX.length(), len - 11))); } @Override - public int compareTo(FateLockNode o) { + public int compareTo(NodeName o) { int cmp = Long.compare(sequence, o.sequence); if (cmp == 0) { - cmp = lockData.compareTo(o.lockData); + cmp = fateLockEntry.get().compareTo(o.fateLockEntry.get()); } return cmp; } @Override public boolean equals(Object o) { - if (o instanceof FateLockNode) { - return this.compareTo((FateLockNode) o) == 0; + if (o instanceof NodeName) { + return this.compareTo((NodeName) o) == 0; } return false; } @Override public int hashCode() { - return Objects.hash(sequence, lockData); + return Objects.hash(sequence, fateLockEntry.get()); } } @Override public long addEntry(FateLockEntry entry) { - String dataString = new String(entry.serialize(), UTF_8); + String dataString = entry.serialize(); Preconditions.checkState(!dataString.contains("#")); String newPath; @@ -220,7 +189,7 @@ public long addEntry(FateLockEntry entry) { zoo.putPersistentSequential(path + "/" + PREFIX + dataString + "#", new byte[0]); String[] parts = newPath.split("/"); String last = parts[parts.length - 1]; - return new FateLockNode(last).sequence; + return new NodeName(last).sequence; } catch (NoNodeException nne) { // the parent does not exist so try to create it zoo.putPersistentData(path.toString(), new byte[] {}, NodeExistsPolicy.SKIP); @@ -245,12 +214,11 @@ public long addEntry(FateLockEntry entry) { } for (String name : children) { - var parsed = new FateLockNode(name); - // TODO supplier probably not need becaue always parsing now - if (predicate.test(parsed.sequence, () -> parsed.lockData)) { + var parsed = new NodeName(name); + if (predicate.test(parsed.sequence, parsed.fateLockEntry)) { // Use a supplier so we don't need to deserialize unless the calling code cares about // the value for that entry. - result.put(parsed.sequence, Suppliers.memoize(() -> parsed.lockData)); + Preconditions.checkState(result.put(parsed.sequence, parsed.fateLockEntry) == null); } } } catch (KeeperException | InterruptedException ex) { @@ -261,7 +229,7 @@ public long addEntry(FateLockEntry entry) { @Override public void removeEntry(FateLockEntry data, long entry) { - String dataString = new String(data.serialize(), UTF_8); + String dataString = data.serialize(); Preconditions.checkState(!dataString.contains("#")); try { zoo.recursiveDelete(path + String.format("/%s%s#%010d", PREFIX, dataString, entry), @@ -280,10 +248,10 @@ public void removeEntry(FateLockEntry data, long entry) { /** * Validate and sort child nodes at this lock path by the lock prefix */ - public static SortedSet validateAndWarn(FateLockPath path, List children) { + public static SortedSet validateAndWarn(FateLockPath path, List children) { log.trace("validating and sorting children at path {}", path); - SortedSet validChildren = new TreeSet<>(); + SortedSet validChildren = new TreeSet<>(); if (children == null || children.isEmpty()) { return validChildren; @@ -292,7 +260,7 @@ public static SortedSet validateAndWarn(FateLockPath path, List { log.trace("Validating {}", c); try { - var fateLockNode = new FateLockNode(c); + var fateLockNode = new NodeName(c); validChildren.add(fateLockNode); } catch (RuntimeException e) { log.warn("Illegal fate lock node {}", c, e); diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java index 5d6127b0ae2..229e8ec07d2 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java @@ -153,7 +153,7 @@ public void testFateLockEntrySerDes() { assertEquals(LockType.READ, entry.getLockType()); assertEquals(FateId.from(FateInstanceType.USER, uuid), entry.getFateId()); - byte[] serialized = entry.serialize(); + String serialized = entry.serialize(); var deserialized = FateLockEntry.deserialize(serialized); assertEquals(entry, deserialized); } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java index 26285ced643..b4648c3f90c 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java @@ -34,23 +34,24 @@ public class FateLockTest { public void testParsing() { var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); // ZooKeeper docs state that sequence numbers are formatted using %010d - var lockNode = new FateLock.FateLockNode( - FateLock.PREFIX + fateId.canonical() + "#" + String.format("%010d", 40)); + String lockData = "WRITE:" + fateId.canonical(); + var lockNode = + new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%010d", 40)); assertEquals(40, lockNode.sequence); - assertEquals(fateId.canonical(), lockNode.lockData); + assertEquals(lockData, lockNode.fateLockEntry.get().serialize()); assertThrows(IllegalArgumentException.class, - () -> new FateLock.FateLockNode(fateId.canonical() + "#" + String.format("%010d", 40))); - assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( - FateLock.PREFIX + fateId.canonical() + "#" + String.format("%d", 40))); - assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( - FateLock.PREFIX + fateId.canonical() + "#" + String.format("%09d", 40))); - assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( - FateLock.PREFIX + fateId.canonical() + "#" + String.format("%011d", 40))); + () -> new FateLock.NodeName(lockData + "#" + String.format("%010d", 40))); assertThrows(IllegalArgumentException.class, - () -> new FateLock.FateLockNode(FateLock.PREFIX + fateId.canonical() + "#abc")); - assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( - FateLock.PREFIX + fateId.canonical() + String.format("%010d", 40))); + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%d", 40))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%09d", 40))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%011d", 40))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#abc")); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + String.format("%010d", 40))); // ZooKeeper docs state that sequence numbers can roll and become negative. The FateLock code // does not support this, so make sure it fails if this happens. @@ -59,17 +60,17 @@ public void testParsing() { var seq = String.format("%010d", i); if (seq.length() == 10) { assertThrows(NumberFormatException.class, - () -> new FateLock.FateLockNode(FateLock.PREFIX + fateId.canonical() + "#" + seq)); + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + seq)); } else if (seq.length() == 11) { assertThrows(IllegalArgumentException.class, - () -> new FateLock.FateLockNode(FateLock.PREFIX + fateId.canonical() + "#" + seq)); + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + seq)); } else { fail("Unexpected length " + seq.length()); } } // Test a negative number that is not formatted w/ %010d - assertThrows(IllegalArgumentException.class, () -> new FateLock.FateLockNode( - FateLock.PREFIX + fateId.canonical() + "#" + String.format("%d", -40))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%d", -40))); } } From 33e9bf610354627787fd9316ef9da27550319603 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Sat, 1 Feb 2025 21:19:09 +0000 Subject: [PATCH 11/11] move comment --- .../org/apache/accumulo/core/fate/zookeeper/FateLock.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java index f6a654fe6a1..66b8191f2ed 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java @@ -148,6 +148,8 @@ public static class NodeName implements Comparable { Preconditions.checkArgument(nodeName.startsWith(PREFIX) && nodeName.charAt(len - 11) == '#', "Illegal node name %s", nodeName); sequence = Long.parseUnsignedLong(nodeName.substring(len - 10), 10); + // Use a supplier so we don't need to deserialize unless the calling code cares about + // the value for that entry. fateLockEntry = Suppliers .memoize(() -> FateLockEntry.deserialize(nodeName.substring(PREFIX.length(), len - 11))); } @@ -216,8 +218,6 @@ public long addEntry(FateLockEntry entry) { for (String name : children) { var parsed = new NodeName(name); if (predicate.test(parsed.sequence, parsed.fateLockEntry)) { - // Use a supplier so we don't need to deserialize unless the calling code cares about - // the value for that entry. Preconditions.checkState(result.put(parsed.sequence, parsed.fateLockEntry) == null); } }