Skip to content

Commit

Permalink
Feature/background processing (onyx-dot-app#2275)
Browse files Browse the repository at this point in the history
* first cut at redis

* some new helper functions for the db

* ignore kombu tables in alembic migrations (used by celery)

* multiline commands for readability, add vespa_metadata_sync queue to worker

* typo fix

* fix returning tuple fields

* add constants

* fix _get_access_for_document

* docstrings!

* fix double function declaration and typing

* fix type hinting

* add a global redis pool

* Add get_document function

* use task_logger in various celery tasks

* add celeryconfig.py to simplify configuration. Will be used in a subsequent commit

* Add celery redis helper. used in a subsequent PR

* kombu warning getting spammy since celery is not self managing its queue in Postgres any more

* add last_modified and last_synced to documents

* fix task naming convention

* use celeryconfig.py

* the big one. adds queues and tasks, updates functions to use the queues with priorities, etc

* change vespa index log line to debug

* mypy fixes

* update alembic migration

* fix fence ordering, rename to "monitor", fix fetch_versioned_implementation call

* mypy

* switch to monotonic time

* fix startup dependencies on redis

* rebase alembic migration

* kombu cleanup - fail silently

* mypy

* add redis_host environment override

* update REDIS_HOST env var in docker-compose.dev.yml

* update the rest of the docker files

* harden indexing-status endpoint against db changes happening in the background.  Needs further improvement but OK for now.

* allow no task syncs to run because we create certain objects with no entries but initially marked as out of date

* add back writing to vespa on indexing

* update contributing guide

* backporting fixes from background_deletion

* renaming cache to cache_volume

* add redis password to various deployments

* try setting up pr testing for helm

* fix indent

* hopefully this release version actually exists

* fix command line option to --chart-dirs

* fetch-depth 0

* edit values.yaml

* try setting ct working directory

* bypass testing only on change for now

* move files and lint them

* update helm testing

* some issues suggest using --config works

* add vespa repo

* add postgresql repo

* increase timeout

* try amd64 runner

* fix redis password reference

* add comment to helm chart testing workflow

* rename helm testing workflow to disable it

* adding clarifying comments

* address code review

* missed a file

* remove commented warning ... just not needed

---------

Co-authored-by: Richard Kuo <rkuo@rkuo.com>
  • Loading branch information
2 people authored and rajiv chodisetti committed Oct 2, 2024
1 parent d696fd0 commit b78d002
Show file tree
Hide file tree
Showing 26 changed files with 1,431 additions and 353 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Add last synced and last modified to document table
Revision ID: 52a219fb5233
Revises: f17bf3b0d9f1
Create Date: 2024-08-28 17:40:46.077470
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import func

# revision identifiers, used by Alembic.
revision = "52a219fb5233"
down_revision = "f7e58d357687"
branch_labels = None
depends_on = None


def upgrade() -> None:
# last modified represents the last time anything needing syncing to vespa changed
# including row metadata and the document itself. This obviously does not include
# the last_synced column.
op.add_column(
"document",
sa.Column(
"last_modified",
sa.DateTime(timezone=True),
nullable=False,
server_default=func.now(),
),
)

# last synced represents the last time this document was synced to Vespa
op.add_column(
"document",
sa.Column("last_synced", sa.DateTime(timezone=True), nullable=True),
)

# Set last_synced to the same value as last_modified for existing rows
op.execute(
"""
UPDATE document
SET last_synced = last_modified
"""
)

op.create_index(
op.f("ix_document_last_modified"),
"document",
["last_modified"],
unique=False,
)

op.create_index(
op.f("ix_document_last_synced"),
"document",
["last_synced"],
unique=False,
)


def downgrade() -> None:
op.drop_index(op.f("ix_document_last_synced"), table_name="document")
op.drop_index(op.f("ix_document_last_modified"), table_name="document")
op.drop_column("document", "last_synced")
op.drop_column("document", "last_modified")
34 changes: 31 additions & 3 deletions backend/danswer/access/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,49 @@
from danswer.access.models import DocumentAccess
from danswer.access.utils import prefix_user
from danswer.configs.constants import PUBLIC_DOC_PAT
from danswer.db.document import get_acccess_info_for_documents
from danswer.db.document import get_access_info_for_document
from danswer.db.document import get_access_info_for_documents
from danswer.db.models import User
from danswer.utils.variable_functionality import fetch_versioned_implementation


def _get_access_for_document(
document_id: str,
db_session: Session,
) -> DocumentAccess:
info = get_access_info_for_document(
db_session=db_session,
document_id=document_id,
)

if not info:
return DocumentAccess.build(user_ids=[], user_groups=[], is_public=False)

return DocumentAccess.build(user_ids=info[1], user_groups=[], is_public=info[2])


def get_access_for_document(
document_id: str,
db_session: Session,
) -> DocumentAccess:
versioned_get_access_for_document_fn = fetch_versioned_implementation(
"danswer.access.access", "_get_access_for_document"
)
return versioned_get_access_for_document_fn(document_id, db_session) # type: ignore


def _get_access_for_documents(
document_ids: list[str],
db_session: Session,
) -> dict[str, DocumentAccess]:
document_access_info = get_acccess_info_for_documents(
document_access_info = get_access_info_for_documents(
db_session=db_session,
document_ids=document_ids,
)
return {
document_id: DocumentAccess.build(user_ids, [], is_public)
document_id: DocumentAccess.build(
user_ids=user_ids, user_groups=[], is_public=is_public
)
for document_id, user_ids, is_public in document_access_info
}

Expand Down
Loading

0 comments on commit b78d002

Please sign in to comment.