Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Make offer processing thread immortal (#5)
Browse files Browse the repository at this point in the history
* Make offer processing thread immortal

* Add offer processing thread unit tests

* Bump version number to 0.16.100

* Add offer processing failure logging
  • Loading branch information
gabrielhartmann authored Apr 9, 2018
1 parent d4b960c commit 083492e
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</parent>

<artifactId>mesos</artifactId>
<version>0.16.99</version>
<version>0.16.100</version>
<packaging>hpi</packaging>
<url>https://wiki.jenkins-ci.org/display/JENKINS/Mesos+Plugin</url>
<description>Allows the dynamic launch Jenkins agent on a Mesos cluster, depending on workload</description>
Expand Down
84 changes: 56 additions & 28 deletions src/main/java/org/jenkinsci/plugins/mesos/JenkinsScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.commons.collections4.OrderedMapIterator;
import org.apache.commons.collections4.map.LRUMap;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.mesos.MesosSchedulerDriver;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.*;
Expand All @@ -40,9 +41,6 @@

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -66,6 +64,7 @@ public class JenkinsScheduler implements Scheduler {
private static final String JNLP_SECRET_FORMAT = "-secret %s";
public static final String PORT_RESOURCE_NAME = "ports";
public static final String MESOS_DEFAULT_ROLE = "*";
public static final String NULL_FRAMEWORK_ID = "null-framework-id";
private final boolean multiThreaded;

private Queue<Request> requests;
Expand All @@ -88,19 +87,10 @@ public class JenkinsScheduler implements Scheduler {
private static LRUMap<String, Object> recentlyAcceptedOffers = new LRUMap<String, Object>(lruCacheSize);

private static final Object IGNORE = new Object();
private static final ExecutorService offersService = Executors.newSingleThreadExecutor(new ThreadFactory(){

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("mesos-offer-processing-thread");

return thread;
}
});

private static final OfferQueue offerQueue = new OfferQueue();
private Thread offerProcessingThread = null;
private volatile FrameworkID frameworkId;

public JenkinsScheduler(String jenkinsMaster, MesosCloud mesosCloud, boolean multiThreaded) {
startedTime = System.currentTimeMillis();
Expand All @@ -114,18 +104,6 @@ public JenkinsScheduler(String jenkinsMaster, MesosCloud mesosCloud, boolean mul
this.unmatchedLabels = new HashSet<String>();
this.results = new HashMap<TaskID, Result>();
this.finishedTasks = Collections.newSetFromMap(new ConcurrentHashMap<TaskID, Boolean>());

if (multiThreaded) {
LOGGER.info("Processing offers on a separate thread.");
// Start consumption of the offer queue. This will idle until offers start arriving.
offersService.execute(() -> {
while (true) {
processOffers();
}
});
} else {
LOGGER.severe("NOT PROCESSING OFFERS");
}
}

public synchronized void init() {
Expand Down Expand Up @@ -303,6 +281,7 @@ public synchronized void terminateJenkinsSlave(String name) {
@Override
public void registered(SchedulerDriver driver, FrameworkID frameworkId, MasterInfo masterInfo) {
LOGGER.info("Framework registered! ID = " + frameworkId.getValue());
this.frameworkId = frameworkId;
}

@Override
Expand Down Expand Up @@ -385,21 +364,70 @@ private void processOffers() {

@Override
public synchronized void resourceOffers(SchedulerDriver driver, List<Offer> offers) {
if (multiThreaded && !isProcessing()) {
startProcessing();
}

for (Protos.Offer offer : offers) {
boolean queued = offerQueue.offer(offer);
if (!queued) {
LOGGER.warning("Offer queue is full.");
declineShort(offer);
} else {
LOGGER.info("Queued offer " + offer.getId().getValue());
}

LOGGER.info("Queued offer " + offer.getId().getValue());
}

if (!multiThreaded) {
processOffers();
}
}

@VisibleForTesting
String getFrameworkId() {
if (frameworkId != null) {
return frameworkId.getValue();
} else {
return NULL_FRAMEWORK_ID;
}
}

@VisibleForTesting
void startProcessing() {
String threadName = "mesos-offer-processor-" + getFrameworkId();
LOGGER.info("Starting offer processing thread: " + threadName);

offerProcessingThread = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("Started offer processing thread: " + threadName);
try {
while (true) {
processOffers();
}
} catch (Throwable t) {
LOGGER.severe("Offer processing thread failed with exception: " + ExceptionUtils.getStackTrace(t));
}
}
}, threadName);
offerProcessingThread.start();
}

@VisibleForTesting
boolean isProcessing() {
if (offerProcessingThread == null) {
LOGGER.info("Initializing offer processing thread.");
return false;
}

if (!offerProcessingThread.isAlive()) {
LOGGER.info("Offer processing thread is not alive.");
return false;
}

return true;
}

/**
* Determines whether an offer of a Mesos Agent has an unavailability set and is currently scheduled for maintenance.
* <br /><br />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import jenkins.model.Jenkins;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -41,11 +42,13 @@ public class JenkinsSchedulerTest {

private Jenkins jenkins;

private static int TEST_JENKINS_SLAVE_MEM = 512;
private static String TEST_JENKINS_SLAVE_ARG = "-Xms16m -XX:+UseConcMarkSweepGC -Djava.net.preferIPv4Stack=true";
private static String TEST_JENKINS_JNLP_ARG = "";
private static String TEST_JENKINS_SLAVE_NAME = "testSlave1";
private static String TEST_MESOS_ROLE_NAME = "test_role";
private static final int TEST_JENKINS_SLAVE_MEM = 512;
private static final String TEST_JENKINS_SLAVE_ARG = "-Xms16m -XX:+UseConcMarkSweepGC -Djava.net.preferIPv4Stack=true";
private static final String TEST_JENKINS_JNLP_ARG = "";
private static final String TEST_JENKINS_SLAVE_NAME = "testSlave1";
private static final String TEST_MESOS_ROLE_NAME = "test_role";
private static final Protos.FrameworkID TEST_FRAMEWORK_ID =
Protos.FrameworkID.newBuilder().setValue("test-framework-id").build();


@Before
Expand Down Expand Up @@ -402,6 +405,36 @@ public void testConstructMesosCommandInfoWithNullCustomDockerShell() throws Exce
jenkinsScheduler.getCommandInfoBuilder(request);
}

@Test
public void isProcessing() {
jenkinsScheduler = new JenkinsScheduler("jenkinsMaster", mesosCloud, false);
Assert.assertFalse(jenkinsScheduler.isProcessing());

jenkinsScheduler.startProcessing();
Assert.assertTrue(jenkinsScheduler.isProcessing());
}

@Test
public void constructMultiThreaded() {
SchedulerDriver driver = Mockito.mock(SchedulerDriver.class);

jenkinsScheduler = new JenkinsScheduler("jenkinsMaster", mesosCloud, true);
jenkinsScheduler.setDriver(driver);
Assert.assertFalse(jenkinsScheduler.isProcessing());

jenkinsScheduler.registered(driver, TEST_FRAMEWORK_ID, null);
jenkinsScheduler.resourceOffers(driver, Collections.emptyList());

Assert.assertTrue(jenkinsScheduler.isProcessing());
}

@Test
public void getFrameworkId() {
Assert.assertEquals(JenkinsScheduler.NULL_FRAMEWORK_ID, jenkinsScheduler.getFrameworkId());
jenkinsScheduler.registered(null, TEST_FRAMEWORK_ID, null);
Assert.assertEquals(TEST_FRAMEWORK_ID.getValue(), jenkinsScheduler.getFrameworkId());
}

private Mesos.SlaveRequest mockSlaveRequest(
Boolean useDocker,
Boolean useCustomDockerCommandShell,
Expand Down

0 comments on commit 083492e

Please sign in to comment.