From 7928b5595d773b2eb6403973095e6acb1d9c8a4d Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 16 Oct 2023 11:05:11 -0700 Subject: [PATCH 01/12] Add missing dependency for requests-cache --- docker/conda/environments/cuda11.8_dev.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker/conda/environments/cuda11.8_dev.yml b/docker/conda/environments/cuda11.8_dev.yml index c20589a589..2b44feb419 100644 --- a/docker/conda/environments/cuda11.8_dev.yml +++ b/docker/conda/environments/cuda11.8_dev.yml @@ -94,6 +94,7 @@ dependencies: - pytorch=2.0.1 - rapidjson=1.1.0 - requests=2.31 + - requests-cache=1.1 - scikit-build=0.17.1 - scikit-learn=1.2.2 - sphinx From 0ee8f7758fef8493c6683d9ad60efec4e17b1de5 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 16 Oct 2023 12:01:21 -0700 Subject: [PATCH 02/12] Exclude types from CLI args which cannot be expressed on the CLI --- morpheus/stages/preprocess/deserialize_stage.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/morpheus/stages/preprocess/deserialize_stage.py b/morpheus/stages/preprocess/deserialize_stage.py index fc21f8f3ef..297ad14772 100644 --- a/morpheus/stages/preprocess/deserialize_stage.py +++ b/morpheus/stages/preprocess/deserialize_stage.py @@ -34,7 +34,9 @@ logger = logging.getLogger(__name__) -@register_stage("deserialize", modes=[PipelineModes.FIL, PipelineModes.NLP, PipelineModes.OTHER]) +@register_stage("deserialize", + modes=[PipelineModes.FIL, PipelineModes.NLP, PipelineModes.OTHER], + ignore_args=["message_type", "task_type", "task_payload"]) class DeserializeStage(MultiMessageStage): """ Messages are logically partitioned based on the pipeline config's `pipeline_batch_size` parameter. From 54ac77c075b5239de2d88322272c7275ee0dc20c Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 16 Oct 2023 12:01:40 -0700 Subject: [PATCH 03/12] Fix handling of union types in CLI --- morpheus/cli/register_stage.py | 5 +++++ morpheus/cli/utils.py | 5 ++++- morpheus/controllers/rss_controller.py | 2 +- morpheus/stages/input/rss_source_stage.py | 4 ++-- morpheus/utils/type_utils.py | 10 ++++++++++ 5 files changed, 22 insertions(+), 4 deletions(-) diff --git a/morpheus/cli/register_stage.py b/morpheus/cli/register_stage.py index f404c752c4..25adcf29fc 100644 --- a/morpheus/cli/register_stage.py +++ b/morpheus/cli/register_stage.py @@ -37,6 +37,7 @@ from morpheus.config import PipelineModes from morpheus.utils.type_utils import _DecoratorType from morpheus.utils.type_utils import get_full_qualname +from morpheus.utils.type_utils import is_union_type def class_name_to_command_name(class_name: str) -> str: @@ -177,6 +178,10 @@ def set_options_param_type(options_kwargs: dict, annotation, doc_type: str): if (annotation == inspect.Parameter.empty): raise RuntimeError("All types must be specified to auto register stage.") + if (is_union_type(annotation)): + # For the purposes of the CLI, we treat unions as the first type + annotation = typing.get_args(annotation)[0] + if (issubtype(annotation, typing.List)): # For variable length array, use multiple=True options_kwargs["multiple"] = True diff --git a/morpheus/cli/utils.py b/morpheus/cli/utils.py index 5b04911d13..8552b37e94 100644 --- a/morpheus/cli/utils.py +++ b/morpheus/cli/utils.py @@ -159,7 +159,10 @@ def parse_log_level(ctx, param, value): def is_enum(enum_class: typing.Type): """Returns True if the given class is an enum.""" - return issubclass(enum_class, Enum) or is_pybind_enum(enum_class) + try: + return issubclass(enum_class, Enum) or is_pybind_enum(enum_class) + except TypeError: + return False def get_enum_members(enum_class: typing.Type): diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index c9ca3ec456..df53d798fb 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -33,7 +33,7 @@ class RSSController: Parameters ---------- - feed_input : str + feed_input : str, list[str] The URL or file path of the RSS feed. batch_size : int, optional, default = 128 Number of feed items to accumulate before creating a DataFrame. diff --git a/morpheus/stages/input/rss_source_stage.py b/morpheus/stages/input/rss_source_stage.py index 7dd240a286..39f7c38cd3 100644 --- a/morpheus/stages/input/rss_source_stage.py +++ b/morpheus/stages/input/rss_source_stage.py @@ -37,7 +37,7 @@ class RSSSourceStage(PreallocatorMixin, SingleOutputSource): ---------- c : morpheus.config.Config Pipeline configuration instance. - feed_input : str + feed_input : list[str] or str The URL or file path of the RSS feed. interval_secs : float, optional, default = 600 Interval in seconds between fetching new feed items. @@ -49,7 +49,7 @@ class RSSSourceStage(PreallocatorMixin, SingleOutputSource): def __init__(self, c: Config, - feed_input: str | list[str], + feed_input: list[str] | str, interval_secs: float = 600, stop_after: int = 0, max_retries: int = 5, diff --git a/morpheus/utils/type_utils.py b/morpheus/utils/type_utils.py index d35e7b3a12..e88b807dba 100644 --- a/morpheus/utils/type_utils.py +++ b/morpheus/utils/type_utils.py @@ -13,6 +13,7 @@ # limitations under the License. import inspect +import types import typing from collections import defaultdict @@ -45,6 +46,15 @@ def greatest_ancestor(*cls_list): return None # or raise, if that's more appropriate +def is_union_type(type_: type) -> bool: + """ + Returns True if the type is a `typing.Union` or a `types.UnionType`. + """ + # Unions in the form of `(float | int)` are instances of `types.UnionType`. + # However, unions in the form of `typing.Union[float, int]` are instances of `typing._UnionGenericAlias`. + return isinstance(type_, (types.UnionType, typing._UnionGenericAlias)) + + @typing.overload def unpack_union(cls_1: typing.Type[T]) -> typing.Union[typing.Type[T]]: ... From 2e63361ccd9710c4b09ec98722b43eb4b32a7a24 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 16 Oct 2023 12:05:51 -0700 Subject: [PATCH 04/12] Update deserialize stage tests to reflect the rename of process_dataframe to process_dataframe_to_multi_message. TODO: Add test for process_dataframe_to_control_message --- tests/test_deserialize_stage_pipe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_deserialize_stage_pipe.py b/tests/test_deserialize_stage_pipe.py index d19d60f9ac..5dca9c37f2 100755 --- a/tests/test_deserialize_stage_pipe.py +++ b/tests/test_deserialize_stage_pipe.py @@ -40,14 +40,14 @@ def test_fixing_non_unique_indexes(dataset: DatasetManager): # When processing the dataframe, a warning should be generated when there are non-unique IDs with pytest.warns(RuntimeWarning): - DeserializeStage.process_dataframe(meta, 5, ensure_sliceable_index=False) + DeserializeStage.process_dataframe_to_multi_message(meta, 5, ensure_sliceable_index=False) assert not meta.has_sliceable_index() assert "_index_" not in meta.df.columns dataset.assert_df_equal(meta.df, df) - DeserializeStage.process_dataframe(meta, 5, ensure_sliceable_index=True) + DeserializeStage.process_dataframe_to_multi_message(meta, 5, ensure_sliceable_index=True) assert meta.has_sliceable_index() assert "_index_" in meta.df.columns From a429f68c47bc3e1b297489bdd2095c226c51f560 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 16 Oct 2023 12:30:28 -0700 Subject: [PATCH 05/12] Rename relingo word --- morpheus/controllers/rss_controller.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index df53d798fb..fc5b892d9a 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -50,14 +50,14 @@ def __init__(self, feed_input: str | list[str], batch_size: int = 128, run_indef if (run_indefinitely is None): # If feed_input is URL. Runs indefinitely - run_indefinitely = any([RSSController.is_url(f) for f in self._feed_input]) + run_indefinitely = any(RSSController.is_url(f) for f in self._feed_input) self._run_indefinitely = run_indefinitely self._session = requests_cache.CachedSession(os.path.join("./.cache/http", "RSSController.sqlite"), backend="sqlite") - self._blacklisted_feeds = [] # Feeds that have thrown an error and wont be retried + self._errored_feeds = [] # Feeds that have thrown an error and wont be retried @property def run_indefinitely(self): @@ -127,7 +127,7 @@ def parse_feeds(self): """ for url in self._feed_input: try: - if (url in self._blacklisted_feeds): + if (url in self._errored_feeds): continue feed = self._try_parse_feed(url) @@ -138,9 +138,9 @@ def parse_feeds(self): yield feed except Exception as ex: - logger.warning(f"Failed to parse feed: {url}. The feed will be blacklisted and not retried.") + logger.warning(f"Failed to parse feed: {url}. The feed will be not be retried.") - self._blacklisted_feeds.append(url) + self._errored_feeds.append(url) def fetch_dataframes(self): """ From f20de0f9a8776183fabac427bd96168b0476402b Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 16 Oct 2023 12:44:50 -0700 Subject: [PATCH 06/12] Fix handling of local rss files --- morpheus/controllers/rss_controller.py | 28 +++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index fc5b892d9a..ab98b8abd5 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -86,23 +86,30 @@ def parse_feed(self) -> list[dict]: raise RuntimeError(f"Invalid feed input: {self._feed_input}. No entries found.") def _try_parse_feed(self, url: str): + is_url = RSSController.is_url(url) + if (is_url): + response = self._session.get(url) + cache_hit = response.from_cache - response = self._session.get(url) + feed_input = response.text + else: + cache_hit = False + feed_input = url # Try to use requests to get the object - feed = feedparser.parse(response.text) + feed = feedparser.parse(feed_input) - cache_hit = response.from_cache fallback = False if (feed["bozo"]): cache_hit = False - fallback = True - logger.info(f"Failed to parse feed: {url}. Trying to parse using feedparser directly.") + if (is_url): + fallback = True + logger.info(f"Failed to parse feed: {url}. Trying to parse using feedparser directly.") - # If that fails, use feedparser directly (cant cache this) - feed = feedparser.parse(url) + # If that fails, use feedparser directly (cant cache this) + feed = feedparser.parse(url) if (feed["bozo"]): raise RuntimeError(f"Invalid feed input: {url}. Error: {feed['bozo_exception']}") @@ -138,9 +145,12 @@ def parse_feeds(self): yield feed except Exception as ex: - logger.warning(f"Failed to parse feed: {url}. The feed will be not be retried.") + if (RSSController.is_url(url)): + logger.warning("Failed to parse feed: %s: %s. The feed will be not be retried.", url, ex) - self._errored_feeds.append(url) + self._errored_feeds.append(url) + else: + raise def fetch_dataframes(self): """ From 66854a707653c7a869b2d8c0c202a851378e1624 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 16 Oct 2023 12:45:00 -0700 Subject: [PATCH 07/12] Update tests --- tests/test_rss_source_stage_pipe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_rss_source_stage_pipe.py b/tests/test_rss_source_stage_pipe.py index 50d475b505..9230152671 100644 --- a/tests/test_rss_source_stage_pipe.py +++ b/tests/test_rss_source_stage_pipe.py @@ -32,7 +32,7 @@ def test_constructor_with_feed_url(config): ctlr = rss_source_stage._controller - assert ctlr._feed_input == "https://realpython.com/atom.xml" + assert ctlr._feed_input == ["https://realpython.com/atom.xml"] assert ctlr._run_indefinitely is True assert ctlr._batch_size == config.pipeline_batch_size assert rss_source_stage._interval_secs == 600 @@ -47,7 +47,7 @@ def test_constructor_with_feed_file(config): ctlr = rss_source_stage._controller - assert ctlr._feed_input == file_feed_input + assert ctlr._feed_input == [file_feed_input] assert ctlr._run_indefinitely is False assert ctlr._batch_size == config.pipeline_batch_size assert rss_source_stage._interval_secs == 5 From 477a1668345591e4aece69d53b977d8c24bc558e Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 16 Oct 2023 13:00:32 -0700 Subject: [PATCH 08/12] Treat file paths and urls the same --- morpheus/controllers/rss_controller.py | 7 ++----- tests/controllers/test_rss_controller.py | 6 +++--- tests/test_rss_source_stage_pipe.py | 6 ++++-- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index ab98b8abd5..ae3b06cf73 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -145,12 +145,9 @@ def parse_feeds(self): yield feed except Exception as ex: - if (RSSController.is_url(url)): - logger.warning("Failed to parse feed: %s: %s. The feed will be not be retried.", url, ex) + logger.warning("Failed to parse feed: %s: %s. The feed will be not be retried.", url, ex) - self._errored_feeds.append(url) - else: - raise + self._errored_feeds.append(url) def fetch_dataframes(self): """ diff --git a/tests/controllers/test_rss_controller.py b/tests/controllers/test_rss_controller.py index dab7571cf4..f421703fff 100644 --- a/tests/controllers/test_rss_controller.py +++ b/tests/controllers/test_rss_controller.py @@ -53,15 +53,15 @@ def test_run_indefinitely_false(feed_input): @pytest.mark.parametrize("feed_input", test_urls) def test_parse_feed_valid_url(feed_input): controller = RSSController(feed_input=feed_input) - feed = controller.parse_feed() + feed = list(controller.parse_feeds())[0] assert feed.entries @pytest.mark.parametrize("feed_input", test_invalid_urls + test_invalid_file_paths) def test_parse_feed_invalid_input(feed_input): controller = RSSController(feed_input=feed_input) - with pytest.raises(RuntimeError): - controller.parse_feed() + list(controller.parse_feeds()) + assert controller._errored_feeds == [feed_input] @pytest.mark.parametrize("feed_input", test_urls + test_file_paths) diff --git a/tests/test_rss_source_stage_pipe.py b/tests/test_rss_source_stage_pipe.py index 9230152671..7e7c29515c 100644 --- a/tests/test_rss_source_stage_pipe.py +++ b/tests/test_rss_source_stage_pipe.py @@ -94,5 +94,7 @@ def test_invalid_input_rss_source_stage_pipe(config) -> None: pipe.add_edge(rss_source_stage, sink_stage) - with pytest.raises(RuntimeError): - pipe.run() + pipe.run() + + assert len(sink_stage.get_messages()) == 0 + assert rss_source_stage._controller._errored_feeds == [feed_input] From d39e165f8a5312e4f2de8c9f8a4e72a57441a2f1 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 16 Oct 2023 14:03:38 -0700 Subject: [PATCH 09/12] Restore original impl for this method --- morpheus/service/milvus_vector_db_service.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/morpheus/service/milvus_vector_db_service.py b/morpheus/service/milvus_vector_db_service.py index e1b9a87729..7d90134489 100644 --- a/morpheus/service/milvus_vector_db_service.py +++ b/morpheus/service/milvus_vector_db_service.py @@ -58,16 +58,14 @@ def loads(field: str): @staticmethod def from_dict(field: dict): - # # FieldSchema converts dtype -> type when serialized. We need to convert any dtype to type before deserilaizing - # if ("dtype" in field and isinstance(field["dtype"], str)): - # try: - # field["type"] = MILVUS_DATA_TYPE_MAP[field["dtype"]] - # del field["dtype"] - # except KeyError: - # raise ValueError( - # f"Invalid string data type: {field['dtype']}. Must be one of: {[MILVUS_DATA_TYPE_MAP.keys()]}") - - return pymilvus.FieldSchema.construct_from_dict(field) + name = field.pop("name") + dtype = field.pop("dtype") + + dtype = MILVUS_DATA_TYPE_MAP[dtype.lower()] + + field_schema = pymilvus.FieldSchema(name=name, dtype=dtype, **field) + + return field_schema def with_collection_lock(func: typing.Callable) -> typing.Callable: From bdd9695083ccf10ca2168bf73122aa1102256725 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 17 Oct 2023 06:28:12 -0700 Subject: [PATCH 10/12] Update test data to match changes made to milvus_idx_part_collection_conf.json --- tests/tests_data/service/milvus_simple_collection_conf.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/tests_data/service/milvus_simple_collection_conf.json b/tests/tests_data/service/milvus_simple_collection_conf.json index 399463cfeb..fcdf59c7c1 100644 --- a/tests/tests_data/service/milvus_simple_collection_conf.json +++ b/tests/tests_data/service/milvus_simple_collection_conf.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:c4d5984904a207dad3a58515b1327f309542b80729a6172b02399ad67a6a83cd -size 766 +oid sha256:b547c0e1e72b93d477af3f5a77a2d742e24694497510a62ffa67663521118fcf +size 826 From 745eda4f274e9efbf1f66e820860069fef3b9fd5 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 17 Oct 2023 07:50:48 -0700 Subject: [PATCH 11/12] Remove unused imports --- morpheus/controllers/rss_controller.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index ae3b06cf73..736cb3cbe6 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -15,11 +15,9 @@ import logging import os import typing -import urllib from urllib.parse import urlparse import feedparser -import requests import requests_cache import cudf From cf9ce98c0ec6670fa167ed07f9ff0e5805aa4689 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 18 Oct 2023 14:21:17 -0700 Subject: [PATCH 12/12] Raise an error if the cli gets a union type, update the RSSSourceStage api to just accept a list of str --- morpheus/cli/register_stage.py | 3 +-- morpheus/controllers/rss_controller.py | 2 +- morpheus/stages/input/rss_source_stage.py | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/morpheus/cli/register_stage.py b/morpheus/cli/register_stage.py index 25adcf29fc..49e01c7248 100644 --- a/morpheus/cli/register_stage.py +++ b/morpheus/cli/register_stage.py @@ -179,8 +179,7 @@ def set_options_param_type(options_kwargs: dict, annotation, doc_type: str): raise RuntimeError("All types must be specified to auto register stage.") if (is_union_type(annotation)): - # For the purposes of the CLI, we treat unions as the first type - annotation = typing.get_args(annotation)[0] + raise RuntimeError("Union types are not supported for auto registering stages.") if (issubtype(annotation, typing.List)): # For variable length array, use multiple=True diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index 736cb3cbe6..f739c25979 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -31,7 +31,7 @@ class RSSController: Parameters ---------- - feed_input : str, list[str] + feed_input : str or list[str] The URL or file path of the RSS feed. batch_size : int, optional, default = 128 Number of feed items to accumulate before creating a DataFrame. diff --git a/morpheus/stages/input/rss_source_stage.py b/morpheus/stages/input/rss_source_stage.py index 39f7c38cd3..abd4bcd0b4 100644 --- a/morpheus/stages/input/rss_source_stage.py +++ b/morpheus/stages/input/rss_source_stage.py @@ -37,7 +37,7 @@ class RSSSourceStage(PreallocatorMixin, SingleOutputSource): ---------- c : morpheus.config.Config Pipeline configuration instance. - feed_input : list[str] or str + feed_input : list[str] The URL or file path of the RSS feed. interval_secs : float, optional, default = 600 Interval in seconds between fetching new feed items. @@ -49,7 +49,7 @@ class RSSSourceStage(PreallocatorMixin, SingleOutputSource): def __init__(self, c: Config, - feed_input: list[str] | str, + feed_input: list[str], interval_secs: float = 600, stop_after: int = 0, max_retries: int = 5,