Skip to content

Commit

Permalink
google#5842 stopTimers capture in ServiceManager
Browse files Browse the repository at this point in the history
  • Loading branch information
Ayush Jaiswal committed Jan 17, 2022
1 parent a4fabc0 commit 3232403
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,18 @@ public void testServiceStartupDurations() {
assertThat(startupTimes.get(b)).isAtLeast(Duration.ofMillis(353));
}

public void testServiceStopDurations() {
Service a = new NoOpDelayedService(150);
Service b = new NoOpDelayedService(353);
ServiceManager serviceManager = new ServiceManager(asList(a, b));
serviceManager.startAsync().awaitHealthy();
serviceManager.stopAsync().awaitStopped();
ImmutableMap<Service, Duration> stopDurations = serviceManager.stopDurations();
assertThat(stopDurations).hasSize(2);
assertThat(stopDurations.get(a)).isAtLeast(Duration.ofMillis(150));
assertThat(stopDurations.get(b)).isAtLeast(Duration.ofMillis(353));
}


public void testServiceStartupTimes_selfStartingServices() {
// This tests to ensure that:
Expand Down
194 changes: 142 additions & 52 deletions guava/src/com/google/common/util/concurrent/ServiceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@
import com.google.j2objc.annotations.WeakOuter;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -334,7 +331,8 @@ public void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException {
@CanIgnoreReturnValue
public ServiceManager stopAsync() {
for (Service service : services) {
service.stopAsync();
state.tryStartStopTiming(service);
service.stopAsync();
}
return this;
}
Expand Down Expand Up @@ -414,6 +412,17 @@ public ImmutableMap<Service, Long> startupTimes() {
return state.startupTimes();
}

/**
* Returns the service stop times. This value will only return stop times for services that
* have finished Stopping.
*
* @return Map of services and their corresponding stop time in millis, the map entries will be
* ordered by stop time.
*/
public ImmutableMap<Service, Long> stopTimes() {
return state.stopTimes();
}

/**
* Returns the service load times. This value will only return startup times for services that
* have finished starting.
Expand All @@ -428,11 +437,25 @@ public ImmutableMap<Service, Duration> startupDurations() {
Maps.<Service, Long, Duration>transformValues(startupTimes(), Duration::ofMillis));
}

/**
* Returns the service stopping termination times. This method returns values for only those services
* that have finished stopping.
*
* @return Map of services and their corresponding stop time, the map entries will be ordered
* by stop time.
*/
@J2ObjCIncompatible
public ImmutableMap<Service, Duration> stopDurations() {
return ImmutableMap.copyOf(
Maps.<Service, Long, Duration>transformValues(stopTimes(), Duration::ofMillis));
}


@Override
public String toString() {
return MoreObjects.toStringHelper(ServiceManager.class)
.add("services", Collections2.filter(services, not(instanceOf(NoOpService.class))))
.toString();
return MoreObjects.toStringHelper(ServiceManager.class)
.add("services", Collections2.filter(services, not(instanceOf(NoOpService.class))))
.toString();
}

/**
Expand All @@ -452,6 +475,9 @@ private static final class ServiceManagerState {
@GuardedBy("monitor")
final Map<Service, Stopwatch> startupTimers = Maps.newIdentityHashMap();

@GuardedBy("monitor")
final Map<Service, Stopwatch> stopTimers = new IdentityHashMap<>();

/**
* These two booleans are used to mark the state as ready to start.
*
Expand Down Expand Up @@ -542,6 +568,23 @@ void tryStartTiming(Service service) {
}
}

/**
* Attempts to start the stop timer immediately prior to the service being stopped via {@link Service#stopAsync()}
*
* @param service
*/
void tryStartStopTiming(Service service) {
monitor.enter();
try {
Stopwatch stopwatch = stopTimers.get(service);
if (stopwatch == null) {
stopTimers.put(service, Stopwatch.createStarted());
}
} finally {
monitor.leave();
}
}

/**
* Marks the {@link State} as ready to receive transitions. Returns true if no transitions have
* been observed yet.
Expand Down Expand Up @@ -661,6 +704,35 @@ public Long apply(Entry<Service, Long> input) {
return ImmutableMap.copyOf(loadTimes);
}

ImmutableMap<Service, Long> stopTimes() {
List<Entry<Service, Long>> stopTimes;
monitor.enter();
try {
stopTimes = Lists.newArrayListWithCapacity(stopTimers.size());
// N.B. There will only be an entry in the map if the service has stopped
for (Entry<Service, Stopwatch> entry : stopTimers.entrySet()) {
Service service = entry.getKey();
Stopwatch stopwatch = entry.getValue();
if (!stopwatch.isRunning() && !(service instanceof NoOpService)) {
stopTimes.add(Maps.immutableEntry(service, stopwatch.elapsed(MILLISECONDS)));
}
}
} finally {
monitor.leave();
}
Collections.sort(
stopTimes,
Ordering.natural()
.onResultOf(
new Function<Entry<Service, Long>, Long>() {
@Override
public Long apply(Entry<Service, Long> input) {
return input.getValue();
}
}));
return ImmutableMap.copyOf(stopTimes);
}

/**
* Updates the state with the given service transition.
*
Expand All @@ -674,58 +746,76 @@ public Long apply(Entry<Service, Long> input) {
* </ol>
*/
void transitionService(final Service service, State from, State to) {
checkNotNull(service);
checkArgument(from != to);
monitor.enter();
try {
transitioned = true;
if (!ready) {
return;
}
// Update state.
checkState(
servicesByState.remove(from, service),
"Service %s not at the expected location in the state map %s",
service,
from);
checkState(
servicesByState.put(to, service),
"Service %s in the state map unexpectedly at %s",
service,
to);
checkNotNull(service);
checkArgument(from != to);
monitor.enter();
try {
transitioned = true;
if (!ready) {
return;
}
// Update state.
checkState(
servicesByState.remove(from, service),
"Service %s not at the expected location in the state map %s",
service,
from);
checkState(
servicesByState.put(to, service),
"Service %s in the state map unexpectedly at %s",
service,
to);

updateStartAndStopTimersIfRequired(service, to);
// Queue our listeners

// Did a service fail?
if (to == FAILED) {
enqueueFailedEvent(service);
}

if (states.count(RUNNING) == numberOfServices) {
// This means that the manager is currently healthy. N.B. If other threads call isHealthy
// they are not guaranteed to get 'true', because any service could fail right now.
enqueueHealthyEvent();
} else if (states.count(TERMINATED) + states.count(FAILED) == numberOfServices) {
enqueueStoppedEvent();
}
} finally {
monitor.leave();
// Run our executors outside of the lock
dispatchListenerEvents();
}
}


void updateStartAndStopTimersIfRequired(Service service, State state) {
// Update the timer
Stopwatch stopwatch = startupTimers.get(service);
if (stopwatch == null) {
// This means the service was started by some means other than ServiceManager.startAsync
stopwatch = Stopwatch.createStarted();
startupTimers.put(service, stopwatch);
// This means the service was started by some means other than ServiceManager.startAsync
stopwatch = Stopwatch.createStarted();
startupTimers.put(service, stopwatch);
}
if (to.compareTo(RUNNING) >= 0 && stopwatch.isRunning()) {
// N.B. if we miss the STARTING event then we may never record a startup time.
stopwatch.stop();
if (!(service instanceof NoOpService)) {
logger.log(Level.FINE, "Started {0} in {1}.", new Object[] {service, stopwatch});
}
if (state.compareTo(RUNNING) >= 0 && stopwatch.isRunning()) {
// N.B. if we miss the STARTING event then we may never record a startup time.
stopwatch.stop();
if (!(service instanceof NoOpService)) {
logger.log(Level.FINE, "Started {0} in {1}.", new Object[]{service, stopwatch});
}
}
// Queue our listeners

// Did a service fail?
if (to == FAILED) {
enqueueFailedEvent(service);
stopwatch = stopTimers.get(service);
// can be null only if the service has been triggered stop from somewhere else than ServiceManager.stopAsync
if (stopwatch == null) {
stopTimers.put(service, Stopwatch.createStarted());
}

if (states.count(RUNNING) == numberOfServices) {
// This means that the manager is currently healthy. N.B. If other threads call isHealthy
// they are not guaranteed to get 'true', because any service could fail right now.
enqueueHealthyEvent();
} else if (states.count(TERMINATED) + states.count(FAILED) == numberOfServices) {
enqueueStoppedEvent();
if (state.compareTo(TERMINATED) >= 0 && stopwatch.isRunning()) {
stopwatch.stop();
if (!(service instanceof NoOpService)) {
logger.log(Level.FINE, "Stopped {0} in {1}.", new Object[]{service, stopwatch});
}
}
} finally {
monitor.leave();
// Run our executors outside of the lock
dispatchListenerEvents();
}
}

void enqueueStoppedEvent() {
Expand Down

0 comments on commit 3232403

Please sign in to comment.