Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Indexing Pipeline #6424

Merged
merged 7 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions examples/getting_started/indexing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pathlib import Path

from haystack.document_stores import InMemoryDocumentStore
from haystack.pipeline_utils import build_indexing_pipeline

# We support many different databases. Here we load a simple and lightweight in-memory document store.
document_store = InMemoryDocumentStore()

# Let's now build indexing pipeline that indexes PDFs and text files from a test folder.
indexing_pipeline = build_indexing_pipeline(
document_store=document_store, embedding_model="sentence-transformers/all-mpnet-base-v2"
)
result = indexing_pipeline.run(files=list(Path("../../test/test_files").iterdir()))
print(result)
3 changes: 2 additions & 1 deletion haystack/pipeline_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from haystack.pipeline_utils.rag import build_rag_pipeline
from haystack.pipeline_utils.indexing import build_indexing_pipeline

__all__ = ["build_rag_pipeline"]
__all__ = ["build_rag_pipeline", "build_indexing_pipeline"]
223 changes: 223 additions & 0 deletions haystack/pipeline_utils/indexing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import inspect
import os
import re
from pathlib import Path
from typing import Optional, List, Any, Dict
from typing import Union, Type

from haystack.document_stores.protocol import DocumentStore

from haystack import Pipeline
from haystack.components.converters import TextFileToDocument
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, OpenAIDocumentEmbedder
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.routers import FileTypeRouter, DocumentJoiner
from haystack.components.writers import DocumentWriter


def build_indexing_pipeline(
document_store: Any,
embedding_model: Optional[str] = None,
embedding_model_kwargs: Optional[Dict[str, Any]] = None,
supported_mime_types: Optional[List[str]] = None,
):
"""
Returns a prebuilt pipeline for indexing documents into a DocumentStore. Indexing pipeline automatically detects
the file type of the input files and converts them into Documents. The supported file types are: .txt,
.pdf, and .html

Example usage:

```python
from haystack.utils import build_indexing_pipeline
indexing_pipe = build_indexing_pipeline(document_store=your_document_store_instance)
indexing_pipe.run(files=["path/to/file1", "path/to/file2"])
>>> {'documents_written': 2}
```

One can also pass an embedding model to the pipeline, which will then calculate embeddings for the documents
and store them in the DocumentStore. Example usage:
```python
indexing_pipe = build_indexing_pipeline(document_store=your_document_store_instance,
embedding_model="sentence-transformers/all-mpnet-base-v2")
indexing_pipe.run(files=["path/to/file1", "path/to/file2"])
>>> {'documents_written': 2}
```

After running indexing pipeline, the documents are indexed in the DocumentStore and can be used for querying.


:param document_store: An instance of a DocumentStore to index documents into.
:param embedding_model: The name of the model to use for document embeddings.
:param embedding_model_kwargs: Keyword arguments to pass to the embedding model class.
:param supported_mime_types: List of MIME types to support in the pipeline. If not given,
defaults to ["text/plain", "application/pdf", "text/html"].

"""
return _IndexingPipeline(
document_store=document_store,
embedding_model=embedding_model,
embedding_model_kwargs=embedding_model_kwargs,
supported_mime_types=supported_mime_types,
)


class _IndexingPipeline:
"""
An internal class to simplify creation of prebuilt pipeline for indexing documents into a DocumentStore. Indexing
pipeline automatically detect the file type of the input files and converts them into Documents. The supported
file types are: .txt, .pdf, and .html
"""

def __init__(
self,
document_store: DocumentStore,
embedding_model: Optional[str] = None,
embedding_model_kwargs: Optional[Dict[str, Any]] = None,
supported_mime_types: Optional[List[str]] = None,
):
"""
:param document_store: An instance of a DocumentStore to index documents into.
:param embedding_model: The name of the model to use for document embeddings.
:param supported_mime_types: List of MIME types to support in the pipeline. If not given,
defaults to ["text/plain", "application/pdf", "text/html"].
"""

if supported_mime_types is None:
supported_mime_types = ["text/plain", "application/pdf", "text/html"]

