Skip to content

Commit

Permalink
add TaskTimeoutExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
EddeCCC committed Dec 12, 2024
1 parent de6ad66 commit ce045af
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import org.springframework.stereotype.Service;
import rocks.inspectit.ocelot.config.model.InspectitConfig;
import rocks.inspectit.ocelot.config.model.command.AgentCommandSettings;
import rocks.inspectit.ocelot.core.config.propertysources.http.TaskTimeoutExecutor;
import rocks.inspectit.ocelot.core.service.DynamicallyActivatableService;

import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,8 +38,24 @@ public class AgentCommandService extends DynamicallyActivatableService implement
*/
private ScheduledFuture<?> handlerFuture;

/**
* The interval for the scheduled task.
*/
private Duration pollingInterval;

/**
* The executor to cancel the polling task by timeout. This should prevent the HTTP thread to deadlock.
*/
private final TaskTimeoutExecutor timeoutExecutor;

/**
* The maximum time to run one polling task.
*/
private Duration pollingTimeout;

public AgentCommandService() {
super("agentCommands");
timeoutExecutor = new TaskTimeoutExecutor();
}

@Override
Expand Down Expand Up @@ -69,17 +87,19 @@ protected boolean doEnable(InspectitConfig configuration) {
}

AgentCommandSettings settings = configuration.getAgentCommands();
long pollingIntervalMs = settings.getPollingInterval().toMillis();

handlerFuture = executor.scheduleWithFixedDelay(this, pollingIntervalMs, pollingIntervalMs, TimeUnit.MILLISECONDS);
pollingInterval = settings.getPollingInterval();
pollingTimeout = settings.getTaskTimeout();
startScheduledHandler();

return true;
}

