Skip to content

Commit

Permalink
improved queue handling
Browse files Browse the repository at this point in the history
  • Loading branch information
rainer-prosi committed Sep 30, 2024
1 parent 2aed56f commit 0669d00
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 14 deletions.
49 changes: 46 additions & 3 deletions src/main/java/org/cip4/bambi/core/AbstractDevice.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.cip4.jdflib.util.mime.BodyPartHelper;
import org.cip4.jdflib.util.mime.MimeReader;
import org.cip4.jdflib.util.mime.MimeWriter;
import org.cip4.jdflib.util.thread.MultiJobTaskQueue;
import org.cip4.jdflib.util.thread.MyMutex;

/**
Expand Down Expand Up @@ -2401,15 +2402,57 @@ public void ensureProcessor(final JDFQueueEntry newQE, final JDFDoc theJDF)

}

protected int getParallelSynch()
{
return 0;
}

class SingleProcess implements Runnable
{

private final AbstractDeviceProcessor p;

public SingleProcess(final AbstractDeviceProcessor p)
{
super();
this.p = p;
}

@Override
public void run()
{
final boolean result = p.processExistingQueueEntry();
p.stopProcessing(result ? EnumNodeStatus.Completed : EnumNodeStatus.Aborted);
log.info("completed processing " + getQueueProcessor().getTotalEntryCount());
for (final Object o : EnumQueueEntryStatus.getEnumList())
{
final EnumQueueEntryStatus s = (EnumQueueEntryStatus) o;
final int numEntries = getQueueProcessor().getQueue().numEntries(s);
if (numEntries > 0)
{
log.info("completed processing " + s.getName() + " " + numEntries);
}
}
}
}

public boolean doSynchronous(final IQueueEntry iqe)
{
final AbstractDeviceProcessor p = buildDeviceProcessor();
p.setCurrentQE(iqe);
p.setParent(this);
p.getStatusListener().setWantPersist(false);
final boolean result = p.processExistingQueueEntry();
p.stopProcessing(result ? EnumNodeStatus.Completed : EnumNodeStatus.Aborted);
return result;
if (getParallelSynch() > 0)
{
MultiJobTaskQueue.getCreateJobQueue(getClass().getSimpleName(), getParallelSynch()).queue(new SingleProcess(p), p.getJobID());
return true;
}
else
{
final boolean result = p.processExistingQueueEntry();
p.stopProcessing(result ? EnumNodeStatus.Completed : EnumNodeStatus.Aborted);
return result;
}
}

public boolean isSynchronous()
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/cip4/bambi/core/BambiContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,10 @@ public XMLResponse processJMFDoc(final XMLRequest request)
{
responseDoc = handler.processJMF(jmfDoc);
}
else
{
log.warn("No handler for " + requestURI);
}

if (responseDoc != null)
{
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/cip4/bambi/core/ServletContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void setWantDump(final boolean wantDump)
*/
public XMLResponse processError(final String requestURI, final EnumType messageType, final int returnCode, final String notification)
{
log.warn("processError- rc: " + returnCode + " " + ((notification == null) ? JDFConstants.EMPTYSTRING : notification));
log.warn("processError- rc: " + returnCode + " " + ((notification == null) ? JDFConstants.EMPTYSTRING : notification), new BambiException());
final JDFJMF error = JDFJMF.createJMF(EnumFamily.Response, messageType);
final JDFResponse r = error.getResponse(0);
r.setReturnCode(returnCode);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/cip4/bambi/core/StatusListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ public boolean flush(final String msgType)
{
final Trigger[] t2 = rootDispatcher.triggerQueueEntry(theCounter.getQueueEntryID(), theCounter.getNodeIDentifier(), -1, msgType);
rootDispatcher.flush();
if (!Trigger.waitQueued(t2, 12000))
if (!Trigger.waitQueued(t2, 420))
return false;
}
return Trigger.waitQueued(t, 12000);
return Trigger.waitQueued(t, 420);
}
return false;
}
Expand Down
19 changes: 18 additions & 1 deletion src/main/java/org/cip4/bambi/core/queues/QueueProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2955,6 +2955,11 @@ private boolean returnJDFUrl(final JDFDoc docJDF)
}

boolean returnJMF(final JDFDoc docJDF, final JDFJMF jmf)
{
return returnJMF(docJDF, jmf, 0);
}

