diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py index e93fc825372..056e4ce968b 100644 --- a/backend/danswer/db/index_attempt.py +++ b/backend/danswer/db/index_attempt.py @@ -211,12 +211,12 @@ def get_latest_index_attempts( return db_session.execute(stmt).scalars().all() -def get_index_attempts_for_connector( +def count_index_attempts_for_connector( db_session: Session, connector_id: int, only_current: bool = True, disinclude_finished: bool = False, -) -> Sequence[IndexAttempt]: +) -> int: stmt = ( select(IndexAttempt) .join(ConnectorCredentialPair) @@ -232,23 +232,60 @@ def get_index_attempts_for_connector( stmt = stmt.join(SearchSettings).where( SearchSettings.status == IndexModelStatus.PRESENT ) + # Count total items for pagination + count_stmt = stmt.with_only_columns(func.count()).order_by(None) + total_count = db_session.execute(count_stmt).scalar_one() + return total_count - stmt = stmt.order_by(IndexAttempt.time_created.desc()) - return db_session.execute(stmt).scalars().all() +def get_paginated_index_attempts_for_cc_pair_id( + db_session: Session, + connector_id: int, + page: int, + page_size: int, + only_current: bool = True, + disinclude_finished: bool = False, +) -> list[IndexAttempt]: + stmt = ( + select(IndexAttempt) + .join(ConnectorCredentialPair) + .where(ConnectorCredentialPair.connector_id == connector_id) + ) + if disinclude_finished: + stmt = stmt.where( + IndexAttempt.status.in_( + [IndexingStatus.NOT_STARTED, IndexingStatus.IN_PROGRESS] + ) + ) + if only_current: + stmt = stmt.join(SearchSettings).where( + SearchSettings.status == IndexModelStatus.PRESENT + ) + + stmt = stmt.order_by(IndexAttempt.time_started.desc()) + + # Apply pagination + stmt = stmt.offset((page - 1) * page_size).limit(page_size) + + return list(db_session.execute(stmt).scalars().all()) -def get_latest_finished_index_attempt_for_cc_pair( + +def get_latest_index_attempt_for_cc_pair_id( + db_session: Session, connector_credential_pair_id: int, secondary_index: bool, - db_session: Session, + only_finished: bool = True, ) -> IndexAttempt | None: stmt = select(IndexAttempt) stmt = stmt.where( IndexAttempt.connector_credential_pair_id == connector_credential_pair_id, - IndexAttempt.status.not_in( - [IndexingStatus.NOT_STARTED, IndexingStatus.IN_PROGRESS] - ), ) + if only_finished: + stmt = stmt.where( + IndexAttempt.status.not_in( + [IndexingStatus.NOT_STARTED, IndexingStatus.IN_PROGRESS] + ), + ) if secondary_index: stmt = stmt.join(SearchSettings).where( SearchSettings.status == IndexModelStatus.FUTURE diff --git a/backend/danswer/server/documents/cc_pair.py b/backend/danswer/server/documents/cc_pair.py index 99e7bae61f9..97ed3a82812 100644 --- a/backend/danswer/server/documents/cc_pair.py +++ b/backend/danswer/server/documents/cc_pair.py @@ -1,6 +1,9 @@ +import math + from fastapi import APIRouter from fastapi import Depends from fastapi import HTTPException +from fastapi import Query from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session @@ -18,12 +21,15 @@ from danswer.db.enums import ConnectorCredentialPairStatus from danswer.db.index_attempt import cancel_indexing_attempts_for_ccpair from danswer.db.index_attempt import cancel_indexing_attempts_past_model -from danswer.db.index_attempt import get_index_attempts_for_connector +from danswer.db.index_attempt import count_index_attempts_for_connector +from danswer.db.index_attempt import get_latest_index_attempt_for_cc_pair_id +from danswer.db.index_attempt import get_paginated_index_attempts_for_cc_pair_id from danswer.db.models import User from danswer.server.documents.models import CCPairFullInfo from danswer.server.documents.models import CCStatusUpdateRequest from danswer.server.documents.models import ConnectorCredentialPairIdentifier from danswer.server.documents.models import ConnectorCredentialPairMetadata +from danswer.server.documents.models import PaginatedIndexAttempts from danswer.server.models import StatusResponse from danswer.utils.logger import setup_logger from ee.danswer.db.user_group import validate_user_creation_permissions @@ -33,6 +39,38 @@ router = APIRouter(prefix="/manage") +@router.get("/admin/cc-pair/{cc_pair_id}/index-attempts") +def get_cc_pair_index_attempts( + cc_pair_id: int, + page: int = Query(1, ge=1), + page_size: int = Query(10, ge=1, le=1000), + user: User | None = Depends(current_curator_or_admin_user), + db_session: Session = Depends(get_session), +) -> PaginatedIndexAttempts: + cc_pair = get_connector_credential_pair_from_id( + cc_pair_id, db_session, user, get_editable=False + ) + if not cc_pair: + raise HTTPException( + status_code=400, detail="CC Pair not found for current user permissions" + ) + total_count = count_index_attempts_for_connector( + db_session=db_session, + connector_id=cc_pair.connector_id, + ) + index_attempts = get_paginated_index_attempts_for_cc_pair_id( + db_session=db_session, + connector_id=cc_pair.connector_id, + page=page, + page_size=page_size, + ) + return PaginatedIndexAttempts.from_models( + index_attempt_models=index_attempts, + page=page, + total_pages=math.ceil(total_count / page_size), + ) + + @router.get("/admin/cc-pair/{cc_pair_id}") def get_cc_pair_full_info( cc_pair_id: int, @@ -56,11 +94,6 @@ def get_cc_pair_full_info( credential_id=cc_pair.credential_id, ) - index_attempts = get_index_attempts_for_connector( - db_session, - cc_pair.connector_id, - ) - document_count_info_list = list( get_document_cnts_for_cc_pairs( db_session=db_session, @@ -71,9 +104,20 @@ def get_cc_pair_full_info( document_count_info_list[0][-1] if document_count_info_list else 0 ) + latest_attempt = get_latest_index_attempt_for_cc_pair_id( + db_session=db_session, + connector_credential_pair_id=cc_pair.id, + secondary_index=False, + only_finished=False, + ) + return CCPairFullInfo.from_models( cc_pair_model=cc_pair, - index_attempt_models=list(index_attempts), + number_of_index_attempts=count_index_attempts_for_connector( + db_session=db_session, + connector_id=cc_pair.connector_id, + ), + last_index_attempt=latest_attempt, latest_deletion_attempt=get_deletion_attempt_snapshot( connector_id=cc_pair.connector_id, credential_id=cc_pair.credential_id, diff --git a/backend/danswer/server/documents/connector.py b/backend/danswer/server/documents/connector.py index 34496c62504..cc27d1cabaa 100644 --- a/backend/danswer/server/documents/connector.py +++ b/backend/danswer/server/documents/connector.py @@ -66,7 +66,7 @@ from danswer.db.engine import get_session from danswer.db.index_attempt import create_index_attempt from danswer.db.index_attempt import get_index_attempts_for_cc_pair -from danswer.db.index_attempt import get_latest_finished_index_attempt_for_cc_pair +from danswer.db.index_attempt import get_latest_index_attempt_for_cc_pair_id from danswer.db.index_attempt import get_latest_index_attempts from danswer.db.models import User from danswer.db.models import UserRole @@ -453,10 +453,11 @@ def get_connector_indexing_status( (connector.id, credential.id) ) - latest_finished_attempt = get_latest_finished_index_attempt_for_cc_pair( + latest_finished_attempt = get_latest_index_attempt_for_cc_pair_id( + db_session=db_session, connector_credential_pair_id=cc_pair.id, secondary_index=secondary_index, - db_session=db_session, + only_finished=True, ) indexing_statuses.append( diff --git a/backend/danswer/server/documents/models.py b/backend/danswer/server/documents/models.py index 3805ccc4157..df172378063 100644 --- a/backend/danswer/server/documents/models.py +++ b/backend/danswer/server/documents/models.py @@ -185,6 +185,28 @@ def from_db_model(cls, error: DbIndexAttemptError) -> "IndexAttemptError": ) +class PaginatedIndexAttempts(BaseModel): + index_attempts: list[IndexAttemptSnapshot] + page: int + total_pages: int + + @classmethod + def from_models( + cls, + index_attempt_models: list[IndexAttempt], + page: int, + total_pages: int, + ) -> "PaginatedIndexAttempts": + return cls( + index_attempts=[ + IndexAttemptSnapshot.from_index_attempt_db_model(index_attempt_model) + for index_attempt_model in index_attempt_models + ], + page=page, + total_pages=total_pages, + ) + + class CCPairFullInfo(BaseModel): id: int name: str @@ -192,7 +214,8 @@ class CCPairFullInfo(BaseModel): num_docs_indexed: int connector: ConnectorSnapshot credential: CredentialSnapshot - index_attempts: list[IndexAttemptSnapshot] + number_of_index_attempts: int + last_index_attempt_status: IndexingStatus | None latest_deletion_attempt: DeletionAttemptSnapshot | None is_public: bool is_editable_for_current_user: bool @@ -201,11 +224,27 @@ class CCPairFullInfo(BaseModel): def from_models( cls, cc_pair_model: ConnectorCredentialPair, - index_attempt_models: list[IndexAttempt], latest_deletion_attempt: DeletionAttemptSnapshot | None, + number_of_index_attempts: int, + last_index_attempt: IndexAttempt | None, num_docs_indexed: int, # not ideal, but this must be computed separately is_editable_for_current_user: bool, ) -> "CCPairFullInfo": + # figure out if we need to artificially deflate the number of docs indexed. + # This is required since the total number of docs indexed by a CC Pair is + # updated before the new docs for an indexing attempt. If we don't do this, + # there is a mismatch between these two numbers which may confuse users. + last_indexing_status = last_index_attempt.status if last_index_attempt else None + if ( + last_indexing_status == IndexingStatus.SUCCESS + and number_of_index_attempts == 1 + and last_index_attempt + and last_index_attempt.new_docs_indexed + ): + num_docs_indexed = ( + last_index_attempt.new_docs_indexed if last_index_attempt else 0 + ) + return cls( id=cc_pair_model.id, name=cc_pair_model.name, @@ -217,10 +256,8 @@ def from_models( credential=CredentialSnapshot.from_credential_db_model( cc_pair_model.credential ), - index_attempts=[ - IndexAttemptSnapshot.from_index_attempt_db_model(index_attempt_model) - for index_attempt_model in index_attempt_models - ], + number_of_index_attempts=number_of_index_attempts, + last_index_attempt_status=last_indexing_status, latest_deletion_attempt=latest_deletion_attempt, is_public=cc_pair_model.is_public, is_editable_for_current_user=is_editable_for_current_user, diff --git a/web/src/app/admin/connector/[ccPairId]/IndexingAttemptsTable.tsx b/web/src/app/admin/connector/[ccPairId]/IndexingAttemptsTable.tsx index b9861a29759..d1e6d01964b 100644 --- a/web/src/app/admin/connector/[ccPairId]/IndexingAttemptsTable.tsx +++ b/web/src/app/admin/connector/[ccPairId]/IndexingAttemptsTable.tsx @@ -1,5 +1,6 @@ "use client"; +import { useEffect, useRef } from "react"; import { Table, TableHead, @@ -8,31 +9,172 @@ import { TableBody, TableCell, Text, - Button, - Divider, } from "@tremor/react"; -import { IndexAttemptStatus } from "@/components/Status"; import { CCPairFullInfo } from "./types"; +import { IndexAttemptStatus } from "@/components/Status"; import { useState } from "react"; import { PageSelector } from "@/components/PageSelector"; +import { ThreeDotsLoader } from "@/components/Loading"; +import { buildCCPairInfoUrl } from "./lib"; import { localizeAndPrettify } from "@/lib/time"; import { getDocsProcessedPerMinute } from "@/lib/indexAttempt"; -import { Modal } from "@/components/Modal"; -import { CheckmarkIcon, CopyIcon, SearchIcon } from "@/components/icons/icons"; +import { ErrorCallout } from "@/components/ErrorCallout"; +import { SearchIcon } from "@/components/icons/icons"; import Link from "next/link"; import ExceptionTraceModal from "@/components/modals/ExceptionTraceModal"; +import { PaginatedIndexAttempts } from "./types"; +import { useRouter } from "next/navigation"; +// This is the number of index attempts to display per page const NUM_IN_PAGE = 8; +// This is the number of pages to fetch at a time +const BATCH_SIZE = 8; export function IndexingAttemptsTable({ ccPair }: { ccPair: CCPairFullInfo }) { - const [page, setPage] = useState(1); const [indexAttemptTracePopupId, setIndexAttemptTracePopupId] = useState< number | null >(null); - const indexAttemptToDisplayTraceFor = ccPair.index_attempts.find( + + const totalPages = Math.ceil(ccPair.number_of_index_attempts / NUM_IN_PAGE); + + const router = useRouter(); + const [page, setPage] = useState(() => { + if (typeof window !== "undefined") { + const urlParams = new URLSearchParams(window.location.search); + return parseInt(urlParams.get("page") || "1", 10); + } + return 1; + }); + + const [currentPageData, setCurrentPageData] = + useState(null); + const [currentPageError, setCurrentPageError] = useState(null); + const [isCurrentPageLoading, setIsCurrentPageLoading] = useState(false); + + // This is a cache of the data for each "batch" which is a set of pages + const [cachedBatches, setCachedBatches] = useState<{ + [key: number]: PaginatedIndexAttempts[]; + }>({}); + + // This is a set of the batches that are currently being fetched + // we use it to avoid duplicate requests + const ongoingRequestsRef = useRef>(new Set()); + + const batchRetrievalUrlBuilder = (batchNum: number) => + `${buildCCPairInfoUrl(ccPair.id)}/index-attempts?page=${batchNum}&page_size=${BATCH_SIZE * NUM_IN_PAGE}`; + + // This fetches and caches the data for a given batch number + const fetchBatchData = async (batchNum: number) => { + if (ongoingRequestsRef.current.has(batchNum)) return; + ongoingRequestsRef.current.add(batchNum); + + try { + const response = await fetch(batchRetrievalUrlBuilder(batchNum + 1)); + if (!response.ok) { + throw new Error("Failed to fetch data"); + } + const data = await response.json(); + + const newBatchData: PaginatedIndexAttempts[] = []; + for (let i = 0; i < BATCH_SIZE; i++) { + const startIndex = i * NUM_IN_PAGE; + const endIndex = startIndex + NUM_IN_PAGE; + const pageIndexAttempts = data.index_attempts.slice( + startIndex, + endIndex + ); + newBatchData.push({ + ...data, + index_attempts: pageIndexAttempts, + }); + } + + setCachedBatches((prev) => ({ + ...prev, + [batchNum]: newBatchData, + })); + } catch (error) { + setCurrentPageError( + error instanceof Error ? error : new Error("An error occurred") + ); + } finally { + ongoingRequestsRef.current.delete(batchNum); + } + }; + + // This fetches and caches the data for the current batch and the next and previous batches + useEffect(() => { + const batchNum = Math.floor((page - 1) / BATCH_SIZE); + + if (!cachedBatches[batchNum]) { + setIsCurrentPageLoading(true); + fetchBatchData(batchNum); + } else { + setIsCurrentPageLoading(false); + } + + const nextBatchNum = Math.min( + batchNum + 1, + Math.ceil(totalPages / BATCH_SIZE) - 1 + ); + if (!cachedBatches[nextBatchNum]) { + fetchBatchData(nextBatchNum); + } + + const prevBatchNum = Math.max(batchNum - 1, 0); + if (!cachedBatches[prevBatchNum]) { + fetchBatchData(prevBatchNum); + } + + // Always fetch the first batch if it's not cached + if (!cachedBatches[0]) { + fetchBatchData(0); + } + }, [ccPair.id, page, cachedBatches, totalPages]); + + // This updates the data on the current page + useEffect(() => { + const batchNum = Math.floor((page - 1) / BATCH_SIZE); + const batchPageNum = (page - 1) % BATCH_SIZE; + + if (cachedBatches[batchNum] && cachedBatches[batchNum][batchPageNum]) { + setCurrentPageData(cachedBatches[batchNum][batchPageNum]); + setIsCurrentPageLoading(false); + } else { + setIsCurrentPageLoading(true); + } + }, [page, cachedBatches]); + + // This updates the page number and manages the URL + const updatePage = (newPage: number) => { + setPage(newPage); + router.push(`/admin/connector/${ccPair.id}?page=${newPage}`, { + scroll: false, + }); + window.scrollTo({ + top: 0, + left: 0, + behavior: "smooth", + }); + }; + + if (isCurrentPageLoading || !currentPageData) { + return ; + } + + if (currentPageError) { + return ( + + ); + } + + // This is the index attempt that the user wants to view the trace for + const indexAttemptToDisplayTraceFor = currentPageData?.index_attempts?.find( (indexAttempt) => indexAttempt.id === indexAttemptTracePopupId ); - const [copyClicked, setCopyClicked] = useState(false); return ( <> @@ -55,101 +197,92 @@ export function IndexingAttemptsTable({ ccPair }: { ccPair: CCPairFullInfo }) { - {ccPair.index_attempts - .slice(NUM_IN_PAGE * (page - 1), NUM_IN_PAGE * page) - .map((indexAttempt) => { - const docsPerMinute = - getDocsProcessedPerMinute(indexAttempt)?.toFixed(2); - return ( - - - {indexAttempt.time_started - ? localizeAndPrettify(indexAttempt.time_started) - : "-"} - - - - {docsPerMinute && ( -
- {docsPerMinute} docs / min -
- )} -
- -
-
-
{indexAttempt.new_docs_indexed}
- {indexAttempt.docs_removed_from_index > 0 && ( -
- (also removed {indexAttempt.docs_removed_from_index}{" "} - docs that were detected as deleted in the source) -
- )} -
+ {currentPageData.index_attempts.map((indexAttempt) => { + const docsPerMinute = + getDocsProcessedPerMinute(indexAttempt)?.toFixed(2); + return ( + + + {indexAttempt.time_started + ? localizeAndPrettify(indexAttempt.time_started) + : "-"} + + + + {docsPerMinute && ( +
+ {docsPerMinute} docs / min
-
- {indexAttempt.total_docs_indexed} - -
- {indexAttempt.error_count > 0 && ( - - - -  View Errors - - + )} + + +
+
+
{indexAttempt.new_docs_indexed}
+ {indexAttempt.docs_removed_from_index > 0 && ( +
+ (also removed {indexAttempt.docs_removed_from_index}{" "} + docs that were detected as deleted in the source) +
)} +
+
+
+ {indexAttempt.total_docs_indexed} + +
+ {indexAttempt.error_count > 0 && ( + + + +  View Errors + + + )} - {indexAttempt.status === "success" && ( + {indexAttempt.status === "success" && ( + + {"-"} + + )} + + {indexAttempt.status === "failed" && + indexAttempt.error_msg && ( - {"-"} + {indexAttempt.error_msg} )} - {indexAttempt.status === "failed" && - indexAttempt.error_msg && ( - - {indexAttempt.error_msg} - - )} - - {indexAttempt.full_exception_trace && ( -
{ - setIndexAttemptTracePopupId(indexAttempt.id); - }} - className="mt-2 text-link cursor-pointer select-none" - > - View Full Trace -
- )} -
-
- - ); - })} + {indexAttempt.full_exception_trace && ( +
{ + setIndexAttemptTracePopupId(indexAttempt.id); + }} + className="mt-2 text-link cursor-pointer select-none" + > + View Full Trace +
+ )} +
+
+
+ ); + })} - {ccPair.index_attempts.length > NUM_IN_PAGE && ( + {totalPages > 1 && (
{ - setPage(newPage); - window.scrollTo({ - top: 0, - left: 0, - behavior: "smooth", - }); - }} + onPageChange={updatePage} />
diff --git a/web/src/app/admin/connector/[ccPairId]/page.tsx b/web/src/app/admin/connector/[ccPairId]/page.tsx index f5da225a867..f2e8a8de8cc 100644 --- a/web/src/app/admin/connector/[ccPairId]/page.tsx +++ b/web/src/app/admin/connector/[ccPairId]/page.tsx @@ -1,7 +1,6 @@ "use client"; import { CCPairFullInfo, ConnectorCredentialPairStatus } from "./types"; -import { HealthCheckBanner } from "@/components/health/healthcheck"; import { CCPairStatus } from "@/components/Status"; import { BackButton } from "@/components/BackButton"; import { Button, Divider, Title } from "@tremor/react"; @@ -11,7 +10,6 @@ import { ModifyStatusButtonCluster } from "./ModifyStatusButtonCluster"; import { DeletionButton } from "./DeletionButton"; import { ErrorCallout } from "@/components/ErrorCallout"; import { ReIndexButton } from "./ReIndexButton"; -import { isCurrentlyDeleting } from "@/lib/documentDeletion"; import { ValidSources } from "@/lib/types"; import useSWR, { mutate } from "swr"; import { errorHandlingFetcher } from "@/lib/fetcher"; @@ -86,24 +84,13 @@ function Main({ ccPairId }: { ccPairId: number }) { return ( ); } - const lastIndexAttempt = ccPair.index_attempts[0]; const isDeleting = ccPair.status === ConnectorCredentialPairStatus.DELETING; - // figure out if we need to artificially deflate the number of docs indexed. - // This is required since the total number of docs indexed by a CC Pair is - // updated before the new docs for an indexing attempt. If we don't do this, - // there is a mismatch between these two numbers which may confuse users. - const totalDocsIndexed = - lastIndexAttempt?.status === "in_progress" && - ccPair.index_attempts.length === 1 - ? lastIndexAttempt.total_docs_indexed - : ccPair.num_docs_indexed; - const refresh = () => { mutate(buildCCPairInfoUrl(ccPairId)); }; @@ -182,13 +169,13 @@ function Main({ ccPairId }: { ccPairId: number }) { )}
Total Documents Indexed:{" "} - {totalDocsIndexed} + {ccPair.num_docs_indexed}
{!ccPair.is_editable_for_current_user && (
diff --git a/web/src/app/admin/connector/[ccPairId]/types.ts b/web/src/app/admin/connector/[ccPairId]/types.ts index 1cc43311e21..f44b958b095 100644 --- a/web/src/app/admin/connector/[ccPairId]/types.ts +++ b/web/src/app/admin/connector/[ccPairId]/types.ts @@ -1,6 +1,10 @@ import { Connector } from "@/lib/connectors/connectors"; import { Credential } from "@/lib/connectors/credentials"; -import { DeletionAttemptSnapshot, IndexAttemptSnapshot } from "@/lib/types"; +import { + DeletionAttemptSnapshot, + IndexAttemptSnapshot, + ValidStatuses, +} from "@/lib/types"; export enum ConnectorCredentialPairStatus { ACTIVE = "ACTIVE", @@ -15,8 +19,15 @@ export interface CCPairFullInfo { num_docs_indexed: number; connector: Connector; credential: Credential; - index_attempts: IndexAttemptSnapshot[]; + number_of_index_attempts: number; + last_index_attempt_status: ValidStatuses | null; latest_deletion_attempt: DeletionAttemptSnapshot | null; is_public: boolean; is_editable_for_current_user: boolean; } + +export interface PaginatedIndexAttempts { + index_attempts: IndexAttemptSnapshot[]; + page: number; + total_pages: number; +} diff --git a/web/src/lib/ss/ccPair.ts b/web/src/lib/ss/ccPair.ts deleted file mode 100644 index 847321d1103..00000000000 --- a/web/src/lib/ss/ccPair.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { fetchSS } from "../utilsSS"; - -export async function getCCPairSS(ccPairId: number) { - return fetchSS(`/manage/admin/cc-pair/${ccPairId}`); -}