@Override
protected boolean doDisable() {
log.info("Stopping agent command polling service.");

if (timeoutExecutor != null) {
timeoutExecutor.cancelTimeout();
}
if (handlerFuture != null) {
handlerFuture.cancel(true);
}
Expand All @@ -91,11 +111,23 @@ public void run() {
log.debug("Trying to fetch new agent commands.");
try {
commandHandler.nextCommand();
// After the command was fetched, the task should no longer timeout
timeoutExecutor.cancelTimeout();
} catch (Exception exception) {
log.error("Error while fetching agent command.", exception);
}
}

/**
* Start the scheduled fetching of the next agent command.
*/
private void startScheduledHandler() {
handlerFuture = executor.scheduleWithFixedDelay(this,
pollingInterval.toMillis(), pollingInterval.toMillis(), TimeUnit.MILLISECONDS);
// Setup timeout for fetching a command
timeoutExecutor.scheduleCancelling(handlerFuture, "agentcommand", this::startScheduledHandler, pollingTimeout);
}

@VisibleForTesting
URI getCommandUri(InspectitConfig configuration) throws URISyntaxException {
AgentCommandSettings settings = configuration.getAgentCommands();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import rocks.inspectit.ocelot.core.config.InspectitEnvironment;
import rocks.inspectit.ocelot.core.service.DynamicallyActivatableService;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -29,6 +30,21 @@ public class HttpConfigurationPoller extends DynamicallyActivatableService imple
*/
private ScheduledFuture<?> pollerFuture;

/**
* The interval for the scheduled task.
*/
private Duration pollingInterval;

/**
* The executor to cancel the polling task by timeout. This should prevent the HTTP thread to deadlock.
*/
private final TaskTimeoutExecutor timeoutExecutor;

/**
* The maximum time to run one polling task.
*/
private Duration pollingTimeout;

/**
* The state of the used HTTP property source configuration.
*/
Expand All @@ -37,6 +53,7 @@ public class HttpConfigurationPoller extends DynamicallyActivatableService imple

public HttpConfigurationPoller() {
super("config.http");
timeoutExecutor = new TaskTimeoutExecutor();
}

@Override
Expand All @@ -52,15 +69,19 @@ protected boolean doEnable(InspectitConfig configuration) {

currentState = new HttpPropertySourceState(InspectitEnvironment.HTTP_BASED_CONFIGURATION, httpSettings);

long frequencyMs = httpSettings.getFrequency().toMillis();
pollerFuture = executor.scheduleWithFixedDelay(this, frequencyMs, frequencyMs, TimeUnit.MILLISECONDS);
pollingInterval = httpSettings.getFrequency();
pollingTimeout = httpSettings.getTaskTimeout();
startScheduledPolling();

return true;
}

@Override
protected boolean doDisable() {
log.info("Stopping HTTP configuration polling service.");
if (timeoutExecutor != null) {
timeoutExecutor.cancelTimeout();
}
if (pollerFuture != null) {
pollerFuture.cancel(true);
}
Expand All @@ -74,7 +95,10 @@ protected boolean doDisable() {
@Override
public void run() {
log.debug("Updating HTTP property source.");
// Fetch configuration
boolean wasUpdated = currentState.update(false);
// After the configuration was fetched, the task should no longer timeout
timeoutExecutor.cancelTimeout();
if (wasUpdated) {
env.updatePropertySources(propertySources -> {
if (propertySources.contains(InspectitEnvironment.HTTP_BASED_CONFIGURATION)) {
Expand All @@ -84,6 +108,16 @@ public void run() {
}
}

/**
* Start the scheduled HTTP polling.
*/
private void startScheduledPolling() {
pollerFuture = executor.scheduleWithFixedDelay(this,
pollingInterval.toMillis(), pollingInterval.toMillis(), TimeUnit.MILLISECONDS);
// Setup timeout for fetching the configuration
timeoutExecutor.scheduleCancelling(pollerFuture, "http.config", this::startScheduledPolling, pollingTimeout);
}

public void updateAgentHealthState(AgentHealthState agentHealth) {
if (currentState != null) {
currentState.updateAgentHealthState(agentHealth);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private String fetchConfiguration(boolean fallBackToFile) {
CloseableHttpClient httpClient = clientHolder.getHttpClient(currentSettings);
Retry retry;
if (fallBackToFile) {
// fallBackToFile == true means the agent started.
// fallBackToFile == true means the agent has just started.
// If the configuration is not reachable, we want to fail fast and use the possibly existing backup file
// as soon as possible.
// If there is no backup standard polling mechanism will kick in and the agent will soon try again with
Expand All @@ -208,6 +208,9 @@ private String fetchConfiguration(boolean fallBackToFile) {
logFetchError("HTTP protocol error occurred while fetching configuration.", e);
} catch (IOException e) {
logFetchError("A IO problem occurred while fetching configuration.", e);
} catch (InterruptedException e) {
logFetchError("Thread was interrupted while fetching configuration.", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
logFetchError("Exception occurred while fetching configuration.", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package rocks.inspectit.ocelot.core.config.propertysources.http;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.concurrent.*;

/**
* Cancels internal tasks after a timeout has been exceeded.
*/
@Slf4j
public class TaskTimeoutExecutor {

/**
* The future to cancel the task by timeout.
*/
private Future<?> timeoutExecutor;

/**
* Cancel the task after a specific timeout. This should prevent, that the thread stays deadlocked.
* The task will be restarted after successful cancel.
*
* @param task the task to cancel by timeout
* @param taskName the name of the task
* @param restartTask the runnable to restart the task
* @param timeout the time after which the task should be cancelled
*/
public void scheduleCancelling(Future<?> task, String taskName, Runnable restartTask, Duration timeout) {
ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat("timeout-" + taskName)
.build();
ScheduledExecutorService cancelExecutor = Executors.newSingleThreadScheduledExecutor(factory);

// Execute when timeout is reached
Runnable cancelRunnable = () -> {
task.cancel(true);
boolean isCancelled = task.isCancelled();
log.warn("Cancelled {}: {}", taskName, isCancelled);
if (isCancelled) {
log.info("Restarting {}...", taskName);
restartTask.run();
}
};

// Schedule the cancelling just once
log.debug("Scheduling {} timeout with: {}", taskName, timeout);
timeoutExecutor = cancelExecutor.schedule(cancelRunnable, timeout.toMillis(), TimeUnit.MILLISECONDS);
}

/**
* Cancel the timeout executor
*/
public void cancelTimeout() {
if(timeoutExecutor != null) timeoutExecutor.cancel(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,39 @@ public void verifyPrioritization() throws Exception {
assertThat(result.toString()).isEqualTo("http://example.org/api/command");
}
}

@Nested
public class TaskTimeout {

@Test
void shouldCancelFutureAndRestartWhenTimeoutExceeded() throws MalformedURLException {
Duration timeout = Duration.ofMillis(500);
when(configuration.getAgentCommands().getPollingInterval()).thenReturn(Duration.ofSeconds(5));
when(configuration.getAgentCommands().getTaskTimeout()).thenReturn(timeout);
when(configuration.getAgentCommands().getUrl()).thenReturn(new URL("http://example.org"));
ScheduledFuture future = mock(ScheduledFuture.class);
when(future.isCancelled()).thenReturn(true);
when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future);

service.doEnable(configuration);

verify(future, timeout(timeout.toMillis() + 100)).cancel(true);
verify(executor, times(2)).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
}

@Test
void shouldNotCancelFutureWhenNoTimeout() throws MalformedURLException {
Duration timeout = Duration.ofMillis(5000);
when(configuration.getAgentCommands().getPollingInterval()).thenReturn(Duration.ofSeconds(1));
when(configuration.getAgentCommands().getTaskTimeout()).thenReturn(timeout);
when(configuration.getAgentCommands().getUrl()).thenReturn(new URL("http://example.org"));
ScheduledFuture future = mock(ScheduledFuture.class);
when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future);

service.doEnable(configuration);

verify(future, never()).cancel(true);
verify(executor, times(1)).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public void successfullyEnabled() {
configuration.setConfig(new ConfigSettings());
configuration.getConfig().setHttp(new HttpConfigSettings());
configuration.getConfig().getHttp().setFrequency(Duration.ofMillis(5000L));
configuration.getConfig().getHttp().setTaskTimeout(Duration.ofMillis(50000L));
ScheduledFuture future = Mockito.mock(ScheduledFuture.class);
when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future);

Expand Down Expand Up @@ -71,6 +72,7 @@ public void isEnabled() {
configuration.setConfig(new ConfigSettings());
configuration.getConfig().setHttp(new HttpConfigSettings());
configuration.getConfig().getHttp().setFrequency(Duration.ofMillis(5000L));
configuration.getConfig().getHttp().setTaskTimeout(Duration.ofMillis(50000L));
ScheduledFuture future = Mockito.mock(ScheduledFuture.class);
when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future);

Expand Down Expand Up @@ -117,4 +119,43 @@ public void stateUpdated() {
verifyNoMoreInteractions(currentState, env);
}
}
}

@Nested
public class TaskTimeout {

@Test
void shouldCancelFutureAndRestartWhenTimeoutExceeded() {
Duration timeout = Duration.ofMillis(500);
InspectitConfig configuration = new InspectitConfig();
configuration.setConfig(new ConfigSettings());
configuration.getConfig().setHttp(new HttpConfigSettings());
configuration.getConfig().getHttp().setFrequency(Duration.ofMillis(5000));
configuration.getConfig().getHttp().setTaskTimeout(timeout);
ScheduledFuture future = Mockito.mock(ScheduledFuture.class);
when(future.isCancelled()).thenReturn(true);
when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future);

poller.doEnable(configuration);

verify(future, timeout(timeout.toMillis() + 100)).cancel(true);
verify(executor, times(2)).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
}

@Test
void shouldNotCancelFutureWhenNoTimeout() {
Duration timeout = Duration.ofMillis(5000);
InspectitConfig configuration = new InspectitConfig();
configuration.setConfig(new ConfigSettings());
configuration.getConfig().setHttp(new HttpConfigSettings());
configuration.getConfig().getHttp().setFrequency(Duration.ofMillis(500));
configuration.getConfig().getHttp().setTaskTimeout(timeout);
ScheduledFuture future = Mockito.mock(ScheduledFuture.class);
when(executor.scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class))).thenReturn(future);

poller.doEnable(configuration);

verify(future, never()).cancel(true);
verify(executor, times(1)).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
}
}
}
Loading

0 comments on commit ce045af

Please sign in to comment.