diff --git a/docs/docs/integrations/providers/upstash.mdx b/docs/docs/integrations/providers/upstash.mdx index 0b619dcde9526..d1bfa783c230c 100644 --- a/docs/docs/integrations/providers/upstash.mdx +++ b/docs/docs/integrations/providers/upstash.mdx @@ -61,6 +61,22 @@ store = UpstashVectorStore( See [Upstash Vector documentation](https://upstash.com/docs/vector/features/embeddingmodels) for more detail on embedding models. +## Namespaces +You can use namespaces to partition your data in the index. Namespaces are useful when you want to query over huge amount of data, and you want to partition the data to make the queries faster. When you use namespaces, there won't be post-filtering on the results which will make the query results more precise. + +```python +from langchain_community.vectorstores.upstash import UpstashVectorStore +import os + +os.environ["UPSTASH_VECTOR_REST_URL"] = "" +os.environ["UPSTASH_VECTOR_REST_TOKEN"] = "" + +store = UpstashVectorStore( + embedding=embeddings + namespace="my_namespace" +) +``` + ### Inserting Vectors ```python diff --git a/libs/community/langchain_community/vectorstores/upstash.py b/libs/community/langchain_community/vectorstores/upstash.py index 96bcc6e4e5132..23aeec0cc3cfb 100644 --- a/libs/community/langchain_community/vectorstores/upstash.py +++ b/libs/community/langchain_community/vectorstores/upstash.py @@ -2,7 +2,7 @@ import logging import uuid -from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple, Union, cast import numpy as np from langchain_core.documents import Document @@ -64,6 +64,8 @@ def __init__( index_url: Optional[str] = None, index_token: Optional[str] = None, embedding: Optional[Union[Embeddings, bool]] = None, + *, + namespace: str = "", ): """ Constructor for UpstashVectorStore. @@ -83,6 +85,7 @@ def __init__( is applied. If true, Upstash embeddings are used. When Upstash embeddings are used, text is sent directly to Upstash and embedding is applied there instead of embedding in Langchain. + namespace: Namespace to use from the index. Example: .. code-block:: python @@ -94,7 +97,8 @@ def __init__( vectorstore = UpstashVectorStore( embedding=embeddings, index_url="...", - index_token="..." + index_token="...", + namespace="..." ) # With an existing index @@ -103,7 +107,8 @@ def __init__( index = Index(url="...", token="...") vectorstore = UpstashVectorStore( embedding=embeddings, - index=index + index=index, + namespace="..." ) """ @@ -145,6 +150,7 @@ def __init__( self._embeddings = embedding self._text_key = text_key + self._namespace = namespace @property def embeddings(self) -> Optional[Union[Embeddings, bool]]: # type: ignore @@ -187,6 +193,8 @@ def add_documents( ids: Optional[List[str]] = None, batch_size: int = 32, embedding_chunk_size: int = 1000, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[str]: """ @@ -202,6 +210,7 @@ def add_documents( batch_size: Batch size to use when upserting the embeddings. Upstash supports at max 1000 vectors per request. embedding_batch_size: Chunk size to use when embedding the texts. + namespace: Namespace to use from the index. Returns: List of ids from adding the texts into the vectorstore. @@ -216,6 +225,7 @@ def add_documents( batch_size=batch_size, ids=ids, embedding_chunk_size=embedding_chunk_size, + namespace=namespace, **kwargs, ) @@ -225,6 +235,8 @@ async def aadd_documents( ids: Optional[List[str]] = None, batch_size: int = 32, embedding_chunk_size: int = 1000, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[str]: """ @@ -240,6 +252,7 @@ async def aadd_documents( batch_size: Batch size to use when upserting the embeddings. Upstash supports at max 1000 vectors per request. embedding_batch_size: Chunk size to use when embedding the texts. + namespace: Namespace to use from the index. Returns: List of ids from adding the texts into the vectorstore. @@ -254,6 +267,7 @@ async def aadd_documents( ids=ids, batch_size=batch_size, embedding_chunk_size=embedding_chunk_size, + namespace=namespace, **kwargs, ) @@ -264,6 +278,8 @@ def add_texts( ids: Optional[List[str]] = None, batch_size: int = 32, embedding_chunk_size: int = 1000, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[str]: """ @@ -281,11 +297,15 @@ def add_texts( batch_size: Batch size to use when upserting the embeddings. Upstash supports at max 1000 vectors per request. embedding_batch_size: Chunk size to use when embedding the texts. + namespace: Namespace to use from the index. Returns: List of ids from adding the texts into the vectorstore. """ + if namespace is None: + namespace = self._namespace + texts = list(texts) ids = ids or [str(uuid.uuid4()) for _ in texts] @@ -308,7 +328,9 @@ def add_texts( for batch in batch_iterate( batch_size, zip(chunk_ids, embeddings, chunk_metadatas) ): - self._index.upsert(vectors=batch, **kwargs) + self._index.upsert( + vectors=batch, namespace=cast(str, namespace), **kwargs + ) return ids @@ -319,6 +341,8 @@ async def aadd_texts( ids: Optional[List[str]] = None, batch_size: int = 32, embedding_chunk_size: int = 1000, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[str]: """ @@ -336,11 +360,15 @@ async def aadd_texts( batch_size: Batch size to use when upserting the embeddings. Upstash supports at max 1000 vectors per request. embedding_batch_size: Chunk size to use when embedding the texts. + namespace: Namespace to use from the index. Returns: List of ids from adding the texts into the vectorstore. """ + if namespace is None: + namespace = self._namespace + texts = list(texts) ids = ids or [str(uuid.uuid4()) for _ in texts] @@ -363,7 +391,9 @@ async def aadd_texts( for batch in batch_iterate( batch_size, zip(chunk_ids, embeddings, chunk_metadatas) ): - await self._async_index.upsert(vectors=batch, **kwargs) + await self._async_index.upsert( + vectors=batch, namespace=cast(str, namespace), **kwargs + ) return ids @@ -372,6 +402,8 @@ def similarity_search_with_score( query: str, k: int = 4, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Retrieve texts most similar to query and @@ -381,12 +413,13 @@ def similarity_search_with_score( query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Optional metadata filter in str format + namespace: Namespace to use from the index. Returns: List of Documents most similar to the query and score for each """ return self.similarity_search_by_vector_with_score( - self._embed_query(query), k=k, filter=filter, **kwargs + self._embed_query(query), k=k, filter=filter, namespace=namespace, **kwargs ) async def asimilarity_search_with_score( @@ -394,6 +427,8 @@ async def asimilarity_search_with_score( query: str, k: int = 4, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Retrieve texts most similar to query and @@ -403,12 +438,13 @@ async def asimilarity_search_with_score( query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Optional metadata filter in str format + namespace: Namespace to use from the index. Returns: List of Documents most similar to the query and score for each """ return await self.asimilarity_search_by_vector_with_score( - self._embed_query(query), k=k, filter=filter, **kwargs + self._embed_query(query), k=k, filter=filter, namespace=namespace, **kwargs ) def _process_results(self, results: List) -> List[Tuple[Document, float]]: @@ -430,15 +466,25 @@ def similarity_search_by_vector_with_score( embedding: Union[List[float], str], k: int = 4, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return texts whose embedding is closest to the given embedding""" filter = filter or "" + if namespace is None: + namespace = self._namespace + if isinstance(embedding, str): results = self._index.query( - data=embedding, top_k=k, include_metadata=True, filter=filter, **kwargs + data=embedding, + top_k=k, + include_metadata=True, + filter=filter, + namespace=namespace, + **kwargs, ) else: results = self._index.query( @@ -446,6 +492,7 @@ def similarity_search_by_vector_with_score( top_k=k, include_metadata=True, filter=filter, + namespace=namespace, **kwargs, ) @@ -456,15 +503,25 @@ async def asimilarity_search_by_vector_with_score( embedding: Union[List[float], str], k: int = 4, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return texts whose embedding is closest to the given embedding""" filter = filter or "" + if namespace is None: + namespace = self._namespace + if isinstance(embedding, str): results = await self._async_index.query( - data=embedding, top_k=k, include_metadata=True, filter=filter, **kwargs + data=embedding, + top_k=k, + include_metadata=True, + filter=filter, + namespace=namespace, + **kwargs, ) else: results = await self._async_index.query( @@ -472,6 +529,7 @@ async def asimilarity_search_by_vector_with_score( top_k=k, include_metadata=True, filter=filter, + namespace=namespace, **kwargs, ) @@ -482,6 +540,8 @@ def similarity_search( query: str, k: int = 4, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """Return documents most similar to query. @@ -490,12 +550,13 @@ def similarity_search( query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Optional metadata filter in str format + namespace: Namespace to use from the index. Returns: List of Documents most similar to the query and score for each """ docs_and_scores = self.similarity_search_with_score( - query, k=k, filter=filter, **kwargs + query, k=k, filter=filter, namespace=namespace, **kwargs ) return [doc for doc, _ in docs_and_scores] @@ -504,6 +565,8 @@ async def asimilarity_search( query: str, k: int = 4, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """Return documents most similar to query. @@ -512,12 +575,13 @@ async def asimilarity_search( query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Optional metadata filter in str format + namespace: Namespace to use from the index. Returns: List of Documents most similar to the query """ docs_and_scores = await self.asimilarity_search_with_score( - query, k=k, filter=filter, **kwargs + query, k=k, filter=filter, namespace=namespace, **kwargs ) return [doc for doc, _ in docs_and_scores] @@ -526,6 +590,8 @@ def similarity_search_by_vector( embedding: Union[List[float], str], k: int = 4, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """Return documents closest to the given embedding. @@ -534,12 +600,13 @@ def similarity_search_by_vector( embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Optional metadata filter in str format + namespace: Namespace to use from the index. Returns: List of Documents most similar to the query """ docs_and_scores = self.similarity_search_by_vector_with_score( - embedding, k=k, filter=filter, **kwargs + embedding, k=k, filter=filter, namespace=namespace, **kwargs ) return [doc for doc, _ in docs_and_scores] @@ -548,6 +615,8 @@ async def asimilarity_search_by_vector( embedding: Union[List[float], str], k: int = 4, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """Return documents closest to the given embedding. @@ -556,12 +625,13 @@ async def asimilarity_search_by_vector( embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Optional metadata filter in str format + namespace: Namespace to use from the index. Returns: List of Documents most similar to the query """ docs_and_scores = await self.asimilarity_search_by_vector_with_score( - embedding, k=k, filter=filter, **kwargs + embedding, k=k, filter=filter, namespace=namespace, **kwargs ) return [doc for doc, _ in docs_and_scores] @@ -570,25 +640,31 @@ def _similarity_search_with_relevance_scores( query: str, k: int = 4, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """ Since Upstash always returns relevance scores, default implementation is used. """ - return self.similarity_search_with_score(query, k=k, filter=filter, **kwargs) + return self.similarity_search_with_score( + query, k=k, filter=filter, namespace=namespace, **kwargs + ) async def _asimilarity_search_with_relevance_scores( self, query: str, k: int = 4, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """ Since Upstash always returns relevance scores, default implementation is used. """ return await self.asimilarity_search_with_score( - query, k=k, filter=filter, **kwargs + query, k=k, filter=filter, namespace=namespace, **kwargs ) def max_marginal_relevance_search_by_vector( @@ -598,6 +674,8 @@ def max_marginal_relevance_search_by_vector( fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """Return docs selected using the maximal marginal relevance. @@ -614,10 +692,14 @@ def max_marginal_relevance_search_by_vector( to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter: Optional metadata filter in str format + namespace: Namespace to use from the index. Returns: List of Documents selected by maximal marginal relevance. """ + if namespace is None: + namespace = self._namespace + assert isinstance(self.embeddings, Embeddings) if isinstance(embedding, str): results = self._index.query( @@ -626,6 +708,7 @@ def max_marginal_relevance_search_by_vector( include_vectors=True, include_metadata=True, filter=filter or "", + namespace=namespace, **kwargs, ) else: @@ -635,6 +718,7 @@ def max_marginal_relevance_search_by_vector( include_vectors=True, include_metadata=True, filter=filter or "", + namespace=namespace, **kwargs, ) @@ -657,6 +741,8 @@ async def amax_marginal_relevance_search_by_vector( fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """Return docs selected using the maximal marginal relevance. @@ -673,10 +759,15 @@ async def amax_marginal_relevance_search_by_vector( to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter: Optional metadata filter in str format + namespace: Namespace to use from the index. Returns: List of Documents selected by maximal marginal relevance. """ + + if namespace is None: + namespace = self._namespace + assert isinstance(self.embeddings, Embeddings) if isinstance(embedding, str): results = await self._async_index.query( @@ -685,6 +776,7 @@ async def amax_marginal_relevance_search_by_vector( include_vectors=True, include_metadata=True, filter=filter or "", + namespace=namespace, **kwargs, ) else: @@ -694,6 +786,7 @@ async def amax_marginal_relevance_search_by_vector( include_vectors=True, include_metadata=True, filter=filter or "", + namespace=namespace, **kwargs, ) @@ -716,6 +809,8 @@ def max_marginal_relevance_search( fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """Return docs selected using the maximal marginal relevance. @@ -732,6 +827,7 @@ def max_marginal_relevance_search( to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter: Optional metadata filter in str format + namespace: Namespace to use from the index. Returns: List of Documents selected by maximal marginal relevance. @@ -743,6 +839,7 @@ def max_marginal_relevance_search( fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter, + namespace=namespace, **kwargs, ) @@ -753,6 +850,8 @@ async def amax_marginal_relevance_search( fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[str] = None, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> List[Document]: """Return docs selected using the maximal marginal relevance. @@ -769,6 +868,7 @@ async def amax_marginal_relevance_search( to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter: Optional metadata filter in str format + namespace: Namespace to use from the index. Returns: List of Documents selected by maximal marginal relevance. @@ -780,6 +880,7 @@ async def amax_marginal_relevance_search( fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter, + namespace=namespace, **kwargs, ) @@ -797,6 +898,8 @@ def from_texts( async_index: Optional[AsyncIndex] = None, index_url: Optional[str] = None, index_token: Optional[str] = None, + *, + namespace: str = "", **kwargs: Any, ) -> UpstashVectorStore: """Create a new UpstashVectorStore from a list of texts. @@ -819,6 +922,7 @@ def from_texts( async_index=async_index, index_url=index_url, index_token=index_token, + namespace=namespace, **kwargs, ) @@ -828,6 +932,7 @@ def from_texts( ids=ids, batch_size=batch_size, embedding_chunk_size=embedding_chunk_size, + namespace=namespace, ) return vector_store @@ -845,6 +950,8 @@ async def afrom_texts( async_index: Optional[AsyncIndex] = None, index_url: Optional[str] = None, index_token: Optional[str] = None, + *, + namespace: str = "", **kwargs: Any, ) -> UpstashVectorStore: """Create a new UpstashVectorStore from a list of texts. @@ -865,6 +972,7 @@ async def afrom_texts( text_key=text_key, index=index, async_index=async_index, + namespace=namespace, index_url=index_url, index_token=index_token, **kwargs, @@ -875,6 +983,7 @@ async def afrom_texts( metadatas=metadatas, ids=ids, batch_size=batch_size, + namespace=namespace, embedding_chunk_size=embedding_chunk_size, ) return vector_store @@ -884,6 +993,8 @@ def delete( ids: Optional[List[str]] = None, delete_all: Optional[bool] = None, batch_size: Optional[int] = 1000, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> None: """Delete by vector IDs @@ -892,14 +1003,17 @@ def delete( ids: List of ids to delete. delete_all: Delete all vectors in the index. batch_size: Batch size to use when deleting the embeddings. + namespace: Namespace to use from the index. Upstash supports at max 1000 deletions per request. """ + if namespace is None: + namespace = self._namespace if delete_all: - self._index.reset() + self._index.reset(namespace=namespace) elif ids is not None: for batch in batch_iterate(batch_size, ids): - self._index.delete(ids=batch) + self._index.delete(ids=batch, namespace=namespace) else: raise ValueError("Either ids or delete_all should be provided") @@ -910,6 +1024,8 @@ async def adelete( ids: Optional[List[str]] = None, delete_all: Optional[bool] = None, batch_size: Optional[int] = 1000, + *, + namespace: Optional[str] = None, **kwargs: Any, ) -> None: """Delete by vector IDs @@ -918,14 +1034,17 @@ async def adelete( ids: List of ids to delete. delete_all: Delete all vectors in the index. batch_size: Batch size to use when deleting the embeddings. + namespace: Namespace to use from the index. Upstash supports at max 1000 deletions per request. """ + if namespace is None: + namespace = self._namespace if delete_all: - await self._async_index.reset() + await self._async_index.reset(namespace=namespace) elif ids is not None: for batch in batch_iterate(batch_size, ids): - await self._async_index.delete(ids=batch) + await self._async_index.delete(ids=batch, namespace=namespace) else: raise ValueError("Either ids or delete_all should be provided")