Skip to content

Commit

Permalink
Merge remote-tracking branch 'dlmarion/4973-new-monitor-metrics' into…
Browse files Browse the repository at this point in the history
… newServerOldMonitor
  • Loading branch information
DomGarguilo committed Dec 6, 2024
2 parents 065aabd + 1691c9c commit 305101f
Show file tree
Hide file tree
Showing 37 changed files with 612 additions and 786 deletions.
782 changes: 290 additions & 492 deletions assemble/bin/accumulo-cluster

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ public enum CompactionReason {
public abstract List<IteratorSetting> getIterators();

/**
* Return the host where the compaction is running.
* Return the server where the compaction is running.
*
* @since 2.1.0
* @since 4.0.0
*/
public abstract ServerId getHost();
public abstract ServerId getServerId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;

import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.security.Authorizations;
Expand Down Expand Up @@ -96,4 +97,11 @@ public abstract class ActiveScan {
* @since 1.5.0
*/
public abstract long getIdleTime();

/**
* Return the server where the scan is running.
*
* @since 4.0.0
*/
public abstract ServerId getServerId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,22 +259,22 @@ Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceGroupPred
* @param tserver The tablet server address. This should be of the form
* {@code <ip address>:<port>}
* @return A list of active scans on tablet server.
* @deprecated see {@link #getActiveScans(ServerId)}
* @deprecated see {@link #getActiveScans(Collection)}
*/
@Deprecated(since = "4.0.0")
List<ActiveScan> getActiveScans(String tserver)
throws AccumuloException, AccumuloSecurityException;

/**
* List the active scans on a server.
* List the active scans on a collection of servers.
*
* @param server server type and address
* @return A stream of active scans on server.
* @param servers Collection of server types and addresses
* @return A list of active scans on the given servers.
* @throws IllegalArgumentException when the type of the server is not TABLET_SERVER or
* SCAN_SERVER
* @since 4.0.0
*/
List<ActiveScan> getActiveScans(ServerId server)
List<ActiveScan> getActiveScans(Collection<ServerId> servers)
throws AccumuloException, AccumuloSecurityException;

/**
Expand All @@ -286,32 +286,20 @@ List<ActiveScan> getActiveScans(ServerId server)
* @param tserver The server address. This should be of the form {@code <ip address>:<port>}
* @return the list of active compactions
* @since 1.5.0
* @deprecated see {@link #getActiveCompactions(ServerId server)}
* @deprecated see {@link #getActiveCompactions(Collection)}
*/
@Deprecated(since = "4.0.0")
List<ActiveCompaction> getActiveCompactions(String tserver)
throws AccumuloException, AccumuloSecurityException;

/**
* List the active compaction running on a TabletServer or Compactor. The server address can be
* retrieved using {@link #getCompactors()} or {@link #getTabletServers()}. Use
* {@link #getActiveCompactions()} to get a list of all compactions running on tservers and
* compactors.
*
* @param server The ServerId object
* @return the list of active compactions
* @throws IllegalArgumentException when the type of the server is not TABLET_SERVER or COMPACTOR
* @since 4.0.0
*/
List<ActiveCompaction> getActiveCompactions(ServerId server)
throws AccumuloException, AccumuloSecurityException;

/**
* List all internal and external compactions running in Accumulo.
*
* @return the list of active compactions
* @since 2.1.0
* @deprecated see {@link #getActiveCompactions(Collection)}
*/
@Deprecated(since = "4.0.0")
List<ActiveCompaction> getActiveCompactions() throws AccumuloException, AccumuloSecurityException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public List<IteratorSetting> getIterators() {
}

@Override
public ServerId getHost() {
public ServerId getServerId() {
return server;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.ActiveScan;
import org.apache.accumulo.core.client.admin.ScanState;
import org.apache.accumulo.core.client.admin.ScanType;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
Expand All @@ -44,20 +46,21 @@ public class ActiveScanImpl extends ActiveScan {

private final long scanId;
private final String client;
private String tableName;
private final String tableName;
private final long age;
private final long idle;
private ScanType type;
private ScanState state;
private KeyExtent extent;
private List<Column> columns;
private List<String> ssiList;
private Map<String,Map<String,String>> ssio;
private final ScanType type;
private final ScanState state;
private final KeyExtent extent;
private final List<Column> columns;
private final List<String> ssiList;
private final Map<String,Map<String,String>> ssio;
private final String user;
private Authorizations authorizations;
private final Authorizations authorizations;
private final ServerId server;

ActiveScanImpl(ClientContext context,
org.apache.accumulo.core.tabletscan.thrift.ActiveScan activeScan)
org.apache.accumulo.core.tabletscan.thrift.ActiveScan activeScan, ServerId server)
throws TableNotFoundException {
this.scanId = activeScan.scanId;
this.client = activeScan.client;
Expand All @@ -81,6 +84,7 @@ public class ActiveScanImpl extends ActiveScan {
this.ssiList.add(ii.iterName + "=" + ii.priority + "," + ii.className);
}
this.ssio = activeScan.ssio;
this.server = Objects.requireNonNull(server);
}

@Override
Expand Down Expand Up @@ -152,4 +156,9 @@ public Authorizations getAuthorizations() {
public long getIdleTime() {
return idle;
}

@Override
public ServerId getServerId() {
return server;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.accumulo.core.rpc.ThriftUtil.getClient;
import static org.apache.accumulo.core.rpc.ThriftUtil.returnClient;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS_COMPACTIONS_FINDER_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS_SCANS_FINDER_POOL;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -38,6 +39,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
Expand All @@ -51,6 +53,7 @@
import org.apache.accumulo.core.client.admin.ActiveScan;
import org.apache.accumulo.core.client.admin.InstanceOperations;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.clientImpl.thrift.ClientService;
import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType;
import org.apache.accumulo.core.clientImpl.thrift.TVersionedProperties;
Expand All @@ -72,12 +75,14 @@
import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.core.util.threads.ThreadPoolNames;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.MoreExecutors;

/**
* Provides a class for administering the accumulo instance
Expand Down Expand Up @@ -265,33 +270,18 @@ public List<String> getTabletServers() {
@Deprecated(since = "4.0.0")
public List<ActiveScan> getActiveScans(String tserver)
throws AccumuloException, AccumuloSecurityException {
final var parsedTserver = HostAndPort.fromString(tserver);
TabletScanClientService.Client client = null;
try {
client = getClient(ThriftClientTypes.TABLET_SCAN, parsedTserver, context);

List<ActiveScan> as = new ArrayList<>();
for (var activeScan : client.getActiveScans(TraceUtil.traceInfo(), context.rpcCreds())) {
try {
as.add(new ActiveScanImpl(context, activeScan));
} catch (TableNotFoundException e) {
throw new AccumuloException(e);
}
}
return as;
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code, e);
} catch (TException e) {
throw new AccumuloException(e);
} finally {
if (client != null) {
returnClient(client, context);
}
}
var si = getServerId(tserver, List.of(Type.TABLET_SERVER, Type.SCAN_SERVER));
// getActiveScans throws exceptions so we can't use Optional.map() here
return si.isPresent() ? getActiveScans(si.orElseThrow()) : List.of();
}

@Override
public List<ActiveScan> getActiveScans(ServerId server)
public List<ActiveScan> getActiveScans(Collection<ServerId> servers)
throws AccumuloException, AccumuloSecurityException {
return queryServers(servers, this::getActiveScans, INSTANCE_OPS_SCANS_FINDER_POOL);
}

private List<ActiveScan> getActiveScans(ServerId server)
throws AccumuloException, AccumuloSecurityException {

Objects.requireNonNull(server);
Expand All @@ -309,7 +299,7 @@ public List<ActiveScan> getActiveScans(ServerId server)
List<ActiveScan> as = new ArrayList<>();
for (var activeScan : rpcClient.getActiveScans(TraceUtil.traceInfo(), context.rpcCreds())) {
try {
as.add(new ActiveScanImpl(context, activeScan));
as.add(new ActiveScanImpl(context, activeScan, server));
} catch (TableNotFoundException e) {
throw new AccumuloException(e);
}
Expand Down Expand Up @@ -337,21 +327,12 @@ public boolean testClassLoad(final String className, final String asTypeName)
@Deprecated
public List<ActiveCompaction> getActiveCompactions(String server)
throws AccumuloException, AccumuloSecurityException {

HostAndPort hp = HostAndPort.fromString(server);

ServerId si = getServer(ServerId.Type.COMPACTOR, null, hp.getHost(), hp.getPort());
if (si == null) {
si = getServer(ServerId.Type.TABLET_SERVER, null, hp.getHost(), hp.getPort());
}
if (si == null) {
return List.of();
}
return getActiveCompactions(si);
var si = getServerId(server, List.of(Type.COMPACTOR, Type.TABLET_SERVER));
// getActiveCompactions throws exceptions so we can't use Optional.map() here
return si.isPresent() ? getActiveCompactions(si.orElseThrow()) : List.of();
}

@Override
public List<ActiveCompaction> getActiveCompactions(ServerId server)
private List<ActiveCompaction> getActiveCompactions(ServerId server)
throws AccumuloException, AccumuloSecurityException {

Objects.requireNonNull(server);
Expand Down Expand Up @@ -391,6 +372,7 @@ public List<ActiveCompaction> getActiveCompactions(ServerId server)
}

@Override
@Deprecated
public List<ActiveCompaction> getActiveCompactions()
throws AccumuloException, AccumuloSecurityException {

Expand All @@ -404,19 +386,34 @@ public List<ActiveCompaction> getActiveCompactions()
@Override
public List<ActiveCompaction> getActiveCompactions(Collection<ServerId> compactionServers)
throws AccumuloException, AccumuloSecurityException {
return queryServers(compactionServers, this::getActiveCompactions,
INSTANCE_OPS_COMPACTIONS_FINDER_POOL);
}

private <T> List<T> queryServers(Collection<ServerId> servers, ServerQuery<List<T>> serverQuery,
ThreadPoolNames pool) throws AccumuloException, AccumuloSecurityException {

final ExecutorService executorService;
// If size 0 or 1 there's no need to create a thread pool
if (servers.isEmpty()) {
return List.of();
} else if (servers.size() == 1) {
executorService = MoreExecutors.newDirectExecutorService();
} else {
int numThreads = Math.max(4, Math.min((servers.size()) / 10, 256));
executorService =
context.threadPools().getPoolBuilder(pool).numCoreThreads(numThreads).build();
}

int numThreads = Math.max(4, Math.min((compactionServers.size()) / 10, 256));
var executorService = context.threadPools().getPoolBuilder(INSTANCE_OPS_COMPACTIONS_FINDER_POOL)
.numCoreThreads(numThreads).build();
try {
List<Future<List<ActiveCompaction>>> futures = new ArrayList<>();
List<Future<List<T>>> futures = new ArrayList<>();

for (ServerId server : compactionServers) {
futures.add(executorService.submit(() -> getActiveCompactions(server)));
for (ServerId server : servers) {
futures.add(executorService.submit(() -> serverQuery.execute(server)));
}

List<ActiveCompaction> ret = new ArrayList<>();
for (Future<List<ActiveCompaction>> future : futures) {
List<T> ret = new ArrayList<>();
for (Future<List<T>> future : futures) {
try {
ret.addAll(future.get());
} catch (InterruptedException | ExecutionException e) {
Expand Down Expand Up @@ -635,4 +632,13 @@ private ServerId createServerId(ServerId.Type type, ServiceLockPath slp) {
return new ServerId(type, resourceGroup, host, port);
}

private Optional<ServerId> getServerId(String server, List<Type> types) {
HostAndPort hp = HostAndPort.fromString(server);
return types.stream().map(type -> getServer(type, null, hp.getHost(), hp.getPort()))
.findFirst();
}

interface ServerQuery<T> {
T execute(ServerId server) throws AccumuloException, AccumuloSecurityException;
}
}
Loading

0 comments on commit 305101f

Please sign in to comment.