Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc updates & fixes for the vdb_upload pipeline #1289

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ dependencies:
- libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863
- mlflow>=2.2.1,<3
- mrc=23.11
- networkx=3.1
- networkx>=2.8
- ninja=1.10
- nodejs=18.*
- numba>=0.56.2
Expand Down
23 changes: 23 additions & 0 deletions docker/conda/environments/cuda11.8_examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,39 @@
channels:
- rapidsai
- nvidia
- huggingface
- conda-forge
- dglteam/label/cu118
dependencies:
- arxiv=1.4
- boto3
- cuml=23.06
- dask>=2023.1.1
- dgl=1.0.2
- dill=0.3.6
- distributed>=2023.1.1
- huggingface_hub=0.10.1 # work-around for https://github.com/UKPLab/sentence-transformers/issues/1762
- langchain=0.0.190
- libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863
- mlflow>=2.2.1,<3
- newspaper3k=0.2
- papermill=2.3.4
- pypdf=3.16
- requests-cache=1.1
- s3fs>=2023.6
- sentence-transformers
- transformers

####### Pip Transitive Dependencies (keep sorted!) #######
# These are dependencies that are available on conda, but are required by the pip packages listed below. Its much
# better to install them with conda than pip to allow for better dependency resolution.
- environs=9.5
- minio=7.1
- python-dotenv=1.0
- ujson=5.8


####### Pip Dependencies (keep sorted!) #######
- pip:
- grpcio-status==1.58 # To keep in sync with 1.58 grpcio which is installed for Morpheus
- nemollm
28 changes: 28 additions & 0 deletions examples/llm/vdb_upload/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
### Launching Triton

Pull the Docker image for Triton:
```bash
docker pull nvcr.io/nvidia/tritonserver:23.06-py3
```

From the Morpheus repo root directory, run the following to launch Triton and load the `all-MiniLM-L6-v2` model:
```bash
docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v $PWD/models:/models nvcr.io/nvidia/tritonserver:23.06-py3 tritonserver --model-repository=/models/triton-model-repo --exit-on-error=false --model-control-mode=explicit --load-model all-MiniLM-L6-v2
```

This will launch Triton and only load the `all-MiniLM-L6-v2` model. Once Triton has loaded the model, the following will be displayed:
```
+------------------+---------+--------+
| Model | Version | Status |
+------------------+---------+--------+
| all-MiniLM-L6-v2 | 1 | READY |
+------------------+---------+--------+
```

## Running the Pipeline

