From aa84846298963ce412a10745441c4bcc6ead2cb7 Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Sun, 1 Sep 2024 23:32:20 -0700 Subject: [PATCH] Connector deletion fix (#2293) --------- Co-authored-by: Weves --- backend/danswer/background/update.py | 6 +- backend/danswer/db/document_set.py | 71 +++++++++++++------ .../danswer/server/manage/administrative.py | 8 ++- .../tests/connector/test_deletion.py | 42 +++++------ 4 files changed, 82 insertions(+), 45 deletions(-) diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index 28abb481143..10fa36a1d8c 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -17,6 +17,7 @@ from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP from danswer.configs.app_configs import NUM_INDEXING_WORKERS from danswer.configs.app_configs import NUM_SECONDARY_INDEXING_WORKERS +from danswer.configs.constants import DocumentSource from danswer.configs.constants import POSTGRES_INDEXER_APP_NAME from danswer.db.connector import fetch_connectors from danswer.db.connector_credential_pair import fetch_connector_credential_pairs @@ -46,7 +47,6 @@ from shared_configs.configs import LOG_LEVEL from shared_configs.configs import MODEL_SERVER_PORT - logger = setup_logger() # If the indexing dies, it's most likely due to resource constraints, @@ -67,6 +67,10 @@ def _should_create_new_indexing( ) -> bool: connector = cc_pair.connector + # don't kick off indexing for `NOT_APPLICABLE` sources + if connector.source == DocumentSource.NOT_APPLICABLE: + return False + # User can still manually create single indexing attempts via the UI for the # currently in use index if DISABLE_INDEX_UPDATE_ON_SWAP: diff --git a/backend/danswer/db/document_set.py b/backend/danswer/db/document_set.py index 2de61a491f9..c2900593835 100644 --- a/backend/danswer/db/document_set.py +++ b/backend/danswer/db/document_set.py @@ -524,37 +524,66 @@ def fetch_document_sets_for_documents( db_session: Session, ) -> Sequence[tuple[str, list[str]]]: """Gives back a list of (document_id, list[document_set_names]) tuples""" + + """Building subqueries""" + # NOTE: have to build these subqueries first in order to guarantee that we get one + # returned row for each specified document_id. Basically, we want to do the filters first, + # then the outer joins. + + # don't include CC pairs that are being deleted + # NOTE: CC pairs can never go from DELETING to any other state -> it's safe to ignore them + # as we can assume their document sets are no longer relevant + valid_cc_pairs_subquery = aliased( + ConnectorCredentialPair, + select(ConnectorCredentialPair) + .where( + ConnectorCredentialPair.status != ConnectorCredentialPairStatus.DELETING + ) # noqa: E712 + .subquery(), + ) + + valid_document_set__cc_pairs_subquery = aliased( + DocumentSet__ConnectorCredentialPair, + select(DocumentSet__ConnectorCredentialPair) + .where(DocumentSet__ConnectorCredentialPair.is_current == True) # noqa: E712 + .subquery(), + ) + """End building subqueries""" + stmt = ( - select(Document.id, func.array_agg(DocumentSetDBModel.name)) - .join( - DocumentSet__ConnectorCredentialPair, - DocumentSetDBModel.id - == DocumentSet__ConnectorCredentialPair.document_set_id, - ) - .join( - ConnectorCredentialPair, - ConnectorCredentialPair.id - == DocumentSet__ConnectorCredentialPair.connector_credential_pair_id, + select( + Document.id, + func.coalesce( + func.array_remove(func.array_agg(DocumentSetDBModel.name), None), [] + ).label("document_set_names"), ) - .join( + # Here we select document sets by relation: + # Document -> DocumentByConnectorCredentialPair -> ConnectorCredentialPair -> + # DocumentSet__ConnectorCredentialPair -> DocumentSet + .outerjoin( DocumentByConnectorCredentialPair, + Document.id == DocumentByConnectorCredentialPair.id, + ) + .outerjoin( + valid_cc_pairs_subquery, and_( DocumentByConnectorCredentialPair.connector_id - == ConnectorCredentialPair.connector_id, + == valid_cc_pairs_subquery.connector_id, DocumentByConnectorCredentialPair.credential_id - == ConnectorCredentialPair.credential_id, + == valid_cc_pairs_subquery.credential_id, ), ) - .join( - Document, - Document.id == DocumentByConnectorCredentialPair.id, + .outerjoin( + valid_document_set__cc_pairs_subquery, + valid_cc_pairs_subquery.id + == valid_document_set__cc_pairs_subquery.connector_credential_pair_id, + ) + .outerjoin( + DocumentSetDBModel, + DocumentSetDBModel.id + == valid_document_set__cc_pairs_subquery.document_set_id, ) .where(Document.id.in_(document_ids)) - # don't include CC pairs that are being deleted - # NOTE: CC pairs can never go from DELETING to any other state -> it's safe to ignore them - # as we can assume their document sets are no longer relevant - .where(ConnectorCredentialPair.status != ConnectorCredentialPairStatus.DELETING) - .where(DocumentSet__ConnectorCredentialPair.is_current == True) # noqa: E712 .group_by(Document.id) ) return db_session.execute(stmt).all() # type: ignore diff --git a/backend/danswer/server/manage/administrative.py b/backend/danswer/server/manage/administrative.py index 0ac90ba8d11..a2d7156892c 100644 --- a/backend/danswer/server/manage/administrative.py +++ b/backend/danswer/server/manage/administrative.py @@ -166,10 +166,14 @@ def create_deletion_attempt_for_connector_id( get_editable=True, ) if cc_pair is None: + error = ( + f"Connector with ID '{connector_id}' and credential ID " + f"'{credential_id}' does not exist. Has it already been deleted?" + ) + logger.error(error) raise HTTPException( status_code=404, - detail=f"Connector with ID '{connector_id}' and credential ID " - f"'{credential_id}' does not exist. Has it already been deleted?", + detail=error, ) # Cancel any scheduled indexing attempts diff --git a/backend/tests/integration/tests/connector/test_deletion.py b/backend/tests/integration/tests/connector/test_deletion.py index a7708a02418..08d34c4c1a8 100644 --- a/backend/tests/integration/tests/connector/test_deletion.py +++ b/backend/tests/integration/tests/connector/test_deletion.py @@ -264,6 +264,8 @@ def test_connector_deletion_for_overlapping_connectors( doc_creating_user=admin_user, ) + # EVERYTHING BELOW HERE IS CURRENTLY BROKEN AND NEEDS TO BE FIXED SERVER SIDE + # delete connector 1 CCPairManager.pause_cc_pair( cc_pair=cc_pair_1, @@ -274,32 +276,30 @@ def test_connector_deletion_for_overlapping_connectors( user_performing_action=admin_user, ) - # EVERYTHING BELOW HERE IS CURRENTLY BROKEN AND NEEDS TO BE FIXED SERVER SIDE - # wait for deletion to finish - # CCPairManager.wait_for_deletion_completion(user_performing_action=admin_user) + CCPairManager.wait_for_deletion_completion(user_performing_action=admin_user) - # print("Connector 1 deleted") + print("Connector 1 deleted") # check that only connector 1 is deleted # TODO: check for the CC pair rather than the connector once the refactor is done - # CCPairManager.verify( - # cc_pair=cc_pair_1, - # verify_deleted=True, - # user_performing_action=admin_user, - # ) - # CCPairManager.verify( - # cc_pair=cc_pair_2, - # user_performing_action=admin_user, - # ) + CCPairManager.verify( + cc_pair=cc_pair_1, + verify_deleted=True, + user_performing_action=admin_user, + ) + CCPairManager.verify( + cc_pair=cc_pair_2, + user_performing_action=admin_user, + ) # verify the document is not in any document sets # verify the document is only in user group 2 - # DocumentManager.verify( - # vespa_client=vespa_client, - # cc_pair=cc_pair_2, - # doc_set_names=[], - # group_names=[user_group_2.name], - # doc_creating_user=admin_user, - # verify_deleted=False, - # ) + DocumentManager.verify( + vespa_client=vespa_client, + cc_pair=cc_pair_2, + doc_set_names=[], + group_names=[user_group_2.name], + doc_creating_user=admin_user, + verify_deleted=False, + )