Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to do sns-based updates instead of webhooks #1920

Merged
merged 14 commits into from
Apr 25, 2019
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ jdk:
- oraclejdk8

install: false
script: mvn -B -q -DskipSingularityWebUI verify
script: mvn -B -DskipSingularityWebUI verify
cache:
directories:
- $HOME/.m2
17 changes: 17 additions & 0 deletions Docs/reference/webhooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@ Singularity provides webhooks for changes to the three core types of objects in

Webhooks are managed via the [API](api.html) and a separate webhook should be added separately in order to receive updates about all three object types.

### SNS Topic Updates

By default, Singularity will use it's own internal queue of webhooks backed by zookeeper. If you would like to save load on zookeeper or have more flexibility in the consumption of task/deploy/request updates, you can instead configure Singularity to produce messages to SNS. In the configuration yaml, you can specify:

```yaml
webhookQueue:
queueType: SNS
awsAccessKey: {my key}
awsSecretKey: {my secret}
snsTopics:
TASK: singularity-task-updates
DEPLOY: singularity-deploy-updates
REQUEST: singularity-request-updates
```

This will cause Singularity to create these topics if they do not exist, and publish messages to SNS rather than sending its own webhooks. The content of these messages still follows the same format outlined below.

### Adding a Webhook

In order to create a new Webhook, post the json for the [SingularityWebhook](api.html) to the [webhook endpoint](api.html).
Expand Down
5 changes: 5 additions & 0 deletions SingularityService/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sns</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@
import com.hubspot.singularity.guice.DropwizardObjectMapperProvider;
import com.hubspot.singularity.helpers.SingularityS3Service;
import com.hubspot.singularity.helpers.SingularityS3Services;
import com.hubspot.singularity.hooks.AbstractWebhookChecker;
import com.hubspot.singularity.hooks.LoadBalancerClient;
import com.hubspot.singularity.hooks.LoadBalancerClientImpl;
import com.hubspot.singularity.hooks.SingularityWebhookPoller;
import com.hubspot.singularity.hooks.SingularityWebhookSender;
import com.hubspot.singularity.hooks.SnsWebhookManager;
import com.hubspot.singularity.hooks.SnsWebhookRetryer;
import com.hubspot.singularity.hooks.WebhookQueueType;
import com.hubspot.singularity.managed.SingularityLifecycleManaged;
import com.hubspot.singularity.mesos.OfferCache;
import com.hubspot.singularity.mesos.SingularityMesosStatusUpdateHandler;
Expand Down Expand Up @@ -148,7 +152,12 @@ public void configure(Binder binder) {

binder.bind(SingularityAbort.class).in(Scopes.SINGLETON);
binder.bind(SingularityExceptionNotifierManaged.class).in(Scopes.SINGLETON);
binder.bind(SingularityWebhookSender.class).in(Scopes.SINGLETON);
if (configuration.getWebhookQueueConfiguration().getQueueType() == WebhookQueueType.SNS) {
binder.bind(SnsWebhookManager.class).in(Scopes.SINGLETON);
binder.bind(AbstractWebhookChecker.class).to(SnsWebhookRetryer.class).in(Scopes.SINGLETON);
} else {
binder.bind(AbstractWebhookChecker.class).to(SingularityWebhookSender.class).in(Scopes.SINGLETON);
}

binder.bind(SingularityUsageHelper.class).in(Scopes.SINGLETON);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void configure(Binder binder) {
// API Docs
getEnvironment().jersey().register(SingularityOpenApiResource.class);

binder.install(new SingularityEventModule());
binder.install(new SingularityEventModule(getConfiguration().getWebhookQueueConfiguration()));
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ public class SingularityConfiguration extends Configuration {
@Valid
private WebhookAuthConfiguration webhookAuthConfiguration = new WebhookAuthConfiguration();

@JsonProperty("webhookQueue")
@Valid
private WebhookQueueConfiguration webhookQueueConfiguration = new WebhookQueueConfiguration();

private int maxConcurrentWebhooks = 100;

@JsonProperty("auth")
Expand Down Expand Up @@ -1252,6 +1256,14 @@ public void setWebhookAuthConfiguration(WebhookAuthConfiguration webhookAuthConf
this.webhookAuthConfiguration = webhookAuthConfiguration;
}

public WebhookQueueConfiguration getWebhookQueueConfiguration() {
return webhookQueueConfiguration;
}

public void setWebhookQueueConfiguration(WebhookQueueConfiguration webhookQueueConfiguration) {
this.webhookQueueConfiguration = webhookQueueConfiguration;
}

public int getMaxConcurrentWebhooks() {
return maxConcurrentWebhooks;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.hubspot.singularity.config;

import java.util.Map;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.hubspot.singularity.WebhookType;
import com.hubspot.singularity.hooks.WebhookQueueType;

public class WebhookQueueConfiguration {
@JsonProperty
private WebhookQueueType queueType = WebhookQueueType.ZOOKEEPER;

@JsonProperty
private Map<WebhookType, String> snsTopics = ImmutableMap.of(
WebhookType.TASK, "singularity-task-updates",
WebhookType.DEPLOY, "singularity-deploy-updates",
WebhookType.REQUEST, "singularity-request-updates"
);

@JsonProperty
private Optional<String> awsAccessKey = Optional.absent();

@JsonProperty
private Optional<String> awsSecretKey = Optional.absent();

@JsonProperty
private Optional<String> awsRegion = Optional.absent();

private int snsRequestTimeout = 3000;

private int snsSocketTimeout = 3000;

private int snsConnectTimeout = 2000;

private int snsTotalTimeout = 5000;

// Protection for zookeeper so large list children calls will not take it down
private int maxZkQueuedWebhooksPerParentNode = 3000;

public WebhookQueueType getQueueType() {
return queueType;
}

public void setQueueType(WebhookQueueType queueType) {
this.queueType = queueType;
}

public Map<WebhookType, String> getSnsTopics() {
return snsTopics;
}

public void setSnsTopics(Map<WebhookType, String> snsTopics) {
this.snsTopics = snsTopics;
}

public Optional<String> getAwsAccessKey() {
return awsAccessKey;
}

public void setAwsAccessKey(Optional<String> awsAccessKey) {
this.awsAccessKey = awsAccessKey;
}

public Optional<String> getAwsSecretKey() {
return awsSecretKey;
}

public void setAwsSecretKey(Optional<String> awsSecretKey) {
this.awsSecretKey = awsSecretKey;
}

public Optional<String> getAwsRegion() {
return awsRegion;
}

public void setAwsRegion(Optional<String> awsRegion) {
this.awsRegion = awsRegion;
}

public int getSnsRequestTimeout() {
return snsRequestTimeout;
}

public void setSnsRequestTimeout(int snsRequestTimeout) {
this.snsRequestTimeout = snsRequestTimeout;
}

public int getSnsSocketTimeout() {
return snsSocketTimeout;
}

public void setSnsSocketTimeout(int snsSocketTimeout) {
this.snsSocketTimeout = snsSocketTimeout;
}

public int getSnsConnectTimeout() {
return snsConnectTimeout;
}

public void setSnsConnectTimeout(int snsConnectTimeout) {
this.snsConnectTimeout = snsConnectTimeout;
}

public int getSnsTotalTimeout() {
return snsTotalTimeout;
}

public void setSnsTotalTimeout(int snsTotalTimeout) {
this.snsTotalTimeout = snsTotalTimeout;
}

public int getMaxZkQueuedWebhooksPerParentNode() {
return maxZkQueuedWebhooksPerParentNode;
}

public void setMaxZkQueuedWebhooksPerParentNode(int maxZkQueuedWebhooksPerParentNode) {
this.maxZkQueuedWebhooksPerParentNode = maxZkQueuedWebhooksPerParentNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ protected void configure() {
bind(SingularityValidator.class).in(Scopes.SINGLETON);
bind(UserManager.class).in(Scopes.SINGLETON);
bind(UsageManager.class).in(Scopes.SINGLETON);
bind(WebhookManager.class).in(Scopes.SINGLETON);

bind(NotificationsManager.class).in(Scopes.SINGLETON);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import com.hubspot.singularity.SingularityTaskShellCommandRequestId;
import com.hubspot.singularity.SingularityTaskShellCommandUpdate;
import com.hubspot.singularity.SingularityTaskStatusHolder;
import com.hubspot.singularity.SingularityTaskWebhook;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.transcoders.IdTranscoder;
import com.hubspot.singularity.data.transcoders.StringTranscoder;
Expand Down Expand Up @@ -572,7 +573,10 @@ public SingularityCreateResult saveTaskHistoryUpdate(SingularityTaskHistoryUpdat

@Timed
public SingularityCreateResult saveTaskHistoryUpdate(SingularityTaskHistoryUpdate taskHistoryUpdate, boolean overwriteExisting) {
singularityEventListener.taskHistoryUpdateEvent(taskHistoryUpdate);
Optional<SingularityTask> task = getTask(taskHistoryUpdate.getTaskId());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only line I was hesitant about in the PR, but I still think it's a new improvement. With the previous version, we would be doing a write to zk here. Now we are doing a (mostly likely cached) read and a write to SNS/zk. The SNS update falls back to a zk write in case sns is down, so nothing should block here.

if (task.isPresent()) {
singularityEventListener.taskHistoryUpdateEvent(new SingularityTaskWebhook(task.get(), taskHistoryUpdate));
}

if (overwriteExisting) {
Optional<SingularityTaskHistoryUpdate> maybeExisting = getTaskHistoryUpdate(taskHistoryUpdate.getTaskId(), taskHistoryUpdate.getTaskState());
Expand Down Expand Up @@ -601,7 +605,10 @@ public SingularityCreateResult saveTaskHistoryUpdate(SingularityTaskHistoryUpdat

public SingularityDeleteResult deleteTaskHistoryUpdate(SingularityTaskId taskId, ExtendedTaskState state, Optional<SingularityTaskHistoryUpdate> previousStateUpdate) {
if (previousStateUpdate.isPresent()) {
singularityEventListener.taskHistoryUpdateEvent(previousStateUpdate.get());
Optional<SingularityTask> task = getTask(previousStateUpdate.get().getTaskId());
if (task.isPresent()) {
singularityEventListener.taskHistoryUpdateEvent(new SingularityTaskWebhook(task.get(), previousStateUpdate.get()));
}
}
if (leaderCache.active()) {
leaderCache.deleteTaskHistoryUpdate(taskId, state);
Expand Down
Loading