diff --git a/pom.xml b/pom.xml
index e1571706f..1a25a6dc4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,7 +8,7 @@
mesos
- 0.16.99
+ 0.16.100
hpi
https://wiki.jenkins-ci.org/display/JENKINS/Mesos+Plugin
Allows the dynamic launch Jenkins agent on a Mesos cluster, depending on workload
diff --git a/src/main/java/org/jenkinsci/plugins/mesos/JenkinsScheduler.java b/src/main/java/org/jenkinsci/plugins/mesos/JenkinsScheduler.java
index 12a995515..8cccbc47d 100644
--- a/src/main/java/org/jenkinsci/plugins/mesos/JenkinsScheduler.java
+++ b/src/main/java/org/jenkinsci/plugins/mesos/JenkinsScheduler.java
@@ -27,6 +27,7 @@
import org.apache.commons.collections4.OrderedMapIterator;
import org.apache.commons.collections4.map.LRUMap;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.mesos.MesosSchedulerDriver;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.*;
@@ -40,9 +41,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -66,6 +64,7 @@ public class JenkinsScheduler implements Scheduler {
private static final String JNLP_SECRET_FORMAT = "-secret %s";
public static final String PORT_RESOURCE_NAME = "ports";
public static final String MESOS_DEFAULT_ROLE = "*";
+ public static final String NULL_FRAMEWORK_ID = "null-framework-id";
private final boolean multiThreaded;
private Queue requests;
@@ -88,19 +87,10 @@ public class JenkinsScheduler implements Scheduler {
private static LRUMap recentlyAcceptedOffers = new LRUMap(lruCacheSize);
private static final Object IGNORE = new Object();
- private static final ExecutorService offersService = Executors.newSingleThreadExecutor(new ThreadFactory(){
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r);
- thread.setDaemon(true);
- thread.setName("mesos-offer-processing-thread");
-
- return thread;
- }
- });
private static final OfferQueue offerQueue = new OfferQueue();
+ private Thread offerProcessingThread = null;
+ private volatile FrameworkID frameworkId;
public JenkinsScheduler(String jenkinsMaster, MesosCloud mesosCloud, boolean multiThreaded) {
startedTime = System.currentTimeMillis();
@@ -114,18 +104,6 @@ public JenkinsScheduler(String jenkinsMaster, MesosCloud mesosCloud, boolean mul
this.unmatchedLabels = new HashSet();
this.results = new HashMap();
this.finishedTasks = Collections.newSetFromMap(new ConcurrentHashMap());
-
- if (multiThreaded) {
- LOGGER.info("Processing offers on a separate thread.");
- // Start consumption of the offer queue. This will idle until offers start arriving.
- offersService.execute(() -> {
- while (true) {
- processOffers();
- }
- });
- } else {
- LOGGER.severe("NOT PROCESSING OFFERS");
- }
}
public synchronized void init() {
@@ -303,6 +281,7 @@ public synchronized void terminateJenkinsSlave(String name) {
@Override
public void registered(SchedulerDriver driver, FrameworkID frameworkId, MasterInfo masterInfo) {
LOGGER.info("Framework registered! ID = " + frameworkId.getValue());
+ this.frameworkId = frameworkId;
}
@Override
@@ -385,14 +364,18 @@ private void processOffers() {
@Override
public synchronized void resourceOffers(SchedulerDriver driver, List offers) {
+ if (multiThreaded && !isProcessing()) {
+ startProcessing();
+ }
+
for (Protos.Offer offer : offers) {
boolean queued = offerQueue.offer(offer);
if (!queued) {
LOGGER.warning("Offer queue is full.");
declineShort(offer);
+ } else {
+ LOGGER.info("Queued offer " + offer.getId().getValue());
}
-
- LOGGER.info("Queued offer " + offer.getId().getValue());
}
if (!multiThreaded) {
@@ -400,6 +383,51 @@ public synchronized void resourceOffers(SchedulerDriver driver, List offe
}
}
+ @VisibleForTesting
+ String getFrameworkId() {
+ if (frameworkId != null) {
+ return frameworkId.getValue();
+ } else {
+ return NULL_FRAMEWORK_ID;
+ }
+ }
+
+ @VisibleForTesting
+ void startProcessing() {
+ String threadName = "mesos-offer-processor-" + getFrameworkId();
+ LOGGER.info("Starting offer processing thread: " + threadName);
+
+ offerProcessingThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOGGER.info("Started offer processing thread: " + threadName);
+ try {
+ while (true) {
+ processOffers();
+ }
+ } catch (Throwable t) {
+ LOGGER.severe("Offer processing thread failed with exception: " + ExceptionUtils.getStackTrace(t));
+ }
+ }
+ }, threadName);
+ offerProcessingThread.start();
+ }
+
+ @VisibleForTesting
+ boolean isProcessing() {
+ if (offerProcessingThread == null) {
+ LOGGER.info("Initializing offer processing thread.");
+ return false;
+ }
+
+ if (!offerProcessingThread.isAlive()) {
+ LOGGER.info("Offer processing thread is not alive.");
+ return false;
+ }
+
+ return true;
+ }
+
/**
* Determines whether an offer of a Mesos Agent has an unavailability set and is currently scheduled for maintenance.
*
diff --git a/src/test/java/org/jenkinsci/plugins/mesos/JenkinsSchedulerTest.java b/src/test/java/org/jenkinsci/plugins/mesos/JenkinsSchedulerTest.java
index 281df4814..92508250e 100644
--- a/src/test/java/org/jenkinsci/plugins/mesos/JenkinsSchedulerTest.java
+++ b/src/test/java/org/jenkinsci/plugins/mesos/JenkinsSchedulerTest.java
@@ -8,6 +8,7 @@
import jenkins.model.Jenkins;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -41,11 +42,13 @@ public class JenkinsSchedulerTest {
private Jenkins jenkins;
- private static int TEST_JENKINS_SLAVE_MEM = 512;
- private static String TEST_JENKINS_SLAVE_ARG = "-Xms16m -XX:+UseConcMarkSweepGC -Djava.net.preferIPv4Stack=true";
- private static String TEST_JENKINS_JNLP_ARG = "";
- private static String TEST_JENKINS_SLAVE_NAME = "testSlave1";
- private static String TEST_MESOS_ROLE_NAME = "test_role";
+ private static final int TEST_JENKINS_SLAVE_MEM = 512;
+ private static final String TEST_JENKINS_SLAVE_ARG = "-Xms16m -XX:+UseConcMarkSweepGC -Djava.net.preferIPv4Stack=true";
+ private static final String TEST_JENKINS_JNLP_ARG = "";
+ private static final String TEST_JENKINS_SLAVE_NAME = "testSlave1";
+ private static final String TEST_MESOS_ROLE_NAME = "test_role";
+ private static final Protos.FrameworkID TEST_FRAMEWORK_ID =
+ Protos.FrameworkID.newBuilder().setValue("test-framework-id").build();
@Before
@@ -402,6 +405,36 @@ public void testConstructMesosCommandInfoWithNullCustomDockerShell() throws Exce
jenkinsScheduler.getCommandInfoBuilder(request);
}
+ @Test
+ public void isProcessing() {
+ jenkinsScheduler = new JenkinsScheduler("jenkinsMaster", mesosCloud, false);
+ Assert.assertFalse(jenkinsScheduler.isProcessing());
+
+ jenkinsScheduler.startProcessing();
+ Assert.assertTrue(jenkinsScheduler.isProcessing());
+ }
+
+ @Test
+ public void constructMultiThreaded() {
+ SchedulerDriver driver = Mockito.mock(SchedulerDriver.class);
+
+ jenkinsScheduler = new JenkinsScheduler("jenkinsMaster", mesosCloud, true);
+ jenkinsScheduler.setDriver(driver);
+ Assert.assertFalse(jenkinsScheduler.isProcessing());
+
+ jenkinsScheduler.registered(driver, TEST_FRAMEWORK_ID, null);
+ jenkinsScheduler.resourceOffers(driver, Collections.emptyList());
+
+ Assert.assertTrue(jenkinsScheduler.isProcessing());
+ }
+
+ @Test
+ public void getFrameworkId() {
+ Assert.assertEquals(JenkinsScheduler.NULL_FRAMEWORK_ID, jenkinsScheduler.getFrameworkId());
+ jenkinsScheduler.registered(null, TEST_FRAMEWORK_ID, null);
+ Assert.assertEquals(TEST_FRAMEWORK_ID.getValue(), jenkinsScheduler.getFrameworkId());
+ }
+
private Mesos.SlaveRequest mockSlaveRequest(
Boolean useDocker,
Boolean useCustomDockerCommandShell,