Skip to content

Commit

Permalink
feat(cats): Support for selectively enabling agents (#2177)
Browse files Browse the repository at this point in the history
This is an assist to local development when you do not want _all_
agents to run by default.

```
redis:
  agent:
    enabledPattern: .*reservation.*
```

By default `enabledPattern` is `.*` and all agents will be scheduled.
  • Loading branch information
ajordens authored Nov 21, 2017
1 parent c05a292 commit d91b340
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.netflix.spinnaker.cats.redis.RedisClientDelegate;
import com.netflix.spinnaker.cats.thread.NamedThreadFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -37,6 +39,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

@SuppressFBWarnings
public class ClusteredAgentScheduler extends CatsModuleAware implements AgentScheduler<AgentLock>, Runnable {
Expand All @@ -45,24 +48,48 @@ private static enum Status {
FAILURE
}

private static final Logger logger = LoggerFactory.getLogger(ClusteredAgentScheduler.class);

private final RedisClientDelegate redisClientDelegate;
private final NodeIdentity nodeIdentity;
private final AgentIntervalProvider intervalProvider;
private final ExecutorService agentExecutionPool;
private final Pattern enabledAgentPattern;

private final Map<String, AgentExecutionAction> agents = new ConcurrentHashMap<>();
private final Map<String, NextAttempt> activeAgents = new ConcurrentHashMap<>();
private final NodeStatusProvider nodeStatusProvider;

public ClusteredAgentScheduler(RedisClientDelegate redisClientDelegate, NodeIdentity nodeIdentity, AgentIntervalProvider intervalProvider, NodeStatusProvider nodeStatusProvider) {
this(redisClientDelegate, nodeIdentity, intervalProvider, nodeStatusProvider, Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(ClusteredAgentScheduler.class.getSimpleName())), Executors.newCachedThreadPool(new NamedThreadFactory(AgentExecutionAction.class.getSimpleName())));
public ClusteredAgentScheduler(RedisClientDelegate redisClientDelegate,
NodeIdentity nodeIdentity,
AgentIntervalProvider intervalProvider,
NodeStatusProvider nodeStatusProvider,
String enabledAgentPattern) {
this(
redisClientDelegate,
nodeIdentity,
intervalProvider,
nodeStatusProvider,
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(ClusteredAgentScheduler.class.getSimpleName())),
Executors.newCachedThreadPool(new NamedThreadFactory(AgentExecutionAction.class.getSimpleName())),
enabledAgentPattern
);
}

public ClusteredAgentScheduler(RedisClientDelegate redisClientDelegate, NodeIdentity nodeIdentity, AgentIntervalProvider intervalProvider, NodeStatusProvider nodeStatusProvider, ScheduledExecutorService lockPollingScheduler, ExecutorService agentExecutionPool) {
public ClusteredAgentScheduler(RedisClientDelegate redisClientDelegate,
NodeIdentity nodeIdentity,
AgentIntervalProvider intervalProvider,
NodeStatusProvider nodeStatusProvider,
ScheduledExecutorService lockPollingScheduler,
ExecutorService agentExecutionPool,
String enabledAgentPattern) {
this.redisClientDelegate = redisClientDelegate;
this.nodeIdentity = nodeIdentity;
this.intervalProvider = intervalProvider;
this.nodeStatusProvider = nodeStatusProvider;
this.agentExecutionPool = agentExecutionPool;
this.enabledAgentPattern = Pattern.compile(enabledAgentPattern);

lockPollingScheduler.scheduleAtFixedRate(this, 0, 1, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -152,12 +179,26 @@ private void agentCompleted(String agentType, long nextExecutionTime) {
}

@Override
public void schedule(Agent agent, AgentExecution agentExecution, ExecutionInstrumentation executionInstrumentation) {
public void schedule(Agent agent,
AgentExecution agentExecution,
ExecutionInstrumentation executionInstrumentation) {
if (!enabledAgentPattern.matcher(agent.getClass().getSimpleName().toLowerCase()).matches()) {
logger.debug(
"Agent is not enabled (agent: {}, agentType: {}, pattern: {})",
agent.getClass().getSimpleName(),
agent.getAgentType(),
enabledAgentPattern.pattern()
);
return;
}

if (agent instanceof AgentSchedulerAware) {
((AgentSchedulerAware)agent).setAgentScheduler(this);
}

final AgentExecutionAction agentExecutionAction = new AgentExecutionAction(agent, agentExecution, executionInstrumentation);
AgentExecutionAction agentExecutionAction = new AgentExecutionAction(
agent, agentExecution, executionInstrumentation
);
agents.put(agent.getAgentType(), agentExecutionAction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,15 @@ class ClusteredAgentSchedulerSpec extends Specification {
}
lockPollingScheduler = new ManualRunnableScheduler()
agentExecutionScheduler = new ManualRunnableScheduler()
scheduler = new ClusteredAgentScheduler(new JedisClientDelegate(jedisPool), new DefaultNodeIdentity(), interval, new DefaultNodeStatusProvider(), lockPollingScheduler, agentExecutionScheduler)
scheduler = new ClusteredAgentScheduler(
new JedisClientDelegate(jedisPool),
new DefaultNodeIdentity(),
interval,
new DefaultNodeStatusProvider(),
lockPollingScheduler,
agentExecutionScheduler,
".*"
)
}

def 'cache run aborted if agent doesnt acquire execution token'() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,25 @@ class JedisCacheConfig {

@Bean
@ConditionalOnProperty(value = "caching.writeEnabled", matchIfMissing = true)
AgentScheduler agentScheduler(RedisConfigurationProperties redisConfigurationProperties, RedisClientDelegate redisClientDelegate, JedisPool jedisPool, AgentIntervalProvider agentIntervalProvider, NodeStatusProvider nodeStatusProvider) {
AgentScheduler agentScheduler(RedisConfigurationProperties redisConfigurationProperties,
RedisClientDelegate redisClientDelegate,
JedisPool jedisPool,
AgentIntervalProvider agentIntervalProvider,
NodeStatusProvider nodeStatusProvider) {
if (redisConfigurationProperties.scheduler.equalsIgnoreCase("default")) {
URI redisUri = URI.create(redisConfigurationProperties.connection)
String redisHost = redisUri.host
int redisPort = redisUri.port
if (redisPort == -1) {
redisPort = 6379
}
new ClusteredAgentScheduler(redisClientDelegate, new DefaultNodeIdentity(redisHost, redisPort), agentIntervalProvider, nodeStatusProvider);
new ClusteredAgentScheduler(
redisClientDelegate,
new DefaultNodeIdentity(redisHost, redisPort),
agentIntervalProvider,
nodeStatusProvider,
redisConfigurationProperties.agent.enabledPattern
);
} else if (redisConfigurationProperties.scheduler.equalsIgnoreCase("sort")) {
new ClusteredSortAgentScheduler(jedisPool, nodeStatusProvider, agentIntervalProvider, redisConfigurationProperties.parallelism ?: -1);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,17 @@ class RedisConfigurationProperties {
int timeoutSeconds = 300
}

@Canonical
static class AgentConfiguration {
String enabledPattern = ".*"
}

@NestedConfigurationProperty
final PollConfiguration poll = new PollConfiguration()

@NestedConfigurationProperty
final AgentConfiguration agent = new AgentConfiguration()

String connection = "redis://localhost:6379"
String connectionPrevious = null

Expand Down

0 comments on commit d91b340

Please sign in to comment.