Skip to content

Commit

Permalink
Revert TabletServer
Browse files Browse the repository at this point in the history
  • Loading branch information
DomGarguilo committed Dec 23, 2024
1 parent 8158304 commit fc7ab16
Showing 1 changed file with 16 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -964,41 +963,40 @@ public void run() {
.numMaxThreads(16).build();

ManagerClientService.Client iface = managerConnection(getManagerAddress());
final AtomicBoolean managerDown = new AtomicBoolean(false);
boolean managerDown = false;

try {
for (DataLevel level : DataLevel.values()) {
for (DataLevel level : new DataLevel[] {DataLevel.USER, DataLevel.METADATA, DataLevel.ROOT}) {
getOnlineTablets().keySet().forEach(ke -> {
if (DataLevel.of(ke.tableId()) == level) {
futures.add(
tpe.submit(new UnloadTabletHandler(this, ke, TUnloadTabletGoal.UNASSIGNED, 5000)));
}
});
while (!futures.isEmpty()) {
futures.removeIf(f -> {
if (!f.isDone()) {
return false;
}
if (!managerDown.get()) {
ManagerMessage mm = managerMessages.poll();
try {
if (mm != null) {
Iterator<Future<?>> unloads = futures.iterator();
while (unloads.hasNext()) {
Future<?> f = unloads.next();
if (f.isDone()) {
if (!managerDown) {
ManagerMessage mm = managerMessages.poll();
try {
mm.send(getContext().rpcCreds(), getClientAddressString(), iface);
} catch (TException e) {
managerDown = true;
LOG.debug("Error sending message to Manager during tablet unloading, msg: {}",
e.getMessage());
}
} catch (TException e) {
managerDown.set(true);
log.debug("Error sending message to Manager during tablet unloading, msg: {}",
e.getMessage());
}
unloads.remove();
}
return true;
});
}
log.debug("Waiting on {} {} tablets to close.", futures.size(), level);
UtilWaitThread.sleep(1000);
}
}
} finally {
if (!managerDown.get()) {
if (!managerDown) {
try {
ManagerMessage mm = managerMessages.poll();
do {
Expand Down

0 comments on commit fc7ab16

Please sign in to comment.