self.pipeline = Pipeline()
self.pipeline.add_component("file_type_router", FileTypeRouter(mime_types=supported_mime_types))
converters_used: List[str] = []
# Add converters dynamically based on MIME types
if "text/plain" in supported_mime_types:
self.pipeline.add_component("text_file_converter", TextFileToDocument())
self.pipeline.connect("file_type_router.text/plain", "text_file_converter.sources")
converters_used.append("text_file_converter")

if "application/pdf" in supported_mime_types:
from haystack.components.converters import PyPDFToDocument

self.pipeline.add_component("pdf_file_converter", PyPDFToDocument())
self.pipeline.connect("file_type_router.application/pdf", "pdf_file_converter.sources")
converters_used.append("pdf_file_converter")

if "text/html" in supported_mime_types:
from haystack.components.converters import HTMLToDocument

self.pipeline.add_component("html_file_converter", HTMLToDocument())
self.pipeline.connect("file_type_router.text/html", "html_file_converter.sources")
converters_used.append("html_file_converter")

# Add remaining common components
self.pipeline.add_component("document_joiner", DocumentJoiner())
self.pipeline.add_component("document_cleaner", DocumentCleaner())
self.pipeline.add_component("document_splitter", DocumentSplitter())

# Connect converters to joiner, if they exist
for converter_name in converters_used:
self.pipeline.connect(f"{converter_name}.documents", "document_joiner.documents")

# Connect joiner to cleaner and splitter
self.pipeline.connect("document_joiner.documents", "document_cleaner.documents")
self.pipeline.connect("document_cleaner.documents", "document_splitter.documents")

if embedding_model:
embedder_instance = self._find_embedder(embedding_model, embedding_model_kwargs)
self.pipeline.add_component("storage_sink", DocumentWriter(document_store=document_store))
self.pipeline.add_component("writer", embedder_instance)
self.pipeline.connect("writer", "storage_sink")
else:
self.pipeline.add_component("writer", DocumentWriter(document_store=document_store))

self.pipeline.connect("document_splitter.documents", "writer.documents")

# this is more of a sanity check for the maintainer of the pipeline, to make sure that the pipeline is
# configured correctly
if len(self.pipeline.inputs()) < 1:
raise RuntimeError("IndexingPipeline needs at least one input component.")
if len(self.pipeline.outputs()) < 1:
raise RuntimeError("IndexingPipeline needs at least one output component.")

def run(self, files: List[Union[str, Path]]) -> Dict[str, Any]:
"""
Performs indexing of the given list of documents into the DocumentStore.
:param files: A list of paths to files to index.
:type files: List[Union[str, Path]]

:return: the output of the pipeline run, which is a dictionary containing the number of documents written
"""
if not files:
return {"documents_written": 0}
input_files = self._process_files(files)
pipeline_output = self.pipeline.run(data={"file_type_router": {"sources": input_files}})
aggregated_results = {}
# combine the results of all outputs into one dictionary
for component_result in pipeline_output.values():
aggregated_results.update(component_result)
return aggregated_results

def _find_embedder(self, embedding_model: str, init_kwargs: Optional[Dict[str, Any]] = None) -> Any:
embedder_patterns = {
r"^text-embedding.*": OpenAIDocumentEmbedder,
r"^sentence-transformers.*": SentenceTransformersDocumentEmbedder,
# add more patterns or adjust them here
}
embedder_class = next((val for pat, val in embedder_patterns.items() if re.match(pat, embedding_model)), None)
if not embedder_class:
raise ValueError(
f"Could not find an embedder for the given embedding model name {embedding_model}. "
f"Please provide a valid embedding model name. "
f"Valid embedder classes are {embedder_patterns.values()}."
)
return self._create_embedder(embedder_class, embedding_model, init_kwargs)

def _create_embedder(
self, embedder_class: Type, model_name: str, init_kwargs: Optional[Dict[str, Any]] = None
) -> Any:
init_signature = inspect.signature(embedder_class.__init__)

kwargs = {**(init_kwargs or {})}

# Determine the correct parameter name and set it
if "model_name_or_path" in init_signature.parameters:
kwargs["model_name_or_path"] = model_name
elif "model_name" in init_signature.parameters:
kwargs["model_name"] = model_name
else:
raise ValueError(f"Could not find a parameter for the model name in the embedder class {embedder_class}")

