Skip to content

Commit

Permalink
Suggestions for graceful shutdown branch
Browse files Browse the repository at this point in the history
  • Loading branch information
DomGarguilo committed Dec 23, 2024
1 parent 0d5f014 commit 8158304
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -532,12 +531,7 @@ public void refreshProcesses(ServerType type) {
}
break;
case COMPACTOR:
Iterator<Process> iterC = compactorProcesses.iterator();
while (iterC.hasNext()) {
if (!iterC.next().isAlive()) {
iterC.remove();
}
}
compactorProcesses.removeIf(process -> !process.isAlive());
break;
case GARBAGE_COLLECTOR:
if (!gcProcess.isAlive()) {
Expand All @@ -556,20 +550,10 @@ public void refreshProcesses(ServerType type) {
}
break;
case SCAN_SERVER:
Iterator<Process> iterS = scanServerProcesses.iterator();
while (iterS.hasNext()) {
if (!iterS.next().isAlive()) {
iterS.remove();
}
}
scanServerProcesses.removeIf(process -> !process.isAlive());
break;
case TABLET_SERVER:
Iterator<Process> iterT = tabletServerProcesses.iterator();
while (iterT.hasNext()) {
if (!iterT.next().isAlive()) {
iterT.remove();
}
}
tabletServerProcesses.removeIf(process -> !process.isAlive());
break;
case ZOOKEEPER:
if (!zooKeeperProcess.isAlive()) {
Expand All @@ -586,8 +570,7 @@ public Set<Process> getProcesses(ServerType type) {
case COMPACTION_COORDINATOR:
return coordinatorProcess == null ? Set.of() : Set.of(coordinatorProcess);
case COMPACTOR:
return compactorProcesses == null ? Set.of()
: Set.of(compactorProcesses.toArray(new Process[] {}));
return Set.of(compactorProcesses.toArray(new Process[] {}));
case GARBAGE_COLLECTOR:
return gcProcess == null ? Set.of() : Set.of(gcProcess);
case MANAGER:
Expand All @@ -596,8 +579,7 @@ public Set<Process> getProcesses(ServerType type) {
case MONITOR:
return monitor == null ? Set.of() : Set.of(monitor);
case SCAN_SERVER:
return scanServerProcesses == null ? Set.of()
: Set.of(scanServerProcesses.toArray(new Process[] {}));
return Set.of(scanServerProcesses.toArray(new Process[] {}));
case TABLET_SERVER:
return tabletServerProcesses == null ? Set.of()
: Set.of(tabletServerProcesses.toArray(new Process[] {}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -963,40 +964,41 @@ public void run() {
.numMaxThreads(16).build();

ManagerClientService.Client iface = managerConnection(getManagerAddress());
boolean managerDown = false;
final AtomicBoolean managerDown = new AtomicBoolean(false);

try {
for (DataLevel level : new DataLevel[] {DataLevel.USER, DataLevel.METADATA, DataLevel.ROOT}) {
for (DataLevel level : DataLevel.values()) {
getOnlineTablets().keySet().forEach(ke -> {
if (DataLevel.of(ke.tableId()) == level) {
futures.add(
tpe.submit(new UnloadTabletHandler(this, ke, TUnloadTabletGoal.UNASSIGNED, 5000)));
}
});
while (!futures.isEmpty()) {
Iterator<Future<?>> unloads = futures.iterator();
while (unloads.hasNext()) {
Future<?> f = unloads.next();
if (f.isDone()) {
if (!managerDown) {
ManagerMessage mm = managerMessages.poll();
try {
futures.removeIf(f -> {
if (!f.isDone()) {
return false;
}
if (!managerDown.get()) {
ManagerMessage mm = managerMessages.poll();
try {
if (mm != null) {
mm.send(getContext().rpcCreds(), getClientAddressString(), iface);
} catch (TException e) {
managerDown = true;
LOG.debug("Error sending message to Manager during tablet unloading, msg: {}",
e.getMessage());
}
} catch (TException e) {
managerDown.set(true);
log.debug("Error sending message to Manager during tablet unloading, msg: {}",
e.getMessage());
}
unloads.remove();
}
}
return true;
});
log.debug("Waiting on {} {} tablets to close.", futures.size(), level);
UtilWaitThread.sleep(1000);
}
}
} finally {
if (!managerDown) {
if (!managerDown.get()) {
try {
ManagerMessage mm = managerMessages.poll();
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
Expand Down Expand Up @@ -147,17 +148,17 @@ public void testGracefulShutdown() throws Exception {
final TableId tid = ctx.getTableId(tableName);

// Insert 10 rows, flush after every row to create 10 files
final BatchWriter writer = client.createBatchWriter(tableName);
for (int i : IntStream.rangeClosed(1, 10).toArray()) {
String val = i + "";
Mutation m = new Mutation(val);
m.put(val, val, val);
writer.addMutation(m);
writer.flush();
client.tableOperations().flush(tableName, null, null, true);
try (BatchWriter writer = client.createBatchWriter(tableName)) {
for (int i : IntStream.rangeClosed(1, 10).toArray()) {
String val = i + "";
Mutation m = new Mutation(val);
m.put(val, val, val);
writer.addMutation(m);
writer.flush();
client.tableOperations().flush(tableName, null, null, true);
}
}
final long numFiles = ctx.getAmple().readTablets().forTable(tid).build().stream()
.mapToLong(tm -> tm.getFiles().size()).sum();
long numFiles = getNumFilesForTable(ctx, tid);
assertEquals(10, numFiles);
client.instanceOperations().waitForBalance();

Expand Down Expand Up @@ -220,9 +221,8 @@ public void testGracefulShutdown() throws Exception {
cc.setIterators(List.of(is));
cc.setWait(false);

final long numFiles2 = ctx.getAmple().readTablets().forTable(tid).build().stream()
.mapToLong(tm -> tm.getFiles().size()).sum();
assertTrue(numFiles2 == numFiles);
final long numFiles2 = getNumFilesForTable(ctx, tid);
assertEquals(numFiles2, numFiles);
assertEquals(0, ExternalCompactionTestUtils.getRunningCompactions(ctx).getCompactionsSize());
client.tableOperations().compact(tableName, cc);
Wait.waitFor(
Expand All @@ -232,8 +232,7 @@ public void testGracefulShutdown() throws Exception {
control.refreshProcesses(ServerType.COMPACTOR);
return control.getProcesses(ServerType.COMPACTOR).isEmpty();
});
final long numFiles3 = ctx.getAmple().readTablets().forTable(tid).build().stream()
.mapToLong(tm -> tm.getFiles().size()).sum();
final long numFiles3 = getNumFilesForTable(ctx, tid);
assertTrue(numFiles3 < numFiles2);
assertEquals(1, numFiles3);

Expand Down Expand Up @@ -266,4 +265,10 @@ public void testGracefulShutdown() throws Exception {
}

}

long getNumFilesForTable(ServerContext ctx, TableId tid) {
try (TabletsMetadata tablets = ctx.getAmple().readTablets().forTable(tid).build()) {
return tablets.stream().mapToLong(tm -> tm.getFiles().size()).sum();
}
}
}

0 comments on commit 8158304

Please sign in to comment.