Skip to content

Commit

Permalink
add ccpair id to logging (#2391)
Browse files Browse the repository at this point in the history
  • Loading branch information
pablonyx authored Sep 11, 2024
1 parent 9f6e8bd commit 0d749eb
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 3 deletions.
9 changes: 7 additions & 2 deletions backend/danswer/background/indexing/run_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,17 +384,22 @@ def _prepare_index_attempt(db_session: Session, index_attempt_id: int) -> IndexA
return attempt


def run_indexing_entrypoint(index_attempt_id: int, is_ee: bool = False) -> None:
def run_indexing_entrypoint(
index_attempt_id: int, connector_credential_pair_id: int, is_ee: bool = False
) -> None:
"""Entrypoint for indexing run when using dask distributed.
Wraps the actual logic in a `try` block so that we can catch any exceptions
and mark the attempt as failed."""

try:
if is_ee:
global_version.set_ee()

# set the indexing attempt ID so that all log messages from this process
# will have it added as a prefix
IndexAttemptSingleton.set_index_attempt_id(index_attempt_id)
IndexAttemptSingleton.set_cc_and_index_id(
index_attempt_id, connector_credential_pair_id
)

with Session(get_sqlalchemy_engine()) as db_session:
# make sure that it is valid to run this indexing attempt + mark it
Expand Down
2 changes: 2 additions & 0 deletions backend/danswer/background/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,15 @@ def kickoff_indexing_jobs(
run = secondary_client.submit(
run_indexing_entrypoint,
attempt.id,
attempt.connector_credential_pair_id,
global_version.get_is_ee_version(),
pure=False,
)
else:
run = client.submit(
run_indexing_entrypoint,
attempt.id,
attempt.connector_credential_pair_id,
global_version.get_is_ee_version(),
pure=False,
)
Expand Down
15 changes: 14 additions & 1 deletion backend/danswer/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,22 @@ class IndexAttemptSingleton:
main background job (scheduler), etc. this will not be used."""

_INDEX_ATTEMPT_ID: None | int = None
_CONNECTOR_CREDENTIAL_PAIR_ID: None | int = None

@classmethod
def get_index_attempt_id(cls) -> None | int:
return cls._INDEX_ATTEMPT_ID

@classmethod
def set_index_attempt_id(cls, index_attempt_id: int) -> None:
def get_connector_credential_pair_id(cls) -> None | int:
return cls._CONNECTOR_CREDENTIAL_PAIR_ID

@classmethod
def set_cc_and_index_id(
cls, index_attempt_id: int, connector_credential_pair_id: int
) -> None:
cls._INDEX_ATTEMPT_ID = index_attempt_id
cls._CONNECTOR_CREDENTIAL_PAIR_ID = connector_credential_pair_id


def get_log_level_from_str(log_level_str: str = LOG_LEVEL) -> int:
Expand All @@ -50,9 +58,14 @@ def process(
# If this is an indexing job, add the attempt ID to the log message
# This helps filter the logs for this specific indexing
attempt_id = IndexAttemptSingleton.get_index_attempt_id()
cc_pair_id = IndexAttemptSingleton.get_connector_credential_pair_id()

if attempt_id is not None:
msg = f"[Attempt ID: {attempt_id}] {msg}"

if cc_pair_id is not None:
msg = f"[CC Pair ID: {cc_pair_id}] {msg}"

# For Slack Bot, logs the channel relevant to the request
channel_id = self.extra.get(SLACK_CHANNEL_ID) if self.extra else None
if channel_id:
Expand Down

0 comments on commit 0d749eb

Please sign in to comment.