Skip to content

Commit

Permalink
bambi improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
rainer-prosi committed Nov 20, 2024
1 parent e39ae38 commit f443e56
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 56 deletions.
15 changes: 7 additions & 8 deletions src/main/java/org/cip4/bambi/core/AbstractDevice.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
package org.cip4.bambi.core;

import java.io.File;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Set;
Expand Down Expand Up @@ -1950,18 +1951,16 @@ protected boolean copyPhaseTimeFromCounter(final JDFResponse response, final JDF
final JDFResponse r = root.getResponse(i);
final JDFResponse rResp = respRoot.getCreateResponse(i);
final String id = rResp.getID();
rResp.setrefID(refID);
rResp.setAttributes(r);
rResp.setrefID(refID);
rResp.setID(id);
final VElement v = r.getChildElementVector(null, null);
if (v != null)
final Collection<KElement> v = r.getChildArray(null, null);
for (final KElement e : v)
{
for (final KElement e : v)
{
rResp.copyElement(e, null);
}
rResp.copyElement(e, null);
}
}

return true;
}

Expand Down Expand Up @@ -2220,7 +2219,7 @@ public boolean deleteSignal(final JDFSignal s)
for (final JDFDeviceInfo di : devInfos)
{
final EnumDeviceStatus stat = di.getDeviceStatus();
if (!EnumDeviceStatus.Idle.equals(stat) && !EnumDeviceStatus.Down.equals(stat))
if (!EnumDeviceStatus.Idle.equals(stat) && !EnumDeviceStatus.Down.equals(stat) || di.getJobPhase() != null)
{
idleCount = 0;
return false; // we have something that is a bit active
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ protected void abort()
*/
protected void reset()
{
_statusListener.signalStatus(EnumDeviceStatus.Idle, "JoIncomplete", EnumNodeStatus.Waiting, "job partially completed", true);
_statusListener.signalStatus(EnumDeviceStatus.Idle, "JobIncomplete", EnumNodeStatus.Waiting, "job partially completed", true);
}

/**
Expand All @@ -580,6 +580,7 @@ protected void reset()
*/
protected void complete()
{
_statusListener.setPercentComplete(100);
_statusListener.signalStatus(EnumDeviceStatus.Idle, "JobCompleted", EnumNodeStatus.Completed, "job completed successfully", true);
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/cip4/bambi/core/HTTPResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* The CIP4 Software License, Version 1.0
*
*
* Copyright (c) 2001-2020 The International Cooperation for the Integration of Processes in Prepress, Press and Postpress (CIP4). All rights reserved.
* Copyright (c) 2001-2024 The International Cooperation for the Integration of Processes in Prepress, Press and Postpress (CIP4). All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
*
Expand Down Expand Up @@ -297,7 +297,7 @@ public void setHttpRC(final int httpRC, final String notification)
/**
* @return the notification
*/
protected String getNotification()
public String getNotification()
{
return notification;
}
Expand Down
31 changes: 19 additions & 12 deletions src/main/java/org/cip4/bambi/core/StatusListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@
import org.cip4.jdflib.core.JDFElement.EnumNodeStatus;
import org.cip4.jdflib.core.KElement;
import org.cip4.jdflib.core.VString;
import org.cip4.jdflib.datatypes.JDFAttributeMap;
import org.cip4.jdflib.datatypes.VJDFAttributeMap;
import org.cip4.jdflib.jmf.JDFDeviceInfo;
import org.cip4.jdflib.jmf.JDFMessage;
import org.cip4.jdflib.jmf.JDFMessage.EnumType;
import org.cip4.jdflib.jmf.JDFQuery;
import org.cip4.jdflib.jmf.JDFResourceQuParams;
import org.cip4.jdflib.jmf.JDFResponse;
import org.cip4.jdflib.jmf.JDFStatusQuParams;
import org.cip4.jdflib.node.JDFNode;
import org.cip4.jdflib.node.NodeIdentifier;
Expand All @@ -75,6 +74,7 @@ public class StatusListener implements IPersistable
{

private SignalDispatcher dispatcher;

private SignalDispatcher rootDispatcher;
protected StatusCounter theCounter;
final MultiModuleStatusCounter multiCounter;
Expand Down Expand Up @@ -206,6 +206,7 @@ public void updateAmount(final String resID, final double good, final double was
theCounter.addPhase(resID, good, waste, true);
if (good + waste > 0)
{
theCounter.setPhase(null, null, null, null);
dispatcher.triggerQueueEntry(theCounter.getQueueEntryID(), theCounter.getNodeIDentifier(), (int) (good + waste), null);
}
DelayedPersist.getDelayedPersist().queue(this, 123456);
Expand All @@ -225,6 +226,7 @@ public void setPercentComplete(final double percent)
return;
}
theCounter.setPercentComplete(percent);
theCounter.setPhase(null, null, null, null);
DelayedPersist.getDelayedPersist().queue(this, 123456);
}

Expand Down Expand Up @@ -262,6 +264,7 @@ public void updateTotal(final String resID, final double amount, final boolean w
theCounter.setTotal(resID, amount, waste);
if (amount > 0)
{
theCounter.setPhase(null, null, null, null);
dispatcher.triggerQueueEntry(theCounter.getQueueEntryID(), theCounter.getNodeIDentifier(), (int) amount, null);
}
DelayedPersist.getDelayedPersist().queue(this, 123456);
Expand Down Expand Up @@ -316,6 +319,12 @@ public synchronized void setNode(final String queueEntryID, JDFNode node, final
theCounter.setTrackWaste("*", true); // always track waste
theCounter.setFirstRefID(trackResourceID);
theCounter.setQueueEntryID(queueEntryID);
if (node != null)
{
final VJDFAttributeMap partMapVector = node.getNodeInfoPartMapVector();
final JDFAttributeMap partMap = partMapVector == null ? null : partMapVector.getCommonMap();
theCounter.setPhase(node.getPartStatus(partMap, 1), node.getPartStatusDetails(partMap), EnumDeviceStatus.Running, node.getPartStatusDetails(partMap));
}
while (node != null)
{
dispatcher.addSubscriptions(node, queueEntryID);
Expand All @@ -326,7 +335,6 @@ public synchronized void setNode(final String queueEntryID, JDFNode node, final
/**
* save the currently active jdf
*
* @param timeSinceLast milliseconds time to leave between saves
*/
@Override
public boolean persist()
Expand All @@ -352,15 +360,7 @@ public boolean persist()
*/
public EnumDeviceStatus getDeviceStatus()
{
if (theCounter == null)
{
return EnumDeviceStatus.Idle;
}

final JDFDoc docJMF = theCounter.getDocJMFPhaseTime();
final JDFResponse r = docJMF == null ? null : docJMF.getJMFRoot().getResponse(-1);
final JDFDeviceInfo di = r == null ? null : r.getDeviceInfo(0);
return di == null ? EnumDeviceStatus.Idle : di.getDeviceStatus();
return multiCounter.getDeviceStatus();
}

/**
Expand Down Expand Up @@ -500,6 +500,7 @@ public boolean removeEmployee(final JDFEmployee employee)
final boolean b = theCounter.removeEmployee(employee);
if (b)
{
theCounter.setPhase(null, null, null, null);
DelayedPersist.getDelayedPersist().queue(this, -1);
}
return b;
Expand All @@ -519,6 +520,7 @@ public int addEmployee(final JDFEmployee employee)
final int n1 = theCounter.addEmployee(employee);
if (n1 != n0)
{
theCounter.setPhase(null, null, null, null);
DelayedPersist.getDelayedPersist().queue(this, -1);
}
return n1;
Expand Down Expand Up @@ -554,10 +556,15 @@ public JDFDoc getDocJMFPhaseTime()
public void removeListener(final StatusListener statusListener)
{
multiCounter.removeModule(statusListener.getStatusCounter());
if (EnumDeviceStatus.Idle.equals(multiCounter.getDeviceStatus()))
{
theCounter.setActiveNode(null, null, null);
}
}

public void addListener(final StatusListener statusListener)
{
multiCounter.addModule(statusListener.getStatusCounter());
}

}
43 changes: 21 additions & 22 deletions src/main/java/org/cip4/bambi/core/messaging/MessageSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -911,10 +911,10 @@ HttpURLConnection sendDetails(final MessageDetails messageDetails) throws IOExce
}

final HTTPDetails httpDetails = messageDetails.mimeDet == null ? null : messageDetails.mimeDet.httpDetails;
long t0 = System.currentTimeMillis();
final long t0 = System.currentTimeMillis();
final UrlPart p = UrlUtil.writeToURL(url, ByteArrayIOStream.getBufferedInputStream(is), UrlUtil.POST, contentType, httpDetails);
int rc = UrlPart.getReturnCode(p);
long t1 = System.currentTimeMillis();
final int rc = UrlPart.getReturnCode(p);
final long t1 = System.currentTimeMillis();
if (!UrlPart.isReturnCodeOK(p))
{
log.warn("Flaky RC " + rc + " in JMF response to " + url);
Expand Down Expand Up @@ -1043,7 +1043,7 @@ public boolean queuePost(final IResponseHandler responseHandler, final String ur
return queueMessageDetails(messageDetails);
}

private boolean queueMessageDetails(final MessageDetails messageDetails)
boolean queueMessageDetails(final MessageDetails messageDetails)
{
if (waitKaputt && messageDetails.isFireForget())
{
Expand All @@ -1065,28 +1065,27 @@ private boolean queueMessageDetails(final MessageDetails messageDetails)
senderQueueOptimizer.optimize(messageDetails.jmf);
}

synchronized (messageFiFo)
if (jmfFactory.isLogLots())
{
if (jmfFactory.isLogLots())
String textInfo = "queued " + messageDetails.getName() + " #" + sent + " to " + messageDetails.url;
if (messageFiFo.size() > 0)
{
String textInfo = "queued " + messageDetails.getName() + " #" + sent + " to " + messageDetails.url;
if (messageFiFo.size() > 0)
{
textInfo += " size=" + messageFiFo.size();
}
log.info(textInfo);
textInfo += " size=" + messageFiFo.size();
}
log.info(textInfo);
}

messageFiFo.add(messageDetails);

if (messageFiFo.size() >= 1000)
{
if ((messageFiFo.size() % 100) == 0)
{
log.warn("queueing message into blocked sender to " + callURL + " size=" + messageFiFo.size());
}
messageFiFo.add(messageDetails);
if (messageFiFo.size() >= 1000)
else if (jmfFactory.isLogLots())
{
if ((messageFiFo.size() % 100) == 0)
{
log.warn("queueing message into blocked sender to " + callURL + " size=" + messageFiFo.size());
}
else if (jmfFactory.isLogLots())
{
log.info("queueing message into blocked sender to " + callURL + " size=" + messageFiFo.size());
}
log.info("queueing message into blocked sender to " + callURL + " size=" + messageFiFo.size());
}
}
if (!isPaused)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,12 @@ private JDFJMF filterSenders(final JDFJMF jmfOut)
return null;
}
final VElement v = jmfOut.getMessageVector(EnumFamily.Signal, null);
final int siz = v == null ? 0 : v.size();
if (siz == 0 || v == null)
final int siz = ContainerUtil.size(v);
if (siz == 0)
{
return null;
}
for (int i = siz - 1; i >= 0; i--)
for (int i = 0; i < siz; i++)
{
final JDFSignal s = (JDFSignal) v.get(i);
if (!StringUtil.matchesSimple(s.getSenderID(), jmfDeviceID) || signalDispatcher.device.deleteSignal(s))
Expand Down
9 changes: 6 additions & 3 deletions src/test/java/org/cip4/bambi/core/AbstractDeviceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,9 @@ public void testDoSynchMulti() throws Exception
root.setJobID("J" + i % 7);
final QueueEntry qee = new QueueEntry(root, qe);
assertTrue(device.doSynchronous(qee));
ThreadUtil.sleep(1);
}
for (int i = 0; i < 1234; i++)
for (int i = 0; i < 4234; i++)
{
if (queueProcessor.getQueue().numEntries(EnumQueueEntryStatus.Waiting) > 0)
{
Expand Down Expand Up @@ -301,8 +302,9 @@ public void testDoSynchMulti4() throws Exception
root.setJobID("J" + i % 7);
final QueueEntry qee = new QueueEntry(root, qe);
assertTrue(device.doSynchronous(qee));
ThreadUtil.sleep(1);
}
for (int i = 0; i < 1234; i++)
for (int i = 0; i < 4234; i++)
{
if (queueProcessor.getQueue().numEntries(EnumQueueEntryStatus.Waiting) > 0)
{
Expand Down Expand Up @@ -332,8 +334,9 @@ public void testDoAsynchMulti() throws Exception
root.setJobID("J" + i % 7);
final QueueEntry qee = new QueueEntry(root, qe);
assertTrue(device.doSynchronous(qee));
ThreadUtil.sleep(1);
}
for (int i = 0; i < 1234; i++)
for (int i = 0; i < 4234; i++)
{
if (queueProcessor.getQueue().numEntries(EnumQueueEntryStatus.Waiting) > 0)
{
Expand Down
10 changes: 5 additions & 5 deletions src/test/java/org/cip4/bambi/core/StatusListenerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* The CIP4 Software License, Version 1.0
*
*
* Copyright (c) 2001-2023 The International Cooperation for the Integration of
* Copyright (c) 2001-2024 The International Cooperation for the Integration of
* Processes in Prepress, Press and Postpress (CIP4). All rights
* reserved.
*
Expand Down Expand Up @@ -86,20 +86,20 @@ public class StatusListenerTest extends BambiTestCaseBase
@Test
public void testCreate()
{
StatusListener sl = new StatusListener(null, null, null);
final StatusListener sl = new StatusListener(null, null, null);
assertNotNull(sl);
assertNotNull(sl.toString());
}

@Test
public void testPersist()
{
SignalDispatcher disp = Mockito.mock(SignalDispatcher.class);
StatusListener sl = new StatusListener(disp, null, null);
final SignalDispatcher disp = Mockito.mock(SignalDispatcher.class);
final StatusListener sl = new StatusListener(disp, null, null);
sl.setWantPersist(false);
assertFalse(sl.isWantPersist());
assertFalse(sl.persist());
JDFNode createRoot = JDFNode.createRoot();
final JDFNode createRoot = JDFNode.createRoot();
sl.setNode("a", createRoot, null, agentName);
assertFalse(sl.persist());
sl.setWantPersist(true);
Expand Down

0 comments on commit f443e56

Please sign in to comment.