Skip to content

Commit

Permalink
Refact: StreamTask, OATask - order of processing/adaption on new/obso…
Browse files Browse the repository at this point in the history
…lete instances #988
  • Loading branch information
detlefarend committed May 22, 2024
1 parent 81a7f0f commit 7405d78
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 36 deletions.
26 changes: 16 additions & 10 deletions src/mlpro/bf/mt.py
Original file line number Diff line number Diff line change
Expand Up @@ -960,16 +960,22 @@ def init_plot( self,
task_pos_x = 1
task_pos_y = 1
task_ax_id = 1
task_plot_settings = PlotSettings( p_view = ps.view,
p_axes = task_axes,
p_pos_x = task_pos_x,
p_pos_y = task_pos_y,
p_step_rate = ps.step_rate,
p_plot_depth = ps.plot_depth,
p_detail_level = ps.detail_level,
p_force_fg = ps.force_fg,
p_id=task_ax_id,
p_view_autoselect = ps.view_autoselect )
# task_plot_settings = PlotSettings( p_view = ps.view,
# p_axes = task_axes,
# p_pos_x = task_pos_x,
# p_pos_y = task_pos_y,
# p_step_rate = ps.step_rate,
# p_plot_depth = ps.plot_depth,
# p_detail_level = ps.detail_level,
# p_force_fg = ps.force_fg,
# p_id=task_ax_id,
# p_view_autoselect = ps.view_autoselect )
task_plot_settings = ps.copy()
task_plot_settings.axes = task_axes
task_plot_settings.pos_x = task_pos_x
task_plot_settings.pos_y = task_pos_y
task_plot_settings.id = task_ax_id


else:
# Task plots embedded in the predecessor/workflow figure/subplot
Expand Down
22 changes: 21 additions & 1 deletion src/mlpro/bf/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
## -- - parameter horizon replaced by plot_horizon with new default
## -- value 500
## -- - new parameter data_horizon with default value 1000
## -- 2024-05-22 2.13.0 DA New method PlotSettings.copy()
## -------------------------------------------------------------------------------------------------

"""
Ver. 2.12.0 (2024-05-21)
Ver. 2.13.0 (2024-05-22)
This module provides various classes related to data plotting.
"""


Expand Down Expand Up @@ -168,6 +170,24 @@ def __init__( self,
self.data_horizon = p_data_horizon


## -------------------------------------------------------------------------------------------------
def copy(self):
return self.__class__( p_view = self.view,
p_axes = self.axes,
p_pos_x = self.pos_x,
p_pos_y = self.pos_y,
p_size_x = self.size_x,
p_size_y = self.size_y,
p_step_rate = self.step_rate,
p_plot_horizon = self.plot_horizon,
p_plot_depth = self.plot_depth,
p_data_horizon = self.data_horizon,
p_force_fg = self.force_fg,
p_id = self.id,
p_view_autoselect = self.view_autoselect,
p_kwargs = self.kwargs )





Expand Down
29 changes: 12 additions & 17 deletions src/mlpro/bf/streams/tasks/rearranger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,23 @@
## -- 2022-12-14 1.0.1 DA Corrections
## -- 2022-12-16 1.0.2 DA Little refactoring
## -- 2022-12-19 1.0.3 DA New parameter p_duplicate_data
## -- 2024-05-22 1.1.0 DA Refactoring
## -------------------------------------------------------------------------------------------------

"""
Ver. 1.0.3 (2022-12-19)
Ver. 1.1.0 (2024-05-22)
This module provides a stream task class Rearranger to rearrange the feature and label space of
instances.
"""


from mlpro.bf.exceptions import *
from mlpro.bf.various import Log
from mlpro.bf.mt import Task
from mlpro.bf.math import Set, Element
from mlpro.bf.streams import Instance, StreamTask
from mlpro.bf.math import Element, Set
from mlpro.bf.streams import Instance, InstDict, StreamTask



Expand Down Expand Up @@ -192,24 +194,17 @@ def _rearrange(self, p_inst:Instance):


## -------------------------------------------------------------------------------------------------
def _run(self, p_inst_new: set, p_inst_del: set):
def _run(self, p_inst : InstDict):

# 1 Late preparation based on first incoming instance
if not self._prepared:
try:
inst = p_inst_new[0]
(inst_type, inst) = next(iter(p_inst.values()))
self._prepare_rearrangement(p_inst=inst)
self._prepared = True
except:
inst = p_inst_del[0]

self._prepare_rearrangement(p_inst=inst)
self._prepared = True


# 2 Rearrange new instances
for inst in p_inst_new:
self._rearrange(p_inst=inst)

return

# 3 Rearrange instances to be deleted
for inst in p_inst_del:
# 2 Rearrange new instances (order doesn't matter)
for (inst_type,inst) in p_inst.values():
self._rearrange(p_inst=inst)
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
## -- yyyy-mm-dd Ver. Auth. Description
## -- 2022-10-27 0.0.0 DA Creation
## -- 2022-12-14 1.0.0 DA First implementation
## -- 2024-05-22 1.1.0 DA Refactoring
## -------------------------------------------------------------------------------------------------

"""
Ver. 1.0.0 (2022-12-14)
Ver. 1.1.0 (2024-05-22)
This module demonstrates the principles of stream processing with MLPro. To this regard, a stream of
a stream provider is combined with a stream workflow to a stream scenario. The workflow consists of
Expand Down Expand Up @@ -46,7 +47,7 @@ class MyTask (StreamTask):
C_NAME = 'Custom'

## -------------------------------------------------------------------------------------------------
def _run(self, p_inst_new: list, p_inst_del: list):
def _run(self, p_inst : InstDict):
pass


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
## -- 2022-10-27 0.0.0 DA Creation
## -- 2022-12-14 1.0.0 DA First implementation
## -- 2023-02-07 1.0.1 SY Refactoring module name
## -- 2024-05-22 1.1.0 DA Refactoring
## -------------------------------------------------------------------------------------------------

"""
Ver. 1.0.1 (2023-02-07)
Ver. 1.1.0 (2024-05-22)
This module demonstrates the principles of stream processing with MLPro. To this regard, a stream of
a stream provider is combined with a stream workflow to a stream scenario. The workflow consists of
Expand Down Expand Up @@ -47,7 +48,7 @@ class MyTask (StreamTask):
C_NAME = 'Custom'

## -------------------------------------------------------------------------------------------------
def _run(self, p_inst_new: list, p_inst_del: list):
def _run(self, p_inst : InstDict):
pass


Expand Down
14 changes: 10 additions & 4 deletions test/howtos/bf/howto_bf_streams_113_stream_task_rearranger_nd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
## -- 2022-10-27 0.0.0 DA Creation
## -- 2022-12-14 1.0.0 DA First implementation
## -- 2023-02-07 1.0.1 SY Refactoring module name
## -- 2024-05-22 1.1.0 DA Refactoring
## -------------------------------------------------------------------------------------------------

"""
Ver. 1.0.1 (2023-02-07)
Ver. 1.1.0 (2024-05-22)
This module demonstrates the principles of stream processing with MLPro. To this regard, a stream of
a stream provider is combined with a stream workflow to a stream scenario. The workflow consists of
Expand Down Expand Up @@ -47,7 +48,7 @@ class MyTask (StreamTask):
C_NAME = 'Custom'

## -------------------------------------------------------------------------------------------------
def _run(self, p_inst_new: list, p_inst_del: list):
def _run(self, p_inst: InstDict):
pass


Expand Down Expand Up @@ -85,7 +86,7 @@ def _setup(self, p_mode, p_visualize: bool, p_logging):
labels_new = [ ( 'L', [ labels[0] ] ),
( 'F', features[4:6] ) ]

task_rearranger = Rearranger( p_name='t1',
task_rearranger = Rearranger( p_name='T1 - Rearranger',
p_range_max=Task.C_RANGE_THREAD,
p_visualize=p_visualize,
p_logging=p_logging,
Expand All @@ -95,7 +96,10 @@ def _setup(self, p_mode, p_visualize: bool, p_logging):
workflow.add_task( p_task=task_rearranger )

# 2.2 Set up and add an own custom task
task_custom = MyTask( p_name='t2', p_visualize=p_visualize, p_logging=logging )
task_custom = MyTask( p_name='T2 - My task',
p_visualize=p_visualize,
p_logging=logging )

workflow.add_task( p_task=task_custom, p_pred_tasks=[task_rearranger] )


Expand Down Expand Up @@ -131,6 +135,8 @@ def _setup(self, p_mode, p_visualize: bool, p_logging):

if __name__ == '__main__':
myscenario.init_plot( p_plot_settings=PlotSettings( p_view = PlotSettings.C_VIEW_ND,
p_plot_horizon = 50,
p_data_horizon = 100,
p_step_rate = 2 ) )
input('Press ENTER to start stream processing...')

Expand Down

0 comments on commit 7405d78

Please sign in to comment.