Skip to content

Commit

Permalink
Merge pull request #369 from HubSpot/move_webhook_to_leader
Browse files Browse the repository at this point in the history
move webhook processing to leader only
  • Loading branch information
tpetr committed Jan 13, 2015
2 parents 39454bb + ad415f6 commit 92d8a60
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ private void exit() {
private void sendAbortNotification(AbortReason abortReason) {
final String message = String.format("Singularity on %s is aborting due to %s", hostAndPort.getHostText(), abortReason);

LOG.error(message);

sendAbortMail(message);

exceptionNotifier.notify(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,27 @@
import com.hubspot.singularity.SingularityDeployHistory;
import com.hubspot.singularity.SingularityDeployKey;
import com.hubspot.singularity.SingularityRequestDeployState;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DeployManager;

@Singleton
public class SingularityDeployHistoryPersister {
public class SingularityDeployHistoryPersister extends SingularityHistoryPersister {

private static final Logger LOG = LoggerFactory.getLogger(SingularityDeployHistoryPersister.class);

private final DeployManager deployManager;
private final HistoryManager historyManager;

@Inject
public SingularityDeployHistoryPersister(DeployManager deployManager, HistoryManager historyManager) {
public SingularityDeployHistoryPersister(SingularityConfiguration configuration, DeployManager deployManager, HistoryManager historyManager) {
super(configuration);

this.deployManager = deployManager;
this.historyManager = historyManager;
}

public void checkInactiveDeploys() {
@Override
public void runActionOnPoll() {
LOG.info("Checking inactive deploys for deploy history persistance");

final long start = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public void configure() {
resultSetMappers.addBinding().to(SingularityMappers.SingularityTaskIdHistoryMapper.class).in(Scopes.SINGLETON);
resultSetMappers.addBinding().to(SingularityMappers.SingularityDeployHistoryLiteMapper.class).in(Scopes.SINGLETON);

bind(SingularityHistoryPersister.class).in(Scopes.SINGLETON);
bind(TaskHistoryHelper.class).in(Scopes.SINGLETON);
bind(RequestHistoryHelper.class).in(Scopes.SINGLETON);
bind(DeployHistoryHelper.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,59 +1,28 @@
package com.hubspot.singularity.data.history;

import io.dropwizard.lifecycle.Managed;

import java.util.concurrent.TimeUnit;

import javax.inject.Singleton;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.inject.Inject;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.scheduler.SingularityLeaderOnlyPoller;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;

@Singleton
public class SingularityHistoryPersister extends SingularityLeaderOnlyPoller implements Managed {
public abstract class SingularityHistoryPersister extends SingularityLeaderOnlyPoller {

private static final Logger LOG = LoggerFactory.getLogger(SingularityHistoryPersister.class);
private final SingularityConfiguration configuration;

private final SingularityTaskHistoryPersister taskPersister;
private final SingularityDeployHistoryPersister deployPersister;
private final SingularityRequestHistoryPersister requestHistoryPersister;
private final SingularityExceptionNotifier exceptionNotifier;
public SingularityHistoryPersister(SingularityConfiguration configuration) {
super(configuration.getPersistHistoryEverySeconds(), TimeUnit.SECONDS);

@Inject
public SingularityHistoryPersister(SingularityExceptionNotifier exceptionNotifier, SingularityTaskHistoryPersister taskPersister,
SingularityRequestHistoryPersister requestHistoryPersister, SingularityDeployHistoryPersister deployPersister, SingularityConfiguration configuration) {
super(configuration.getPersistHistoryEverySeconds(), TimeUnit.SECONDS, configuration.getDatabaseConfiguration().isPresent());
this.configuration = configuration;
}

this.taskPersister = taskPersister;
this.deployPersister = deployPersister;
this.exceptionNotifier = exceptionNotifier;
this.requestHistoryPersister = requestHistoryPersister;
@Override
protected boolean abortsOnError() {
return false;
}

@Override
public void runActionOnPoll() {
try {
taskPersister.checkInactiveTaskIds();
} catch (Throwable t) {
exceptionNotifier.notify(t);
LOG.error("While persisting task history", t);
}
try {
deployPersister.checkInactiveDeploys();
} catch (Throwable t) {
exceptionNotifier.notify(t);
LOG.error("While persisting deploy history", t);
}
try {
requestHistoryPersister.checkRequestHistory();
} catch (Throwable t) {
exceptionNotifier.notify(t);
LOG.error("While persisting request history", t);
}
protected boolean isEnabled() {
return configuration.getDatabaseConfiguration().isPresent();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,27 @@
import com.google.inject.Inject;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityRequestHistory;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.RequestManager;

@Singleton
public class SingularityRequestHistoryPersister {
public class SingularityRequestHistoryPersister extends SingularityHistoryPersister {

private static final Logger LOG = LoggerFactory.getLogger(SingularityRequestHistoryPersister.class);

private final RequestManager requestManager;
private final HistoryManager historyManager;

@Inject
public SingularityRequestHistoryPersister(RequestManager requestManager, HistoryManager historyManager) {
public SingularityRequestHistoryPersister(SingularityConfiguration configuration, RequestManager requestManager, HistoryManager historyManager) {
super(configuration);

this.requestManager = requestManager;
this.historyManager = historyManager;
}

public void checkRequestHistory() {
@Override
public void runActionOnPoll() {
LOG.info("Checking request history for persistence");

final long start = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
import com.hubspot.singularity.SingularityPendingDeploy;
import com.hubspot.singularity.SingularityTaskHistory;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DeployManager;
import com.hubspot.singularity.data.TaskManager;

@Singleton
public class SingularityTaskHistoryPersister {
public class SingularityTaskHistoryPersister extends SingularityHistoryPersister {

private static final Logger LOG = LoggerFactory.getLogger(SingularityTaskHistoryPersister.class);

Expand All @@ -29,13 +30,16 @@ public class SingularityTaskHistoryPersister {
private final HistoryManager historyManager;

@Inject
public SingularityTaskHistoryPersister(TaskManager taskManager, DeployManager deployManager, HistoryManager historyManager) {
public SingularityTaskHistoryPersister(SingularityConfiguration configuration, TaskManager taskManager, DeployManager deployManager, HistoryManager historyManager) {
super(configuration);

this.taskManager = taskManager;
this.historyManager = historyManager;
this.deployManager = deployManager;
}

public void checkInactiveTaskIds() {
@Override
public void runActionOnPoll() {
LOG.info("Checking inactive task ids for task history persistance");

final long start = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,62 +1,35 @@
package com.hubspot.singularity.hooks;

import io.dropwizard.lifecycle.Managed;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.inject.Singleton;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.scheduler.SingularityLeaderOnlyPoller;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;

@Singleton
public class SingularityWebhookPoller implements Managed {

private static final Logger LOG = LoggerFactory.getLogger(SingularityWebhookPoller.class);
public class SingularityWebhookPoller extends SingularityLeaderOnlyPoller {

private final SingularityWebhookSender webhookSender;
private final SingularityExceptionNotifier exceptionNotifier;
private final SingularityConfiguration configuration;
private final ScheduledExecutorService executorService;

@Inject
public SingularityWebhookPoller(SingularityWebhookSender webhookSender, SingularityExceptionNotifier exceptionNotifier, SingularityConfiguration configuration) {
this.webhookSender = webhookSender;
this.configuration = configuration;
this.exceptionNotifier = exceptionNotifier;
super(configuration.getCheckWebhooksEveryMillis(), TimeUnit.MILLISECONDS);

this.executorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("SingularityWebhookSender-%d").build());
this.webhookSender = webhookSender;
}

@Override
public void start() {
LOG.info("Starting a webhookPoller that executes webhooks every {}", JavaUtils.durationFromMillis(configuration.getCheckWebhooksEveryMillis()));

executorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
webhookSender.checkWebhooks();
} catch (Throwable t) {
LOG.error("Caught an unexpected exception while checking webhooks", t);
exceptionNotifier.notify(t);
}
}
}, configuration.getCheckWebhooksEveryMillis(), configuration.getCheckWebhooksEveryMillis(), TimeUnit.MILLISECONDS);
public void runActionOnPoll() {
webhookSender.checkWebhooks();
}

@Override
public void stop() {
MoreExecutors.shutdownAndAwaitTermination(executorService, 1, TimeUnit.SECONDS);
protected boolean abortsOnError() {
return false;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import javax.inject.Singleton;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.hubspot.singularity.config.SingularityConfiguration;
Expand All @@ -18,10 +17,9 @@ public class SingularityCleanupPoller extends SingularityLeaderOnlyPoller {

@Inject
SingularityCleanupPoller(SingularityConfiguration configuration, SingularityCleaner cleaner, @Named(SingularityMesosModule.SCHEDULER_LOCK_NAME) final Lock lock) {
super(configuration.getCleanupEverySeconds(), TimeUnit.SECONDS);
super(configuration.getCleanupEverySeconds(), TimeUnit.SECONDS, lock);

this.cleaner = cleaner;
this.lockHolder = Optional.of(lock);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import javax.inject.Singleton;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.hubspot.singularity.config.SingularityConfiguration;
Expand All @@ -18,10 +17,9 @@ public class SingularityCooldownPoller extends SingularityLeaderOnlyPoller {

@Inject
SingularityCooldownPoller(SingularityConfiguration configuration, SingularityCooldownChecker checker, @Named(SingularityMesosModule.SCHEDULER_LOCK_NAME) final Lock lock) {
super(TimeUnit.MINUTES.toMillis(configuration.getCooldownExpiresAfterMinutes()) / 2, TimeUnit.MILLISECONDS);
super(TimeUnit.MINUTES.toMillis(configuration.getCooldownExpiresAfterMinutes()) / 2, TimeUnit.MILLISECONDS, lock);

this.checker = checker;
this.lockHolder = Optional.of(lock);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.hubspot.mesos.JavaUtils;
Expand All @@ -24,10 +23,9 @@ public class SingularityDeployPoller extends SingularityLeaderOnlyPoller {

@Inject
SingularityDeployPoller(SingularityDeployChecker deployChecker, SingularityConfiguration configuration, @Named(SingularityMesosModule.SCHEDULER_LOCK_NAME) final Lock lock) {
super(configuration.getCheckDeploysEverySeconds(), TimeUnit.SECONDS);
super(configuration.getCheckDeploysEverySeconds(), TimeUnit.SECONDS, lock);

this.deployChecker = deployChecker;
this.lockHolder = Optional.of(lock);
}

@Override
Expand Down
Loading

0 comments on commit 92d8a60

Please sign in to comment.