Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple devices #102

Merged
merged 1 commit into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
- model_name: sentence-transformers/all-mpnet-base-v2
model_tag_name: sentence-transformers-all-mpnet-base-v2
onnx_runtime: false
use_sentence_transformers_vectorizer: true
- model_name: sentence-transformers/all-MiniLM-L12-v2
model_tag_name: sentence-transformers-all-MiniLM-L12-v2
onnx_runtime: false
Expand Down
87 changes: 46 additions & 41 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
from fastapi import FastAPI, Depends, Response, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from typing import Union
from config import TRUST_REMOTE_CODE, get_allowed_tokens
from config import (
TRUST_REMOTE_CODE,
get_allowed_tokens,
get_use_sentence_transformers_multi_process,
get_t2v_transformers_direct_tokenize,
)
from vectorizer import Vectorizer, VectorInput
from meta import Meta

Expand Down Expand Up @@ -33,43 +38,6 @@ async def lifespan(app: FastAPI):

allowed_tokens = get_allowed_tokens()

cuda_env = os.getenv("ENABLE_CUDA")
cuda_per_process_memory_fraction = 1.0
if "CUDA_PER_PROCESS_MEMORY_FRACTION" in os.environ:
try:
cuda_per_process_memory_fraction = float(
os.getenv("CUDA_PER_PROCESS_MEMORY_FRACTION")
)
except ValueError:
logger.error(
f"Invalid CUDA_PER_PROCESS_MEMORY_FRACTION (should be between 0.0-1.0)"
)
if 0.0 <= cuda_per_process_memory_fraction <= 1.0:
logger.info(
f"CUDA_PER_PROCESS_MEMORY_FRACTION set to {cuda_per_process_memory_fraction}"
)
cuda_support = False
cuda_core = ""

if cuda_env is not None and cuda_env == "true" or cuda_env == "1":
cuda_support = True
cuda_core = os.getenv("CUDA_CORE")
if cuda_core is None or cuda_core == "":
cuda_core = "cuda:0"
logger.info(f"CUDA_CORE set to {cuda_core}")
else:
logger.info("Running on CPU")

# Batch text tokenization enabled by default
direct_tokenize = False
transformers_direct_tokenize = os.getenv("T2V_TRANSFORMERS_DIRECT_TOKENIZE")
if (
transformers_direct_tokenize is not None
and transformers_direct_tokenize == "true"
or transformers_direct_tokenize == "1"
):
direct_tokenize = True

model_dir = "./models/model"

def get_model_name() -> Union[str, bool]:
Expand Down Expand Up @@ -104,15 +72,51 @@ def log_info_about_onnx(onnx_runtime: bool):
f"Running ONNX vectorizer with quantized model for {onnx_quantization_info}"
)

model_name, use_sentence_transformer_vectorizer = get_model_name()
model_name, use_sentence_transformers_vectorizer = get_model_name()
onnx_runtime = get_onnx_runtime()
trust_remote_code = get_trust_remote_code()

cuda_env = os.getenv("ENABLE_CUDA")
cuda_per_process_memory_fraction = 1.0
if "CUDA_PER_PROCESS_MEMORY_FRACTION" in os.environ:
try:
cuda_per_process_memory_fraction = float(
os.getenv("CUDA_PER_PROCESS_MEMORY_FRACTION")
)
except ValueError:
logger.error(
f"Invalid CUDA_PER_PROCESS_MEMORY_FRACTION (should be between 0.0-1.0)"
)
if 0.0 <= cuda_per_process_memory_fraction <= 1.0:
logger.info(
f"CUDA_PER_PROCESS_MEMORY_FRACTION set to {cuda_per_process_memory_fraction}"
)
cuda_support = False
cuda_core = ""

if cuda_env is not None and cuda_env == "true" or cuda_env == "1":
cuda_support = True
cuda_core = os.getenv("CUDA_CORE")
if cuda_core is None or cuda_core == "":
cuda_core = "cuda:0"
logger.info(f"CUDA_CORE set to {cuda_core}")
else:
logger.info("Running on CPU")

# Use all available cores
use_sentence_transformers_multi_process = (
get_use_sentence_transformers_multi_process()
)

# Batch text tokenization enabled by default
direct_tokenize = get_t2v_transformers_direct_tokenize()

log_info_about_onnx(onnx_runtime)

