Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Paginate connector page #2328

Merged
merged 8 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 46 additions & 9 deletions backend/danswer/db/index_attempt.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,12 @@ def get_latest_index_attempts(
return db_session.execute(stmt).scalars().all()


def get_index_attempts_for_connector(
def count_index_attempts_for_connector(
db_session: Session,
connector_id: int,
only_current: bool = True,
disinclude_finished: bool = False,
) -> Sequence[IndexAttempt]:
) -> int:
stmt = (
select(IndexAttempt)
.join(ConnectorCredentialPair)
Expand All @@ -232,23 +232,60 @@ def get_index_attempts_for_connector(
stmt = stmt.join(SearchSettings).where(
SearchSettings.status == IndexModelStatus.PRESENT
)
# Count total items for pagination
count_stmt = stmt.with_only_columns(func.count()).order_by(None)
total_count = db_session.execute(count_stmt).scalar_one()
return total_count

stmt = stmt.order_by(IndexAttempt.time_created.desc())
return db_session.execute(stmt).scalars().all()

def get_paginated_index_attempts_for_cc_pair_id(
db_session: Session,
connector_id: int,
page: int,
page_size: int,
only_current: bool = True,
disinclude_finished: bool = False,
) -> list[IndexAttempt]:
stmt = (
select(IndexAttempt)
.join(ConnectorCredentialPair)
.where(ConnectorCredentialPair.connector_id == connector_id)
)
if disinclude_finished:
stmt = stmt.where(
IndexAttempt.status.in_(
[IndexingStatus.NOT_STARTED, IndexingStatus.IN_PROGRESS]
)
)
if only_current:
stmt = stmt.join(SearchSettings).where(
SearchSettings.status == IndexModelStatus.PRESENT
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty identical except for this pagination logic

stmt = stmt.order_by(IndexAttempt.time_started.desc())

# Apply pagination
stmt = stmt.offset((page - 1) * page_size).limit(page_size)

return list(db_session.execute(stmt).scalars().all())

def get_latest_finished_index_attempt_for_cc_pair(

def get_latest_index_attempt_for_cc_pair_id(
db_session: Session,
connector_credential_pair_id: int,
secondary_index: bool,
db_session: Session,
only_finished: bool = True,
) -> IndexAttempt | None:
stmt = select(IndexAttempt).distinct()
stmt = stmt.where(
IndexAttempt.connector_credential_pair_id == connector_credential_pair_id,
IndexAttempt.status.not_in(
[IndexingStatus.NOT_STARTED, IndexingStatus.IN_PROGRESS]
),
)
if only_finished:
stmt = stmt.where(
IndexAttempt.status.not_in(
[IndexingStatus.NOT_STARTED, IndexingStatus.IN_PROGRESS]
),
)
if secondary_index:
stmt = stmt.join(SearchSettings).where(
SearchSettings.status == IndexModelStatus.FUTURE
Expand Down
58 changes: 51 additions & 7 deletions backend/danswer/server/documents/cc_pair.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import math

from fastapi import APIRouter
from fastapi import Depends
from fastapi import HTTPException
from fastapi import Query
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session

Expand All @@ -18,12 +21,15 @@
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.index_attempt import cancel_indexing_attempts_for_ccpair
from danswer.db.index_attempt import cancel_indexing_attempts_past_model
from danswer.db.index_attempt import get_index_attempts_for_connector
from danswer.db.index_attempt import count_index_attempts_for_connector
from danswer.db.index_attempt import get_latest_index_attempt_for_cc_pair_id
from danswer.db.index_attempt import get_paginated_index_attempts_for_cc_pair_id
from danswer.db.models import User
from danswer.server.documents.models import CCPairFullInfo
from danswer.server.documents.models import CCStatusUpdateRequest
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
from danswer.server.documents.models import ConnectorCredentialPairMetadata
from danswer.server.documents.models import PaginatedIndexAttempts
from danswer.server.models import StatusResponse
from danswer.utils.logger import setup_logger
from ee.danswer.db.user_group import validate_user_creation_permissions
Expand All @@ -33,6 +39,38 @@
router = APIRouter(prefix="/manage")


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a seperate endpoint to get the index attempts for a couple reasons:

  • enable pagination of the index attempts
  • allow the front end to load the rest of the connector data without having to wait for the index attempts

@router.get("/admin/cc-pair/{cc_pair_id}/index-attempts")
def get_cc_pair_index_attempts(
cc_pair_id: int,
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=1000),
user: User | None = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
) -> PaginatedIndexAttempts:
cc_pair = get_connector_credential_pair_from_id(
cc_pair_id, db_session, user, get_editable=False
)
if not cc_pair:
raise HTTPException(
status_code=400, detail="CC Pair not found for current user permissions"
)
total_count = count_index_attempts_for_connector(
db_session=db_session,
connector_id=cc_pair.connector_id,
)
index_attempts = get_paginated_index_attempts_for_cc_pair_id(
db_session=db_session,
connector_id=cc_pair.connector_id,
page=page,
page_size=page_size,
)
return PaginatedIndexAttempts.from_models(
index_attempt_models=index_attempts,
page=page,
total_pages=math.ceil(total_count / page_size),
)


@router.get("/admin/cc-pair/{cc_pair_id}")
def get_cc_pair_full_info(
cc_pair_id: int,
Expand All @@ -56,11 +94,6 @@ def get_cc_pair_full_info(
credential_id=cc_pair.credential_id,
)

index_attempts = get_index_attempts_for_connector(
db_session,
cc_pair.connector_id,
)

document_count_info_list = list(
get_document_cnts_for_cc_pairs(
db_session=db_session,
Expand All @@ -71,9 +104,20 @@ def get_cc_pair_full_info(
document_count_info_list[0][-1] if document_count_info_list else 0
)

latest_attempt = get_latest_index_attempt_for_cc_pair_id(
db_session=db_session,
connector_credential_pair_id=cc_pair.id,
secondary_index=False,
only_finished=False,
)

return CCPairFullInfo.from_models(
cc_pair_model=cc_pair,
index_attempt_models=list(index_attempts),
number_of_index_attempts=count_index_attempts_for_connector(
db_session=db_session,
connector_id=cc_pair.connector_id,
),
last_index_attempt=latest_attempt,
latest_deletion_attempt=get_deletion_attempt_snapshot(
connector_id=cc_pair.connector_id,
credential_id=cc_pair.credential_id,
Expand Down
7 changes: 4 additions & 3 deletions backend/danswer/server/documents/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
from danswer.db.engine import get_session
from danswer.db.index_attempt import create_index_attempt
from danswer.db.index_attempt import get_index_attempts_for_cc_pair
from danswer.db.index_attempt import get_latest_finished_index_attempt_for_cc_pair
from danswer.db.index_attempt import get_latest_index_attempt_for_cc_pair_id
from danswer.db.index_attempt import get_latest_index_attempts
from danswer.db.models import User
from danswer.db.models import UserRole
Expand Down Expand Up @@ -444,10 +444,11 @@ def get_connector_indexing_status(
(connector.id, credential.id)
)

latest_finished_attempt = get_latest_finished_index_attempt_for_cc_pair(
latest_finished_attempt = get_latest_index_attempt_for_cc_pair_id(
db_session=db_session,
connector_credential_pair_id=cc_pair.id,
secondary_index=secondary_index,
db_session=db_session,
only_finished=True,
)

indexing_statuses.append(
Expand Down
49 changes: 43 additions & 6 deletions backend/danswer/server/documents/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,37 @@ def from_db_model(cls, error: DbIndexAttemptError) -> "IndexAttemptError":
)


class PaginatedIndexAttempts(BaseModel):
index_attempts: list[IndexAttemptSnapshot]
page: int
total_pages: int

@classmethod
def from_models(
cls,
index_attempt_models: list[IndexAttempt],
page: int,
total_pages: int,
) -> "PaginatedIndexAttempts":
return cls(
index_attempts=[
IndexAttemptSnapshot.from_index_attempt_db_model(index_attempt_model)
for index_attempt_model in index_attempt_models
],
page=page,
total_pages=total_pages,
)


class CCPairFullInfo(BaseModel):
id: int
name: str
status: ConnectorCredentialPairStatus
num_docs_indexed: int
connector: ConnectorSnapshot
credential: CredentialSnapshot
index_attempts: list[IndexAttemptSnapshot]
number_of_index_attempts: int
last_index_attempt_status: IndexingStatus | None
latest_deletion_attempt: DeletionAttemptSnapshot | None
is_public: bool
is_editable_for_current_user: bool
Expand All @@ -201,11 +224,27 @@ class CCPairFullInfo(BaseModel):
def from_models(
cls,
cc_pair_model: ConnectorCredentialPair,
index_attempt_models: list[IndexAttempt],
latest_deletion_attempt: DeletionAttemptSnapshot | None,
number_of_index_attempts: int,
last_index_attempt: IndexAttempt | None,
num_docs_indexed: int, # not ideal, but this must be computed separately
is_editable_for_current_user: bool,
) -> "CCPairFullInfo":
# figure out if we need to artificially deflate the number of docs indexed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic was originally in the front end but i thought it made more sense to be here

# This is required since the total number of docs indexed by a CC Pair is
# updated before the new docs for an indexing attempt. If we don't do this,
# there is a mismatch between these two numbers which may confuse users.
last_indexing_status = last_index_attempt.status if last_index_attempt else None
if (
last_indexing_status == IndexingStatus.SUCCESS
and number_of_index_attempts == 1
and last_index_attempt
and last_index_attempt.new_docs_indexed
):
num_docs_indexed = (
last_index_attempt.new_docs_indexed if last_index_attempt else 0
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we now return the last indexing status and the number of docs to facilitate some of the original display logic on the frontend without having to send back the indexattempts themselves

return cls(
id=cc_pair_model.id,
name=cc_pair_model.name,
Expand All @@ -217,10 +256,8 @@ def from_models(
credential=CredentialSnapshot.from_credential_db_model(
cc_pair_model.credential
),
index_attempts=[
IndexAttemptSnapshot.from_index_attempt_db_model(index_attempt_model)
for index_attempt_model in index_attempt_models
],
number_of_index_attempts=number_of_index_attempts,
last_index_attempt_status=last_indexing_status,
latest_deletion_attempt=latest_deletion_attempt,
is_public=cc_pair_model.is_public,
is_editable_for_current_user=is_editable_for_current_user,
Expand Down
Loading
Loading