Skip to content

Commit

Permalink
Fixing up the router tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mdemoret-nv committed Oct 25, 2024
1 parent 83b3aff commit cd9081d
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
9 changes: 4 additions & 5 deletions python/morpheus/morpheus/_lib/stages/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@
#include <rxcpp/rx.hpp> // for trace_activity, decay_t

#include <filesystem> // for path
#include <map>
#include <memory> // for shared_ptr, allocator
#include <sstream> // for operator<<, basic_ostringstream
#include <string> // for string
#include <vector> // for vector
#include <memory> // for shared_ptr, allocator
#include <sstream> // for operator<<, basic_ostringstream
#include <string> // for string
#include <vector> // for vector

namespace morpheus {
namespace py = pybind11;
Expand Down
9 changes: 5 additions & 4 deletions tests/morpheus/stages/test_router_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage


@pytest.mark.parametrize("is_runnable", [True, False])
def test_router_stage_pipe(config, filter_probs_df, is_runnable: bool):
@pytest.mark.parametrize("processing_engines", [0, 4])
def test_router_stage_pipe(config, filter_probs_df, processing_engines: bool):

keys = ["odd", "even"]

Expand All @@ -44,7 +44,8 @@ def determine_route_fn(_: ControlMessage):
pipe = Pipeline(config)
source = pipe.add_stage(InMemorySourceStage(config, dataframes=[filter_probs_df], repeat=5))
deserialize = pipe.add_stage(DeserializeStage(config))
router_stage = pipe.add_stage(RouterStage(config, keys=keys, key_fn=determine_route_fn, is_runnable=is_runnable))
router_stage = pipe.add_stage(
RouterStage(config, keys=keys, key_fn=determine_route_fn, processing_engines=processing_engines))
sink1 = pipe.add_stage(InMemorySinkStage(config))
sink2 = pipe.add_stage(InMemorySinkStage(config))

Expand Down Expand Up @@ -101,7 +102,7 @@ def determine_route_fn(_: ControlMessage):
pipe = Pipeline(config)

source = pipe.add_stage(InMemoryDataGenStage(config, data_source=source_fn, output_data_type=ControlMessage))
router_stage = pipe.add_stage(RouterStage(config, keys=keys, key_fn=determine_route_fn, is_runnable=True))
router_stage = pipe.add_stage(RouterStage(config, keys=keys, key_fn=determine_route_fn, processing_engines=10))
sink1 = pipe.add_stage(InMemorySinkStage(config))
sink2 = pipe.add_stage(InMemorySinkStage(config))

Expand Down

0 comments on commit cd9081d

Please sign in to comment.