Skip to content

Commit

Permalink
feat: add upload from object storage (#113)
Browse files Browse the repository at this point in the history
Signed-off-by: Tiago Santana <54704492+SantanaTiago@users.noreply.github.com>
  • Loading branch information
SantanaTiago authored Jul 26, 2023
1 parent aaf0d10 commit e010abc
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 15 deletions.
8 changes: 8 additions & 0 deletions deepsearch/cps/cli/cli_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@
multiple urls separated by empty lines.""",
)

COORDINATES_PATH = typer.Option(
None,
"--coordinates-file",
"-c",
help="""Provide absolute path to local json file
containing coordinates of COS.""",
)

SOURCE_PATH = typer.Option(
None,
"--input-file",
Expand Down
26 changes: 23 additions & 3 deletions deepsearch/cps/cli/data_indices_typer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
from enum import Enum
from pathlib import Path
from typing import List
from typing import List, Optional

import typer

Expand All @@ -10,13 +11,15 @@
from deepsearch.cps.cli.cli_options import (
ATTACHMENT_KEY,
ATTACHMENT_PATH,
COORDINATES_PATH,
INDEX_ITEM_ID,
INDEX_KEY,
PROJ_KEY,
SOURCE_PATH,
URL,
)
from deepsearch.cps.client.api import CpsApi
from deepsearch.cps.client.components.data_indices import S3Coordinates
from deepsearch.cps.client.components.elastic import ElasticProjectDataCollectionSource
from deepsearch.cps.data_indices import utils
from deepsearch.documents.core.common_routines import ERROR_MSG
Expand Down Expand Up @@ -131,19 +134,36 @@ def upload_files(
url: str = URL,
local_file: Path = SOURCE_PATH,
index_key: str = INDEX_KEY,
s3_coordinates: Path = COORDINATES_PATH,
):
"""
Upload pdfs, zips, or online documents to a data index in a project
"""

api = CpsApi.from_env()

urls = None
if url is not None:
p = Path(url)
urls = get_urls(p) if p.exists() else [url]

cos_coordinates: Optional[S3Coordinates] = None
if s3_coordinates is not None:
try:
cos_coordinates = json.load(open(s3_coordinates))
except Exception as e:
typer.echo(f"Error occurred: {e}")
typer.echo(ERROR_MSG)
raise typer.Abort()

coords = ElasticProjectDataCollectionSource(proj_key=proj_key, index_key=index_key)
utils.upload_files(coords=coords, url=url, local_file=local_file)
return
utils.upload_files(
api=api,
coords=coords,
url=urls,
local_file=local_file,
s3_coordinates=cos_coordinates,
)


@app.command(
Expand Down
14 changes: 13 additions & 1 deletion deepsearch/cps/client/components/data_indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def delete(
def upload_file(
self,
coords: ElasticProjectDataCollectionSource,
body: Dict[str, List[str]],
body: Union[Dict[str, List[str]], Dict[str, Dict[str, S3Coordinates]]],
) -> str:
"""
Call api for converting and uploading file to a project's data index.
Expand Down Expand Up @@ -197,3 +197,15 @@ def add_item_attachment(
@dataclass
class CpsApiDataIndex(ApiConnectedObject):
project: str


class S3Coordinates(BaseModel):
host: str
port: int
ssl: bool
verify_ssl: bool
access_key: str
secret_key: str
bucket: str
location: str
key_prefix: str = ""
62 changes: 51 additions & 11 deletions deepsearch/cps/data_indices/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from tqdm import tqdm

from deepsearch.cps.client.api import CpsApi
from deepsearch.cps.client.components.data_indices import S3Coordinates
from deepsearch.cps.client.components.elastic import ElasticProjectDataCollectionSource
from deepsearch.documents.core import convert, input_process
from deepsearch.documents.core.common_routines import progressbar, success_message
Expand All @@ -19,39 +20,41 @@


def upload_files(
api: CpsApi,
coords: ElasticProjectDataCollectionSource,
url: Optional[Union[str, List[str]]] = None,
local_file: Optional[Union[str, Path]] = None,
api: Optional[CpsApi] = None,
s3_coordinates: Optional[S3Coordinates] = None,
):
"""
Orchestrate document conversion and upload to an index in a project
"""

# initialize default Api if not specified
if api is None:
api = CpsApi.from_env()

# check required inputs are present
if url is None and local_file is None:
if url is None and local_file is None and s3_coordinates is None:
raise ValueError(
"No input provided. Please provide either a url or a local file for conversion."
"No input provided. Please provide either a url, a local file, or coordinates to COS."
)
elif url is not None and local_file is None:
elif url is not None and local_file is None and s3_coordinates is None:
if isinstance(url, str):
urls = [url]
else:
urls = url

return process_url_input(api=api, coords=coords, urls=urls)
elif url is None and local_file is not None:
elif url is None and local_file is not None and s3_coordinates is None:
return process_local_file(
api=api,
coords=coords,
local_file=Path(local_file),
)

raise ValueError("Please provide only one input: url or local file.")
elif url is None and local_file is None and s3_coordinates is not None:
return process_external_cos(
api=api, coords=coords, s3_coordinates=s3_coordinates
)
raise ValueError(
"Please provide only one input: url, local file, or coordinates to COS."
)


def process_url_input(
Expand Down Expand Up @@ -158,3 +161,40 @@ def process_local_file(
print(success_message)
cleanup(root_dir=root_dir)
return


def process_external_cos(
api: CpsApi,
coords: ElasticProjectDataCollectionSource,
s3_coordinates: S3Coordinates,
progress_bar=False,
):
"""
Individual files are processed before upload.
"""
# container for task_ids
task_ids = []

with tqdm(
total=1,
desc=f"{'Submitting input:': <{progressbar.padding}}",
disable=not (progress_bar),
colour=progressbar.colour,
bar_format=progressbar.bar_format,
) as progress:
# upload using coordinates
payload = {"s3_source": {"coordinates": s3_coordinates}}
task_id = api.data_indices.upload_file(
coords=coords,
body=payload,
)
task_ids.append(task_id)
progress.update(1)

# check status of running tasks
# TODO: add failure handling
statuses = convert.check_cps_status_running_tasks(
api=api, cps_proj_key=coords.proj_key, task_ids=task_ids
)
print(success_message)
return

0 comments on commit e010abc

Please sign in to comment.