```bash
python examples/llm/main.py vdb_upload pipeline
``````

> **Note**: This pipeline will, by default, run continuously repeatedly polling the configured RSS sources. To run for a fixed number of iterations, add the `--stop_after=N` flag.
68 changes: 35 additions & 33 deletions examples/llm/vdb_upload/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def run():
)
@click.option(
"--model_fea_length",
default=256,
default=512,
type=click.IntRange(min=1),
help="Features length to use for the model",
)
Expand All @@ -159,21 +159,23 @@ def run():
@click.option(
"--model_name",
required=True,
default='all-mpnet-base-v2',
default='all-MiniLM-L6-v2',
help="The name of the model that is deployed on Triton server",
)
@click.option("--pre_calc_embeddings",
is_flag=True,
default=False,
help="Whether to pre-calculate the embeddings using Triton")
@click.option("--isolate_embeddings",
is_flag=True,
default=False,
help="Whether to pre-calculate the embeddings using Triton")
help="Whether to fetch all data prior to executing the rest of the pipeline.")
@click.option("--use_cache",
type=click.Path(file_okay=True, dir_okay=False),
default=None,
help="What cache to use for the confluence documents")
@click.option(
"--stop_after",
default=0,
type=click.IntRange(min=0),
help="Stop after emitting this many records from the RSS source stage. Useful for testing. Disabled if `0`",
)
def pipeline(num_threads,
pipeline_batch_size,
model_max_batch_size,
Expand All @@ -183,13 +185,15 @@ def pipeline(num_threads,
output_file,
server_url,
model_name,
pre_calc_embeddings,
isolate_embeddings,
use_cache):
use_cache,
stop_after: int):

from morpheus.config import Config
from morpheus.config import CppConfig
from morpheus.config import PipelineModes
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.general.trigger_stage import TriggerStage
Expand Down Expand Up @@ -219,40 +223,38 @@ def pipeline(num_threads,
pipe = LinearPipeline(config)

# add doca source stage
pipe.set_source(RSSSourceStage(config, feed_input=_build_rss_urls(), batch_size=128))
pipe.set_source(RSSSourceStage(config, feed_input=_build_rss_urls(), batch_size=128, stop_after=stop_after))

pipe.add_stage(MonitorStage(config, description="Source rate", unit='pages'))

pipe.add_stage(WebScraperStage(config, chunk_size=model_fea_length))

pipe.add_stage(MonitorStage(config, description="Download rate", unit='pages'))

# add deserialize stage
pipe.add_stage(DeserializeStage(config))

if (isolate_embeddings):
pipe.add_stage(TriggerStage(config))

if (pre_calc_embeddings):

# add deserialize stage
pipe.add_stage(DeserializeStage(config))

# add preprocessing stage
pipe.add_stage(
PreprocessNLPStage(config,
vocab_hash_file="data/bert-base-uncased-hash.txt",
do_lower_case=True,
truncation=True,
add_special_tokens=False,
column='page_content'))
# add preprocessing stage
pipe.add_stage(
PreprocessNLPStage(config,
vocab_hash_file="data/bert-base-uncased-hash.txt",
do_lower_case=True,
truncation=True,
add_special_tokens=False,
column='page_content'))

pipe.add_stage(MonitorStage(config, description="Tokenize rate", unit='events', delayed_start=True))
pipe.add_stage(MonitorStage(config, description="Tokenize rate", unit='events', delayed_start=True))

pipe.add_stage(
TritonInferenceStage(config,
model_name=model_name,
server_url="localhost:8001",
force_convert_inputs=True,
use_shared_memory=True))
pipe.add_stage(MonitorStage(config, description="Inference rate", unit="events", delayed_start=True))
pipe.add_stage(
TritonInferenceStage(config,
model_name=model_name,
server_url="localhost:8001",
force_convert_inputs=True,
use_shared_memory=True))
pipe.add_stage(MonitorStage(config, description="Inference rate", unit="events", delayed_start=True))

pipe.add_stage(
WriteToVectorDBStage(config,
Expand All @@ -277,7 +279,7 @@ def pipeline(num_threads,
@click.option(
"--model_name",
required=True,
default='all-mpnet-base-v2',
default='all-MiniLM-L6-v2',
help="The name of the model that is deployed on Triton server",
)
@click.option(
Expand Down Expand Up @@ -372,7 +374,7 @@ def _save_model(model, max_seq_length: int, batch_size: int, output_model_path:
@click.option(
"--model_name",
required=True,
default='all-mpnet-base-v2',
default='all-MiniLM-L6-v2',
help="The name of the model that is deployed on Triton server",
)
@click.option(
Expand Down
18 changes: 13 additions & 5 deletions morpheus/stages/input/rss_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def __init__(self,
if (batch_size is None):
batch_size = c.pipeline_batch_size

if (stop_after > 0):
if (run_indefinitely):
raise ValueError("Cannot set both `stop_after` and `run_indefinitely` to True.")

run_indefinitely = False

self._records_emitted = 0
self._controller = RSSController(feed_input=feed_input,
batch_size=batch_size,
Expand Down Expand Up @@ -101,13 +107,13 @@ def _fetch_feeds(self) -> MessageMeta:

yield MessageMeta(df=df)

if not self._controller.run_indefinitely:
self._stop_requested = True
continue
if (self._stop_after > 0 and self._records_emitted >= self._stop_after):
self._stop_requested = True
logger.debug("Stop limit reached...preparing to halt the source.")
break

if (self._stop_after > 0 and self._records_emitted >= self._stop_after):
if not self._controller.run_indefinitely:
self._stop_requested = True
logger.debug("Stop limit reached...preparing to halt the source.")
continue

logger.debug("Waiting for %d seconds before fetching again...", self._interval_secs)
Expand All @@ -128,6 +134,8 @@ def _fetch_feeds(self) -> MessageMeta:
logger.error("Max retries reached. Unable to fetch feed entries.")
raise RuntimeError(f"Failed to fetch feed entries after max retries: {exc}") from exc

logger.debug("Source stopped.")

def _build_source(self, builder: mrc.Builder) -> StreamPair:
source = builder.make_source(self.unique_name, self._fetch_feeds)
return source, MessageMeta
Loading