boolean returnJMF(final JDFDoc docJDF, final JDFJMF jmf, int retry)
{
final JDFNode jdfRoot = docJDF == null ? null : docJDF.getJDFRoot();
if (jdfRoot == null || jmf == null)
Expand All @@ -2968,7 +2973,7 @@ boolean returnJMF(final JDFDoc docJDF, final JDFJMF jmf)
final String returnJMF = BambiNSExtension.getReturnJMF(qe);
final JDFReturnQueueEntryParams returnQEParams = jmf.getCommand(0).getReturnQueueEntryParams(0);

qLog.info("MIME/ZIP ReturnQueueEntry for " + queueEntryID + " is being been sent to " + returnJMF);
qLog.info("MIME/ZIP ReturnQueueEntry for " + queueEntryID + " is being been sent to " + returnJMF + " retry=" + retry);
returnQEParams.setURL("cid:dummy"); // will be overwritten by buildMimePackage

final MIMEDetails mimeDetails = new MIMEDetails();
Expand Down Expand Up @@ -3002,6 +3007,13 @@ boolean returnJMF(final JDFDoc docJDF, final JDFJMF jmf)
{
qLog.error("failed to send ReturnQueueEntry for " + queueEntryID + " to " + returnJMF);
}
if (retry < 2)
{
++retry;
ThreadUtil.sleep(1234 * retry);
qLog.warn("retry ReturnQueueEntry for " + queueEntryID + " to " + returnJMF + " # " + retry);
return returnJMF(docJDF, jmf, retry);
}
return false;
}

Expand Down Expand Up @@ -3399,4 +3411,9 @@ protected JDFDoc getDocFromXJDFZip(final String url, final ZipReader zipReader)
}
return doc;
}