# Instantiate the class
return embedder_class(**kwargs)

def _list_files_recursively(self, path: Union[str, Path]) -> List[str]:
"""
List all files in a directory recursively as a list of strings, or return the file itself
if it's not a directory.
:param path: the path to list files from
:type path: Union[str, Path]
:return: a list of strings, where each string is a path to a file
"""

if os.path.isfile(path):
return [str(path)]
elif os.path.isdir(path):
file_list: List[str] = []
for root, _, files in os.walk(path):
for file in files:
file_list.append(os.path.join(root, file))
return file_list
else:
return []

def _process_files(self, files: List[Union[str, Path]]) -> List[str]:
"""
Process a list of files and directories, listing all files recursively and removing duplicates.
:param files: A list of files and directories to process.
:type files: List[Union[str, Path]]
:return: A list of unique files.
"""
nested_file_lists = [self._list_files_recursively(file) for file in files]
combined_files = [item for sublist in nested_file_lists for item in sublist]
unique_files = list(set(combined_files))
return unique_files
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
Add a indexing `build_indexing_pipeline` utility function
69 changes: 69 additions & 0 deletions test/pipelines/test_indexing_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pytest

from haystack.pipeline_utils.indexing import build_indexing_pipeline
from haystack.document_stores import InMemoryDocumentStore


class TestIndexingPipeline:
# indexing files without embeddings
def test_indexing_files_without_embeddings(self, test_files_path):
file_paths = [test_files_path / "txt" / "doc_1.txt", test_files_path / "txt" / "doc_2.txt"]
document_store = InMemoryDocumentStore()
pipeline = build_indexing_pipeline(document_store=document_store)
result = pipeline.run(files=file_paths)
assert result == {"documents_written": 2}

# indexing files with embeddings
@pytest.mark.integration
def test_indexing_files_with_embeddings(self, test_files_path):
document_store = InMemoryDocumentStore()
pipeline = build_indexing_pipeline(
document_store=document_store, embedding_model="sentence-transformers/all-mpnet-base-v2"
)
file_paths = [test_files_path / "txt" / "doc_1.txt", test_files_path / "txt" / "doc_2.txt"]
result = pipeline.run(files=file_paths)
assert result == {"documents_written": 2}

@pytest.mark.integration
def test_indexing_dirs_with_embeddings(self, test_files_path):
document_store = InMemoryDocumentStore()
pipeline = build_indexing_pipeline(
document_store=document_store, embedding_model="sentence-transformers/all-mpnet-base-v2"
)
file_paths = [test_files_path / "txt"]
result = pipeline.run(files=file_paths)
assert "documents_written" in result
assert result["documents_written"] >= 3

# indexing multiple files
def test_indexing_multiple_file_types(self, test_files_path):
document_store = InMemoryDocumentStore()
pipeline = build_indexing_pipeline(document_store=document_store)
file_paths = [
test_files_path / "txt" / "doc_1.txt",
test_files_path / "txt" / "doc_2.txt",
test_files_path / "pdf" / "sample_pdf_1.pdf",
]
result = pipeline.run(files=file_paths)
# pdf gets split into 2 documents
assert result == {"documents_written": 4}

# indexing empty list of files
def test_indexing_empty_list_of_files(self):
document_store = InMemoryDocumentStore()
pipeline = build_indexing_pipeline(document_store=document_store)
result = pipeline.run(files=[])
assert result == {"documents_written": 0}

# embedding model is not found
def test_embedding_model_not_found(self):
document_store = InMemoryDocumentStore()
with pytest.raises(ValueError, match="Could not find an embedder"):
build_indexing_pipeline(document_store=document_store, embedding_model="invalid_model")

@pytest.mark.integration
def test_open_ai_embedding_model(self):
document_store = InMemoryDocumentStore()
pipe = build_indexing_pipeline(document_store=document_store, embedding_model="text-embedding-ada-002")
# don't run the pipeline and waste credits, just check that it was created correctly
assert pipe is not None