meta_config = Meta(
model_dir,
model_name,
use_sentence_transformer_vectorizer,
use_sentence_transformers_vectorizer,
trust_remote_code,
)
vec = Vectorizer(
Expand All @@ -124,7 +128,8 @@ def log_info_about_onnx(onnx_runtime: bool):
meta_config.get_architecture(),
direct_tokenize,
onnx_runtime,
use_sentence_transformer_vectorizer,
use_sentence_transformers_vectorizer,
use_sentence_transformers_multi_process,
model_name,
trust_remote_code,
)
Expand Down
22 changes: 22 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,25 @@ def get_allowed_tokens() -> List[str] | None:
tokens := os.getenv("AUTHENTICATION_ALLOWED_TOKENS", "").strip()
) and tokens != "":
return tokens.strip().split(",")


def get_use_sentence_transformers_multi_process():
enable_multi_process = os.getenv("USE_SENTENCE_TRANSFORMERS_MULTI_PROCESS")
if (
enable_multi_process is not None
and enable_multi_process == "true"
or enable_multi_process == "1"
):
return True
return False


def get_t2v_transformers_direct_tokenize():
transformers_direct_tokenize = os.getenv("T2V_TRANSFORMERS_DIRECT_TOKENIZE")
if (
transformers_direct_tokenize is not None
and transformers_direct_tokenize == "true"
or transformers_direct_tokenize == "1"
):
return True
return False
53 changes: 45 additions & 8 deletions vectorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import math
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Optional
from typing import Optional, Any, Literal
from logging import getLogger, Logger

import nltk
import torch
Expand Down Expand Up @@ -50,17 +51,22 @@ def __init__(
architecture: str,
direct_tokenize: bool,
onnx_runtime: bool,
use_sentence_transformer_vectorizer: bool,
use_sentence_transformers_vectorizer: bool,
use_sentence_transformers_multi_process: bool,
model_name: str,
trust_remote_code: bool,
):
self.executor = ThreadPoolExecutor()
if onnx_runtime:
self.vectorizer = ONNXVectorizer(model_path, trust_remote_code)
else:
if model_type == "t5" or use_sentence_transformer_vectorizer:
if model_type == "t5" or use_sentence_transformers_vectorizer:
self.vectorizer = SentenceTransformerVectorizer(
model_path, model_name, cuda_core, trust_remote_code
model_path,
model_name,
cuda_core,
trust_remote_code,
use_sentence_transformers_multi_process,
)
else:
self.vectorizer = HuggingFaceVectorizer(
Expand All @@ -83,13 +89,25 @@ async def vectorize(self, text: str, config: VectorInputConfig):
class SentenceTransformerVectorizer:
model: SentenceTransformer
cuda_core: str
use_sentence_transformers_multi_process: bool
pool: dict[Literal["input", "output", "processes"], Any]
logger: Logger

def __init__(
self, model_path: str, model_name: str, cuda_core: str, trust_remote_code: bool
self,
model_path: str,
model_name: str,
cuda_core: str,
trust_remote_code: bool,
use_sentence_transformers_multi_process: bool,
):
self.logger = getLogger("uvicorn")
self.cuda_core = cuda_core
print(
f"model_name={model_name}, cache_folder={model_path} device:{self.get_device()} trust_remote_code:{trust_remote_code}"
self.use_sentence_transformers_multi_process = (
use_sentence_transformers_multi_process
)
self.logger.info(
f"Sentence transformer vectorizer running with model_name={model_name}, cache_folder={model_path} device:{self.get_device()} trust_remote_code:{trust_remote_code} use_sentence_transformers_multi_process: {self.use_sentence_transformers_multi_process}"
)
self.model = SentenceTransformer(
model_name,
Expand All @@ -98,18 +116,37 @@ def __init__(
trust_remote_code=trust_remote_code,
)
self.model.eval() # make sure we're in inference mode, not training
if self.use_sentence_transformers_multi_process:
self.pool = self.model.start_multi_process_pool()
self.logger.info(
"Sentence transformer vectorizer is set to use all available devices"
)
self.logger.info(
f"Created pool of {len(self.pool['processes'])} available {'CUDA' if torch.cuda.is_available() else 'CPU'} devices"
)

def get_device(self) -> Optional[str]:
if self.cuda_core is not None and self.cuda_core != "":
if (
not self.use_sentence_transformers_multi_process
and self.cuda_core is not None
and self.cuda_core != ""
):
return self.cuda_core
return None

def vectorize(self, text: str, config: VectorInputConfig):
if self.use_sentence_transformers_multi_process:
embedding = self.model.encode_multi_process(
[text], pool=self.pool, normalize_embeddings=True
)
return embedding[0]

embedding = self.model.encode(
[text],
device=self.get_device(),
convert_to_tensor=False,
convert_to_numpy=True,
normalize_embeddings=True,
)
return embedding[0]

Expand Down