From 0669d0065bbcd5c9a9442272abaaee1c213ecce7 Mon Sep 17 00:00:00 2001 From: Rainer Prosi Date: Mon, 30 Sep 2024 15:28:06 +0200 Subject: [PATCH] improved queue handling --- .../org/cip4/bambi/core/AbstractDevice.java | 49 +++++++++++++- .../org/cip4/bambi/core/BambiContainer.java | 4 ++ .../org/cip4/bambi/core/ServletContainer.java | 2 +- .../org/cip4/bambi/core/StatusListener.java | 4 +- .../bambi/core/queues/QueueProcessor.java | 19 +++++- .../org/cip4/bambi/BambiTestProcessor.java | 14 ++-- .../cip4/bambi/core/AbstractDeviceTest.java | 67 ++++++++++++++++++- .../bambi/core/queues/QueueProcessorTest.java | 12 ++++ 8 files changed, 157 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/cip4/bambi/core/AbstractDevice.java b/src/main/java/org/cip4/bambi/core/AbstractDevice.java index 85347b6c..03713c2a 100644 --- a/src/main/java/org/cip4/bambi/core/AbstractDevice.java +++ b/src/main/java/org/cip4/bambi/core/AbstractDevice.java @@ -116,6 +116,7 @@ import org.cip4.jdflib.util.mime.BodyPartHelper; import org.cip4.jdflib.util.mime.MimeReader; import org.cip4.jdflib.util.mime.MimeWriter; +import org.cip4.jdflib.util.thread.MultiJobTaskQueue; import org.cip4.jdflib.util.thread.MyMutex; /** @@ -2401,15 +2402,57 @@ public void ensureProcessor(final JDFQueueEntry newQE, final JDFDoc theJDF) } + protected int getParallelSynch() + { + return 0; + } + + class SingleProcess implements Runnable + { + + private final AbstractDeviceProcessor p; + + public SingleProcess(final AbstractDeviceProcessor p) + { + super(); + this.p = p; + } + + @Override + public void run() + { + final boolean result = p.processExistingQueueEntry(); + p.stopProcessing(result ? EnumNodeStatus.Completed : EnumNodeStatus.Aborted); + log.info("completed processing " + getQueueProcessor().getTotalEntryCount()); + for (final Object o : EnumQueueEntryStatus.getEnumList()) + { + final EnumQueueEntryStatus s = (EnumQueueEntryStatus) o; + final int numEntries = getQueueProcessor().getQueue().numEntries(s); + if (numEntries > 0) + { + log.info("completed processing " + s.getName() + " " + numEntries); + } + } + } + } + public boolean doSynchronous(final IQueueEntry iqe) { final AbstractDeviceProcessor p = buildDeviceProcessor(); p.setCurrentQE(iqe); p.setParent(this); p.getStatusListener().setWantPersist(false); - final boolean result = p.processExistingQueueEntry(); - p.stopProcessing(result ? EnumNodeStatus.Completed : EnumNodeStatus.Aborted); - return result; + if (getParallelSynch() > 0) + { + MultiJobTaskQueue.getCreateJobQueue(getClass().getSimpleName(), getParallelSynch()).queue(new SingleProcess(p), p.getJobID()); + return true; + } + else + { + final boolean result = p.processExistingQueueEntry(); + p.stopProcessing(result ? EnumNodeStatus.Completed : EnumNodeStatus.Aborted); + return result; + } } public boolean isSynchronous() diff --git a/src/main/java/org/cip4/bambi/core/BambiContainer.java b/src/main/java/org/cip4/bambi/core/BambiContainer.java index d95a40d8..3c188a8e 100644 --- a/src/main/java/org/cip4/bambi/core/BambiContainer.java +++ b/src/main/java/org/cip4/bambi/core/BambiContainer.java @@ -620,6 +620,10 @@ public XMLResponse processJMFDoc(final XMLRequest request) { responseDoc = handler.processJMF(jmfDoc); } + else + { + log.warn("No handler for " + requestURI); + } if (responseDoc != null) { diff --git a/src/main/java/org/cip4/bambi/core/ServletContainer.java b/src/main/java/org/cip4/bambi/core/ServletContainer.java index f6276bd5..ea619d61 100644 --- a/src/main/java/org/cip4/bambi/core/ServletContainer.java +++ b/src/main/java/org/cip4/bambi/core/ServletContainer.java @@ -133,7 +133,7 @@ public void setWantDump(final boolean wantDump) */ public XMLResponse processError(final String requestURI, final EnumType messageType, final int returnCode, final String notification) { - log.warn("processError- rc: " + returnCode + " " + ((notification == null) ? JDFConstants.EMPTYSTRING : notification)); + log.warn("processError- rc: " + returnCode + " " + ((notification == null) ? JDFConstants.EMPTYSTRING : notification), new BambiException()); final JDFJMF error = JDFJMF.createJMF(EnumFamily.Response, messageType); final JDFResponse r = error.getResponse(0); r.setReturnCode(returnCode); diff --git a/src/main/java/org/cip4/bambi/core/StatusListener.java b/src/main/java/org/cip4/bambi/core/StatusListener.java index 038b9677..0958d2db 100644 --- a/src/main/java/org/cip4/bambi/core/StatusListener.java +++ b/src/main/java/org/cip4/bambi/core/StatusListener.java @@ -118,10 +118,10 @@ public boolean flush(final String msgType) { final Trigger[] t2 = rootDispatcher.triggerQueueEntry(theCounter.getQueueEntryID(), theCounter.getNodeIDentifier(), -1, msgType); rootDispatcher.flush(); - if (!Trigger.waitQueued(t2, 12000)) + if (!Trigger.waitQueued(t2, 420)) return false; } - return Trigger.waitQueued(t, 12000); + return Trigger.waitQueued(t, 420); } return false; } diff --git a/src/main/java/org/cip4/bambi/core/queues/QueueProcessor.java b/src/main/java/org/cip4/bambi/core/queues/QueueProcessor.java index 9d17e611..8a47bce4 100644 --- a/src/main/java/org/cip4/bambi/core/queues/QueueProcessor.java +++ b/src/main/java/org/cip4/bambi/core/queues/QueueProcessor.java @@ -2955,6 +2955,11 @@ private boolean returnJDFUrl(final JDFDoc docJDF) } boolean returnJMF(final JDFDoc docJDF, final JDFJMF jmf) + { + return returnJMF(docJDF, jmf, 0); + } + + boolean returnJMF(final JDFDoc docJDF, final JDFJMF jmf, int retry) { final JDFNode jdfRoot = docJDF == null ? null : docJDF.getJDFRoot(); if (jdfRoot == null || jmf == null) @@ -2968,7 +2973,7 @@ boolean returnJMF(final JDFDoc docJDF, final JDFJMF jmf) final String returnJMF = BambiNSExtension.getReturnJMF(qe); final JDFReturnQueueEntryParams returnQEParams = jmf.getCommand(0).getReturnQueueEntryParams(0); - qLog.info("MIME/ZIP ReturnQueueEntry for " + queueEntryID + " is being been sent to " + returnJMF); + qLog.info("MIME/ZIP ReturnQueueEntry for " + queueEntryID + " is being been sent to " + returnJMF + " retry=" + retry); returnQEParams.setURL("cid:dummy"); // will be overwritten by buildMimePackage final MIMEDetails mimeDetails = new MIMEDetails(); @@ -3002,6 +3007,13 @@ boolean returnJMF(final JDFDoc docJDF, final JDFJMF jmf) { qLog.error("failed to send ReturnQueueEntry for " + queueEntryID + " to " + returnJMF); } + if (retry < 2) + { + ++retry; + ThreadUtil.sleep(1234 * retry); + qLog.warn("retry ReturnQueueEntry for " + queueEntryID + " to " + returnJMF + " # " + retry); + return returnJMF(docJDF, jmf, retry); + } return false; } @@ -3399,4 +3411,9 @@ protected JDFDoc getDocFromXJDFZip(final String url, final ZipReader zipReader) } return doc; } + + public int getTotalEntryCount() + { + return BambiNSExtension.getTotal(getQueue()); + } } diff --git a/src/test/java/org/cip4/bambi/BambiTestProcessor.java b/src/test/java/org/cip4/bambi/BambiTestProcessor.java index 1dd95b47..9bc1fec9 100644 --- a/src/test/java/org/cip4/bambi/BambiTestProcessor.java +++ b/src/test/java/org/cip4/bambi/BambiTestProcessor.java @@ -84,30 +84,32 @@ public class BambiTestProcessor extends WorkerDeviceProcessor { - private EnumQueueEntryStatus finalStatus; + private final EnumQueueEntryStatus finalStatus; - public BambiTestProcessor(EnumQueueEntryStatus finalStatus) + public BambiTestProcessor(final EnumQueueEntryStatus finalStatus) { super(); _doShutdown = true; // avoid start of proc loop this.finalStatus = finalStatus; - JDFNode n = JDFNode.createRoot(); - JDFQueueEntry qe = ((JDFQueue) JDFElement.createRoot(ElementName.QUEUE)).appendQueueEntry(); + final JDFNode n = JDFNode.createRoot(); + final JDFQueueEntry qe = ((JDFQueue) JDFElement.createRoot(ElementName.QUEUE)).appendQueueEntry(); + qe.setQueueEntryStatus(EnumQueueEntryStatus.Waiting); setCurrentQE(new QueueEntry(n, qe)); } static int wait = 42; @Override - public EnumQueueEntryStatus processDoc(JDFNode n, JDFQueueEntry qe) + public EnumQueueEntryStatus processDoc(final JDFNode n, final JDFQueueEntry qe) { ThreadUtil.sleep(wait); return finalStatus; } @Override - public EnumNodeStatus stopProcessing(EnumNodeStatus newStatus) + public EnumNodeStatus stopProcessing(final EnumNodeStatus newStatus) { + getCurrentQE().getQueueEntry().setStatus(newStatus); return newStatus; } diff --git a/src/test/java/org/cip4/bambi/core/AbstractDeviceTest.java b/src/test/java/org/cip4/bambi/core/AbstractDeviceTest.java index 166d2978..baa266d6 100644 --- a/src/test/java/org/cip4/bambi/core/AbstractDeviceTest.java +++ b/src/test/java/org/cip4/bambi/core/AbstractDeviceTest.java @@ -41,9 +41,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; import java.io.File; @@ -51,6 +53,7 @@ import org.cip4.bambi.BambiTestDevice; import org.cip4.bambi.core.IDeviceProperties.EWatchFormat; import org.cip4.bambi.core.queues.QueueEntry; +import org.cip4.jdflib.auto.JDFAutoQueueEntry.EnumQueueEntryStatus; import org.cip4.jdflib.core.AttributeName; import org.cip4.jdflib.core.ElementName; import org.cip4.jdflib.core.JDFDoc; @@ -62,7 +65,9 @@ import org.cip4.jdflib.jmf.JDFQueueEntry; import org.cip4.jdflib.jmf.JMFBuilderFactory; import org.cip4.jdflib.node.JDFNode; +import org.cip4.jdflib.util.ThreadUtil; import org.junit.Test; +import org.mockito.Mockito; public class AbstractDeviceTest extends BambiTestCaseBase { @@ -119,7 +124,7 @@ public void testUpdateWatchURL() throws Exception * @throws Exception */ @Test - public void testEWatchFormart() throws Exception + public void testEWatchFormat() throws Exception { assertEquals(EWatchFormat.JMF, EWatchFormat.getEnum(null)); assertEquals(EWatchFormat.XJMF, EWatchFormat.getEnum("xJmF")); @@ -245,6 +250,66 @@ public void testDoSynch() throws Exception assertTrue(device.doSynchronous(qe)); } + /** + * @throws Exception + */ + @Test + public void testDoSynchMulti() throws Exception + { + final BambiTestDevice device = Mockito.spy(new BambiTestDevice()); + device.setSynchronous(true); + when(device.getParallelSynch()).thenReturn(4); + for (int i = 0; i < 420; i++) + { + final JDFQueueEntry qe = device.getQueueProcessor().getQueue().appendQueueEntry(); + qe.setQueueEntryID("qe" + i); + qe.setQueueEntryStatus(EnumQueueEntryStatus.Waiting); + final JDFNode root = JDFNode.createRoot(); + root.setJobID("J" + i % 7); + final QueueEntry qee = new QueueEntry(root, qe); + assertTrue(device.doSynchronous(qee)); + } + for (int i = 0; i < 1234; i++) + { + if (device.getQueueProcessor().getQueue().numEntries(EnumQueueEntryStatus.Waiting) > 0) + { + ThreadUtil.sleep(123); + } + } + assertEquals(0, device.getQueueProcessor().getQueue().numEntries(EnumQueueEntryStatus.Waiting)); + assertNotEquals(0, device.getQueueProcessor().getQueue().numEntries(EnumQueueEntryStatus.Completed)); + } + + /** + * @throws Exception + */ + @Test + public void testDoAsynchMulti() throws Exception + { + final BambiTestDevice device = Mockito.spy(new BambiTestDevice()); + device.setSynchronous(false); + when(device.getParallelSynch()).thenReturn(4); + for (int i = 0; i < 420; i++) + { + final JDFQueueEntry qe = device.getQueueProcessor().getQueue().appendQueueEntry(); + qe.setQueueEntryID("qe" + i); + qe.setQueueEntryStatus(EnumQueueEntryStatus.Waiting); + final JDFNode root = JDFNode.createRoot(); + root.setJobID("J" + i % 7); + final QueueEntry qee = new QueueEntry(root, qe); + assertTrue(device.doSynchronous(qee)); + } + for (int i = 0; i < 1234; i++) + { + if (device.getQueueProcessor().getQueue().numEntries(EnumQueueEntryStatus.Waiting) > 0) + { + ThreadUtil.sleep(123); + } + } + assertEquals(0, device.getQueueProcessor().getQueue().numEntries(EnumQueueEntryStatus.Waiting)); + assertNotEquals(0, device.getQueueProcessor().getQueue().numEntries(EnumQueueEntryStatus.Completed)); + } + /** * @throws Exception */ diff --git a/src/test/java/org/cip4/bambi/core/queues/QueueProcessorTest.java b/src/test/java/org/cip4/bambi/core/queues/QueueProcessorTest.java index 4b3b4676..312a304b 100644 --- a/src/test/java/org/cip4/bambi/core/queues/QueueProcessorTest.java +++ b/src/test/java/org/cip4/bambi/core/queues/QueueProcessorTest.java @@ -185,6 +185,7 @@ public void testQEReturn1() qe.setSubmissionTime(new JDFDate()); final QueueEntryReturn r = qp.new QueueEntryReturn(qe, EnumQueueEntryStatus.Completed); assertFalse(r.returnJMF(null, null)); + assertFalse(r.returnJMF(null, null, 4)); assertFalse(r.returnJMF(doc, null)); } @@ -290,6 +291,17 @@ public void testRemoveQE() qp.new RemoveQueueEntryHandler().handleMessage(jmf.getMessageElement(null, null, 0), JDFJMF.createJMF(EnumFamily.Response, EnumType.RemoveQueueEntry).getResponse()); } + /** + * + * + */ + @Test + public void testTotalCount() + { + final QueueProcessor qp = getDevice().getQueueProcessor(); + assertTrue(qp.getTotalEntryCount() >= 0); + } + /** * *