From cd07dc60360b3c14009380b0b0d96f2197757b26 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 4 Dec 2024 13:34:02 -0500 Subject: [PATCH 1/7] improves trace logging in zoocache (#5133) * improves trace logging in zoocache Noticed a lot of tsever lock checking was blocking on zoocache in jstacks. Was not sure why this was happening. Added more detailed logging to zoocache inorder to know what paths are missing in the cache and when a path is removed from the cache. * Update core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java Co-authored-by: Daniel Roberts --------- Co-authored-by: Daniel Roberts --- .../core/fate/zookeeper/ZooCache.java | 45 ++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java index e0f00994d5d..86b869fa152 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java @@ -53,6 +53,9 @@ public class ZooCache { private final ZCacheWatcher watcher = new ZCacheWatcher(); private final Watcher externalWatcher; + private static final AtomicLong nextCacheId = new AtomicLong(0); + private final String cacheId = "ZC" + nextCacheId.incrementAndGet(); + private static class ZcNode { final byte[] data; final ZcStat stat; @@ -150,7 +153,7 @@ private class ZCacheWatcher implements Watcher { @Override public void process(WatchedEvent event) { if (log.isTraceEnabled()) { - log.trace("{}", event); + log.trace("{}: {}", cacheId, event); } switch (event.getType()) { @@ -167,26 +170,26 @@ public void process(WatchedEvent event) { // These are ignored, because they are generated by SingletonManager closing // ZooKeepers for ZooSession, and SingletonManager is already responsible for clearing // caches via the registered ZooCacheFactory singleton - log.trace("ZooKeeper connection closed, ignoring; {}", event); + log.trace("{} ZooKeeper connection closed, ignoring; {}", cacheId, event); break; case Disconnected: - log.trace("ZooKeeper connection disconnected, clearing cache; {}", event); + log.trace("{} ZooKeeper connection disconnected, clearing cache; {}", cacheId, event); clear(); break; case SyncConnected: - log.trace("ZooKeeper connection established, ignoring; {}", event); + log.trace("{} ZooKeeper connection established, ignoring; {}", cacheId, event); break; case Expired: - log.trace("ZooKeeper connection expired, clearing cache; {}", event); + log.trace("{} ZooKeeper connection expired, clearing cache; {}", cacheId, event); clear(); break; default: - log.warn("Unhandled {}", event); + log.warn("{} Unhandled {}", cacheId, event); break; } break; default: - log.warn("Unhandled {}", event); + log.warn("{} Unhandled {}", cacheId, event); break; } @@ -206,6 +209,7 @@ public ZooCache(ZooReader reader, Watcher watcher) { this.zReader = reader; nodeCache = new ConcurrentHashMap<>(); this.externalWatcher = watcher; + log.trace("{} created new cache", cacheId, new Exception()); } private abstract static class ZooRunnable { @@ -316,6 +320,8 @@ public List run() throws KeeperException, InterruptedException { return zcNode.getChildren(); } + log.trace("{} {} was not in children cache, looking up in zookeeper", cacheId, zPath); + try { zcNode = nodeCache.compute(zPath, (zp, zcn) -> { // recheck the children now that lock is held on key @@ -387,6 +393,8 @@ public byte[] run() throws KeeperException, InterruptedException { return zcNode.getData(); } + log.trace("{} {} was not in data cache, looking up in zookeeper", cacheId, zPath); + zcNode = nodeCache.compute(zPath, (zp, zcn) -> { // recheck the now that lock is held on key, it may be present now. Could have been // computed while waiting for lock. @@ -408,7 +416,7 @@ public byte[] run() throws KeeperException, InterruptedException { ZcStat zstat = null; if (stat == null) { if (log.isTraceEnabled()) { - log.trace("zookeeper did not contain {}", zPath); + log.trace("{} zookeeper did not contain {}", cacheId, zPath); } } else { try { @@ -420,7 +428,7 @@ public byte[] run() throws KeeperException, InterruptedException { throw new ZcInterruptedException(e); } if (log.isTraceEnabled()) { - log.trace("zookeeper contained {} {}", zPath, + log.trace("{} zookeeper contained {} {}", cacheId, zPath, (data == null ? null : new String(data, UTF_8))); } } @@ -460,6 +468,7 @@ protected void copyStats(ZcStat userStat, ZcStat cachedStat) { private void remove(String zPath) { nodeCache.remove(zPath); + log.trace("{} removed {} from cache", cacheId, zPath); updateCount.incrementAndGet(); } @@ -470,6 +479,7 @@ public void clear() { Preconditions.checkState(!closed); nodeCache.clear(); updateCount.incrementAndGet(); + log.trace("{} cleared all from cache", cacheId); } public void close() { @@ -514,7 +524,20 @@ boolean childrenCached(String zPath) { */ public void clear(Predicate pathPredicate) { Preconditions.checkState(!closed); - nodeCache.keySet().removeIf(pathPredicate); + + Predicate pathPredicateToUse; + if (log.isTraceEnabled()) { + pathPredicateToUse = path -> { + boolean testResult = pathPredicate.test(path); + if (testResult) { + log.trace("{} removing {} from cache", cacheId, path); + } + return testResult; + }; + } else { + pathPredicateToUse = pathPredicate; + } + nodeCache.keySet().removeIf(pathPredicateToUse); updateCount.incrementAndGet(); } @@ -536,7 +559,7 @@ public Optional getLockData(ServiceLockPath path) { byte[] lockData = get(path + "/" + lockNode); if (log.isTraceEnabled()) { - log.trace("Data from lockNode {} is {}", lockNode, new String(lockData, UTF_8)); + log.trace("{} Data from lockNode {} is {}", cacheId, lockNode, new String(lockData, UTF_8)); } if (lockData == null) { lockData = new byte[0]; From 355b0d726e3bebc2407e5f29e5bd965004f68bef Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Thu, 5 Dec 2024 10:42:28 -0500 Subject: [PATCH 2/7] Fix ACCUMULO_POOL_PREFIX usage for Thread pool names (#5141) Closes #5136 --- .../apache/accumulo/core/util/threads/ThreadPools.java | 8 ++++++-- .../java/org/apache/accumulo/server/rpc/TServerUtils.java | 2 +- .../tserver/compactions/InternalCompactionExecutor.java | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index b2b0bc02db1..240a41459cb 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -399,7 +399,7 @@ public ThreadPoolExecutorBuilder getPoolBuilder(@NonNull final ThreadPoolNames p } /** - * Fet a fluent-style pool builder. + * Get a fluent-style pool builder. * * @param name the pool name - the name trimed and prepended with the ACCUMULO_POOL_PREFIX so that * pool names begin with a consistent prefix. @@ -409,7 +409,11 @@ public ThreadPoolExecutorBuilder getPoolBuilder(@NonNull final String name) { if (trimmed.startsWith(ACCUMULO_POOL_PREFIX.poolName)) { return new ThreadPoolExecutorBuilder(trimmed); } else { - return new ThreadPoolExecutorBuilder(ACCUMULO_POOL_PREFIX.poolName + trimmed); + if (trimmed.startsWith(".")) { + return new ThreadPoolExecutorBuilder(ACCUMULO_POOL_PREFIX.poolName + trimmed); + } else { + return new ThreadPoolExecutorBuilder(ACCUMULO_POOL_PREFIX.poolName + "." + trimmed); + } } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index d4220ff6ded..22a191a7a68 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -311,7 +311,7 @@ private static ServerAddress createNonBlockingServer(HostAndPort address, TProce private static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, final int executorThreads, long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks) { - String poolName = ACCUMULO_POOL_PREFIX.poolName + serverName.toLowerCase() + ".client"; + String poolName = ACCUMULO_POOL_PREFIX.poolName + "." + serverName.toLowerCase() + ".client"; final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().getPoolBuilder(poolName).numCoreThreads(executorThreads) .withTimeOut(threadTimeOut, MILLISECONDS).enableThreadPoolMetrics().build(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java index 66002556a3f..618f6463b32 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java @@ -208,7 +208,7 @@ public SubmittedJob submit(CompactionServiceId csid, CompactionJob job, Compacta public void setThreads(int numThreads) { ThreadPools.resizePool(threadPool, () -> numThreads, - ACCUMULO_POOL_PREFIX.poolName + "accumulo.pool.compaction." + ceid); + ACCUMULO_POOL_PREFIX.poolName + ".accumulo.pool.compaction." + ceid); } @Override From 93365b4f2330cc54268149e6ebc24a87af44e89f Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 6 Dec 2024 08:07:35 -0500 Subject: [PATCH 3/7] Fixed failures in RegexGroupBalanceIT after merging PR #5070 (#5140) After merging #5070 RegexGroupBalanceIT started failing. Both GroupBalancer and HostRegexTableLoadBalancer have logic that throttles the frequency that they can be called do not return any migrations in this scenario. The change in #5070 modified the frequency in which the balancer is called from once for all DataLevel's to once per DataLevel. This caused the GroupBalancer and HostRegexTableLoadBalancer to return migrations for the ROOT DataLevel, but not the METADATA and USER DataLevels. The solution in this commit is to push the DataLevel down to the Balancer in the BalancerParams so that the throttling can be done at the DataLevel level. --- .../manager/balancer/BalanceParamsImpl.java | 18 +++++++++++---- .../core/spi/balancer/GroupBalancer.java | 10 +++++--- .../balancer/HostRegexTableLoadBalancer.java | 14 +++++++---- .../core/spi/balancer/TableLoadBalancer.java | 7 ++++-- .../core/spi/balancer/TabletBalancer.java | 9 ++++++++ .../BaseHostRegexTableLoadBalancerTest.java | 8 +++++++ .../core/spi/balancer/GroupBalancerTest.java | 11 +++++---- ...xTableLoadBalancerReconfigurationTest.java | 12 ++++++---- .../HostRegexTableLoadBalancerTest.java | 23 +++++++++++-------- .../spi/balancer/SimpleLoadBalancerTest.java | 7 ++++-- .../spi/balancer/TableLoadBalancerTest.java | 5 ++-- .../org/apache/accumulo/manager/Manager.java | 2 +- .../test/ChaoticLoadBalancerTest.java | 4 +++- 13 files changed, 92 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java index a0c30d43f5d..97b9315c6e6 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.dataImpl.TabletIdImpl; import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.TabletBalancer; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; @@ -40,35 +41,39 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters { private final List migrationsOut; private final SortedMap thriftCurrentStatus; private final Set thriftCurrentMigrations; + private final DataLevel currentDataLevel; public static BalanceParamsImpl fromThrift(SortedMap currentStatus, SortedMap thriftCurrentStatus, - Set thriftCurrentMigrations) { + Set thriftCurrentMigrations, DataLevel currentLevel) { Set currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new) .collect(Collectors.toUnmodifiableSet()); return new BalanceParamsImpl(currentStatus, currentMigrations, new ArrayList<>(), - thriftCurrentStatus, thriftCurrentMigrations); + thriftCurrentStatus, thriftCurrentMigrations, currentLevel); } public BalanceParamsImpl(SortedMap currentStatus, - Set currentMigrations, List migrationsOut) { + Set currentMigrations, List migrationsOut, + DataLevel currentLevel) { this.currentStatus = currentStatus; this.currentMigrations = currentMigrations; this.migrationsOut = migrationsOut; this.thriftCurrentStatus = null; this.thriftCurrentMigrations = null; + this.currentDataLevel = currentLevel; } private BalanceParamsImpl(SortedMap currentStatus, Set currentMigrations, List migrationsOut, SortedMap thriftCurrentStatus, - Set thriftCurrentMigrations) { + Set thriftCurrentMigrations, DataLevel currentLevel) { this.currentStatus = currentStatus; this.currentMigrations = currentMigrations; this.migrationsOut = migrationsOut; this.thriftCurrentStatus = thriftCurrentStatus; this.thriftCurrentMigrations = thriftCurrentMigrations; + this.currentDataLevel = currentLevel; } @Override @@ -100,4 +105,9 @@ public void addMigration(KeyExtent extent, TServerInstance oldServer, TServerIns TabletServerId newTsid = new TabletServerIdImpl(newServer); migrationsOut.add(new TabletMigration(id, oldTsid, newTsid)); } + + @Override + public String currentLevel() { + return currentDataLevel.name(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java index 3527ba6f4c1..dc34e704440 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -68,7 +69,8 @@ public abstract class GroupBalancer implements TabletBalancer { protected BalancerEnvironment environment; private final TableId tableId; - private long lastRun = 0; + + protected final Map lastRunTimes = new HashMap<>(DataLevel.values().length); @Override public void init(BalancerEnvironment balancerEnvironment) { @@ -211,7 +213,9 @@ public long balance(BalanceParameters params) { return 5000; } - if (System.currentTimeMillis() - lastRun < getWaitTime()) { + final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel()); + + if (System.currentTimeMillis() - lastRunTimes.getOrDefault(currentLevel, 0L) < getWaitTime()) { return 5000; } @@ -275,7 +279,7 @@ public long balance(BalanceParameters params) { populateMigrations(tservers.keySet(), params.migrationsOut(), moves); - lastRun = System.currentTimeMillis(); + lastRunTimes.put(currentLevel, System.currentTimeMillis()); return 5000; } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java index 0b89e5d4ddf..cb88ce320c4 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java @@ -51,6 +51,7 @@ import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; import org.apache.accumulo.core.manager.balancer.TableStatisticsImpl; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TableStatistics; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; @@ -181,7 +182,7 @@ static class HrtlbConf { } private static final Set EMPTY_MIGRATIONS = Collections.emptySet(); - private volatile long lastOOBCheck = System.currentTimeMillis(); + protected final Map lastOOBCheckTimes = new HashMap<>(DataLevel.values().length); private Map> pools = new HashMap<>(); private final Map migrationsFromLastPass = new HashMap<>(); private final Map tableToTimeSinceNoMigrations = new HashMap<>(); @@ -394,7 +395,10 @@ public long balance(BalanceParameters params) { Map> currentGrouped = splitCurrentByRegex(params.currentStatus()); - if ((now - this.lastOOBCheck) > myConf.oobCheckMillis) { + final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel()); + + if ((now - this.lastOOBCheckTimes.getOrDefault(currentLevel, System.currentTimeMillis())) + > myConf.oobCheckMillis) { try { // Check to see if a tablet is assigned outside the bounds of the pool. If so, migrate it. for (String table : tableIdMap.keySet()) { @@ -454,7 +458,7 @@ public long balance(BalanceParameters params) { } } finally { // this could have taken a while...get a new time - this.lastOOBCheck = System.currentTimeMillis(); + this.lastOOBCheckTimes.put(currentLevel, System.currentTimeMillis()); } } @@ -507,8 +511,8 @@ public long balance(BalanceParameters params) { continue; } ArrayList newMigrations = new ArrayList<>(); - getBalancerForTable(tableId) - .balance(new BalanceParamsImpl(currentView, migrations, newMigrations)); + getBalancerForTable(tableId).balance( + new BalanceParamsImpl(currentView, migrations, newMigrations, DataLevel.of(tableId))); if (newMigrations.isEmpty()) { tableToTimeSinceNoMigrations.remove(tableId); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java index cb89e5b093a..55a24c30943 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.slf4j.Logger; @@ -124,10 +125,12 @@ public void getAssignments(AssignmentParameters params) { public long balance(BalanceParameters params) { long minBalanceTime = 5_000; // Iterate over the tables and balance each of them + final DataLevel currentDataLevel = DataLevel.valueOf(params.currentLevel()); for (TableId tableId : environment.getTableIdMap().values()) { ArrayList newMigrations = new ArrayList<>(); - long tableBalanceTime = getBalancerForTable(tableId).balance( - new BalanceParamsImpl(params.currentStatus(), params.currentMigrations(), newMigrations)); + long tableBalanceTime = + getBalancerForTable(tableId).balance(new BalanceParamsImpl(params.currentStatus(), + params.currentMigrations(), newMigrations, currentDataLevel)); if (tableBalanceTime < minBalanceTime) { minBalanceTime = tableBalanceTime; } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java index a7dfcbdc2bb..356bbc72236 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java @@ -25,6 +25,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TabletId; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -93,6 +94,14 @@ interface BalanceParameters { * migrations. */ List migrationsOut(); + + /** + * Return the DataLevel name for which the Manager is currently balancing. Balancers should + * return migrations for tables within the current DataLevel. + * + * @return name of current balancing iteration data level + */ + String currentLevel(); } /** diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/BaseHostRegexTableLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/BaseHostRegexTableLoadBalancerTest.java index c9c478a07fd..38d9297881f 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/BaseHostRegexTableLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/BaseHostRegexTableLoadBalancerTest.java @@ -268,4 +268,12 @@ protected SortedMap createCurrent(int numTservers) } return current; } + + @Override + public long balance(BalanceParameters params) { + long wait = super.balance(params); + super.lastOOBCheckTimes.clear(); + return wait; + } + } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java index 3f85ed3b792..e55eb379d23 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java @@ -40,6 +40,7 @@ import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -83,7 +84,8 @@ public void balance() { } public void balance(final int maxMigrations) { - GroupBalancer balancer = new GroupBalancer(TableId.of("1")) { + TableId tid = TableId.of("1"); + GroupBalancer balancer = new GroupBalancer(tid) { @Override protected Map getLocationProvider() { @@ -106,10 +108,10 @@ protected int getMaxMigrations() { } }; - balance(balancer, maxMigrations); + balance(balancer, maxMigrations, tid); } - public void balance(TabletBalancer balancer, int maxMigrations) { + public void balance(TabletBalancer balancer, int maxMigrations, TableId tid) { while (true) { Set migrations = new HashSet<>(); @@ -121,7 +123,8 @@ public void balance(TabletBalancer balancer, int maxMigrations) { new org.apache.accumulo.core.master.thrift.TabletServerStatus())); } - balancer.balance(new BalanceParamsImpl(current, migrations, migrationsOut)); + balancer + .balance(new BalanceParamsImpl(current, migrations, migrationsOut, DataLevel.of(tid))); assertTrue(migrationsOut.size() <= (maxMigrations + 5), "Max Migration exceeded " + maxMigrations + " " + migrationsOut.size()); diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java index f6b2123b6df..58a89ec6260 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java @@ -43,6 +43,7 @@ import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.spi.balancer.data.TabletStatistics; @@ -107,16 +108,19 @@ public void testConfigurationChanges() { // getOnlineTabletsForTable UtilWaitThread.sleep(3000); this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(0, migrationsOut.size()); // Change property, simulate call by TableConfWatcher config.set(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + BAR.getTableName(), "r01.*"); - // Wait to trigger the out of bounds check and the repool check - UtilWaitThread.sleep(10000); + // calls to balance will clear the lastOOBCheckTimes map + // in the HostRegexTableLoadBalancer. For this test we want + // to get into the out of bounds checking code, so we need to + // populate the map with an older time value + this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2); this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(5, migrationsOut.size()); for (TabletMigration migration : migrationsOut) { assertTrue(migration.getNewTabletServer().getHost().startsWith("192.168.0.1") diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java index 298bb8b995c..4d3162e02d2 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java @@ -48,13 +48,13 @@ import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.spi.balancer.data.TabletStatistics; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.util.ConfigurationImpl; -import org.apache.accumulo.core.util.UtilWaitThread; import org.junit.jupiter.api.Test; public class HostRegexTableLoadBalancerTest extends BaseHostRegexTableLoadBalancerTest { @@ -98,7 +98,7 @@ public void testBalance() { List migrationsOut = new ArrayList<>(); long wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(20000, wait); // should balance four tablets in one of the tables before reaching max assertEquals(4, migrationsOut.size()); @@ -109,7 +109,7 @@ public void testBalance() { } migrationsOut.clear(); wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(20000, wait); // should balance four tablets in one of the other tables before reaching max assertEquals(4, migrationsOut.size()); @@ -120,7 +120,7 @@ public void testBalance() { } migrationsOut.clear(); wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(20000, wait); // should balance four tablets in one of the other tables before reaching max assertEquals(4, migrationsOut.size()); @@ -131,7 +131,7 @@ public void testBalance() { } migrationsOut.clear(); wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(20000, wait); // no more balancing to do assertEquals(0, migrationsOut.size()); @@ -148,7 +148,7 @@ public void testBalanceWithTooManyOutstandingMigrations() { migrations.addAll(tableTablets.get(BAR.getTableName())); long wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut)); + migrations, migrationsOut, DataLevel.USER)); assertEquals(20000, wait); // no migrations should have occurred as 10 is the maxOutstandingMigrations assertEquals(0, migrationsOut.size()); @@ -487,13 +487,16 @@ public void testUnassignedWithNoDefaultPool() { @Test public void testOutOfBoundsTablets() { + // calls to balance will clear the lastOOBCheckTimes map + // in the HostRegexTableLoadBalancer. For this test we want + // to get into the out of bounds checking code, so we need to + // populate the map with an older time value + this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2); init(DEFAULT_TABLE_PROPERTIES); - // Wait to trigger the out of bounds check which will call our version of - // getOnlineTabletsForTable - UtilWaitThread.sleep(11000); Set migrations = new HashSet<>(); List migrationsOut = new ArrayList<>(); - this.balance(new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut)); + this.balance( + new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, DataLevel.USER)); assertEquals(2, migrationsOut.size()); } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java index 53889be484d..055898928b3 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java @@ -42,6 +42,7 @@ import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -202,7 +203,8 @@ public void testUnevenAssignment() { // balance until we can't balance no more! while (true) { List migrationsOut = new ArrayList<>(); - balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut)); + balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut, + DataLevel.USER)); if (migrationsOut.isEmpty()) { break; } @@ -244,7 +246,8 @@ public void testUnevenAssignment2() { // balance until we can't balance no more! while (true) { List migrationsOut = new ArrayList<>(); - balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut)); + balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut, + DataLevel.USER)); if (migrationsOut.isEmpty()) { break; } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java index 9d856e6052b..8e9aefd0283 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -141,13 +142,13 @@ public void test() { List migrationsOut = new ArrayList<>(); TableLoadBalancer tls = new TableLoadBalancer(); tls.init(environment); - tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut)); + tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER)); assertEquals(0, migrationsOut.size()); state.put(mkts("10.0.0.2", 2345, "0x02030405"), status()); tls = new TableLoadBalancer(); tls.init(environment); - tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut)); + tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER)); int count = 0; Map movedByTable = new HashMap<>(); movedByTable.put(TableId.of(t1Id), 0); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 44800d58337..55255751531 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1076,7 +1076,7 @@ private long balanceTablets() { } params = BalanceParamsImpl.fromThrift(statusForBalancerLevel, tserverStatusForLevel, - partitionedMigrations.get(dl)); + partitionedMigrations.get(dl), dl); wait = Math.max(tabletBalancer.balance(params), wait); migrationsOutForLevel = 0; for (TabletMigration m : checkMigrationSanity(statusForBalancerLevel.keySet(), diff --git a/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java b/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java index 90a26464173..57fbd33247f 100644 --- a/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java +++ b/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java @@ -40,6 +40,7 @@ import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -157,7 +158,8 @@ public void testUnevenAssignment() { // amount, or even expected amount List migrationsOut = new ArrayList<>(); while (!migrationsOut.isEmpty()) { - balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut)); + balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut, + DataLevel.USER)); } } From 3033a33e012fd66ef2da948d6944bf74bdd0da67 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 6 Dec 2024 14:01:29 +0000 Subject: [PATCH 4/7] Add missing since tag in TabletBalancer, remove unused import --- .../org/apache/accumulo/core/spi/balancer/TabletBalancer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java index 356bbc72236..06235a10a1f 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java @@ -25,7 +25,6 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TabletId; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -100,6 +99,7 @@ interface BalanceParameters { * return migrations for tables within the current DataLevel. * * @return name of current balancing iteration data level + * @since 2.1.4 */ String currentLevel(); } From a2ffa7b956a0ba2b920c0128df194ffd782745cc Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 6 Dec 2024 14:34:07 +0000 Subject: [PATCH 5/7] Add override annotation to BalanceParamsImpl that was lost in merge --- .../apache/accumulo/core/manager/balancer/BalanceParamsImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java index 8419452c711..ccccecd0f39 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java @@ -126,6 +126,7 @@ public Map> currentResourceGroups() { return tserverResourceGroups; } + @Override public String currentLevel() { return currentDataLevel.name(); } From 186294dd202b63904281b99f37e3a21d0b1285bc Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 6 Dec 2024 11:40:24 -0500 Subject: [PATCH 6/7] Refactor and simplification of accumulo-cluster (#5116) Modified argument parsing logic to use getopt which provides more flexibility to specify multiple options and allow options to have optional arguments. This allowed me to remove start-servers and stop-servers and use options to start and stop instead. See usage for details to api changes. Modified command execution such that ssh is used, even locally, for commands unless --local is specified. --- assemble/bin/accumulo-cluster | 782 +++++++++++++--------------------- 1 file changed, 290 insertions(+), 492 deletions(-) diff --git a/assemble/bin/accumulo-cluster b/assemble/bin/accumulo-cluster index cd80d7cb0d9..225ab2fd72e 100755 --- a/assemble/bin/accumulo-cluster +++ b/assemble/bin/accumulo-cluster @@ -18,6 +18,14 @@ # under the License. # +# +# Environment variables that can be set to influence the behavior +# of this script +# +# ACCUMULO_LOCALHOST_ADDRESSES - set to a space delimited string of localhost names +# and addresses to override the default lookups +# + function print_usage { cat < ( ...) [