-
Notifications
You must be signed in to change notification settings - Fork 5.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat omniparse #1408
Feat omniparse #1408
Changes from all commits
22b9990
5287e02
79334de
758acf8
f9d3a8c
6c39c80
340e148
8a4e8f8
a771112
015212d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
*.ico binary | ||
*.jpeg binary | ||
*.mp3 binary | ||
*.mp4 binary | ||
*.zip binary | ||
*.bin binary | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import asyncio | ||
|
||
from metagpt.config2 import config | ||
from metagpt.const import EXAMPLE_DATA_PATH | ||
from metagpt.logs import logger | ||
from metagpt.rag.parsers import OmniParse | ||
from metagpt.rag.schema import OmniParseOptions, OmniParseType, ParseResultType | ||
from metagpt.utils.omniparse_client import OmniParseClient | ||
|
||
TEST_DOCX = EXAMPLE_DATA_PATH / "omniparse/test01.docx" | ||
TEST_PDF = EXAMPLE_DATA_PATH / "omniparse/test02.pdf" | ||
TEST_VIDEO = EXAMPLE_DATA_PATH / "omniparse/test03.mp4" | ||
TEST_AUDIO = EXAMPLE_DATA_PATH / "omniparse/test04.mp3" | ||
|
||
|
||
async def omniparse_client_example(): | ||
client = OmniParseClient(base_url=config.omniparse.base_url) | ||
|
||
# docx | ||
with open(TEST_DOCX, "rb") as f: | ||
file_input = f.read() | ||
document_parse_ret = await client.parse_document(file_input=file_input, bytes_filename="test_01.docx") | ||
logger.info(document_parse_ret) | ||
|
||
pdf_parse_ret = await client.parse_pdf(file_input=TEST_PDF) | ||
logger.info(pdf_parse_ret) | ||
|
||
# video | ||
video_parse_ret = await client.parse_video(file_input=TEST_VIDEO) | ||
logger.info(video_parse_ret) | ||
|
||
# audio | ||
audio_parse_ret = await client.parse_audio(file_input=TEST_AUDIO) | ||
logger.info(audio_parse_ret) | ||
|
||
|
||
async def omniparse_example(): | ||
parser = OmniParse( | ||
api_key=config.omniparse.api_key, | ||
base_url=config.omniparse.base_url, | ||
parse_options=OmniParseOptions( | ||
parse_type=OmniParseType.PDF, | ||
result_type=ParseResultType.MD, | ||
max_timeout=120, | ||
num_workers=3, | ||
), | ||
) | ||
ret = parser.load_data(file_path=TEST_PDF) | ||
logger.info(ret) | ||
|
||
file_paths = [TEST_DOCX, TEST_PDF] | ||
parser.parse_type = OmniParseType.DOCUMENT | ||
ret = await parser.aload_data(file_path=file_paths) | ||
logger.info(ret) | ||
|
||
|
||
async def main(): | ||
await omniparse_client_example() | ||
await omniparse_example() | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from metagpt.utils.yaml_model import YamlModel | ||
|
||
|
||
class OmniParseConfig(YamlModel): | ||
api_key: str = "" | ||
base_url: str = "" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
from llama_index.core.node_parser import SentenceSplitter | ||
from llama_index.core.postprocessor.types import BaseNodePostprocessor | ||
from llama_index.core.query_engine import RetrieverQueryEngine | ||
from llama_index.core.readers.base import BaseReader | ||
from llama_index.core.response_synthesizers import ( | ||
BaseSynthesizer, | ||
get_response_synthesizer, | ||
|
@@ -28,6 +29,7 @@ | |
TransformComponent, | ||
) | ||
|
||
from metagpt.config2 import config | ||
from metagpt.rag.factories import ( | ||
get_index, | ||
get_rag_embedding, | ||
|
@@ -36,6 +38,7 @@ | |
get_retriever, | ||
) | ||
from metagpt.rag.interface import NoEmbedding, RAGObject | ||
from metagpt.rag.parsers import OmniParse | ||
from metagpt.rag.retrievers.base import ModifiableRAGRetriever, PersistableRAGRetriever | ||
from metagpt.rag.retrievers.hybrid_retriever import SimpleHybridRetriever | ||
from metagpt.rag.schema import ( | ||
|
@@ -44,6 +47,9 @@ | |
BaseRetrieverConfig, | ||
BM25RetrieverConfig, | ||
ObjectNode, | ||
OmniParseOptions, | ||
OmniParseType, | ||
ParseResultType, | ||
) | ||
from metagpt.utils.common import import_class | ||
|
||
|
@@ -100,7 +106,10 @@ def from_docs( | |
if not input_dir and not input_files: | ||
raise ValueError("Must provide either `input_dir` or `input_files`.") | ||
|
||
documents = SimpleDirectoryReader(input_dir=input_dir, input_files=input_files).load_data() | ||
file_extractor = cls._get_file_extractor() | ||
documents = SimpleDirectoryReader( | ||
input_dir=input_dir, input_files=input_files, file_extractor=file_extractor | ||
).load_data() | ||
cls._fix_document_metadata(documents) | ||
|
||
transformations = transformations or cls._default_transformations() | ||
|
@@ -301,3 +310,23 @@ def _resolve_embed_model(embed_model: BaseEmbedding = None, configs: list[Any] = | |
@staticmethod | ||
def _default_transformations(): | ||
return [SentenceSplitter()] | ||
|
||
@staticmethod | ||
def _get_file_extractor() -> dict[str:BaseReader]: | ||
""" | ||
Get the file extractor. | ||
Currently, only PDF use OmniParse. Other document types use the built-in reader from llama_index. | ||
|
||
Returns: | ||
dict[file_type: BaseReader] | ||
""" | ||
file_extractor: dict[str:BaseReader] = {} | ||
if config.omniparse.base_url: | ||
pdf_parser = OmniParse( | ||
api_key=config.omniparse.api_key, | ||
base_url=config.omniparse.base_url, | ||
parse_options=OmniParseOptions(parse_type=OmniParseType.PDF, result_type=ParseResultType.MD), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No impact, use @staticmethod
def _get_file_extractor() -> dict[str:BaseReader]:
"""
Get the file extractor.
Currently, only PDF use OmniParse. Other document types use the built-in reader from llama_index.
Returns:
dict[file_type: BaseReader]
"""
file_extractor: dict[str:BaseReader] = {}
if config.omniparse.base_url:
pdf_parser = OmniParse(
api_key=config.omniparse.api_key,
base_url=config.omniparse.base_url,
parse_options=OmniParseOptions(parse_type=OmniParseType.PDF, result_type=ParseResultType.MD),
)
file_extractor[".pdf"] = pdf_parser
return file_extractor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. file_extractor = cls._get_file_extractor()
documents = SimpleDirectoryReader(
input_dir=input_dir, input_files=input_files, file_extractor=file_extractor
).load_data() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you config with omniparse.base_url, you will use pdf_parser from OmniParse. But what if files are docs in input_files. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
file_suffix = input_file.suffix.lower()
if file_suffix in default_file_reader_suffix or file_suffix in file_extractor:
# use file readers
if file_suffix not in file_extractor:
# instantiate file reader if not already
reader_cls = default_file_reader_cls[file_suffix]
file_extractor[file_suffix] = reader_cls()
reader = file_extractor[file_suffix]
# load data -- catch all errors except for ImportError
try:
kwargs = {"extra_info": metadata}
if fs and not is_default_fs(fs):
kwargs["fs"] = fs
docs = reader.load_data(input_file, **kwargs) |
||
) | ||
file_extractor[".pdf"] = pdf_parser | ||
|
||
return file_extractor |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from metagpt.rag.parsers.omniparse import OmniParse | ||
|
||
__all__ = ["OmniParse"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
import asyncio | ||
from fileinput import FileInput | ||
from pathlib import Path | ||
from typing import List, Optional, Union | ||
|
||
from llama_index.core import Document | ||
from llama_index.core.async_utils import run_jobs | ||
from llama_index.core.readers.base import BaseReader | ||
|
||
from metagpt.logs import logger | ||
from metagpt.rag.schema import OmniParseOptions, OmniParseType, ParseResultType | ||
from metagpt.utils.async_helper import NestAsyncio | ||
from metagpt.utils.omniparse_client import OmniParseClient | ||
|
||
|
||
class OmniParse(BaseReader): | ||
"""OmniParse""" | ||
|
||
def __init__( | ||
self, api_key: str = None, base_url: str = "http://localhost:8000", parse_options: OmniParseOptions = None | ||
): | ||
""" | ||
Args: | ||
api_key: Default None, can be used for authentication later. | ||
base_url: OmniParse Base URL for the API. | ||
parse_options: Optional settings for OmniParse. Default is OmniParseOptions with default values. | ||
""" | ||
self.parse_options = parse_options or OmniParseOptions() | ||
self.omniparse_client = OmniParseClient(api_key, base_url, max_timeout=self.parse_options.max_timeout) | ||
|
||
@property | ||
def parse_type(self): | ||
return self.parse_options.parse_type | ||
|
||
@property | ||
def result_type(self): | ||
return self.parse_options.result_type | ||
|
||
@parse_type.setter | ||
def parse_type(self, parse_type: Union[str, OmniParseType]): | ||
if isinstance(parse_type, str): | ||
parse_type = OmniParseType(parse_type) | ||
self.parse_options.parse_type = parse_type | ||
|
||
@result_type.setter | ||
def result_type(self, result_type: Union[str, ParseResultType]): | ||
if isinstance(result_type, str): | ||
result_type = ParseResultType(result_type) | ||
self.parse_options.result_type = result_type | ||
|
||
async def _aload_data( | ||
self, | ||
file_path: Union[str, bytes, Path], | ||
extra_info: Optional[dict] = None, | ||
) -> List[Document]: | ||
""" | ||
Load data from the input file_path. | ||
|
||
Args: | ||
file_path: File path or file byte data. | ||
extra_info: Optional dictionary containing additional information. | ||
|
||
Returns: | ||
List[Document] | ||
""" | ||
try: | ||
if self.parse_type == OmniParseType.PDF: | ||
# pdf parse | ||
parsed_result = await self.omniparse_client.parse_pdf(file_path) | ||
else: | ||
# other parse use omniparse_client.parse_document | ||
# For compatible byte data, additional filename is required | ||
extra_info = extra_info or {} | ||
filename = extra_info.get("filename") | ||
parsed_result = await self.omniparse_client.parse_document(file_path, bytes_filename=filename) | ||
|
||
# Get the specified structured data based on result_type | ||
content = getattr(parsed_result, self.result_type) | ||
docs = [ | ||
Document( | ||
text=content, | ||
metadata=extra_info or {}, | ||
) | ||
] | ||
except Exception as e: | ||
logger.error(f"OMNI Parse Error: {e}") | ||
docs = [] | ||
|
||
return docs | ||
|
||
async def aload_data( | ||
self, | ||
file_path: Union[List[FileInput], FileInput], | ||
extra_info: Optional[dict] = None, | ||
) -> List[Document]: | ||
""" | ||
Load data from the input file_path. | ||
|
||
Args: | ||
file_path: File path or file byte data. | ||
extra_info: Optional dictionary containing additional information. | ||
|
||
Notes: | ||
This method ultimately calls _aload_data for processing. | ||
|
||
Returns: | ||
List[Document] | ||
""" | ||
docs = [] | ||
if isinstance(file_path, (str, bytes, Path)): | ||
# Processing single file | ||
docs = await self._aload_data(file_path, extra_info) | ||
elif isinstance(file_path, list): | ||
# Concurrently process multiple files | ||
parse_jobs = [self._aload_data(file_item, extra_info) for file_item in file_path] | ||
doc_ret_list = await run_jobs(jobs=parse_jobs, workers=self.parse_options.num_workers) | ||
docs = [doc for docs in doc_ret_list for doc in docs] | ||
return docs | ||
|
||
def load_data( | ||
self, | ||
file_path: Union[List[FileInput], FileInput], | ||
extra_info: Optional[dict] = None, | ||
) -> List[Document]: | ||
""" | ||
Load data from the input file_path. | ||
|
||
Args: | ||
file_path: File path or file byte data. | ||
extra_info: Optional dictionary containing additional information. | ||
|
||
Notes: | ||
This method ultimately calls aload_data for processing. | ||
|
||
Returns: | ||
List[Document] | ||
""" | ||
NestAsyncio.apply_once() # Ensure compatibility with nested async calls | ||
return asyncio.run(self.aload_data(file_path, extra_info)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we only use
parse_file
and route to different_parse_pdf
and so on inside?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact,
parse_document
has already implemented this function, but it is more troublesome to be compatible with multiple file formats. It requires the file's triple (filename, file_bytes, mime_type) information.