From f443e56e8169c69382825adde5c0b264bee2660b Mon Sep 17 00:00:00 2001 From: Rainer Prosi Date: Wed, 20 Nov 2024 16:51:51 +0100 Subject: [PATCH] bambi improvements --- .../org/cip4/bambi/core/AbstractDevice.java | 15 +++---- .../bambi/core/AbstractDeviceProcessor.java | 3 +- .../org/cip4/bambi/core/HTTPResponse.java | 4 +- .../org/cip4/bambi/core/StatusListener.java | 31 +++++++------ .../bambi/core/messaging/MessageSender.java | 43 +++++++++---------- .../bambi/core/messaging/MsgSubscription.java | 6 +-- .../cip4/bambi/core/AbstractDeviceTest.java | 9 ++-- .../cip4/bambi/core/StatusListenerTest.java | 10 ++--- 8 files changed, 65 insertions(+), 56 deletions(-) diff --git a/src/main/java/org/cip4/bambi/core/AbstractDevice.java b/src/main/java/org/cip4/bambi/core/AbstractDevice.java index 94bab0f7..dbe44ee6 100644 --- a/src/main/java/org/cip4/bambi/core/AbstractDevice.java +++ b/src/main/java/org/cip4/bambi/core/AbstractDevice.java @@ -40,6 +40,7 @@ package org.cip4.bambi.core; import java.io.File; +import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.Set; @@ -1950,18 +1951,16 @@ protected boolean copyPhaseTimeFromCounter(final JDFResponse response, final JDF final JDFResponse r = root.getResponse(i); final JDFResponse rResp = respRoot.getCreateResponse(i); final String id = rResp.getID(); - rResp.setrefID(refID); rResp.setAttributes(r); + rResp.setrefID(refID); rResp.setID(id); - final VElement v = r.getChildElementVector(null, null); - if (v != null) + final Collection v = r.getChildArray(null, null); + for (final KElement e : v) { - for (final KElement e : v) - { - rResp.copyElement(e, null); - } + rResp.copyElement(e, null); } } + return true; } @@ -2220,7 +2219,7 @@ public boolean deleteSignal(final JDFSignal s) for (final JDFDeviceInfo di : devInfos) { final EnumDeviceStatus stat = di.getDeviceStatus(); - if (!EnumDeviceStatus.Idle.equals(stat) && !EnumDeviceStatus.Down.equals(stat)) + if (!EnumDeviceStatus.Idle.equals(stat) && !EnumDeviceStatus.Down.equals(stat) || di.getJobPhase() != null) { idleCount = 0; return false; // we have something that is a bit active diff --git a/src/main/java/org/cip4/bambi/core/AbstractDeviceProcessor.java b/src/main/java/org/cip4/bambi/core/AbstractDeviceProcessor.java index c14b92cd..5cb517b3 100644 --- a/src/main/java/org/cip4/bambi/core/AbstractDeviceProcessor.java +++ b/src/main/java/org/cip4/bambi/core/AbstractDeviceProcessor.java @@ -571,7 +571,7 @@ protected void abort() */ protected void reset() { - _statusListener.signalStatus(EnumDeviceStatus.Idle, "JoIncomplete", EnumNodeStatus.Waiting, "job partially completed", true); + _statusListener.signalStatus(EnumDeviceStatus.Idle, "JobIncomplete", EnumNodeStatus.Waiting, "job partially completed", true); } /** @@ -580,6 +580,7 @@ protected void reset() */ protected void complete() { + _statusListener.setPercentComplete(100); _statusListener.signalStatus(EnumDeviceStatus.Idle, "JobCompleted", EnumNodeStatus.Completed, "job completed successfully", true); } diff --git a/src/main/java/org/cip4/bambi/core/HTTPResponse.java b/src/main/java/org/cip4/bambi/core/HTTPResponse.java index 59c8f826..dd95c5a9 100644 --- a/src/main/java/org/cip4/bambi/core/HTTPResponse.java +++ b/src/main/java/org/cip4/bambi/core/HTTPResponse.java @@ -3,7 +3,7 @@ * The CIP4 Software License, Version 1.0 * * - * Copyright (c) 2001-2020 The International Cooperation for the Integration of Processes in Prepress, Press and Postpress (CIP4). All rights reserved. + * Copyright (c) 2001-2024 The International Cooperation for the Integration of Processes in Prepress, Press and Postpress (CIP4). All rights reserved. * * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * @@ -297,7 +297,7 @@ public void setHttpRC(final int httpRC, final String notification) /** * @return the notification */ - protected String getNotification() + public String getNotification() { return notification; } diff --git a/src/main/java/org/cip4/bambi/core/StatusListener.java b/src/main/java/org/cip4/bambi/core/StatusListener.java index 1c3f0ddf..ce7d52da 100644 --- a/src/main/java/org/cip4/bambi/core/StatusListener.java +++ b/src/main/java/org/cip4/bambi/core/StatusListener.java @@ -49,13 +49,12 @@ import org.cip4.jdflib.core.JDFElement.EnumNodeStatus; import org.cip4.jdflib.core.KElement; import org.cip4.jdflib.core.VString; +import org.cip4.jdflib.datatypes.JDFAttributeMap; import org.cip4.jdflib.datatypes.VJDFAttributeMap; -import org.cip4.jdflib.jmf.JDFDeviceInfo; import org.cip4.jdflib.jmf.JDFMessage; import org.cip4.jdflib.jmf.JDFMessage.EnumType; import org.cip4.jdflib.jmf.JDFQuery; import org.cip4.jdflib.jmf.JDFResourceQuParams; -import org.cip4.jdflib.jmf.JDFResponse; import org.cip4.jdflib.jmf.JDFStatusQuParams; import org.cip4.jdflib.node.JDFNode; import org.cip4.jdflib.node.NodeIdentifier; @@ -75,6 +74,7 @@ public class StatusListener implements IPersistable { private SignalDispatcher dispatcher; + private SignalDispatcher rootDispatcher; protected StatusCounter theCounter; final MultiModuleStatusCounter multiCounter; @@ -206,6 +206,7 @@ public void updateAmount(final String resID, final double good, final double was theCounter.addPhase(resID, good, waste, true); if (good + waste > 0) { + theCounter.setPhase(null, null, null, null); dispatcher.triggerQueueEntry(theCounter.getQueueEntryID(), theCounter.getNodeIDentifier(), (int) (good + waste), null); } DelayedPersist.getDelayedPersist().queue(this, 123456); @@ -225,6 +226,7 @@ public void setPercentComplete(final double percent) return; } theCounter.setPercentComplete(percent); + theCounter.setPhase(null, null, null, null); DelayedPersist.getDelayedPersist().queue(this, 123456); } @@ -262,6 +264,7 @@ public void updateTotal(final String resID, final double amount, final boolean w theCounter.setTotal(resID, amount, waste); if (amount > 0) { + theCounter.setPhase(null, null, null, null); dispatcher.triggerQueueEntry(theCounter.getQueueEntryID(), theCounter.getNodeIDentifier(), (int) amount, null); } DelayedPersist.getDelayedPersist().queue(this, 123456); @@ -316,6 +319,12 @@ public synchronized void setNode(final String queueEntryID, JDFNode node, final theCounter.setTrackWaste("*", true); // always track waste theCounter.setFirstRefID(trackResourceID); theCounter.setQueueEntryID(queueEntryID); + if (node != null) + { + final VJDFAttributeMap partMapVector = node.getNodeInfoPartMapVector(); + final JDFAttributeMap partMap = partMapVector == null ? null : partMapVector.getCommonMap(); + theCounter.setPhase(node.getPartStatus(partMap, 1), node.getPartStatusDetails(partMap), EnumDeviceStatus.Running, node.getPartStatusDetails(partMap)); + } while (node != null) { dispatcher.addSubscriptions(node, queueEntryID); @@ -326,7 +335,6 @@ public synchronized void setNode(final String queueEntryID, JDFNode node, final /** * save the currently active jdf * - * @param timeSinceLast milliseconds time to leave between saves */ @Override public boolean persist() @@ -352,15 +360,7 @@ public boolean persist() */ public EnumDeviceStatus getDeviceStatus() { - if (theCounter == null) - { - return EnumDeviceStatus.Idle; - } - - final JDFDoc docJMF = theCounter.getDocJMFPhaseTime(); - final JDFResponse r = docJMF == null ? null : docJMF.getJMFRoot().getResponse(-1); - final JDFDeviceInfo di = r == null ? null : r.getDeviceInfo(0); - return di == null ? EnumDeviceStatus.Idle : di.getDeviceStatus(); + return multiCounter.getDeviceStatus(); } /** @@ -500,6 +500,7 @@ public boolean removeEmployee(final JDFEmployee employee) final boolean b = theCounter.removeEmployee(employee); if (b) { + theCounter.setPhase(null, null, null, null); DelayedPersist.getDelayedPersist().queue(this, -1); } return b; @@ -519,6 +520,7 @@ public int addEmployee(final JDFEmployee employee) final int n1 = theCounter.addEmployee(employee); if (n1 != n0) { + theCounter.setPhase(null, null, null, null); DelayedPersist.getDelayedPersist().queue(this, -1); } return n1; @@ -554,10 +556,15 @@ public JDFDoc getDocJMFPhaseTime() public void removeListener(final StatusListener statusListener) { multiCounter.removeModule(statusListener.getStatusCounter()); + if (EnumDeviceStatus.Idle.equals(multiCounter.getDeviceStatus())) + { + theCounter.setActiveNode(null, null, null); + } } public void addListener(final StatusListener statusListener) { multiCounter.addModule(statusListener.getStatusCounter()); } + } diff --git a/src/main/java/org/cip4/bambi/core/messaging/MessageSender.java b/src/main/java/org/cip4/bambi/core/messaging/MessageSender.java index 27633d09..a0bccaa8 100644 --- a/src/main/java/org/cip4/bambi/core/messaging/MessageSender.java +++ b/src/main/java/org/cip4/bambi/core/messaging/MessageSender.java @@ -911,10 +911,10 @@ HttpURLConnection sendDetails(final MessageDetails messageDetails) throws IOExce } final HTTPDetails httpDetails = messageDetails.mimeDet == null ? null : messageDetails.mimeDet.httpDetails; - long t0 = System.currentTimeMillis(); + final long t0 = System.currentTimeMillis(); final UrlPart p = UrlUtil.writeToURL(url, ByteArrayIOStream.getBufferedInputStream(is), UrlUtil.POST, contentType, httpDetails); - int rc = UrlPart.getReturnCode(p); - long t1 = System.currentTimeMillis(); + final int rc = UrlPart.getReturnCode(p); + final long t1 = System.currentTimeMillis(); if (!UrlPart.isReturnCodeOK(p)) { log.warn("Flaky RC " + rc + " in JMF response to " + url); @@ -1043,7 +1043,7 @@ public boolean queuePost(final IResponseHandler responseHandler, final String ur return queueMessageDetails(messageDetails); } - private boolean queueMessageDetails(final MessageDetails messageDetails) + boolean queueMessageDetails(final MessageDetails messageDetails) { if (waitKaputt && messageDetails.isFireForget()) { @@ -1065,28 +1065,27 @@ private boolean queueMessageDetails(final MessageDetails messageDetails) senderQueueOptimizer.optimize(messageDetails.jmf); } - synchronized (messageFiFo) + if (jmfFactory.isLogLots()) { - if (jmfFactory.isLogLots()) + String textInfo = "queued " + messageDetails.getName() + " #" + sent + " to " + messageDetails.url; + if (messageFiFo.size() > 0) { - String textInfo = "queued " + messageDetails.getName() + " #" + sent + " to " + messageDetails.url; - if (messageFiFo.size() > 0) - { - textInfo += " size=" + messageFiFo.size(); - } - log.info(textInfo); + textInfo += " size=" + messageFiFo.size(); + } + log.info(textInfo); + } + + messageFiFo.add(messageDetails); + + if (messageFiFo.size() >= 1000) + { + if ((messageFiFo.size() % 100) == 0) + { + log.warn("queueing message into blocked sender to " + callURL + " size=" + messageFiFo.size()); } - messageFiFo.add(messageDetails); - if (messageFiFo.size() >= 1000) + else if (jmfFactory.isLogLots()) { - if ((messageFiFo.size() % 100) == 0) - { - log.warn("queueing message into blocked sender to " + callURL + " size=" + messageFiFo.size()); - } - else if (jmfFactory.isLogLots()) - { - log.info("queueing message into blocked sender to " + callURL + " size=" + messageFiFo.size()); - } + log.info("queueing message into blocked sender to " + callURL + " size=" + messageFiFo.size()); } } if (!isPaused) diff --git a/src/main/java/org/cip4/bambi/core/messaging/MsgSubscription.java b/src/main/java/org/cip4/bambi/core/messaging/MsgSubscription.java index 9d097b5d..9e399000 100644 --- a/src/main/java/org/cip4/bambi/core/messaging/MsgSubscription.java +++ b/src/main/java/org/cip4/bambi/core/messaging/MsgSubscription.java @@ -263,12 +263,12 @@ private JDFJMF filterSenders(final JDFJMF jmfOut) return null; } final VElement v = jmfOut.getMessageVector(EnumFamily.Signal, null); - final int siz = v == null ? 0 : v.size(); - if (siz == 0 || v == null) + final int siz = ContainerUtil.size(v); + if (siz == 0) { return null; } - for (int i = siz - 1; i >= 0; i--) + for (int i = 0; i < siz; i++) { final JDFSignal s = (JDFSignal) v.get(i); if (!StringUtil.matchesSimple(s.getSenderID(), jmfDeviceID) || signalDispatcher.device.deleteSignal(s)) diff --git a/src/test/java/org/cip4/bambi/core/AbstractDeviceTest.java b/src/test/java/org/cip4/bambi/core/AbstractDeviceTest.java index f8b70c4e..93c37d2e 100644 --- a/src/test/java/org/cip4/bambi/core/AbstractDeviceTest.java +++ b/src/test/java/org/cip4/bambi/core/AbstractDeviceTest.java @@ -270,8 +270,9 @@ public void testDoSynchMulti() throws Exception root.setJobID("J" + i % 7); final QueueEntry qee = new QueueEntry(root, qe); assertTrue(device.doSynchronous(qee)); + ThreadUtil.sleep(1); } - for (int i = 0; i < 1234; i++) + for (int i = 0; i < 4234; i++) { if (queueProcessor.getQueue().numEntries(EnumQueueEntryStatus.Waiting) > 0) { @@ -301,8 +302,9 @@ public void testDoSynchMulti4() throws Exception root.setJobID("J" + i % 7); final QueueEntry qee = new QueueEntry(root, qe); assertTrue(device.doSynchronous(qee)); + ThreadUtil.sleep(1); } - for (int i = 0; i < 1234; i++) + for (int i = 0; i < 4234; i++) { if (queueProcessor.getQueue().numEntries(EnumQueueEntryStatus.Waiting) > 0) { @@ -332,8 +334,9 @@ public void testDoAsynchMulti() throws Exception root.setJobID("J" + i % 7); final QueueEntry qee = new QueueEntry(root, qe); assertTrue(device.doSynchronous(qee)); + ThreadUtil.sleep(1); } - for (int i = 0; i < 1234; i++) + for (int i = 0; i < 4234; i++) { if (queueProcessor.getQueue().numEntries(EnumQueueEntryStatus.Waiting) > 0) { diff --git a/src/test/java/org/cip4/bambi/core/StatusListenerTest.java b/src/test/java/org/cip4/bambi/core/StatusListenerTest.java index d0c76b99..4a97544b 100644 --- a/src/test/java/org/cip4/bambi/core/StatusListenerTest.java +++ b/src/test/java/org/cip4/bambi/core/StatusListenerTest.java @@ -3,7 +3,7 @@ * The CIP4 Software License, Version 1.0 * * - * Copyright (c) 2001-2023 The International Cooperation for the Integration of + * Copyright (c) 2001-2024 The International Cooperation for the Integration of * Processes in Prepress, Press and Postpress (CIP4). All rights * reserved. * @@ -86,7 +86,7 @@ public class StatusListenerTest extends BambiTestCaseBase @Test public void testCreate() { - StatusListener sl = new StatusListener(null, null, null); + final StatusListener sl = new StatusListener(null, null, null); assertNotNull(sl); assertNotNull(sl.toString()); } @@ -94,12 +94,12 @@ public void testCreate() @Test public void testPersist() { - SignalDispatcher disp = Mockito.mock(SignalDispatcher.class); - StatusListener sl = new StatusListener(disp, null, null); + final SignalDispatcher disp = Mockito.mock(SignalDispatcher.class); + final StatusListener sl = new StatusListener(disp, null, null); sl.setWantPersist(false); assertFalse(sl.isWantPersist()); assertFalse(sl.persist()); - JDFNode createRoot = JDFNode.createRoot(); + final JDFNode createRoot = JDFNode.createRoot(); sl.setNode("a", createRoot, null, agentName); assertFalse(sl.persist()); sl.setWantPersist(true);