Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alfoa/hybrid model for batching and ensemble model #2322

Open
wants to merge 33 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1879ae4
moving ahead fixing hybrid model and ensemble model (and batching wit…
alfoa May 17, 2024
1b78cd1
added test
alfoa May 17, 2024
fafedbb
Update ravenframework/JobHandler.py
alfoa May 18, 2024
e8c57bb
identifiers
alfoa May 20, 2024
d18282a
Merge branch 'alfoa/hybrid_model_for_batching_and_ensemble_model' of …
alfoa May 20, 2024
b09eacc
Update ravenframework/Models/Model.py
alfoa May 20, 2024
7114f63
Added test
alfoa May 20, 2024
b131c8a
t push
alfoa May 20, 2024
2e94d8c
Apply suggestions from code review
alfoa May 20, 2024
a332bd0
Merge branch 'devel' into alfoa/hybrid_model_for_batching_and_ensembl…
alfoa May 20, 2024
648c163
updated addFinishedJob
alfoa May 20, 2024
135bcf7
identifier is added from the job runner
alfoa May 20, 2024
1084d1e
removed identifier factory...too many objects rely on reusing identif…
alfoa May 20, 2024
3269196
forgot model file
alfoa May 20, 2024
ab91508
inputs 'types' should be requested to the users and not guessed in th…
alfoa May 20, 2024
5230636
test local jobhandler in ensemble model
alfoa May 22, 2024
09d5b63
casting the batch solution from rlz into float is not necessary and c…
alfoa May 22, 2024
e648a1d
fix ensemble
alfoa Jun 24, 2024
d7cab4d
added option to run volume calc
alfoa Jun 26, 2024
2cf8dd2
addition of command separator option
alfoa Jun 26, 2024
9c4aede
updated serpent documentation
alfoa Jul 17, 2024
71bc89c
removed some comments
alfoa Jul 17, 2024
b25a176
added doc for command separator
alfoa Jul 17, 2024
7067497
addressed comments
alfoa Jul 17, 2024
40ad61b
jobhandler only if parallelstrategy is == 2
alfoa Jul 17, 2024
8d4a3f5
serpent
alfoa Jul 17, 2024
691547e
Merge branch 'devel' into alfoa/hybrid_model_for_batching_and_ensembl…
alfoa Jul 17, 2024
5527d23
fixed doc
alfoa Jul 18, 2024
c81ffea
Merge branch 'alfoa/hybrid_model_for_batching_and_ensemble_model' of …
alfoa Jul 18, 2024
76e91c3
Merge branch 'devel' into alfoa/hybrid_model_for_batching_and_ensembl…
alfoa Aug 26, 2024
64611a5
addressed Josh's and Congjians comments
alfoa Aug 26, 2024
5193b22
added property to set if an hybrid model/logical model or ensemble mo…
alfoa Aug 27, 2024
8113d33
Merge branch 'devel' into alfoa/hybrid_model_for_batching_and_ensembl…
alfoa Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions ravenframework/JobHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import copy
import sys
import threading
from random import randint
import socket
import re

Expand All @@ -47,6 +46,69 @@
# FIXME: Finished jobs can bog down the queue waiting for other objects to take
# them away. Can we shove them onto a different list and free up the job queue?

class IdentifiersFactory(BaseType):
alfoa marked this conversation as resolved.
Show resolved Hide resolved
"""
Identifier Factory. This class contains the memory of identifiers used to execute
JOBS in the job handler. The identifiers are removed from the Factory once out of
scope (i.e. once the job is collected)
"""
def __init__(self):
"""
Constructor
@ In, None
@ Out, None
"""
self.__IDENTIFIERS_FACTORY = {} # {identifier:uniqueHandler}
self.__counter = 0

def __len__(self):
"""
length (number of identifiers)
"""
return len(self.__IDENTIFIERS_FACTORY)

def addIdentifier(self, identifier: str, uniqueHandler: str | None) -> None:
"""
Add identifier in factory
@ In, identifier, str, new identifier to add
@ In, uniqueHandler, str, optional, the `uniqueHandler` if associated with this identifier
@ Out, None
"""
if identifier in self.__IDENTIFIERS_FACTORY:
self.raiseAnError(RuntimeError, f"Identifier {identifier} is still in use and cannot be re-used yet!")

self.__IDENTIFIERS_FACTORY[identifier] = uniqueHandler
self.__counter += 1

def removeIdentifier(self, identifier: str) -> None:
"""
Remove identifier in factory
@ In, identifier, str, new identifier to add
@ Out, None
"""
if identifier not in self.__IDENTIFIERS_FACTORY:
self.raiseAnError(RuntimeError, f"Identifier {identifier} is not present in identifier factory. It cannot be removed!")

self.__IDENTIFIERS_FACTORY.pop(identifier)

def checkIfIdentifierIsInUse(self, identifier: str) -> bool:
"""
This method is a utility method used to check if an identifier is in use.
@ In, identifier, str, the identifier to check
@ Out, checkIfIdentifierIsInUse, bool, is the Identifier in use?
"""
return identifier in list(self.__IDENTIFIERS_FACTORY.keys())

def clear(self) -> None:
"""
Clear
@ In, None
@ Out, None
"""
self.__IDENTIFIERS_FACTORY = {}

IDENTIFIERS_COLLECTOR = IdentifiersFactory()

class JobHandler(BaseType):
"""
JobHandler class. This handles the execution of any job in the RAVEN
Expand Down Expand Up @@ -162,7 +224,7 @@ def initialize(self):
# initialize PBS
with self.__queueLock:
self.__running = [None]*self.runInfoDict['batchSize']
self.__clientRunning = [None]*self.runInfoDict['batchSize']
self.__clientRunning = [None]*self.runInfoDict['batchSize'] * 2
alfoa marked this conversation as resolved.
Show resolved Hide resolved
self._parallelLib = ParallelLibEnum.shared
if self.runInfoDict['parallelMethod'] is not None and self.runInfoDict['parallelMethod'] != ParallelLibEnum.distributed:
self._parallelLib = self.runInfoDict['parallelMethod']
Expand Down Expand Up @@ -640,6 +702,7 @@ def addJob(self, args, functionToRun, identifier, metadata=None, forceUseThreads
clientQueue
@ Out, None
"""
global IDENTIFIERS_COLLECTOR
assert "original_function" in dir(functionToRun), "to parallelize a function, it must be" \
" decorated with RAVEN Parallel decorator"
if self._server is None or forceUseThreads:
Expand Down Expand Up @@ -686,6 +749,8 @@ def addJob(self, args, functionToRun, identifier, metadata=None, forceUseThreads
self.__batching[groupId]["ids"].append(identifier)
# add the runner in the Queue
self.reAddJob(internalJob)
# update identifier factory
IDENTIFIERS_COLLECTOR.addIdentifier(identifier, uniqueHandler)

def reAddJob(self, runner):
"""
Expand Down Expand Up @@ -912,6 +977,7 @@ def getFinished(self, removeFinished=True, jobIdentifier='', uniqueHandler="any"
finished = [job1, job2, [job3.1, job3.2], job4 ] (job3.1/3.2 belong to the same groupID)
or [job1, job2, job3, job4]
"""
global IDENTIFIERS_COLLECTOR
# If the user does not specify a jobIdentifier, then set it to the empty
# string because every job will match this starting string.
if jobIdentifier is None:
Expand Down Expand Up @@ -955,6 +1021,7 @@ def getFinished(self, removeFinished=True, jobIdentifier='', uniqueHandler="any"
if removeFinished:
for i in reversed(runsToBeRemoved):
self.__finished[i].trackTime('collected')
IDENTIFIERS_COLLECTOR.removeIdentifier(self.__finished[i].identifier)
del self.__finished[i]

# end with self.__queueLock
Expand Down Expand Up @@ -1140,8 +1207,10 @@ def shutdown(self):
@ In, None
@ Out, None
"""
global IDENTIFIERS_COLLECTOR
self.completed = True
self.__shutdownParallel()
IDENTIFIERS_COLLECTOR.clear()

def terminateAll(self):
"""
Expand Down
9 changes: 8 additions & 1 deletion ravenframework/Models/EnsembleModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def localInputAndChecks(self,xmlNode):
self.raiseAnError(IOError, "Input XML node for Model" + modelName +" has not been inputted!")
if len(self.modelsInputDictionary[modelName].values()) > allowedEntriesLen:
self.raiseAnError(IOError, "TargetEvaluation, Input and metadataToTransfer XML blocks are the only XML sub-blocks allowed!")
if child.attrib['type'].strip() == "Code":
if child.attrib['type'].strip() in ["Code", 'HybridModel', 'LogicalModel']:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very ugly :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be slightly less ugly if you used a set:

       if child.attrib['type'].strip() in {"Code", 'HybridModel', 'LogicalModel'}:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self.createWorkingDir = True
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is ugly and in addition this creates a sub-working directory even if, for example, the Logical/hybrid models do not use a Code. In case of HybridModel/LogicalModel using only ExternalModels/ROMs, the subdirectory is created but will stay empty. Not very elegant. @joshua-cogliati-inl @wangcj05 any ideas on how to improve this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangcj05 @joshua-cogliati-inl any ideas for this? I cannot find a better solution at this stage

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a class attribute to indicate if there is a code associated with the Model? For example, In Hybrid Model/Logical/Ensemble Model, we define a self._isCodeAvail, and assign it to true when we detect a code in the Model. @alfoa

Copy link
Contributor

@joshua-cogliati-inl joshua-cogliati-inl Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I am trying to fully understand why you need to create the directory? Just to check, it is needed if the Logical/hybrid models use a Code? (Congjian's idea sounds reasonable)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's why. If there is a Code in the underlying Logical/Hybrid model (contained in the ensemble model) the subfolder is required.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangcj05 @joshua-cogliati-inl can you tell me how exactly you would like that flag to be coded? (I prefer not to take a code design decision (that might be needed to be modified) on my own)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangcj05 @joshua-cogliati-inl FY: if you can send feedbacks within tomorrow I can try to address them before leaving on Friday. Otherwise it will need to wait till September.

if child.tag == 'settings':
self.__readSettings(child)
Expand Down Expand Up @@ -246,6 +246,7 @@ def initialize(self,runInfo,inputs,initDict=None):
for modelClass, modelType, modelName, modelInstance in self.assemblerDict['Model']:
if not isThereACode:
isThereACode = modelType == 'Code'

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest we add these lines in Logical model and Hybrid model.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I addressed this as well.

self.modelsDictionary[modelName]['Instance'] = modelInstance
inputInstancesForModel = []
for inputName in self.modelsInputDictionary[modelName]['Input']:
Expand All @@ -267,6 +268,12 @@ def initialize(self,runInfo,inputs,initDict=None):

# initialize model
self.modelsDictionary[modelName]['Instance'].initialize(runInfo,inputInstancesForModel,initDict)
if modelType in ['HybridModel', 'LogicalModel']:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe should be replaced by issubclass(self.modelsDictionary[modelName]['Instance'], HybridModelBase)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you suggestion will be better here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for submodelInst in self.modelsDictionary[modelName]['Instance'].modelInstances.values():
if not isThereACode:
isThereACode = submodelInst.type == 'Code'


# retrieve 'TargetEvaluation' DataObjects
targetEvaluation = self.retrieveObjectFromAssemblerDict('TargetEvaluation',self.modelsInputDictionary[modelName]['TargetEvaluation'], True)
# assert acceptable TargetEvaluation types are used
Expand Down
33 changes: 29 additions & 4 deletions ravenframework/Models/HybridModels/HybridModelBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,32 @@ def submit(self,myInput,samplerType,jobHandler,**kwargs):
## Hybrid models need access to the job handler, so let's stuff it in our
## catch all kwargs where evaluateSample can pick it up, not great, but
## will suffice until we can better redesign this whole process.
prefix = kwargs['prefix']
kwargs['jobHandler'] = jobHandler
jobHandler.addClientJob((self, myInput, samplerType, kwargs), self.__class__.evaluateSample, prefix, kwargs)

nRuns = 1
alfoa marked this conversation as resolved.
Show resolved Hide resolved
batchMode = kwargs.get("batchMode", False)
if batchMode:
nRuns = kwargs["batchInfo"]['nRuns']

for index in range(nRuns):
if batchMode:
kw = kwargs['batchInfo']['batchRealizations'][index]
kw['batchMode'] = False
else:
kw = kwargs

kw['jobHandler'] = jobHandler

prefix = kw.get("prefix")
uniqueHandler = kw.get("uniqueHandler",'any')
## These kw are updated by createNewInput, so the job either should not
## have access to the metadata, or it needs to be updated from within the
## evaluateSample function, which currently is not possible since that
## function does not know about the job instance.
metadata = kw

jobHandler.addClientJob((self, myInput, samplerType, kw), self.__class__.evaluateSample, prefix, metadata=metadata,
uniqueHandler=uniqueHandler,
groupInfo={'id': kwargs['batchInfo']['batchId'], 'size': nRuns} if batchMode else None)

@Parallel()
def evaluateSample(self, myInput, samplerType, kwargs):
Expand All @@ -187,7 +210,9 @@ def evaluateSample(self, myInput, samplerType, kwargs):
# assure rlz has all metadata
rlz = dict((var,np.atleast_1d(kwargsToKeep[var])) for var in kwargsToKeep.keys())
# update rlz with input space from inRun and output space from result
rlz.update(dict((var,np.atleast_1d(kwargsToKeep['SampledVars'][var] if var in kwargs['SampledVars'] else result[var])) for var in set(itertools.chain(result.keys(),kwargsToKeep['SampledVars'].keys()))))
rlz.update(dict((var,np.atleast_1d(kwargsToKeep['SampledVars'][var]
if var in kwargs['SampledVars'] else result[var]))
for var in set(itertools.chain(result.keys(),kwargsToKeep['SampledVars'].keys()))))
return rlz

@abc.abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion ravenframework/Models/HybridModels/LogicalModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def _externalRun(self, inRun, jobHandler):
# TODO: execute control function, move this to createNewInput
modelToRun = inputKwargs.pop('modelToRun')
inputKwargs['prefix'] = modelToRun + utils.returnIdSeparator() + identifier
inputKwargs['uniqueHandler'] = self.name + identifier
inputKwargs['uniqueHandler'] = self.name + utils.returnIdSeparator() + identifier

moveOn = False
while not moveOn:
Expand Down
8 changes: 8 additions & 0 deletions ravenframework/Models/Model.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,11 @@ def getSerializationFiles(self):
serializationFiles = set()
return serializationFiles

def constructModelBasedIdentifier(self, jobId):
"""

@ In, jobId, str, jobId (the default identifier coming from Samplers/Optimizers)
@ Out, new, set, set of filenames that are needed
"""
serializationFiles = set()
return serializationFiles
alfoa marked this conversation as resolved.
Show resolved Hide resolved
138 changes: 138 additions & 0 deletions tests/framework/hybridModel/test_logical_ensemble_model_and_code.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
<?xml version="1.0" ?>
<Simulation verbosity="debug">
<TestInfo>
<name>abcd</name>
<author>abcd</author>
alfoa marked this conversation as resolved.
Show resolved Hide resolved
<created>2020-05-07</created>
<classesTested>Models.LogicalModel</classesTested>
<description>

alfoa marked this conversation as resolved.
Show resolved Hide resolved
</description>
</TestInfo>

<RunInfo>
<JobName>testLogicalCodeAndExtModelInEnsembleModel</JobName>
<Sequence>logicalAndExtModel</Sequence>
<WorkingDir>logicalModelCodeAndExtModelInEnsembleModel</WorkingDir>
<batchSize>1</batchSize>
</RunInfo>

<Files>
<Input name="gen.one" type="">/Users/aalfonsi/projects/raven/tests/framework/hybridModel/logicalCode/gen.one</Input>
<Input name="gen.two" type="">/Users/aalfonsi/projects/raven/tests/framework/hybridModel/logicalCode/gen.two</Input>
</Files>

<Functions>
<External file="/Users/aalfonsi/projects/raven/tests/framework/hybridModel/logicalCode/control" name="control">
<variables>x, y</variables>
</External>
</Functions>

<Models>
<Code name="poly" subType="GenericCode">
<executable>/Users/aalfonsi/projects/raven/tests/framework/hybridModel/logicalCode/poly_code.py</executable>
<clargs arg="python" type="prepend"/>
<clargs arg="-i" extension=".one" type="input"/>
<fileargs arg="aux" extension=".two" type="input"/>
<fileargs arg="output" type="output"/>
</Code>
<Code name="exp" subType="GenericCode">
<executable>/Users/aalfonsi/projects/raven/tests/framework/hybridModel/logicalCode/exp_code.py</executable>
<clargs arg="python" type="prepend"/>
<clargs arg="-i" extension=".one" type="input"/>
<fileargs arg="aux" extension=".two" type="input"/>
<fileargs arg="output" type="output"/>
</Code>
<LogicalModel name="logical" subType="">
<Model class="Models" type="Code">poly</Model>
<Model class="Models" type="Code">exp</Model>
<ControlFunction class="Functions" type="External">control</ControlFunction>
</LogicalModel>

<ExternalModel ModuleToLoad="steadyState" name="steadyState" subType="">
<inputs>x</inputs>
<outputs>y</outputs>
</ExternalModel>

<EnsembleModel name="ExternalModelAndLogical" subType="">
<Model class="Models" type="LogicalModel">
logical
<Input class="Files" type="">gen.one</Input>
<Input class="Files" type="">gen.two</Input>
<TargetEvaluation class="DataObjects" type="PointSet">samplesLogical</TargetEvaluation>
</Model>
<Model class="Models" type="ExternalModel">
steadyState
<Input class="DataObjects" type="PointSet">inputHolder</Input>
<TargetEvaluation class="DataObjects" type="PointSet">samplesSteadyState</TargetEvaluation>
</Model>
</EnsembleModel>


</Models>

<Distributions>
<Uniform name="xd">
<lowerBound>0.0</lowerBound>
<upperBound>1.0</upperBound>
</Uniform>
</Distributions>

<Samplers>
<Stratified name="LHS">
<variable name="x">
<distribution>xd</distribution>
<grid construction="equal" steps="15" type="CDF">0.3 0.9</grid>
</variable>
</Stratified>
</Samplers>

<Steps>
<MultiRun name="logicalAndExtModel">
<Input class="Files" type="">gen.one</Input>
<Input class="Files" type="">gen.two</Input>
<Input class="DataObjects" type="PointSet">inputHolder</Input>

<Model class="Models" type="EnsembleModel">ExternalModelAndLogical</Model>
<Sampler class="Samplers" type="Stratified">LHS</Sampler>

<Output class="DataObjects" type="PointSet">finalResults</Output>
<Output class="DataObjects" type="PointSet">samplesLogical</Output>
<Output class="OutStreams" type="Print">samplesLogical</Output>
<Output class="DataObjects" type="PointSet">samplesSteadyState</Output>
<Output class="OutStreams" type="Print">samplesSteadyState</Output>
</MultiRun>
</Steps>

<DataObjects>
<PointSet name="inputHolder"/>

<PointSet name="samplesSteadyState">
<Input>x</Input>
<Output>y</Output>
</PointSet>
<PointSet name="samplesLogical">
<Input>x,y</Input>
<Output>poly</Output>
</PointSet>

<PointSet name="finalResults">
<Input>x,y</Input>
<Output>poly</Output>
</PointSet>
</DataObjects>

<OutStreams>
<Print name="samplesLogical">
<type>csv</type>
<source>samplesLogical</source>
<what>input, output</what>
</Print>
<Print name="samplesSteadyState">
<type>csv</type>
<source>samplesSteadyState</source>
<what>input, output</what>
</Print>
</OutStreams>

</Simulation>