public int getTotalEntryCount()
{
return BambiNSExtension.getTotal(getQueue());
}
}
14 changes: 8 additions & 6 deletions src/test/java/org/cip4/bambi/BambiTestProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,30 +84,32 @@
public class BambiTestProcessor extends WorkerDeviceProcessor
{

private EnumQueueEntryStatus finalStatus;
private final EnumQueueEntryStatus finalStatus;

public BambiTestProcessor(EnumQueueEntryStatus finalStatus)
public BambiTestProcessor(final EnumQueueEntryStatus finalStatus)
{
super();
_doShutdown = true; // avoid start of proc loop
this.finalStatus = finalStatus;
JDFNode n = JDFNode.createRoot();
JDFQueueEntry qe = ((JDFQueue) JDFElement.createRoot(ElementName.QUEUE)).appendQueueEntry();
final JDFNode n = JDFNode.createRoot();
final JDFQueueEntry qe = ((JDFQueue) JDFElement.createRoot(ElementName.QUEUE)).appendQueueEntry();
qe.setQueueEntryStatus(EnumQueueEntryStatus.Waiting);
setCurrentQE(new QueueEntry(n, qe));
}

static int wait = 42;

@Override
public EnumQueueEntryStatus processDoc(JDFNode n, JDFQueueEntry qe)
public EnumQueueEntryStatus processDoc(final JDFNode n, final JDFQueueEntry qe)
{
ThreadUtil.sleep(wait);
return finalStatus;
}

@Override
public EnumNodeStatus stopProcessing(EnumNodeStatus newStatus)
public EnumNodeStatus stopProcessing(final EnumNodeStatus newStatus)
{
getCurrentQE().getQueueEntry().setStatus(newStatus);
return newStatus;
}

Expand Down
67 changes: 66 additions & 1 deletion src/test/java/org/cip4/bambi/core/AbstractDeviceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,19 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;

import java.io.File;

import org.cip4.bambi.BambiTestCaseBase;
import org.cip4.bambi.BambiTestDevice;
import org.cip4.bambi.core.IDeviceProperties.EWatchFormat;
import org.cip4.bambi.core.queues.QueueEntry;
import org.cip4.jdflib.auto.JDFAutoQueueEntry.EnumQueueEntryStatus;
import org.cip4.jdflib.core.AttributeName;
import org.cip4.jdflib.core.ElementName;
import org.cip4.jdflib.core.JDFDoc;
Expand All @@ -62,7 +65,9 @@
import org.cip4.jdflib.jmf.JDFQueueEntry;
import org.cip4.jdflib.jmf.JMFBuilderFactory;
import org.cip4.jdflib.node.JDFNode;
import org.cip4.jdflib.util.ThreadUtil;
import org.junit.Test;
import org.mockito.Mockito;

public class AbstractDeviceTest extends BambiTestCaseBase
{
Expand Down Expand Up @@ -119,7 +124,7 @@ public void testUpdateWatchURL() throws Exception
* @throws Exception
*/
@Test
public void testEWatchFormart() throws Exception
public void testEWatchFormat() throws Exception
{
assertEquals(EWatchFormat.JMF, EWatchFormat.getEnum(null));
assertEquals(EWatchFormat.XJMF, EWatchFormat.getEnum("xJmF"));
Expand Down Expand Up @@ -245,6 +250,66 @@ public void testDoSynch() throws Exception
assertTrue(device.doSynchronous(qe));
}

/**
* @throws Exception
*/
@Test
public void testDoSynchMulti() throws Exception
{
final BambiTestDevice device = Mockito.spy(new BambiTestDevice());
device.setSynchronous(true);
when(device.getParallelSynch()).thenReturn(4);
for (int i = 0; i < 420; i++)
{
final JDFQueueEntry qe = device.getQueueProcessor().getQueue().appendQueueEntry();
qe.setQueueEntryID("qe" + i);
qe.setQueueEntryStatus(EnumQueueEntryStatus.Waiting);
final JDFNode root = JDFNode.createRoot();
root.setJobID("J" + i % 7);
final QueueEntry qee = new QueueEntry(root, qe);
assertTrue(device.doSynchronous(qee));
}
for (int i = 0; i < 1234; i++)
{
if (device.getQueueProcessor().getQueue().numEntries(EnumQueueEntryStatus.Waiting) > 0)
{
ThreadUtil.sleep(123);
}
}
assertEquals(0, device.getQueueProcessor().getQueue().numEntries(EnumQueueEntryStatus.Waiting));
assertNotEquals(0, device.getQueueProcessor().getQueue().numEntries(EnumQueueEntryStatus.Completed));
}

/**
* @throws Exception
*/
@Test
public void testDoAsynchMulti() throws Exception
{
final BambiTestDevice device = Mockito.spy(new BambiTestDevice());
device.setSynchronous(false);
when(device.getParallelSynch()).thenReturn(4);
for (int i = 0; i < 420; i++)
{
final JDFQueueEntry qe = device.getQueueProcessor().getQueue().appendQueueEntry();
qe.setQueueEntryID("qe" + i);
qe.setQueueEntryStatus(EnumQueueEntryStatus.Waiting);
final JDFNode root = JDFNode.createRoot();
root.setJobID("J" + i % 7);
final QueueEntry qee = new QueueEntry(root, qe);
assertTrue(device.doSynchronous(qee));
}
for (int i = 0; i < 1234; i++)
{
if (device.getQueueProcessor().getQueue().numEntries(EnumQueueEntryStatus.Waiting) > 0)
{
ThreadUtil.sleep(123);
}
}
assertEquals(0, device.getQueueProcessor().getQueue().numEntries(EnumQueueEntryStatus.Waiting));
assertNotEquals(0, device.getQueueProcessor().getQueue().numEntries(EnumQueueEntryStatus.Completed));
}

/**
* @throws Exception
*/
Expand Down
12 changes: 12 additions & 0 deletions src/test/java/org/cip4/bambi/core/queues/QueueProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public void testQEReturn1()
qe.setSubmissionTime(new JDFDate());
final QueueEntryReturn r = qp.new QueueEntryReturn(qe, EnumQueueEntryStatus.Completed);
assertFalse(r.returnJMF(null, null));
assertFalse(r.returnJMF(null, null, 4));
assertFalse(r.returnJMF(doc, null));
}

Expand Down Expand Up @@ -290,6 +291,17 @@ public void testRemoveQE()
qp.new RemoveQueueEntryHandler().handleMessage(jmf.getMessageElement(null, null, 0), JDFJMF.createJMF(EnumFamily.Response, EnumType.RemoveQueueEntry).getResponse());
}

/**
*
*
*/
@Test
public void testTotalCount()
{
final QueueProcessor qp = getDevice().getQueueProcessor();
assertTrue(qp.getTotalEntryCount() >= 0);
}

/**
*
*
Expand Down

0 comments on commit 0669d00

Please sign in to comment.