Skip to content

Commit

Permalink
Refact: StreamTask, OATask - order of processing/adaption on new/obso…
Browse files Browse the repository at this point in the history
…lete instances #988
  • Loading branch information
detlefarend committed May 22, 2024
1 parent 5498c47 commit 3a3ed74
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 134 deletions.
20 changes: 7 additions & 13 deletions src/mlpro/bf/streams/basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,6 @@ class StreamTask (Task):
C_PLOT_VALID_VIEWS = [ PlotSettings.C_VIEW_2D, PlotSettings.C_VIEW_3D, PlotSettings.C_VIEW_ND ]
C_PLOT_DEFAULT_VIEW = PlotSettings.C_VIEW_ND

C_PLOT_ND_XLABEL_INST = 'Instance index'
C_PLOT_ND_XLABEL_TIME = 'Time index'
C_PLOT_ND_YLABEL = 'Feature Data'

Expand Down Expand Up @@ -1012,8 +1011,7 @@ def _init_plot_nd(self, p_figure: Figure, p_settings: PlotSettings):

Task._init_plot_nd( self, p_figure=p_figure, p_settings=p_settings )

self._plot_nd_xlabel = self.C_PLOT_ND_XLABEL_INST
p_settings.axes.set_xlabel(self.C_PLOT_ND_XLABEL_INST)
p_settings.axes.set_xlabel(self.C_PLOT_ND_XLABEL_TIME)
p_settings.axes.set_ylabel(self.C_PLOT_ND_YLABEL)
p_settings.axes.grid(visible=True)
p_settings.axes.set_xlim(0,1)
Expand Down Expand Up @@ -1361,11 +1359,7 @@ def _update_plot_nd( self,

inst_ref = next(iter(p_inst.values()))[1]

# 2.1 Check whether x label needs to be changed to time index
if ( self._plot_nd_xlabel == self.C_PLOT_ND_XLABEL_INST ) and ( inst_ref.get_tstamp() is not None ):
p_settings.axes.set_xlabel(self.C_PLOT_ND_XLABEL_TIME)

# 2.2 Add plot for each feature
# 2.1 Add plot for each feature
self._plot_nd_plots = []
feature_space = inst_ref.get_feature_data().get_related_set()

Expand All @@ -1381,10 +1375,7 @@ def _update_plot_nd( self,


# 3 Update plot data
for inst_id, inst_entry in sorted(p_inst.items()):

inst_type = inst_entry[0]
inst = inst_entry[1]
for inst_id, (inst_type, inst) in sorted(p_inst.items()):

if inst_type == InstTypeNew:
self._plot_inst_ids.append(inst_id)
Expand Down Expand Up @@ -1412,7 +1403,10 @@ def _update_plot_nd( self,
except:
pass


if len(self._plot_nd_xdata)==0:
return


# 4 If buffer size is limited, remove obsolete data
if p_settings.data_horizon > 0:
num_del = max(0, len(self._plot_nd_xdata) - p_settings.data_horizon )
Expand Down
1 change: 0 additions & 1 deletion src/mlpro/bf/streams/tasks/windows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
from mlpro.bf.streams.tasks.windows.basics import Window
from mlpro.bf.streams.tasks.windows.ringbuffer import RingBuffer
89 changes: 42 additions & 47 deletions src/mlpro/bf/streams/tasks/windows/ringbuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def __init__(self,
self._statistics_enabled = p_enable_statistics or p_visualize
self._numeric_buffer:np.ndarray = None
self._numeric_features = []
self._raise_event_buffer_full = False
self._raise_event_data_removed = False
self._buffer_full = False

Expand All @@ -126,71 +125,67 @@ def _run(self, p_inst : InstDict ):
# 1 Main processing loop
for inst_id, (inst_type, inst) in sorted(inst.items()):

if inst_type == InstTypeNew:

# 1.1 A new instance is to be buffered
feature_value = inst.get_feature_data()
if inst_type != InstTypeNew:
# Obsolete instances need to be removed from the buffer (not yet implemented)
self.log(self.C_LOG_TYPE_W, 'Handling of obsolete data not yet implemented')
continue


# 1.1.1 Checking the numeric dimensions/features in Stream
if self._numeric_buffer is None and self._statistics_enabled:
for j in feature_value.get_dim_ids():
if feature_value.get_related_set().get_dim(j).get_base_set() in [Dimension.C_BASE_SET_N,
Dimension.C_BASE_SET_R,
Dimension.C_BASE_SET_Z]:
self._numeric_features.append(j)
# 1.1 A new instance is to be buffered
feature_value = inst.get_feature_data()

self._numeric_buffer = np.zeros((self.buffer_size, len(self._numeric_features)))

# 1.2 Checking the numeric dimensions/features in Stream
if self._numeric_buffer is None and self._statistics_enabled:
for j in feature_value.get_dim_ids():
if feature_value.get_related_set().get_dim(j).get_base_set() in [Dimension.C_BASE_SET_N,
Dimension.C_BASE_SET_R,
Dimension.C_BASE_SET_Z]:
self._numeric_features.append(j)

# 1.1.2 Internal ring buffer full?
if len(self._buffer) == self.buffer_size:
self._numeric_buffer = np.zeros((self.buffer_size, len(self._numeric_features)))

# The oldest instance is extracted from the buffer and forwarded
inst_del = self._buffer[self._buffer_pos]
p_inst[inst_del.id] = ( InstTypeDel, inst_del )

if not self._buffer_full:
self._raise_event_buffer_full = True
self._buffer_full = True
# 1.3 Internal ring buffer already filled?
if len(self._buffer) == self.buffer_size:

self._raise_event_data_removed = True
# The oldest instance is extracted from the buffer and forwarded
inst_del = self._buffer[self._buffer_pos]
p_inst[inst_del.id] = ( InstTypeDel, inst_del )
self._raise_event_data_removed = True


# 1.1.3 New instance is buffered
self._buffer[self._buffer_pos] = inst
# 1.4 New instance is buffered
self._buffer[self._buffer_pos] = inst


# 1.1.4 Update of internal statistics
if self._statistics_enabled:
self._numeric_buffer[self._buffer_pos] = [feature_value.get_value(k) for k in self._numeric_features]
# 1.5 Update of internal statistics
if self._statistics_enabled:
self._numeric_buffer[self._buffer_pos] = [feature_value.get_value(k) for k in self._numeric_features]


# 1.1.5 Increment of buffer position
self._buffer_pos = (self._buffer_pos + 1) % self.buffer_size
# 1.6 Increment of buffer position
self._buffer_pos = (self._buffer_pos + 1) % self.buffer_size


# 1.1.6 Raise events at the end of instance processing
if self._raise_event_buffer_full:
if self._delay:
for i in range(self.buffer_size):
inst_fwd = self._buffer[i]
p_inst[inst_fwd.id] = ( InstTypeNew, inst_fwd )
# 1.7 Raise events at the end of instance processing
if ( not self._buffer_full ) and ( len(self._buffer) == self.buffer_size ):
self._buffer_full = True

self._raise_event( p_event_id = self.C_EVENT_BUFFER_FULL,
p_event_object = Event( p_raising_object=self,
p_related_set=feature_value.get_related_set() ) )
self._raise_event_buffer_full = False
if self._delay:
for i in range(self.buffer_size):
inst_fwd = self._buffer[i]
p_inst[inst_fwd.id] = ( InstTypeNew, inst_fwd )

if self._raise_event_data_removed:
self._raise_event( p_event_id = self.C_EVENT_DATA_REMOVED,
p_event_object = Event( p_raising_object=self,
p_related_set=feature_value.get_related_set() ) )
self._raise_event( p_event_id = self.C_EVENT_BUFFER_FULL,
p_event_object = Event( p_raising_object=self,
p_related_set=feature_value.get_related_set() ) )

else:
# 1.2 Obsolete instances need to be removed from the buffer (not yet implemented)
self.log(self.C_LOG_TYPE_W, 'Handling of obsolete data not yet implemented')
pass

if self._raise_event_data_removed:
self._raise_event( p_event_id = self.C_EVENT_DATA_REMOVED,
p_event_object = Event( p_raising_object=self,
p_related_set=feature_value.get_related_set() ) )


## -------------------------------------------------------------------------------------------------
Expand Down
6 changes: 4 additions & 2 deletions test/howtos/bf/howto_bf_streams_110_stream_task_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def _setup(self, p_mode, p_visualize:bool, p_logging):

if __name__ == "__main__":
# 1.1 Parameters for demo mode
cycle_limit = 100
cycle_limit = 200
logging = Log.C_LOG_ALL
visualize = True

Expand All @@ -114,7 +114,9 @@ def _setup(self, p_mode, p_visualize:bool, p_logging):
myscenario.reset()

if __name__ == '__main__':
myscenario.init_plot()
myscenario.init_plot( p_plot_settings=PlotSettings( p_view=PlotSettings.C_VIEW_ND,
p_plot_horizon=100,
p_data_horizon=150) )
input('Press ENTER to start stream processing...')

myscenario.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from mlpro.oa.streams.tasks.boundarydetectors import *
from mlpro.oa.streams import *
from mlpro.bf.streams.streams import *
from mlpro.bf.streams.tasks import Window
from mlpro.bf.streams.tasks.windows import RingBuffer



Expand All @@ -57,10 +57,10 @@ def _setup(self, p_mode, p_ada: bool, p_visualize: bool, p_logging):
# 2 Set up the stream workflow

# 2.1 Creation of a tasks
task_window = Window( p_name = 'T1 - Window',
p_buffer_size=10,
p_visualize=p_visualize,
p_logging=p_logging )
task_window = RingBuffer( p_name = 'T1 - Window',
p_buffer_size=10,
p_visualize=p_visualize,
p_logging=p_logging )

task_norm = NormalizerZTransform( p_name='T2 - Z-transformation',
p_ada=p_ada,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@


from mlpro.bf.streams.streams import *
from mlpro.bf.streams.tasks import Window, Rearranger
from mlpro.bf.streams.tasks import RingBuffer, Rearranger
from mlpro.oa.streams import *


Expand Down Expand Up @@ -81,13 +81,13 @@ def _setup(self, p_mode, p_ada: bool, p_visualize: bool, p_logging):
workflow.add_task( p_task=task_rearranger )

# 2.2.2 Window to buffer some data
task_window = Window( p_buffer_size=50,
p_delay=True,
p_enable_statistics=True,
p_name='t2',
p_duplicate_data=True,
p_visualize=p_visualize,
p_logging=p_logging )
task_window = RingBuffer( p_buffer_size=50,
p_delay=True,
p_enable_statistics=True,
p_name='t2',
p_duplicate_data=True,
p_visualize=p_visualize,
p_logging=p_logging )

workflow.add_task(p_task=task_window, p_pred_tasks=[task_rearranger])

Expand All @@ -97,7 +97,7 @@ def _setup(self, p_mode, p_ada: bool, p_visualize: bool, p_logging):
p_visualize=p_visualize,
p_logging=p_logging )

task_window.register_event_handler( p_event_id=Window.C_EVENT_DATA_REMOVED, p_event_handler=task_bd.adapt_on_event )
task_window.register_event_handler( p_event_id=RingBuffer.C_EVENT_DATA_REMOVED, p_event_handler=task_bd.adapt_on_event )
workflow.add_task(p_task = task_bd, p_pred_tasks=[task_window])

# # 2.2.4 MinMax-Normalizer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from mlpro.bf.streams import *
from mlpro.bf.streams.streams import *
from mlpro.bf.streams.tasks import Window, Rearranger
from mlpro.bf.streams.tasks import RingBuffer, Rearranger
from mlpro.oa.streams import *


Expand Down Expand Up @@ -82,13 +82,13 @@ def _setup(self, p_mode, p_ada: bool, p_visualize: bool, p_logging):
workflow.add_task( p_task=task_rearranger )

# 2.2.2 Window to buffer some data
task_window = Window( p_buffer_size=50,
p_delay=True,
p_enable_statistics=True,
p_name='t2',
p_duplicate_data=True,
p_visualize=p_visualize,
p_logging=p_logging )
task_window = RingBuffer( p_buffer_size=50,
p_delay=True,
p_enable_statistics=True,
p_name='t2',
p_duplicate_data=True,
p_visualize=p_visualize,
p_logging=p_logging )

workflow.add_task(p_task=task_window, p_pred_tasks=[task_rearranger])

Expand All @@ -98,7 +98,7 @@ def _setup(self, p_mode, p_ada: bool, p_visualize: bool, p_logging):
p_visualize=True,
p_logging=p_logging )

task_window.register_event_handler( p_event_id=Window.C_EVENT_DATA_REMOVED, p_event_handler=task_bd.adapt_on_event )
task_window.register_event_handler( p_event_id=RingBuffer.C_EVENT_DATA_REMOVED, p_event_handler=task_bd.adapt_on_event )
workflow.add_task(p_task = task_bd, p_pred_tasks=[task_window])

# # 2.2.4 MinMax-Normalizer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from mlpro.bf.streams import *
from mlpro.bf.streams.streams import *
from mlpro.bf.streams.tasks import Window, Rearranger
from mlpro.bf.streams.tasks import RingBuffer, Rearranger
from mlpro.oa.streams import *
from mlpro.oa.streams.tasks import *

Expand Down Expand Up @@ -83,13 +83,13 @@ def _setup(self, p_mode, p_ada: bool, p_visualize: bool, p_logging):
workflow.add_task( p_task=task_rearranger )

# 2.2.2 Window to buffer some data
task_window = Window( p_buffer_size=50,
p_delay=True,
p_enable_statistics=True,
p_name='t2',
p_duplicate_data=True,
p_visualize=p_visualize,
p_logging=p_logging )
task_window = RingBuffer( p_buffer_size=50,
p_delay=True,
p_enable_statistics=True,
p_name='t2',
p_duplicate_data=True,
p_visualize=p_visualize,
p_logging=p_logging )

workflow.add_task(p_task=task_window, p_pred_tasks=[task_rearranger])

Expand All @@ -99,7 +99,7 @@ def _setup(self, p_mode, p_ada: bool, p_visualize: bool, p_logging):
p_visualize=True,
p_logging=p_logging )

task_window.register_event_handler( p_event_id=Window.C_EVENT_DATA_REMOVED, p_event_handler=task_bd.adapt_on_event )
task_window.register_event_handler( p_event_id=RingBuffer.C_EVENT_DATA_REMOVED, p_event_handler=task_bd.adapt_on_event )
workflow.add_task(p_task = task_bd, p_pred_tasks=[task_window])

# # 2.2.4 MinMax-Normalizer
Expand Down
Loading

0 comments on commit 3a3ed74

Please sign in to comment.