Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alfoa/hybrid model for batching and ensemble model #2322

Open
wants to merge 33 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1879ae4
moving ahead fixing hybrid model and ensemble model (and batching wit…
alfoa May 17, 2024
1b78cd1
added test
alfoa May 17, 2024
fafedbb
Update ravenframework/JobHandler.py
alfoa May 18, 2024
e8c57bb
identifiers
alfoa May 20, 2024
d18282a
Merge branch 'alfoa/hybrid_model_for_batching_and_ensemble_model' of …
alfoa May 20, 2024
b09eacc
Update ravenframework/Models/Model.py
alfoa May 20, 2024
7114f63
Added test
alfoa May 20, 2024
b131c8a
t push
alfoa May 20, 2024
2e94d8c
Apply suggestions from code review
alfoa May 20, 2024
a332bd0
Merge branch 'devel' into alfoa/hybrid_model_for_batching_and_ensembl…
alfoa May 20, 2024
648c163
updated addFinishedJob
alfoa May 20, 2024
135bcf7
identifier is added from the job runner
alfoa May 20, 2024
1084d1e
removed identifier factory...too many objects rely on reusing identif…
alfoa May 20, 2024
3269196
forgot model file
alfoa May 20, 2024
ab91508
inputs 'types' should be requested to the users and not guessed in th…
alfoa May 20, 2024
5230636
test local jobhandler in ensemble model
alfoa May 22, 2024
09d5b63
casting the batch solution from rlz into float is not necessary and c…
alfoa May 22, 2024
e648a1d
fix ensemble
alfoa Jun 24, 2024
d7cab4d
added option to run volume calc
alfoa Jun 26, 2024
2cf8dd2
addition of command separator option
alfoa Jun 26, 2024
9c4aede
updated serpent documentation
alfoa Jul 17, 2024
71bc89c
removed some comments
alfoa Jul 17, 2024
b25a176
added doc for command separator
alfoa Jul 17, 2024
7067497
addressed comments
alfoa Jul 17, 2024
40ad61b
jobhandler only if parallelstrategy is == 2
alfoa Jul 17, 2024
8d4a3f5
serpent
alfoa Jul 17, 2024
691547e
Merge branch 'devel' into alfoa/hybrid_model_for_batching_and_ensembl…
alfoa Jul 17, 2024
5527d23
fixed doc
alfoa Jul 18, 2024
c81ffea
Merge branch 'alfoa/hybrid_model_for_batching_and_ensemble_model' of …
alfoa Jul 18, 2024
76e91c3
Merge branch 'devel' into alfoa/hybrid_model_for_batching_and_ensembl…
alfoa Aug 26, 2024
64611a5
addressed Josh's and Congjians comments
alfoa Aug 26, 2024
5193b22
added property to set if an hybrid model/logical model or ensemble mo…
alfoa Aug 27, 2024
8113d33
Merge branch 'devel' into alfoa/hybrid_model_for_batching_and_ensembl…
alfoa Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion doc/user_manual/code_interfaces/serpent.tex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ \subsubsection{Models}
<!-- EOL parameters -->
<EOL target="absKeff">1.0</EOL>
<EOL target="impKeff">1.0</EOL>
<!-- Volume calculation -->
<volumeCalculation nPoints="1e9">True</volumeCalculation>
</Code>
</Models>
\end{lstlisting}
Expand Down Expand Up @@ -82,7 +84,10 @@ \subsubsection{Models}
\nb The output parser will generate an output variable called ``EOL\_\textit{target}'' that represents the time at which the
parameter ``target'' takes the value indicated in this node. For example, if ``target'' == absKeff and the value of the XML node is 1.0,
the variable ``EOL\_absKeff'' will contain the time (burnDays) at which the ``absKeff'' == 1.0.

\item \xmlNode{volumeCalculation}, \xmlDesc{bool, optional parameter}, true to activate the stochastic material volume calculation via Serpent ``checkvolumes'' command. If True, the execution of
SERPENT is always preceded with a volume calculation. If the user wants to use the volume calculation output file in its SERPENT template input, he needs to include
the filename in the SERPENT template input (e.g. include ``myInputFile.mvol'') value of the ``target'' at which the time (in burnup calculations) of end of life should be recorded.
The \xmlNode{volumeCalculation} node must contain the attribute \xmlAttr{nPoints}, which indicates the number of samples that needs to be used by SERPENT for the calculation of the volumes (e.g. 1e8).
\end{itemize}

\subsubsection{Files}
Expand Down
5 changes: 5 additions & 0 deletions doc/user_manual/model.tex
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,11 @@ \subsection{Code}
pre-executable to be used.
\nb Both absolute and relative path can be used. In addition, the relative path
to the working directory can also be used.
%
\item \xmlNode{commandSeparator} \xmlDesc{string enumerator, optional field} specifies the symbol to use to separate commands
in case of multiple commands to use in cascade to execute the code. Available are $\&\&$ , $\left | \right |$ and $;$.
\default{$\&\&$}

%
\item \aliasSystemDescription{Code}
%
Expand Down
15 changes: 14 additions & 1 deletion ravenframework/CodeInterfaceClasses/SERPENT/SerpentInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def __init__(self):
# in case of burnup calc, the interface can compute the time at which FOMs (e.g. keff) crosses
# a target. For example (default), we can compute the time (burnDays) at which absKeff crosses 1.0
self.eolTarget = {}
# volume calculation?
self.volumeCalc = False
self.nVolumePoints = None

def _findInputFile(self, inputFiles):
"""
Expand All @@ -75,6 +78,15 @@ def _readMoreXML(self,xmlNode):
@ In, xmlNode, xml.etree.ElementTree.Element, Xml element node
@ Out, None.
"""
preVolumeCalc = xmlNode.find("volumeCalculation")
if preVolumeCalc is not None:
self.volumeCalc = utils.interpretBoolean(preVolumeCalc.text)
nPoints = preVolumeCalc.attrib.get("nPoints")
if nPoints is not None:
self.nVolumePoints = utils.intConversion(utils.floatConversion(nPoints))
if self.nVolumePoints is None:
raise ValueError(self.printTag+' ERROR: "nPoints" attribute in <volumeCalculation> must be present (and integer) if <volumeCalculation> node is inputted')

eolNodes = xmlNode.findall("EOL")
for eolNode in eolNodes:
if eolNode is not None:
Expand Down Expand Up @@ -132,8 +144,9 @@ def generateCommand(self,inputFiles,executable,clargs=None, fargs=None, preExec=
addflags = clargs['text']
else:
addflags = ''

executeCommand = [('parallel',executable+' '+inputFile.getFilename()+' '+addflags)]
if self.volumeCalc:
executeCommand.insert(0, ('parallel',executable+' '+inputFile.getFilename()+f' -checkvolumes {self.nVolumePoints}') )
returnCommand = executeCommand, inputFile.getFilename()+"_res"
return returnCommand

Expand Down
76 changes: 75 additions & 1 deletion ravenframework/JobHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import copy
import sys
import threading
from random import randint
import socket
import re

Expand All @@ -47,6 +46,70 @@
# FIXME: Finished jobs can bog down the queue waiting for other objects to take
# them away. Can we shove them onto a different list and free up the job queue?

#class IdentifiersFactory(BaseType):
#"""
#Identifier Factory. This class contains the memory of identifiers used to execute
#JOBS in the job handler. The identifiers are removed from the Factory once out of
#scope (i.e. once the job is collected)
#"""
#def __init__(self, **kwargs):
#"""
#Constructor
#@ In, None
#@ Out, None
#"""
#super().__init__(**kwargs)
#self.__IDENTIFIERS_FACTORY = {} # {identifier:uniqueHandler}
#self.__counter = 0

#def __len__(self):
#"""
#length (number of identifiers)
#"""
#return len(self.__IDENTIFIERS_FACTORY)

#def addIdentifier(self, identifier: str, uniqueHandler: str | None) -> None:
#"""
#Add identifier in factory
#@ In, identifier, str, new identifier to add
#@ In, uniqueHandler, str, optional, the `uniqueHandler` if associated with this identifier
#@ Out, None
#"""
#if identifier in self.__IDENTIFIERS_FACTORY:
#self.raiseAnError(RuntimeError, f"Identifier {identifier} is still in use and cannot be re-used yet!")

#self.__IDENTIFIERS_FACTORY[identifier] = uniqueHandler
#self.__counter += 1

#def removeIdentifier(self, identifier: str) -> None:
#"""
#Remove identifier in factory
#@ In, identifier, str, new identifier to add
#@ Out, None
#"""
#if identifier not in self.__IDENTIFIERS_FACTORY:
#self.raiseAnError(RuntimeError, f"Identifier {identifier} is not present in identifier factory. It cannot be removed!")

#self.__IDENTIFIERS_FACTORY.pop(identifier)

#def checkIfIdentifierIsInUse(self, identifier: str) -> bool:
#"""
#This method is a utility method used to check if an identifier is in use.
#@ In, identifier, str, the identifier to check
#@ Out, checkIfIdentifierIsInUse, bool, is the Identifier in use?
#"""
#return identifier in list(self.__IDENTIFIERS_FACTORY.keys())

#def clear(self) -> None:
#"""
#Clear
#@ In, None
#@ Out, None
#"""
#self.__IDENTIFIERS_FACTORY = {}

#IDENTIFIERS_COLLECTOR = IdentifiersFactory()
alfoa marked this conversation as resolved.
Show resolved Hide resolved

class JobHandler(BaseType):
"""
JobHandler class. This handles the execution of any job in the RAVEN
Expand Down Expand Up @@ -134,6 +197,17 @@ def __setstate__(self, d):
self.__dict__.update(d)
self.__queueLock = threading.RLock()

def createCloneJobHandler(self):
"""
Method to create a clone of this JobHandler.
The clone is a copy of the jobhandler (initialized)
@ In, None
@ Out, clone, JobHandler, a clone of the curreny JobHandler
"""
clone = copy.deepcopy(self)
clone.terminateAll()
return clone

def applyRunInfo(self, runInfo):
"""
Allows access to the RunInfo data
Expand Down
16 changes: 12 additions & 4 deletions ravenframework/Models/Code.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class cls.
inputSpecification.addSub(InputData.parameterInputFactory("executable", contentType=InputTypes.StringType))
inputSpecification.addSub(InputData.parameterInputFactory("walltime", contentType=InputTypes.FloatType))
inputSpecification.addSub(InputData.parameterInputFactory("preexec", contentType=InputTypes.StringType))

inputSpecification.addSub(InputData.parameterInputFactory("commandSeparator", contentType=InputTypes.makeEnumType("commandSeparator",
wangcj05 marked this conversation as resolved.
Show resolved Hide resolved
"commandSeparatorType",
["&&","||",";"]), default="&&"))
## Begin command line arguments tag
ClargsInput = InputData.parameterInputFactory("clargs")

Expand Down Expand Up @@ -118,6 +120,8 @@ def __init__(self):
self.foundPreExec = True # True indicates the pre-executable is found, otherwise not found
self.maxWallTime = None # If set, this indicates the maximum CPU time a job can take.
self._ravenWorkingDir = None # RAVEN's working dir
self.commandSeparator = "&&" # command separator
self._isThereACode = True # it is a code

def applyRunInfo(self, runInfo):
"""
Expand Down Expand Up @@ -146,6 +150,8 @@ def _readMoreXML(self,xmlNode):
self.maxWallTime = child.value
if child.getName() =='preexec':
self.preExec = child.value
elif child.getName() =='commandSeparator':
self.commandSeparator = child.value
elif child.getName() == 'clargs':
argtype = child.parameterValues['type'] if 'type' in child.parameterValues else None
arg = child.parameterValues['arg'] if 'arg' in child.parameterValues else None
Expand Down Expand Up @@ -241,6 +247,7 @@ def _readMoreXML(self,xmlNode):
else:
self.foundExecutable = False
self.raiseAMessage('InterfaceCheck: ignored executable '+self.executable, 'ExceptedError')

if self.preExec is not None:
if '~' in self.preExec:
self.preExec = os.path.expanduser(self.preExec)
Expand Down Expand Up @@ -475,7 +482,7 @@ def findMsys():
self.raiseAWarning("Could not find msys in "+os.getcwd())
commandSplit = realExecutable + [executable] + commandSplit[1:]
return commandSplit
except PermissionError as e:
except PermissionError:
self.raiseAWarning("Permission denied to open executable ! Skipping!")
return origCommand

Expand Down Expand Up @@ -542,9 +549,10 @@ def evaluateSample(self, myInput, samplerType, kwargs):
elif runtype.lower() == 'serial':
commands.append(cmd)
else:
self.raiseAnError(IOError,'For execution command <'+cmd+'> the run type was neither "serial" nor "parallel"! Instead received: ',runtype,'\nPlease check the code interface.')
self.raiseAnError(IOError,'For execution command <'+cmd+'> the run type was neither "serial" nor "parallel"! Instead received: ',
runtype,'\nPlease check the code interface.')

command = ' && '.join(commands)+' '
command = f' {self.commandSeparator} '.join(commands)+' '

command = command.replace("%INDEX%",kwargs['INDEX'])
command = command.replace("%INDEX1%",kwargs['INDEX1'])
Expand Down
65 changes: 49 additions & 16 deletions ravenframework/Models/EnsembleModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from ..utils import utils, InputData
from ..utils.graphStructure import evaluateModelsOrder
from ..Runners import Error as rerror
from ..Runners.SharedMemoryRunner import InterruptibleThread
#Internal Modules End--------------------------------------------------------------------------------

class EnsembleModel(Dummy):
Expand Down Expand Up @@ -73,6 +74,8 @@ def __init__(self):
@ Out, None
"""
super().__init__()
self.localJobHandler = None # local jobhandler used in case of parallelStrategy == 2
self.localPollingThread = None # local jobhandler thread used in case of parallelStrategy == 2
self.modelsDictionary = {} # dictionary of models that are going to be assembled
# {'modelName':{'Input':[in1,in2,..,inN],'Output':[out1,out2,..,outN],'Instance':Instance}}
self.modelsInputDictionary = {} # to allow reusability of ensemble modes (similar in construction to self.modelsDictionary)
Expand Down Expand Up @@ -136,8 +139,6 @@ def localInputAndChecks(self,xmlNode):
self.raiseAnError(IOError, "Input XML node for Model" + modelName +" has not been inputted!")
if len(self.modelsInputDictionary[modelName].values()) > allowedEntriesLen:
self.raiseAnError(IOError, "TargetEvaluation, Input and metadataToTransfer XML blocks are the only XML sub-blocks allowed!")
if child.attrib['type'].strip() == "Code":
self.createWorkingDir = True
if child.tag == 'settings':
self.__readSettings(child)
if len(self.modelsInputDictionary.keys()) < 2:
Expand Down Expand Up @@ -244,8 +245,6 @@ def initialize(self,runInfo,inputs,initDict=None):
# collect the models
self.allOutputs = set()
for modelClass, modelType, modelName, modelInstance in self.assemblerDict['Model']:
if not isThereACode:
isThereACode = modelType == 'Code'
self.modelsDictionary[modelName]['Instance'] = modelInstance
inputInstancesForModel = []
for inputName in self.modelsInputDictionary[modelName]['Input']:
Expand All @@ -267,6 +266,9 @@ def initialize(self,runInfo,inputs,initDict=None):

# initialize model
self.modelsDictionary[modelName]['Instance'].initialize(runInfo,inputInstancesForModel,initDict)
if not isThereACode:
isThereACode = self.modelsDictionary[modelName]['Instance'].containsACode

# retrieve 'TargetEvaluation' DataObjects
targetEvaluation = self.retrieveObjectFromAssemblerDict('TargetEvaluation',self.modelsInputDictionary[modelName]['TargetEvaluation'], True)
# assert acceptable TargetEvaluation types are used
Expand All @@ -292,6 +294,7 @@ def initialize(self,runInfo,inputs,initDict=None):
# END loop to collect models
self.allOutputs = list(self.allOutputs)
if isThereACode:
self.createWorkingDir = True
# FIXME: LEAVE IT HERE...WE NEED TO MODIFY HOW THE CODE GET RUN INFO...IT NEEDS TO BE ENCAPSULATED
## collect some run info
## self.runInfoDict = runInfo
Expand Down Expand Up @@ -456,27 +459,39 @@ def collectOutput(self,finishedJob,output):
@ Out, None
"""
evaluation = finishedJob.getEvaluation()
outcomes, targetEvaluations, optionalOutputs = evaluation[1]

isPassthroughRunner = type(finishedJob).__name__ == 'PassthroughRunner'
alfoa marked this conversation as resolved.
Show resolved Hide resolved
if not isPassthroughRunner:
outcomes, targetEvaluations, optionalOutputs = evaluation[1]
else:
outcomes = evaluation
optionalOutputs = {}

joinedResponse = {}
joinedGeneralMetadata = {}
targetEvaluationNames = {}
optionalOutputNames = {}
joinedIndexMap = {} # collect all the index maps, then we can keep the ones we want?
for modelIn in self.modelsDictionary.keys():
targetEvaluationNames[self.modelsDictionary[modelIn]['TargetEvaluation']] = modelIn
# collect data
newIndexMap = outcomes[modelIn]['response'].get('_indexMap', None)
if newIndexMap:
joinedIndexMap.update(newIndexMap[0])
joinedResponse.update(outcomes[modelIn]['response'])
joinedGeneralMetadata.update(outcomes[modelIn]['general_metadata'])
if not isPassthroughRunner:
# collect data
newIndexMap = outcomes[modelIn]['response'].get('_indexMap', None)
if newIndexMap:
joinedIndexMap.update(newIndexMap[0])
joinedResponse.update(outcomes[modelIn]['response'])
joinedGeneralMetadata.update(outcomes[modelIn]['general_metadata'])
# collect the output of the STEP
optionalOutputNames.update({outName : modelIn for outName in self.modelsDictionary[modelIn]['OutputObject']})
if isPassthroughRunner:
optionalOutputs[modelIn] = outcomes
# the prefix is re-set here
joinedResponse['prefix'] = np.asarray([finishedJob.identifier])
if joinedIndexMap:
joinedResponse['_indexMap'] = np.atleast_1d(joinedIndexMap)

if not isPassthroughRunner:
joinedResponse['prefix'] = np.asarray([finishedJob.identifier])
if joinedIndexMap:
joinedResponse['_indexMap'] = np.atleast_1d(joinedIndexMap)
else:
joinedResponse = outcomes
if output.name not in optionalOutputNames:
if output.name not in targetEvaluationNames.keys():
# in the event a batch is run, the evaluations will be a dict as {'RAVEN_isBatch':True, 'realizations': [...]}
Expand Down Expand Up @@ -545,7 +560,13 @@ def submit(self,myInput,samplerType,jobHandler,**kwargs):
## works, we are unable to pass a member function as a job because the
## pp library loses track of what self is, so instead we call it from the
## class and pass self in as the first parameter

if self.localJobHandler is None and self.parallelStrategy == 2:
# create local clone of jobhandler
self.localJobHandler = jobHandler.createCloneJobHandler()
# start the job handler
self.localPollingThread = InterruptibleThread(target=self.localJobHandler.startLoop)
self.localPollingThread.daemon = True
self.localPollingThread.start()
nRuns = 1
batchMode = kwargs.get("batchMode", False)
if batchMode:
Expand All @@ -571,13 +592,15 @@ def submit(self,myInput,samplerType,jobHandler,**kwargs):
else:
# for parallel strategy 2, the ensemble model works as a step => it needs the jobHandler
kw['jobHandler'] = jobHandler
kw['jobHandler'] = self.localJobHandler
# for parallel strategy 2, we need to make sure that the batchMode is set to False in the inner runs since only the
# ensemble model evaluation should be batched (THIS IS REQUIRED because the CODE does not submit runs like the other models)
kw['batchMode'] = False
jobHandler.addClientJob((self, myInput, samplerType, kw), self.__class__.evaluateSample, prefix, metadata=metadata,
uniqueHandler=uniqueHandler,
groupInfo={'id': kwargs['batchInfo']['batchId'], 'size': nRuns} if batchMode else None)


def __retrieveDependentOutput(self,modelIn,listOfOutputs, typeOutputs):
"""
This method is aimed to retrieve the values of the output of the models on which the modelIn depends on
Expand Down Expand Up @@ -829,3 +852,13 @@ def __advanceModel(self, identifier, modelToExecute, origInputList, inputKwargs,
returnDict['general_metadata'] = inRunTargetEvaluations.getMeta(general=True)

return returnDict, gotOutputs, evaluation

def endStepActions(self):
"""
This method is intended for performing actions (within the EnsembleModel) at the end of a step
@ In, None
@ Out, None
"""
super().endStepActions()
if self.localPollingThread is not None:
self.localPollingThread.kill()
Loading