Skip to content

Commit

Permalink
Refact: StreamTask, OATask - order of processing/adaption on new/obso…
Browse files Browse the repository at this point in the history
…lete instances #988
  • Loading branch information
detlefarend committed May 21, 2024
1 parent d24280c commit cbe327f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 70 deletions.
12 changes: 3 additions & 9 deletions src/mlpro/bf/streams/basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,11 +1131,8 @@ def _update_plot_2d( self,
ymin = None
ymax = None

for inst_id, inst_entry in p_inst.items():

inst_type = inst_entry[0]
inst = inst_entry[1]

for inst_id, (inst_type, inst) in p_inst.items():

if inst_type == InstTypeNew:
feature_values = inst.get_feature_data().get_values()
x = feature_values[0]
Expand Down Expand Up @@ -1243,10 +1240,7 @@ def _update_plot_3d( self,
zmin = None
zmax = None

for inst_id, inst_entry in p_inst.items():

inst_type = inst_entry[0]
inst = inst_entry[1]
for inst_id, (inst_type, inst) in p_inst.items():

if inst_type == InstTypeNew:
feature_values = inst.get_feature_data().get_values()
Expand Down
113 changes: 60 additions & 53 deletions src/mlpro/bf/streams/tasks/windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,27 @@
## -- 2022-12-29 1.1.3 DA Removed method Window.init_plot()
## -- 2022-12-31 1.1.4 LSB Refactoring
## -- 2023-02-02 1.1.5 DA Methods Window._init_plot_*: removed figure creation
## -- 2024-05-21 1.2.0 DA Refactoring
## -------------------------------------------------------------------------------------------------

"""
Ver. 1.1.5 (2023-02-02)
Ver. 1.2.0 (2024-05-21)
This module provides pool of window objects further used in the context of online adaptivity.
"""


from matplotlib.axes import Axes
from mpl_toolkits.mplot3d import Axes3D
# from mpl_toolkits.mplot3d import Axes3D
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
from matplotlib.collections import PolyCollection
# from matplotlib.collections import PolyCollection
from matplotlib.patches import Rectangle
import matplotlib.pyplot as plt
# import matplotlib.pyplot as plt
import numpy as np
from mlpro.bf.streams.basics import *
from mlpro.bf.events import *
from typing import Union, List, Iterable
import matplotlib.colors as colors
# from typing import Union, List, Iterable
# import matplotlib.colors as colors



Expand Down Expand Up @@ -115,83 +116,89 @@ def __init__(self,


## -------------------------------------------------------------------------------------------------
def _run(self, p_inst_new:list, p_inst_del:list ):
def _run(self, p_inst : InstDict ):
"""
Method to run the window including adding and deleting of elements
Parameters
----------
p_inst_new : list
Instance/s to be added to the window
p_inst_del : list
Instance/s to be deleted from the window
p_inst : InstDict
Instances to be processed.
"""

# 0 Intro
new_inst_found = False
inst = p_inst.copy()
p_inst.clear()


# 1 Checking if there are new instances
if p_inst_new:
new_instances = p_inst_new.copy()
num_inst = len(new_instances)
for inst_id, (inst_type, inst) in sorted(inst.items()):

if inst_type != InstTypeNew: continue

for i in range(num_inst):
inst = new_instances[i]
new_inst_found = True

# new_instances = p_inst_new.copy()
# num_inst = len(new_instances)

# Compatibility with Instance/State
if isinstance(inst, Instance):
feature_value = inst.get_feature_data()
else:
feature_value = inst

# for i in range(num_inst):
# inst = new_instances[i]

# Checking the numeric dimensions/features in Stream
if self._numeric_buffer is None and self._statistics_enabled:
for j in feature_value.get_dim_ids():
if feature_value.get_related_set().get_dim(j).get_base_set() in [Dimension.C_BASE_SET_N,
Dimension.C_BASE_SET_R,
Dimension.C_BASE_SET_Z]:
self._numeric_features.append(j)

self._numeric_buffer = np.zeros((self.buffer_size, len(self._numeric_features)))
feature_value = inst.get_feature_data()


# Increment in buffer position
self._buffer_pos = (self._buffer_pos + 1) % self.buffer_size
# Checking the numeric dimensions/features in Stream
if self._numeric_buffer is None and self._statistics_enabled:
for j in feature_value.get_dim_ids():
if feature_value.get_related_set().get_dim(j).get_base_set() in [Dimension.C_BASE_SET_N,
Dimension.C_BASE_SET_R,
Dimension.C_BASE_SET_Z]:
self._numeric_features.append(j)

self._numeric_buffer = np.zeros((self.buffer_size, len(self._numeric_features)))

if len(self._buffer) == self.buffer_size:
# if the buffer is already full,obsolete data is going to be deleted
# raises an event, stores the new instances and skips the iteration
self._raise_event(self.C_EVENT_DATA_REMOVED, Event(p_raising_object=self,
p_related_set=feature_value.get_related_set()))
p_inst_del.append(self._buffer[self._buffer_pos])
self._buffer[self._buffer_pos] = inst
if self._statistics_enabled:
self._numeric_buffer[self._buffer_pos] = [feature_value.get_value(k) for k in
self._numeric_features]
continue

# Increment in buffer position
self._buffer_pos = (self._buffer_pos + 1) % self.buffer_size

# adds element to the buffer in this code block only if the buffer is not already full

if len(self._buffer) == self.buffer_size:
# if the buffer is already full,obsolete data is going to be deleted
# raises an event, stores the new instances and skips the iteration
self._raise_event(self.C_EVENT_DATA_REMOVED, Event(p_raising_object=self,
p_related_set=feature_value.get_related_set()))

p_inst_del.append(self._buffer[self._buffer_pos])
self._buffer[self._buffer_pos] = inst
if self._statistics_enabled:
self._numeric_buffer[self._buffer_pos] = [feature_value.get_value(k) for k in
self._numeric_features]
continue


# if the buffer is full after adding an element, raises event
if len(self._buffer) == self.buffer_size:
if self._delay:
p_inst_new.clear()
for instance in self._buffer.values():
p_inst_new.append(instance)
self._raise_event(self.C_EVENT_BUFFER_FULL, Event(self))
# adds element to the buffer in this code block only if the buffer is not already full
self._buffer[self._buffer_pos] = inst
if self._statistics_enabled:
self._numeric_buffer[self._buffer_pos] = [feature_value.get_value(k) for k in
self._numeric_features]


# If delay is true, clear the set p_inst_new for any following tasks
if self._delay:
if len(self._buffer) < self.buffer_size:
# if the buffer is full after adding an element, raises event
if len(self._buffer) == self.buffer_size:
if self._delay:
p_inst_new.clear()
for instance in self._buffer.values():
p_inst_new.append(instance)
self._raise_event(self.C_EVENT_BUFFER_FULL, Event(self))


# 2 If delay is true, clear the set p_inst_new for any following tasks
if self._delay and new_inst_found:
if len(self._buffer) < self.buffer_size:
p_inst.clear()


## -------------------------------------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions test/howtos/bf/howto_bf_streams_101_basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class MyTask (StreamTask):
C_NAME = 'Custom'

## -------------------------------------------------------------------------------------------------
def _run(self, p_inst_new: list, p_inst_del: list):
def _run(self, p_inst : InstDict):
pass


Expand Down Expand Up @@ -88,7 +88,7 @@ def _setup(self, p_mode, p_visualize: bool, p_logging):
# 1 Preparation of demo/unit test mode
if __name__ == '__main__':
# 1.1 Parameters for demo mode
cycle_limit = 721
cycle_limit = 500
logging = Log.C_LOG_ALL
visualize = True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
## -- yyyy-mm-dd Ver. Auth. Description
## -- 2022-10-27 0.0.0 DA Creation
## -- 2022-11-22 1.0.0 DA First implementation
## -- 2024-05-21 1.1.0 DA Refactoring
## -------------------------------------------------------------------------------------------------

"""
Ver. 1.0.0 (2022-11-22)
Ver. 1.1.0 (2024-05-21)
This module demonstrates the principles of stream processing with MLPro. To this regard, stream tasks
are added to a stream workflow. This in turn is combined with a stream of a stream provider to a
Expand All @@ -30,7 +31,6 @@


from mlpro.bf.streams import *
from mlpro.wrappers.openml import WrStreamProviderOpenML



Expand All @@ -46,7 +46,7 @@ class MyTask (StreamTask):


## -------------------------------------------------------------------------------------------------
def _run(self, p_inst_new: list, p_inst_del: list):
def _run(self, p_inst : InstDict):
pass


Expand All @@ -65,8 +65,8 @@ class MyScenario (StreamScenario):
def _setup(self, p_mode, p_visualize: bool, p_logging):

# 1 Import a stream from OpenML
openml = WrStreamProviderOpenML(p_logging=p_logging)
stream = openml.get_stream(p_id=75, p_mode=p_mode, p_logging=p_logging)
provider_mlpro = StreamProviderMLPro(p_logging=p_logging)
stream = provider_mlpro.get_stream('Clouds2D4C1000Static', p_logging=p_logging)


# 2 Set up a stream workflow based on a custom stream task
Expand Down Expand Up @@ -115,7 +115,7 @@ def _setup(self, p_mode, p_visualize: bool, p_logging):
# 1 Preparation of demo/unit test mode
if __name__ == "__main__":
# 1.1 Parameters for demo mode
cycle_limit = 10
cycle_limit = 100
logging = Log.C_LOG_ALL
visualize = True

Expand Down

0 comments on commit cbe327f

Please sign in to comment.