Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add background indexing status page #3428

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 17 additions & 26 deletions backend/danswer/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
)
from danswer.redis.redis_connector_index import RedisConnectorIndex
from danswer.redis.redis_connector_prune import RedisConnectorPrune
from danswer.redis.redis_connector_utils import RedisConnectorUtils
from danswer.redis.redis_document_set import RedisDocumentSet
from danswer.redis.redis_pool import get_redis_client
from danswer.redis.redis_usergroup import RedisUserGroup
Expand Down Expand Up @@ -402,7 +403,7 @@ def monitor_connector_deletion_taskset(
tenant_id: str | None, key_bytes: bytes, r: Redis
) -> None:
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
cc_pair_id_str = RedisConnectorUtils.get_id_from_fence_key(fence_key)
if cc_pair_id_str is None:
task_logger.warning(f"could not parse cc_pair_id from {fence_key}")
return
Expand Down Expand Up @@ -528,7 +529,7 @@ def monitor_ccpair_pruning_taskset(
tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session
) -> None:
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
cc_pair_id_str = RedisConnectorUtils.get_id_from_fence_key(fence_key)
if cc_pair_id_str is None:
task_logger.warning(
f"monitor_ccpair_pruning_taskset: could not parse cc_pair_id from {fence_key}"
Expand Down Expand Up @@ -566,7 +567,7 @@ def monitor_ccpair_permissions_taskset(
tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session
) -> None:
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
cc_pair_id_str = RedisConnectorUtils.get_id_from_fence_key(fence_key)
if cc_pair_id_str is None:
task_logger.warning(
f"monitor_ccpair_permissions_taskset: could not parse cc_pair_id from {fence_key}"
Expand Down Expand Up @@ -604,25 +605,15 @@ def monitor_ccpair_permissions_taskset(
def monitor_ccpair_indexing_taskset(
tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session
) -> None:
# if the fence doesn't exist, there's nothing to do
fence_key = key_bytes.decode("utf-8")
composite_id = RedisConnector.get_id_from_fence_key(fence_key)
if composite_id is None:
redis_ids = RedisConnectorIndex.parse_key(key_bytes)
if not redis_ids:
task_logger.warning(
f"monitor_ccpair_indexing_taskset: could not parse composite_id from {fence_key}"
f"monitor_ccpair_indexing_taskset: could not parse composite_id from {key_bytes!r}"
)
return

# parse out metadata and initialize the helper class with it
parts = composite_id.split("/")
if len(parts) != 2:
return

cc_pair_id = int(parts[0])
search_settings_id = int(parts[1])

redis_connector = RedisConnector(tenant_id, cc_pair_id)
redis_connector_index = redis_connector.new_index(search_settings_id)
redis_connector = RedisConnector(tenant_id, redis_ids.cc_pair_id)
redis_connector_index = redis_connector.new_index(redis_ids.search_settings_id)
if not redis_connector_index.fenced:
return

Expand All @@ -635,8 +626,8 @@ def monitor_ccpair_indexing_taskset(
progress = redis_connector_index.get_progress()
if progress is not None:
task_logger.info(
f"Connector indexing progress: cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"Connector indexing progress: cc_pair={redis_ids.cc_pair_id} "
f"search_settings={redis_ids.search_settings_id} "
f"progress={progress} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"
)
Expand Down Expand Up @@ -671,8 +662,8 @@ def monitor_ccpair_indexing_taskset(
f"Connector indexing aborted or exceptioned: "
f"attempt={payload.index_attempt_id} "
f"celery_task={payload.celery_task_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"cc_pair={redis_ids.cc_pair_id} "
f"search_settings={redis_ids.search_settings_id} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f} "
f"result.state={task_state} "
f"result.result={task_result} "
Expand All @@ -699,8 +690,8 @@ def monitor_ccpair_indexing_taskset(
"monitor_ccpair_indexing_taskset - transient exception marking index attempt as failed: "
f"attempt={payload.index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
f"cc_pair={redis_ids.cc_pair_id} "
f"search_settings={redis_ids.search_settings_id}"
)

redis_connector_index.reset()
Expand All @@ -709,8 +700,8 @@ def monitor_ccpair_indexing_taskset(
status_enum = HTTPStatus(status_int)

task_logger.info(
f"Connector indexing finished: cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"Connector indexing finished: cc_pair={redis_ids.cc_pair_id} "
f"search_settings={redis_ids.search_settings_id} "
f"progress={progress} "
f"status={status_enum.name} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"
Expand Down
2 changes: 1 addition & 1 deletion backend/danswer/indexing/chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,6 @@ def chunk(self, documents: list[Document]) -> list[DocAwareChunk]:
final_chunks.extend(chunks)

if self.callback:
self.callback.progress("Chunker.chunk", len(chunks))
self.callback.progress("Chunker.chunk", 0)

return final_chunks
3 changes: 2 additions & 1 deletion backend/danswer/indexing/indexing_heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ def should_stop(self) -> bool:

@abstractmethod
def progress(self, tag: str, amount: int) -> None:
"""Send progress updates to the caller."""
"""Send progress updates to the caller.
Send 0 for the amount if you just want to do a keep alive ping."""
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def _batch_encode_texts(
embeddings.extend(response.embeddings)

if self.callback:
self.callback.progress("_batch_encode_texts", 1)
self.callback.progress("_batch_encode_texts", 0)
return embeddings

def encode(
Expand Down
18 changes: 0 additions & 18 deletions backend/danswer/redis/redis_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,6 @@ def wait_for_indexing_termination(

return finished

@staticmethod
def get_id_from_fence_key(key: str) -> str | None:
"""
Extracts the object ID from a fence key in the format `PREFIX_fence_X`.

Args:
key (str): The fence key string.

Returns:
Optional[int]: The extracted ID if the key is in the correct format, otherwise None.
"""
parts = key.split("_")
if len(parts) != 3:
return None

object_id = parts[2]
return object_id

@staticmethod
def get_id_from_task_id(task_id: str) -> str | None:
"""
Expand Down
30 changes: 29 additions & 1 deletion backend/danswer/redis/redis_connector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
import redis
from pydantic import BaseModel

from danswer.redis.redis_connector_utils import RedisConnectorUtils


class RedisConnectorIndexIdentifiers(BaseModel):
cc_pair_id: int
search_settings_id: int


class RedisConnectorIndexPayload(BaseModel):
index_attempt_id: int | None
Expand Down Expand Up @@ -119,7 +126,8 @@ def generator_clear(self) -> None:
self.redis.delete(self.generator_complete_key)

def get_progress(self) -> int | None:
"""Returns None if the key doesn't exist. The"""
"""Returns None if the key doesn't exist. Returns an int representing
the number of documents processed otherwise."""
# TODO: move into fence?
bytes = self.redis.get(self.generator_progress_key)
if bytes is None:
Expand All @@ -143,6 +151,26 @@ def reset(self) -> None:
self.redis.delete(self.generator_complete_key)
self.redis.delete(self.fence_key)

@staticmethod
def parse_key(key_bytes: bytes) -> RedisConnectorIndexIdentifiers | None:
"""Parses the redis key for an indexing job and extracts
the cc_pair_id and search_settings_id"""
fence_key = key_bytes.decode("utf-8")
composite_id = RedisConnectorUtils.get_id_from_fence_key(fence_key)
if composite_id is None:
return None

# parse out metadata and initialize the helper class with it
parts = composite_id.split("/")
if len(parts) != 2:
return None

cc_pair_id = int(parts[0])
search_settings_id = int(parts[1])
return RedisConnectorIndexIdentifiers(
cc_pair_id=cc_pair_id, search_settings_id=search_settings_id
)

@staticmethod
def reset_all(r: redis.Redis) -> None:
"""Deletes all redis values for all connectors"""
Expand Down
18 changes: 18 additions & 0 deletions backend/danswer/redis/redis_connector_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class RedisConnectorUtils:
@staticmethod
def get_id_from_fence_key(key: str) -> str | None:
"""
Extracts the object ID from a fence key in the format `PREFIX_fence_X`.

Args:
key (str): The fence key string.

Returns:
Optional[int]: The extracted ID if the key is in the correct format, otherwise None.
"""
parts = key.split("_")
if len(parts) != 3:
return None

object_id = parts[2]
return object_id
62 changes: 62 additions & 0 deletions backend/danswer/server/documents/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from danswer.db.connector_credential_pair import add_credential_to_connector
from danswer.db.connector_credential_pair import get_cc_pair_groups_for_ids
from danswer.db.connector_credential_pair import get_connector_credential_pair
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.connector_credential_pair import get_connector_credential_pairs
from danswer.db.credentials import cleanup_gmail_credentials
from danswer.db.credentials import cleanup_google_drive_credentials
Expand All @@ -90,8 +91,11 @@
from danswer.file_store.file_store import get_default_file_store
from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.redis.redis_connector import RedisConnector
from danswer.redis.redis_connector_index import RedisConnectorIndex
from danswer.redis.redis_pool import get_redis_client
from danswer.server.documents.models import AuthStatus
from danswer.server.documents.models import AuthUrl
from danswer.server.documents.models import ConnectorBackgroundIndexingStatus
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
from danswer.server.documents.models import ConnectorIndexingStatus
from danswer.server.documents.models import ConnectorSnapshot
Expand Down Expand Up @@ -642,6 +646,64 @@ def get_connector_indexing_status(
return indexing_statuses


@router.get("/admin/background/indexing")
def get_background_indexing(
user: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
tenant_id: str | None = Depends(get_current_tenant_id),
) -> list[ConnectorBackgroundIndexingStatus]:
indexing_statuses: list[ConnectorBackgroundIndexingStatus] = []

r = get_redis_client(tenant_id=tenant_id)

for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"):
redis_ids = RedisConnectorIndex.parse_key(key_bytes)
if not redis_ids:
continue

redis_connector = RedisConnector(tenant_id, redis_ids.cc_pair_id)
redis_connector_index = redis_connector.new_index(redis_ids.search_settings_id)
if not redis_connector_index.fenced:
continue

payload = redis_connector_index.payload
if not payload:
continue

if not payload.started:
continue

n_progress = redis_connector_index.get_progress()
if not n_progress:
n_progress = 0

cc_pair = get_connector_credential_pair_from_id(
redis_ids.cc_pair_id, db_session
)
if not cc_pair:
continue

indexing_statuses.append(
ConnectorBackgroundIndexingStatus(
name=cc_pair.name,
source=cc_pair.connector.source,
started=payload.started,
progress=n_progress,
index_attempt_id=payload.index_attempt_id,
cc_pair_id=redis_ids.cc_pair_id,
search_settings_id=redis_ids.search_settings_id,
)
)

# Sort the statuses
indexing_statuses = sorted(
indexing_statuses,
key=lambda status: (status.cc_pair_id, status.search_settings_id),
)

return indexing_statuses


def _validate_connector_allowed(source: DocumentSource) -> None:
valid_connectors = [
x for x in ENABLED_CONNECTOR_TYPES.replace("_", "").split(",") if x
Expand Down
10 changes: 10 additions & 0 deletions backend/danswer/server/documents/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,16 @@ class RunConnectorRequest(BaseModel):
from_beginning: bool = False


class ConnectorBackgroundIndexingStatus(BaseModel):
name: str
source: DocumentSource
cc_pair_id: int
search_settings_id: int
index_attempt_id: int | None
started: datetime | None
progress: int | None


class CCPropertyUpdateRequest(BaseModel):
name: str
value: str
Expand Down
Loading
Loading