diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index 150fcf47570..8737c67241c 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@ -1294,24 +1294,32 @@ public Reader(CachableBlockFile.CachableBuilder b) throws IOException { this(new CachableBlockFile.Reader(b)); } - private void closeLocalityGroupReaders() { + private void closeLocalityGroupReaders(boolean ignoreIOExceptions) throws IOException { for (LocalityGroupReader lgr : currentReaders) { try { lgr.close(); } catch (IOException e) { - log.warn("Errored out attempting to close LocalityGroupReader.", e); + if (ignoreIOExceptions) { + log.warn("Errored out attempting to close LocalityGroupReader.", e); + } else { + throw e; + } } } } @Override - public void closeDeepCopies() { + public void closeDeepCopies() throws IOException { + closeDeepCopies(false); + } + + private void closeDeepCopies(boolean ignoreIOExceptions) throws IOException { if (deepCopy) { throw new IllegalStateException("Calling closeDeepCopies on a deep copy is not supported"); } for (Reader deepCopy : deepCopies) { - deepCopy.closeLocalityGroupReaders(); + deepCopy.closeLocalityGroupReaders(ignoreIOExceptions); } deepCopies.clear(); @@ -1323,8 +1331,9 @@ public void close() throws IOException { throw new IllegalStateException("Calling close on a deep copy is not supported"); } - closeDeepCopies(); - closeLocalityGroupReaders(); + // Closes as much as possible igoring and logging exceptions along the way + closeDeepCopies(true); + closeLocalityGroupReaders(true); if (sampleReaders != null) { for (LocalityGroupReader lgr : sampleReaders) { diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index cb09d740956..2fe6e2d2b7f 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -20,15 +20,16 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MINUTES; import java.net.InetAddress; import java.net.URL; import java.net.UnknownHostException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -71,6 +72,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Client; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo; @@ -140,7 +142,7 @@ public static void main(String[] args) throws Exception { private ManagerMonitorInfo mmi; private GCStatus gcStatus; private Optional coordinatorHost = Optional.empty(); - private long coordinatorCheckNanos = 0L; + private Timer coordinatorCheck = null; private CompactionCoordinatorService.Client coordinatorClient; private final String coordinatorMissingMsg = "Error getting the compaction coordinator. Check that it is running. It is not " @@ -295,11 +297,10 @@ public void fetchData() { } // check for compaction coordinator host and only notify its discovery - Optional previousHost; - if (System.nanoTime() - coordinatorCheckNanos > fetchTimeNanos) { - previousHost = coordinatorHost; + if (coordinatorCheck == null || coordinatorCheck.hasElapsed(expirationTimeMinutes, MINUTES)) { + Optional previousHost = coordinatorHost; coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(context); - coordinatorCheckNanos = System.nanoTime(); + coordinatorCheck = Timer.startNew(); if (previousHost.isEmpty() && coordinatorHost.isPresent()) { log.info("External Compaction Coordinator found at {}", coordinatorHost.orElseThrow()); } @@ -507,111 +508,76 @@ public static class CompactionStats { } } - private final Map tserverScans = new HashMap<>(); - private final Map sserverScans = new HashMap<>(); - private final Map allCompactions = new HashMap<>(); - private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); - - private long scansFetchedNanos = System.nanoTime(); - private long compactsFetchedNanos = System.nanoTime(); - private long ecInfoFetchedNanos = System.nanoTime(); - private final long fetchTimeNanos = TimeUnit.MINUTES.toNanos(1); - private final long ageOffEntriesMillis = TimeUnit.MINUTES.toMillis(15); - // When there are a large amount of external compactions running the list of external compactions - // could consume a lot of memory. The purpose of this memoizing supplier is to try to avoid - // creating the list of running external compactions in memory per web request. If multiple - // request come in around the same time they should use the same list. It is still possible to - // have multiple list in memory if one request obtains a copy and then another request comes in - // after the timeout and the supplier recomputes the list. The longer the timeout on the supplier - // is the less likely we are to have multiple list of external compactions in memory, however - // increasing the timeout will make the monitor less responsive. - private final Supplier extCompactionSnapshot = - Suppliers.memoizeWithExpiration(() -> computeExternalCompactionsSnapshot(), fetchTimeNanos, - TimeUnit.NANOSECONDS); + private final long expirationTimeMinutes = 1; + + // Use Suppliers.memoizeWithExpiration() to cache the results of expensive fetch operations. This + // avoids unnecessary repeated fetches within the expiration period and ensures that multiple + // requests around the same time use the same cached data. + private final Supplier> tserverScansSupplier = + Suppliers.memoizeWithExpiration(this::fetchTServerScans, expirationTimeMinutes, MINUTES); + + private final Supplier> sserverScansSupplier = + Suppliers.memoizeWithExpiration(this::fetchSServerScans, expirationTimeMinutes, MINUTES); + + private final Supplier> compactionsSupplier = + Suppliers.memoizeWithExpiration(this::fetchCompactions, expirationTimeMinutes, MINUTES); + + private final Supplier compactorInfoSupplier = + Suppliers.memoizeWithExpiration(this::fetchCompactorsInfo, expirationTimeMinutes, MINUTES); + + private final Supplier externalCompactionsSupplier = + Suppliers.memoizeWithExpiration(this::computeExternalCompactionsSnapshot, + expirationTimeMinutes, MINUTES); /** - * Fetch the active scans but only if fetchTimeNanos has elapsed. + * @return active tablet server scans. Values are cached and refresh after + * {@link #expirationTimeMinutes}. */ - public synchronized Map getScans() { - if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active TabletServer Scans"); - fetchScans(); - } - return Map.copyOf(tserverScans); + public Map getScans() { + return tserverScansSupplier.get(); } - public synchronized Map getScanServerScans() { - if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active ScanServer Scans"); - fetchScans(); - } - return Map.copyOf(sserverScans); + /** + * @return active scan server scans. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public Map getScanServerScans() { + return sserverScansSupplier.get(); } /** - * Fetch the active compactions but only if fetchTimeNanos has elapsed. + * @return active compactions. Values are cached and refresh after {@link #expirationTimeMinutes}. */ - public synchronized Map getCompactions() { - if (System.nanoTime() - compactsFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active Compactions"); - fetchCompactions(); - } - return Map.copyOf(allCompactions); + public Map getCompactions() { + return compactionsSupplier.get(); } - public synchronized ExternalCompactionInfo getCompactorsInfo() { + /** + * @return external compaction information. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public ExternalCompactionInfo getCompactorsInfo() { if (coordinatorHost.isEmpty()) { throw new IllegalStateException("Tried fetching from compaction coordinator that's missing"); } - if (System.nanoTime() - ecInfoFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of External Compaction info"); - Map> compactors = - ExternalCompactionUtil.getCompactorAddrs(getContext()); - log.debug("Found compactors: " + compactors); - ecInfo.setFetchedTimeMillis(System.currentTimeMillis()); - ecInfo.setCompactors(compactors); - ecInfo.setCoordinatorHost(coordinatorHost); - - ecInfoFetchedNanos = System.nanoTime(); - } - return ecInfo; - } - - private static class ExternalCompactionsSnapshot { - public final RunningCompactions runningCompactions; - public final Map ecRunningMap; - - private ExternalCompactionsSnapshot(Optional> ecRunningMapOpt) { - this.ecRunningMap = - ecRunningMapOpt.map(Collections::unmodifiableMap).orElse(Collections.emptyMap()); - this.runningCompactions = new RunningCompactions(this.ecRunningMap); - } - } - - private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() { - if (coordinatorHost.isEmpty()) { - throw new IllegalStateException(coordinatorMissingMsg); - } - var ccHost = coordinatorHost.orElseThrow(); - log.info("User initiated fetch of running External Compactions from " + ccHost); - var client = getCoordinator(ccHost); - TExternalCompactionList running; - try { - running = client.getRunningCompactions(TraceUtil.traceInfo(), getContext().rpcCreds()); - } catch (Exception e) { - throw new IllegalStateException("Unable to get running compactions from " + ccHost, e); - } - - return new ExternalCompactionsSnapshot(Optional.ofNullable(running.getCompactions())); + return compactorInfoSupplier.get(); } - public RunningCompactions getRunnningCompactions() { - return extCompactionSnapshot.get().runningCompactions; + /** + * @return running compactions. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public RunningCompactions getRunningCompactions() { + return externalCompactionsSupplier.get().runningCompactions; } + /** + * @return running compactor details. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ public RunningCompactorDetails getRunningCompactorDetails(ExternalCompactionId ecid) { TExternalCompaction extCompaction = - extCompactionSnapshot.get().ecRunningMap.get(ecid.canonical()); + externalCompactionsSupplier.get().ecRunningMap.get(ecid.canonical()); if (extCompaction == null) { return null; } @@ -631,61 +597,36 @@ private CompactionCoordinatorService.Client getCoordinator(HostAndPort address) return coordinatorClient; } - private void fetchScans() { + private Map fetchScans(Collection servers) { ServerContext context = getContext(); - for (String server : context.instanceOperations().getTabletServers()) { + Map scans = new HashMap<>(); + for (String server : servers) { final HostAndPort parsedServer = HostAndPort.fromString(server); - TabletScanClientService.Client tserver = null; + TabletScanClientService.Client client = null; try { - tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); - List scans = tserver.getActiveScans(null, context.rpcCreds()); - tserverScans.put(parsedServer, new ScanStats(scans)); - scansFetchedNanos = System.nanoTime(); + client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); + List activeScans = client.getActiveScans(null, context.rpcCreds()); + scans.put(parsedServer, new ScanStats(activeScans)); } catch (Exception ex) { log.error("Failed to get active scans from {}", server, ex); } finally { - ThriftUtil.returnClient(tserver, context); - } - } - // Age off old scan information - Iterator> tserverIter = tserverScans.entrySet().iterator(); - // clock time used for fetched for date friendly display - long now = System.currentTimeMillis(); - while (tserverIter.hasNext()) { - Entry entry = tserverIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - tserverIter.remove(); - } - } - // Scan Servers - for (String server : context.instanceOperations().getScanServers()) { - final HostAndPort parsedServer = HostAndPort.fromString(server); - TabletScanClientService.Client sserver = null; - try { - sserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); - List scans = sserver.getActiveScans(null, context.rpcCreds()); - sserverScans.put(parsedServer, new ScanStats(scans)); - scansFetchedNanos = System.nanoTime(); - } catch (Exception ex) { - log.error("Failed to get active scans from {}", server, ex); - } finally { - ThriftUtil.returnClient(sserver, context); - } - } - // Age off old scan information - Iterator> sserverIter = sserverScans.entrySet().iterator(); - // clock time used for fetched for date friendly display - now = System.currentTimeMillis(); - while (sserverIter.hasNext()) { - Entry entry = sserverIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - sserverIter.remove(); + ThriftUtil.returnClient(client, context); } } + return Collections.unmodifiableMap(scans); + } + + private Map fetchTServerScans() { + return fetchScans(getContext().instanceOperations().getTabletServers()); + } + + private Map fetchSServerScans() { + return fetchScans(getContext().instanceOperations().getScanServers()); } - private void fetchCompactions() { + private Map fetchCompactions() { ServerContext context = getContext(); + Map allCompactions = new HashMap<>(); for (String server : context.instanceOperations().getTabletServers()) { final HostAndPort parsedServer = HostAndPort.fromString(server); Client tserver = null; @@ -693,23 +634,52 @@ private void fetchCompactions() { tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context); var compacts = tserver.getActiveCompactions(null, context.rpcCreds()); allCompactions.put(parsedServer, new CompactionStats(compacts)); - compactsFetchedNanos = System.nanoTime(); } catch (Exception ex) { log.debug("Failed to get active compactions from {}", server, ex); } finally { ThriftUtil.returnClient(tserver, context); } } - // Age off old compaction information - var entryIter = allCompactions.entrySet().iterator(); - // clock time used for fetched for date friendly display - long now = System.currentTimeMillis(); - while (entryIter.hasNext()) { - var entry = entryIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - entryIter.remove(); - } + return Collections.unmodifiableMap(allCompactions); + } + + private ExternalCompactionInfo fetchCompactorsInfo() { + ServerContext context = getContext(); + Map> compactors = ExternalCompactionUtil.getCompactorAddrs(context); + log.debug("Found compactors: {}", compactors); + ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); + ecInfo.setFetchedTimeMillis(System.currentTimeMillis()); + ecInfo.setCompactors(compactors); + ecInfo.setCoordinatorHost(coordinatorHost); + return ecInfo; + } + + private static class ExternalCompactionsSnapshot { + public final RunningCompactions runningCompactions; + public final Map ecRunningMap; + + private ExternalCompactionsSnapshot(Optional> ecRunningMapOpt) { + this.ecRunningMap = + ecRunningMapOpt.map(Collections::unmodifiableMap).orElse(Collections.emptyMap()); + this.runningCompactions = new RunningCompactions(this.ecRunningMap); + } + } + + private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() { + if (coordinatorHost.isEmpty()) { + throw new IllegalStateException(coordinatorMissingMsg); } + var ccHost = coordinatorHost.orElseThrow(); + log.info("User initiated fetch of running External Compactions from " + ccHost); + var client = getCoordinator(ccHost); + TExternalCompactionList running; + try { + running = client.getRunningCompactions(TraceUtil.traceInfo(), getContext().rpcCreds()); + } catch (Exception e) { + throw new IllegalStateException("Unable to get running compactions from " + ccHost, e); + } + + return new ExternalCompactionsSnapshot(Optional.ofNullable(running.getCompactions())); } /** diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java index 72d54d70a4e..5fcecef3493 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java @@ -60,7 +60,7 @@ public Compactors getCompactors() { @Path("running") @GET public RunningCompactions getRunning() { - return monitor.getRunnningCompactions(); + return monitor.getRunningCompactions(); } @Path("details") diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java index c7e516c6172..baf740a141c 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java @@ -39,7 +39,8 @@ public class HelpCommand extends Command { @Override public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws ShellCommandException, IOException { - int numColumns = shellState.getTerminal().getWidth(); + int numColumns = + (shellState.getTerminal().getWidth() == 0) ? 80 : shellState.getTerminal().getWidth(); if (cl.hasOption(noWrapOpt.getOpt())) { numColumns = Integer.MAX_VALUE; }