Skip to content

Commit

Permalink
Google Drive V2 Integration (#602)
Browse files Browse the repository at this point in the history
* first commit

* revert format pre commit

* revert flag

* set up pub/sub flow

* revert accidental remove

* saving progress

* nit

* save

* save

* save

* save

* save

* save

* save

* make lint

* a few nits re: error handling and database

* resolve circular imports and .env in configuration.yaml

* nit

* make merge work

* fix branch state

* make lint

* save

* make lint

* fix logger

* update docker compose

* save

* Adopt new logging

* sync_agent and sync_agent_activity jobs

* revert refresh

* fix publish script

* fix re-auth

* ensure files artifact sync works properly

* introduce consolidation logic

* Remove nits

* more nits

* nits

* trying to add  deployment for publisher

* save

* save

* nit

* comment

* save

* File Service: fix empty generator

* add fallback default GDrive option

* patch move for file artifacts

* trigger sync_agent at agent creation

* Logging

* Fixes

* Fix

* fixing

* Format

* testing issues

* Lint

---------

Co-authored-by: lusmoura <luisa@cohere.com>
Co-authored-by: Beatrix De Wilde <128378696+BeatrixCohere@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 15, 2024
1 parent a332c51 commit ee97d0f
Show file tree
Hide file tree
Showing 50 changed files with 3,541 additions and 1,390 deletions.
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ watch:

.PHONY: up
up:
@docker compose up --build
@docker compose up --build -d

.PHONY: down
down:
Expand All @@ -32,7 +32,7 @@ run-tests: run-unit-tests
attach:
@docker attach cohere-toolkit-backend-1
logs:
@docker compose logs -f backend
@@docker-compose logs --follow --tail 100 $(service)

.PHONY: exec-backend
exec-backend:
Expand Down Expand Up @@ -107,3 +107,7 @@ install-web:
.PHONY: build-web
build-web:
cd src/interfaces/coral_web && npm run build

.PHONY: dev-sync
dev-sync:
@docker compose up --build sync_worker sync_publisher flower -d
65 changes: 65 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,59 @@ services:
networks:
- proxynet

sync_worker:
build:
context: .
args:
DOCKER_BUILDKIT: 1
dockerfile: ./src/backend/Dockerfile
command: ["./src/backend/services/sync/executor.sh", "${ENVIRONMENT}"]
restart: unless-stopped
profiles:
- sync
develop:
watch:
- action: sync
path: ./src/backend
target: /workspace/src/backend
ignore:
- __pycache__/
- alembic/
- data/
stdin_open: true
tty: true
depends_on:
- db
- redis
networks:
- proxynet

sync_publisher:
build:
context: .
args:
DOCKER_BUILDKIT: 1
dockerfile: ./src/backend/services/sync/Dockerfile
restart: unless-stopped
profiles:
- sync
develop:
watch:
- action: sync
path: ./src/backend
target: /workspace/src/backend
ignore:
- __pycache__/
- alembic/
- data/
stdin_open: true
tty: true
depends_on:
- db
- redis
networks:
- proxynet

frontend:
build:
target: ${BUILD_TARGET:-prod}
Expand Down Expand Up @@ -119,6 +172,18 @@ services:
networks:
- proxynet

flower:
image: mher/flower
profiles:
- sync
environment:
- CELERY_BROKER_URL=redis://:redis@redis:6379
- FLOWER_PORT=5555
ports:
- 5555:5555
networks:
- proxynet

volumes:
db:
name: cohere_toolkit_db
Expand Down
2,270 changes: 1,399 additions & 871 deletions poetry.lock

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ psycopg2-binary = "^2.9.9"
python-multipart = "^0.0.9"
sse-starlette = "^2.0.0"
boto3 = "^1.0.0"
httpx = "^0.27.0"
httpx = {extras = ["http2"], version = "^0.27.0"}
chromadb = "^0.4.16"
cohere = "^5.5.7"
llama-index = "^0.10.11"
Expand All @@ -45,17 +45,20 @@ google_auth_oauthlib = "^1.2.0"
google-auth-httplib2 = "^0.2.0"
google-api-python-client = "^2.133.0"
openpyxl = "^3.1.5"
celery = {version = "^5.4.0", extras = ["gevent"]}
kombu = "^5.3.7"
watchdog = "^4.0.1"
redis = {extras = ["hiredis"], version = "^5.0.7"}
python-docx = "^1.1.2"
python-calamine = "^0.2.3"

# Compass dependencies - To be removed once Compass is OSS
structlog = "^24.4.0"
pyyaml = "^6.0.1"

[tool.poetry.group.compass]
optional = false

[tool.poetry.group.compass.dependencies]
# Compass dependencies - To be removed once Compass is OSS
fsspec = "2024.2.0"
joblib = "*"
pydantic = ">=2.6.3"
Expand Down
4 changes: 3 additions & 1 deletion src/backend/chat/collate.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ async def rerank_and_chunk(
text = output.get("text")

if not text:
chunked_outputs.append([output])
# If one doesn't have text, skip it
chunked_outputs = None
reranked_results[tool_call_hashable] = tool_result
continue

chunks = chunk(text)
Expand Down
7 changes: 3 additions & 4 deletions src/backend/compass_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from pydantic import BaseModel

from backend.compass_sdk.constants import (
COHERE_API_ENV_VAR,
DEFAULT_COMMANDR_EXTRACTABLE_ATTRIBUTES,
DEFAULT_COMMANDR_PROMPT,
DEFAULT_MAX_TOKENS_METADATA,
DEFAULT_MIN_CHARS_PER_ELEMENT,
DEFAULT_MIN_NUM_CHUNKS_IN_TITLE,
DEFAULT_MIN_NUM_TOKENS_CHUNK,
Expand All @@ -17,7 +17,6 @@
METADATA_HEURISTICS_ATTRIBUTES,
SKIP_INFER_TABLE_TYPES,
)
from backend.model_deployments.cohere_platform import CohereDeployment


class Logger:
Expand Down Expand Up @@ -237,7 +236,7 @@ class MetadataConfig(ValidatedModel):

pre_build_detectors: bool = False
metadata_strategy: MetadataStrategy = MetadataStrategy.No_Metadata
cohere_api_key: Optional[str] = CohereDeployment.api_key
cohere_api_key: Optional[str] = getenv(COHERE_API_ENV_VAR, None)
commandr_model_name: str = "command-r"
commandr_prompt: str = DEFAULT_COMMANDR_PROMPT
commandr_max_tokens: int = 500
Expand Down Expand Up @@ -313,7 +312,7 @@ class ParserConfig(ValidatedModel):
num_tokens_overlap: int = DEFAULT_NUM_TOKENS_CHUNK_OVERLAP
min_chunk_tokens: int = DEFAULT_MIN_NUM_TOKENS_CHUNK
num_chunks_in_title: int = DEFAULT_MIN_NUM_CHUNKS_IN_TITLE
max_tokens_metadata: int = DEFAULT_MAX_TOKENS_METADATA
max_tokens_metadata: int = 1000
include_tables: bool = True

# Formatting configuration
Expand Down
57 changes: 38 additions & 19 deletions src/backend/compass_sdk/compass.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@
PutDocumentsInput,
SearchFilter,
SearchInput,
logger,
)
from backend.compass_sdk.constants import (
DEFAULT_MAX_CHUNKS_PER_REQUEST,
DEFAULT_MAX_ERROR_RATE,
DEFAULT_MAX_RETRIES,
DEFAULT_SLEEP_RETRY_SECONDS,
)
from backend.services.logger.utils import LoggerFactory

logger = LoggerFactory().get_logger()


@dataclass
Expand Down Expand Up @@ -75,7 +77,8 @@ def __init__(
class CompassClient:
def __init__(
self,
index_url: str = "http://localhost:80",
*,
index_url: str,
username: Optional[str] = None,
password: Optional[str] = None,
logger_level: LoggerLevel = LoggerLevel.INFO,
Expand Down Expand Up @@ -115,9 +118,8 @@ def __init__(
"add_context": "/api/v1/indexes/{index_name}/documents/add_context/{doc_id}",
"refresh": "/api/v1/indexes/{index_name}/refresh",
}
logger.setLevel(logger_level.value)

def create_index(self, index_name: str):
def create_index(self, *, index_name: str):
"""
Create an index in Compass
:param index_name: the name of the index
Expand All @@ -130,7 +132,7 @@ def create_index(self, index_name: str):
sleep_retry_seconds=DEFAULT_SLEEP_RETRY_SECONDS,
)

def refresh(self, index_name: str):
def refresh(self, *, index_name: str):
"""
Refresh index
:param index_name: the name of the index
Expand All @@ -143,7 +145,7 @@ def refresh(self, index_name: str):
sleep_retry_seconds=DEFAULT_SLEEP_RETRY_SECONDS,
)

def delete_index(self, index_name: str):
def delete_index(self, *, index_name: str):
"""
Delete an index from Compass
:param index_name: the name of the index
Expand All @@ -156,7 +158,7 @@ def delete_index(self, index_name: str):
sleep_retry_seconds=DEFAULT_SLEEP_RETRY_SECONDS,
)

def delete_document(self, index_name: str, doc_id: str):
def delete_document(self, *, index_name: str, doc_id: str):
"""
Delete a document from Compass
:param index_name: the name of the index
Expand All @@ -171,7 +173,7 @@ def delete_document(self, index_name: str, doc_id: str):
sleep_retry_seconds=DEFAULT_SLEEP_RETRY_SECONDS,
)

def get_document(self, index_name: str, doc_id: str):
def get_document(self, *, index_name: str, doc_id: str):
"""
Get a document from Compass
:param index_name: the name of the index
Expand Down Expand Up @@ -200,6 +202,7 @@ def list_indexes(self):

def add_context(
self,
*,
index_name: str,
doc_id: str,
context: Dict,
Expand Down Expand Up @@ -227,6 +230,7 @@ def add_context(

def insert_doc(
self,
*,
index_name: str,
doc: CompassDocument,
max_retries: int = DEFAULT_MAX_RETRIES,
Expand All @@ -246,7 +250,7 @@ def insert_doc(
sleep_retry_seconds=sleep_retry_seconds,
)

def insert_docs_batch(self, uuid: str, index_name: str):
def insert_docs_batch(self, *, uuid: str, index_name: str):
"""
Insert a batch of parsed documents into an index in Compass
:param uuid: the uuid of the batch
Expand All @@ -260,7 +264,7 @@ def insert_docs_batch(self, uuid: str, index_name: str):
sleep_retry_seconds=DEFAULT_SLEEP_RETRY_SECONDS,
)

