Skip to content

Commit

Permalink
Paginate connector page (onyx-dot-app#2328)
Browse files Browse the repository at this point in the history
* Added pagination to individual connector pages

* I cooked

* Gordon Ramsay in this b

* meepe

* properly calculated max chunk and switch dict to array

* chunks -> batches

* increased max page size

* renmaed var
  • Loading branch information
hagen-danswer authored and rajiv chodisetti committed Oct 2, 2024
1 parent 7e8dcba commit cc89c9b
Show file tree
Hide file tree
Showing 8 changed files with 383 additions and 138 deletions.
55 changes: 46 additions & 9 deletions backend/danswer/db/index_attempt.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,12 @@ def get_latest_index_attempts(
return db_session.execute(stmt).scalars().all()


def get_index_attempts_for_connector(
def count_index_attempts_for_connector(
db_session: Session,
connector_id: int,
only_current: bool = True,
disinclude_finished: bool = False,
) -> Sequence[IndexAttempt]:
) -> int:
stmt = (
select(IndexAttempt)
.join(ConnectorCredentialPair)
Expand All @@ -232,23 +232,60 @@ def get_index_attempts_for_connector(
stmt = stmt.join(SearchSettings).where(
SearchSettings.status == IndexModelStatus.PRESENT
)
# Count total items for pagination
count_stmt = stmt.with_only_columns(func.count()).order_by(None)
total_count = db_session.execute(count_stmt).scalar_one()
return total_count

stmt = stmt.order_by(IndexAttempt.time_created.desc())
return db_session.execute(stmt).scalars().all()

def get_paginated_index_attempts_for_cc_pair_id(
db_session: Session,
connector_id: int,
page: int,
page_size: int,
only_current: bool = True,
disinclude_finished: bool = False,
) -> list[IndexAttempt]:
stmt = (
select(IndexAttempt)
.join(ConnectorCredentialPair)
.where(ConnectorCredentialPair.connector_id == connector_id)
)
if disinclude_finished:
stmt = stmt.where(
IndexAttempt.status.in_(
[IndexingStatus.NOT_STARTED, IndexingStatus.IN_PROGRESS]
)
)
if only_current:
stmt = stmt.join(SearchSettings).where(
SearchSettings.status == IndexModelStatus.PRESENT
)

stmt = stmt.order_by(IndexAttempt.time_started.desc())

# Apply pagination
stmt = stmt.offset((page - 1) * page_size).limit(page_size)

return list(db_session.execute(stmt).scalars().all())

def get_latest_finished_index_attempt_for_cc_pair(

def get_latest_index_attempt_for_cc_pair_id(
db_session: Session,
connector_credential_pair_id: int,
secondary_index: bool,
db_session: Session,
only_finished: bool = True,
) -> IndexAttempt | None:
stmt = select(IndexAttempt)
stmt = stmt.where(
IndexAttempt.connector_credential_pair_id == connector_credential_pair_id,
IndexAttempt.status.not_in(
[IndexingStatus.NOT_STARTED, IndexingStatus.IN_PROGRESS]
),
)
if only_finished:
stmt = stmt.where(
IndexAttempt.status.not_in(
[IndexingStatus.NOT_STARTED, IndexingStatus.IN_PROGRESS]
),
)
if secondary_index:
stmt = stmt.join(SearchSettings).where(
SearchSettings.status == IndexModelStatus.FUTURE
Expand Down
58 changes: 51 additions & 7 deletions backend/danswer/server/documents/cc_pair.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import math

from fastapi import APIRouter
from fastapi import Depends
from fastapi import HTTPException
from fastapi import Query
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session

Expand All @@ -18,12 +21,15 @@
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.index_attempt import cancel_indexing_attempts_for_ccpair
from danswer.db.index_attempt import cancel_indexing_attempts_past_model
from danswer.db.index_attempt import get_index_attempts_for_connector
from danswer.db.index_attempt import count_index_attempts_for_connector
from danswer.db.index_attempt import get_latest_index_attempt_for_cc_pair_id
from danswer.db.index_attempt import get_paginated_index_attempts_for_cc_pair_id
from danswer.db.models import User
from danswer.server.documents.models import CCPairFullInfo
from danswer.server.documents.models import CCStatusUpdateRequest
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
from danswer.server.documents.models import ConnectorCredentialPairMetadata
from danswer.server.documents.models import PaginatedIndexAttempts
from danswer.server.models import StatusResponse
from danswer.utils.logger import setup_logger
from ee.danswer.db.user_group import validate_user_creation_permissions
Expand All @@ -33,6 +39,38 @@
router = APIRouter(prefix="/manage")


@router.get("/admin/cc-pair/{cc_pair_id}/index-attempts")
def get_cc_pair_index_attempts(
cc_pair_id: int,
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=1000),
user: User | None = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
) -> PaginatedIndexAttempts:
cc_pair = get_connector_credential_pair_from_id(
cc_pair_id, db_session, user, get_editable=False
)
if not cc_pair:
raise HTTPException(
status_code=400, detail="CC Pair not found for current user permissions"
)
total_count = count_index_attempts_for_connector(
db_session=db_session,
connector_id=cc_pair.connector_id,
)
index_attempts = get_paginated_index_attempts_for_cc_pair_id(
db_session=db_session,
connector_id=cc_pair.connector_id,
page=page,
page_size=page_size,
)
return PaginatedIndexAttempts.from_models(
index_attempt_models=index_attempts,
page=page,
total_pages=math.ceil(total_count / page_size),
)


@router.get("/admin/cc-pair/{cc_pair_id}")
def get_cc_pair_full_info(
cc_pair_id: int,
Expand All @@ -56,11 +94,6 @@ def get_cc_pair_full_info(
credential_id=cc_pair.credential_id,
)

index_attempts = get_index_attempts_for_connector(
db_session,
cc_pair.connector_id,
)

document_count_info_list = list(
get_document_cnts_for_cc_pairs(
db_session=db_session,
Expand All @@ -71,9 +104,20 @@ def get_cc_pair_full_info(
document_count_info_list[0][-1] if document_count_info_list else 0
)

latest_attempt = get_latest_index_attempt_for_cc_pair_id(
db_session=db_session,
connector_credential_pair_id=cc_pair.id,
secondary_index=False,
only_finished=False,
)

return CCPairFullInfo.from_models(
cc_pair_model=cc_pair,
index_attempt_models=list(index_attempts),
number_of_index_attempts=count_index_attempts_for_connector(
db_session=db_session,
connector_id=cc_pair.connector_id,
),
last_index_attempt=latest_attempt,
latest_deletion_attempt=get_deletion_attempt_snapshot(
connector_id=cc_pair.connector_id,
credential_id=cc_pair.credential_id,
Expand Down
7 changes: 4 additions & 3 deletions backend/danswer/server/documents/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
from danswer.db.engine import get_session
from danswer.db.index_attempt import create_index_attempt
from danswer.db.index_attempt import get_index_attempts_for_cc_pair
from danswer.db.index_attempt import get_latest_finished_index_attempt_for_cc_pair
from danswer.db.index_attempt import get_latest_index_attempt_for_cc_pair_id
from danswer.db.index_attempt import get_latest_index_attempts
from danswer.db.models import User
from danswer.db.models import UserRole
Expand Down Expand Up @@ -453,10 +453,11 @@ def get_connector_indexing_status(
(connector.id, credential.id)
)

latest_finished_attempt = get_latest_finished_index_attempt_for_cc_pair(
latest_finished_attempt = get_latest_index_attempt_for_cc_pair_id(
db_session=db_session,
connector_credential_pair_id=cc_pair.id,
secondary_index=secondary_index,
db_session=db_session,
only_finished=True,
)

indexing_statuses.append(
Expand Down
49 changes: 43 additions & 6 deletions backend/danswer/server/documents/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,37 @@ def from_db_model(cls, error: DbIndexAttemptError) -> "IndexAttemptError":
)


class PaginatedIndexAttempts(BaseModel):
index_attempts: list[IndexAttemptSnapshot]
page: int
total_pages: int

@classmethod
def from_models(
cls,
index_attempt_models: list[IndexAttempt],
page: int,
total_pages: int,
) -> "PaginatedIndexAttempts":
return cls(
index_attempts=[
IndexAttemptSnapshot.from_index_attempt_db_model(index_attempt_model)
for index_attempt_model in index_attempt_models
],
page=page,
total_pages=total_pages,
)


class CCPairFullInfo(BaseModel):
id: int
name: str
status: ConnectorCredentialPairStatus
num_docs_indexed: int
connector: ConnectorSnapshot
credential: CredentialSnapshot
index_attempts: list[IndexAttemptSnapshot]
number_of_index_attempts: int
last_index_attempt_status: IndexingStatus | None
latest_deletion_attempt: DeletionAttemptSnapshot | None
is_public: bool
is_editable_for_current_user: bool
Expand All @@ -201,11 +224,27 @@ class CCPairFullInfo(BaseModel):
def from_models(
cls,
cc_pair_model: ConnectorCredentialPair,
index_attempt_models: list[IndexAttempt],
latest_deletion_attempt: DeletionAttemptSnapshot | None,
number_of_index_attempts: int,
last_index_attempt: IndexAttempt | None,
num_docs_indexed: int, # not ideal, but this must be computed separately
is_editable_for_current_user: bool,
) -> "CCPairFullInfo":
# figure out if we need to artificially deflate the number of docs indexed.
# This is required since the total number of docs indexed by a CC Pair is
# updated before the new docs for an indexing attempt. If we don't do this,
# there is a mismatch between these two numbers which may confuse users.
last_indexing_status = last_index_attempt.status if last_index_attempt else None
if (
last_indexing_status == IndexingStatus.SUCCESS
and number_of_index_attempts == 1
and last_index_attempt
and last_index_attempt.new_docs_indexed
):
num_docs_indexed = (
last_index_attempt.new_docs_indexed if last_index_attempt else 0
)

return cls(
id=cc_pair_model.id,
name=cc_pair_model.name,
Expand All @@ -217,10 +256,8 @@ def from_models(
credential=CredentialSnapshot.from_credential_db_model(
cc_pair_model.credential
),
index_attempts=[
IndexAttemptSnapshot.from_index_attempt_db_model(index_attempt_model)
for index_attempt_model in index_attempt_models
],
number_of_index_attempts=number_of_index_attempts,
last_index_attempt_status=last_indexing_status,
latest_deletion_attempt=latest_deletion_attempt,
is_public=cc_pair_model.is_public,
is_editable_for_current_user=is_editable_for_current_user,
Expand Down
Loading

0 comments on commit cc89c9b

Please sign in to comment.