def batch_status(self, uuid: str):
def batch_status(self, *, uuid: str):
"""
Get the status of a batch
:param uuid: the uuid of the batch
Expand All @@ -282,6 +286,7 @@ def batch_status(self, uuid: str):

def insert_docs(
self,
*,
index_name: str,
docs: Iterator[CompassDocument],
max_chunks_per_request: int = DEFAULT_MAX_CHUNKS_PER_REQUEST,
Expand Down Expand Up @@ -373,7 +378,7 @@ def put_request(
if i > skip_first_n_docs
)
except CompassMaxErrorRateExceeded as e:
logger.error(e.message)
logger.error(event="[CompassError]", error=e.message)
return errors if len(errors) > 0 else None

@staticmethod
Expand All @@ -393,7 +398,10 @@ def _get_request_blocks(
for num_doc, doc in enumerate(docs, 1):
if doc.status != CompassDocumentStatus.Success:
logger.error(
f"[Thread {threading.get_native_id()}] Document #{num_doc} has errors: {doc.errors}"
event="Document errors",
errors=doc.errors,
num_doc=num_doc,
thread_id=threading.get_native_id(),
)
errors.append(doc)
else:
Expand Down Expand Up @@ -425,6 +433,7 @@ def _get_request_blocks(

def search(
self,
*,
index_name: str,
query: str,
top_k: int = 10,
Expand Down Expand Up @@ -498,18 +507,26 @@ def _send_request_with_retry():
else:
error = str(e) + " " + e.response.text
logger.error(
f"[Thread {threading.get_native_id()}] Failed to send request to "
f"{function} {target_path}: {type(e)} {error}. Going to sleep for "
f"{sleep_retry_seconds} seconds and retrying."
event="Failed to send request to",
function=function,
target_path=target_path,
type=type(e),
error=error,
sleep_retry_seconds=sleep_retry_seconds,
thread_id=threading.get_native_id(),
)
raise e

except Exception as e:
error = str(e)
logger.error(
f"[Thread {threading.get_native_id()}] Failed to send request to "
f"{function} {target_path}: {type(e)} {error}. Going to sleep for "
f"{sleep_retry_seconds} seconds and retrying."
event="Failed to send request to",
function=function,
target_path=target_path,
type=type(e),
error=error,
sleep_retry_seconds=sleep_retry_seconds,
thread_id=threading.get_native_id(),
)
raise e

Expand All @@ -525,6 +542,8 @@ def _send_request_with_retry():
return RetryResult(result=None, error=error)
except RetryError:
logger.error(
f"[Thread {threading.get_native_id()}] Failed to send request after {max_retries} attempts. Aborting."
event="Failed to send request after max_retries attempts. Aborting.",
max_retries=max_retries,
thread_id=threading.get_native_id(),
)
return RetryResult(result=None, error=error)
3 changes: 1 addition & 2 deletions src/backend/compass_sdk/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
DEFAULT_NUM_TOKENS_PER_CHUNK = 500
DEFAULT_NUM_TOKENS_CHUNK_OVERLAP = 15
DEFAULT_MIN_NUM_TOKENS_CHUNK = 5
DEFAULT_MAX_TOKENS_METADATA = 50
DEFAULT_MIN_NUM_CHUNKS_IN_TITLE = 1

DEFAULT_WIDTH_HEIGHT_VERTICAL_RATIO = 0.6
NUM_ADDITIONAL_CHARS_FOR_METADATA = 100
SKIP_INFER_TABLE_TYPES = ["jpg", "png", "xls", "xlsx", "heic"]

# Metadata detection constants
COHERE_API_ENV_VAR = "COHERE_API_KEY"
DEFAULT_COMMANDR_EXTRACTABLE_ATTRIBUTES = ["title", "authors", "date"]
DEFAULT_COMMANDR_PROMPT = """
Given the following document:
Expand Down
Loading

0 comments on commit ee97d0f

Please sign in to comment.