From 1197a23960fb681bc821340ae53611c827fe92ed Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Wed, 16 Oct 2024 19:34:11 -0700 Subject: [PATCH 1/4] push multi tenancy for slackbot --- .../slack/handlers/handle_buttons.py | 25 ++-- .../slack/handlers/handle_message.py | 11 +- .../slack/handlers/handle_regular_answer.py | 11 +- backend/danswer/danswerbot/slack/listener.py | 141 ++++++++++-------- backend/danswer/danswerbot/slack/utils.py | 9 +- 5 files changed, 109 insertions(+), 88 deletions(-) diff --git a/backend/danswer/danswerbot/slack/handlers/handle_buttons.py b/backend/danswer/danswerbot/slack/handlers/handle_buttons.py index 9e1c171ee4f..1edbd596c29 100644 --- a/backend/danswer/danswerbot/slack/handlers/handle_buttons.py +++ b/backend/danswer/danswerbot/slack/handlers/handle_buttons.py @@ -4,9 +4,7 @@ from slack_sdk import WebClient from slack_sdk.models.blocks import SectionBlock from slack_sdk.models.views import View -from slack_sdk.socket_mode import SocketModeClient from slack_sdk.socket_mode.request import SocketModeRequest -from sqlalchemy.orm import Session from danswer.configs.constants import MessageType from danswer.configs.constants import SearchFeedbackType @@ -26,6 +24,7 @@ from danswer.danswerbot.slack.handlers.handle_regular_answer import ( handle_regular_answer, ) +from danswer.danswerbot.slack.listener import TenantSocketModeClient from danswer.danswerbot.slack.models import SlackMessageInfo from danswer.danswerbot.slack.utils import build_feedback_id from danswer.danswerbot.slack.utils import decompose_action_id @@ -36,7 +35,7 @@ from danswer.danswerbot.slack.utils import read_slack_thread from danswer.danswerbot.slack.utils import respond_in_thread from danswer.danswerbot.slack.utils import update_emote_react -from danswer.db.engine import get_sqlalchemy_engine +from danswer.db.engine import get_session_with_tenant from danswer.db.feedback import create_chat_message_feedback from danswer.db.feedback import create_doc_retrieval_feedback from danswer.document_index.document_index_utils import get_both_index_names @@ -48,7 +47,7 @@ def handle_doc_feedback_button( req: SocketModeRequest, - client: SocketModeClient, + client: TenantSocketModeClient, ) -> None: if not (actions := req.payload.get("actions")): logger.error("Missing actions. Unable to build the source feedback view") @@ -81,7 +80,7 @@ def handle_doc_feedback_button( def handle_generate_answer_button( req: SocketModeRequest, - client: SocketModeClient, + client: TenantSocketModeClient, ) -> None: channel_id = req.payload["channel"]["id"] channel_name = req.payload["channel"]["name"] @@ -116,7 +115,7 @@ def handle_generate_answer_button( thread_ts=thread_ts, ) - with Session(get_sqlalchemy_engine()) as db_session: + with get_session_with_tenant(client.tenant_id) as db_session: slack_bot_config = get_slack_bot_config_for_channel( channel_name=channel_name, db_session=db_session ) @@ -136,6 +135,7 @@ def handle_generate_answer_button( slack_bot_config=slack_bot_config, receiver_ids=None, client=client.web_client, + tenant_id=client.tenant_id, channel=channel_id, logger=logger, feedback_reminder_id=None, @@ -150,12 +150,11 @@ def handle_slack_feedback( user_id_to_post_confirmation: str, channel_id_to_post_confirmation: str, thread_ts_to_post_confirmation: str, + tenant_id: str | None, ) -> None: - engine = get_sqlalchemy_engine() - message_id, doc_id, doc_rank = decompose_action_id(feedback_id) - with Session(engine) as db_session: + with get_session_with_tenant(tenant_id) as db_session: if feedback_type in [LIKE_BLOCK_ACTION_ID, DISLIKE_BLOCK_ACTION_ID]: create_chat_message_feedback( is_positive=feedback_type == LIKE_BLOCK_ACTION_ID, @@ -232,7 +231,7 @@ def handle_slack_feedback( def handle_followup_button( req: SocketModeRequest, - client: SocketModeClient, + client: TenantSocketModeClient, ) -> None: action_id = None if actions := req.payload.get("actions"): @@ -252,7 +251,7 @@ def handle_followup_button( tag_ids: list[str] = [] group_ids: list[str] = [] - with Session(get_sqlalchemy_engine()) as db_session: + with get_session_with_tenant(client.tenant_id) as db_session: channel_name, is_dm = get_channel_name_from_id( client=client.web_client, channel_id=channel_id ) @@ -295,7 +294,7 @@ def handle_followup_button( def get_clicker_name( req: SocketModeRequest, - client: SocketModeClient, + client: TenantSocketModeClient, ) -> str: clicker_name = req.payload.get("user", {}).get("name", "Someone") clicker_real_name = None @@ -316,7 +315,7 @@ def get_clicker_name( def handle_followup_resolved_button( req: SocketModeRequest, - client: SocketModeClient, + client: TenantSocketModeClient, immediate: bool = False, ) -> None: channel_id = req.payload["container"]["channel_id"] diff --git a/backend/danswer/danswerbot/slack/handlers/handle_message.py b/backend/danswer/danswerbot/slack/handlers/handle_message.py index 0882796204d..ffbe902c5ec 100644 --- a/backend/danswer/danswerbot/slack/handlers/handle_message.py +++ b/backend/danswer/danswerbot/slack/handlers/handle_message.py @@ -2,7 +2,6 @@ from slack_sdk import WebClient from slack_sdk.errors import SlackApiError -from sqlalchemy.orm import Session from danswer.configs.danswerbot_configs import DANSWER_BOT_FEEDBACK_REMINDER from danswer.configs.danswerbot_configs import DANSWER_REACT_EMOJI @@ -19,7 +18,7 @@ from danswer.danswerbot.slack.utils import respond_in_thread from danswer.danswerbot.slack.utils import slack_usage_report from danswer.danswerbot.slack.utils import update_emote_react -from danswer.db.engine import get_sqlalchemy_engine +from danswer.db.engine import get_session_with_tenant from danswer.db.models import SlackBotConfig from danswer.db.users import add_non_web_user_if_not_exists from danswer.utils.logger import setup_logger @@ -110,6 +109,7 @@ def handle_message( slack_bot_config: SlackBotConfig | None, client: WebClient, feedback_reminder_id: str | None, + tenant_id: str | None, ) -> bool: """Potentially respond to the user message depending on filters and if an answer was generated @@ -135,7 +135,9 @@ def handle_message( action = "slack_tag_message" elif is_bot_dm: action = "slack_dm_message" - slack_usage_report(action=action, sender_id=sender_id, client=client) + slack_usage_report( + action=action, sender_id=sender_id, client=client, tenant_id=tenant_id + ) document_set_names: list[str] | None = None persona = slack_bot_config.persona if slack_bot_config else None @@ -209,7 +211,7 @@ def handle_message( except SlackApiError as e: logger.error(f"Was not able to react to user message due to: {e}") - with Session(get_sqlalchemy_engine()) as db_session: + with get_session_with_tenant(tenant_id) as db_session: if message_info.email: add_non_web_user_if_not_exists(db_session, message_info.email) @@ -235,5 +237,6 @@ def handle_message( channel=channel, logger=logger, feedback_reminder_id=feedback_reminder_id, + tenant_id=tenant_id, ) return issue_with_regular_answer diff --git a/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py b/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py index e864d92c702..77a16bae537 100644 --- a/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py +++ b/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py @@ -9,7 +9,6 @@ from slack_sdk import WebClient from slack_sdk.models.blocks import DividerBlock from slack_sdk.models.blocks import SectionBlock -from sqlalchemy.orm import Session from danswer.configs.app_configs import DISABLE_GENERATIVE_AI from danswer.configs.danswerbot_configs import DANSWER_BOT_ANSWER_GENERATION_TIMEOUT @@ -32,7 +31,7 @@ from danswer.danswerbot.slack.utils import respond_in_thread from danswer.danswerbot.slack.utils import SlackRateLimiter from danswer.danswerbot.slack.utils import update_emote_react -from danswer.db.engine import get_sqlalchemy_engine +from danswer.db.engine import get_session_with_tenant from danswer.db.models import Persona from danswer.db.models import SlackBotConfig from danswer.db.models import SlackBotResponseType @@ -87,6 +86,7 @@ def handle_regular_answer( channel: str, logger: DanswerLoggingAdapter, feedback_reminder_id: str | None, + tenant_id: str | None, num_retries: int = DANSWER_BOT_NUM_RETRIES, answer_generation_timeout: int = DANSWER_BOT_ANSWER_GENERATION_TIMEOUT, thread_context_percent: float = DANSWER_BOT_TARGET_CHUNK_PERCENTAGE, @@ -103,8 +103,7 @@ def handle_regular_answer( user = None if message_info.is_bot_dm: if message_info.email: - engine = get_sqlalchemy_engine() - with Session(engine) as db_session: + with get_session_with_tenant(tenant_id) as db_session: user = get_user_by_email(message_info.email, db_session) document_set_names: list[str] | None = None @@ -151,7 +150,7 @@ def _get_answer(new_message_request: DirectQARequest) -> OneShotQAResponse | Non max_document_tokens: int | None = None max_history_tokens: int | None = None - with Session(get_sqlalchemy_engine()) as db_session: + with get_session_with_tenant(tenant_id) as db_session: if len(new_message_request.messages) > 1: if new_message_request.persona_config: raise RuntimeError("Slack bot does not support persona config") @@ -245,7 +244,7 @@ def _get_answer(new_message_request: DirectQARequest) -> OneShotQAResponse | Non ) # Always apply reranking settings if it exists, this is the non-streaming flow - with Session(get_sqlalchemy_engine()) as db_session: + with get_session_with_tenant(tenant_id) as db_session: saved_search_settings = get_current_search_settings(db_session) # This includes throwing out answer via reflexion diff --git a/backend/danswer/danswerbot/slack/listener.py b/backend/danswer/danswerbot/slack/listener.py index dbf6eae24cd..95d085cb872 100644 --- a/backend/danswer/danswerbot/slack/listener.py +++ b/backend/danswer/danswerbot/slack/listener.py @@ -7,8 +7,8 @@ from slack_sdk.socket_mode import SocketModeClient from slack_sdk.socket_mode.request import SocketModeRequest from slack_sdk.socket_mode.response import SocketModeResponse -from sqlalchemy.orm import Session +from danswer.background.celery.celery_app import get_all_tenant_ids from danswer.configs.constants import MessageType from danswer.configs.danswerbot_configs import DANSWER_BOT_REPHRASE_MESSAGE from danswer.configs.danswerbot_configs import DANSWER_BOT_RESPOND_EVERY_CHANNEL @@ -47,7 +47,7 @@ from danswer.danswerbot.slack.utils import remove_danswer_bot_tag from danswer.danswerbot.slack.utils import rephrase_slack_message from danswer.danswerbot.slack.utils import respond_in_thread -from danswer.db.engine import get_sqlalchemy_engine +from danswer.db.engine import get_session_with_tenant from danswer.db.search_settings import get_current_search_settings from danswer.key_value_store.interface import KvKeyNotFoundError from danswer.natural_language_processing.search_nlp_models import EmbeddingModel @@ -80,7 +80,13 @@ _OFFICIAL_SLACKBOT_USER_ID = "USLACKBOT" -def prefilter_requests(req: SocketModeRequest, client: SocketModeClient) -> bool: +class TenantSocketModeClient(SocketModeClient): + def __init__(self, tenant_id: str | None, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) + self.tenant_id = tenant_id + + +def prefilter_requests(req: SocketModeRequest, client: TenantSocketModeClient) -> bool: """True to keep going, False to ignore this Slack request""" if req.type == "events_api": # Verify channel is valid @@ -153,8 +159,7 @@ def prefilter_requests(req: SocketModeRequest, client: SocketModeClient) -> bool client=client.web_client, channel_id=channel ) - engine = get_sqlalchemy_engine() - with Session(engine) as db_session: + with get_session_with_tenant(client.tenant_id) as db_session: slack_bot_config = get_slack_bot_config_for_channel( channel_name=channel_name, db_session=db_session ) @@ -221,7 +226,7 @@ def prefilter_requests(req: SocketModeRequest, client: SocketModeClient) -> bool return True -def process_feedback(req: SocketModeRequest, client: SocketModeClient) -> None: +def process_feedback(req: SocketModeRequest, client: TenantSocketModeClient) -> None: if actions := req.payload.get("actions"): action = cast(dict[str, Any], actions[0]) feedback_type = cast(str, action.get("action_id")) @@ -243,6 +248,7 @@ def process_feedback(req: SocketModeRequest, client: SocketModeClient) -> None: user_id_to_post_confirmation=user_id, channel_id_to_post_confirmation=channel_id, thread_ts_to_post_confirmation=thread_ts, + tenant_id=client.tenant_id, ) query_event_id, _, _ = decompose_action_id(feedback_id) @@ -250,7 +256,7 @@ def process_feedback(req: SocketModeRequest, client: SocketModeClient) -> None: def build_request_details( - req: SocketModeRequest, client: SocketModeClient + req: SocketModeRequest, client: TenantSocketModeClient ) -> SlackMessageInfo: if req.type == "events_api": event = cast(dict[str, Any], req.payload["event"]) @@ -329,7 +335,7 @@ def build_request_details( def apologize_for_fail( details: SlackMessageInfo, - client: SocketModeClient, + client: TenantSocketModeClient, ) -> None: respond_in_thread( client=client.web_client, @@ -341,7 +347,7 @@ def apologize_for_fail( def process_message( req: SocketModeRequest, - client: SocketModeClient, + client: TenantSocketModeClient, respond_every_channel: bool = DANSWER_BOT_RESPOND_EVERY_CHANNEL, notify_no_answer: bool = NOTIFY_SLACKBOT_NO_ANSWER, ) -> None: @@ -357,8 +363,7 @@ def process_message( client=client.web_client, channel_id=channel ) - engine = get_sqlalchemy_engine() - with Session(engine) as db_session: + with get_session_with_tenant(client.tenant_id) as db_session: slack_bot_config = get_slack_bot_config_for_channel( channel_name=channel_name, db_session=db_session ) @@ -390,6 +395,7 @@ def process_message( slack_bot_config=slack_bot_config, client=client.web_client, feedback_reminder_id=feedback_reminder_id, + tenant_id=client.tenant_id, ) if failed: @@ -404,12 +410,12 @@ def process_message( apologize_for_fail(details, client) -def acknowledge_message(req: SocketModeRequest, client: SocketModeClient) -> None: +def acknowledge_message(req: SocketModeRequest, client: TenantSocketModeClient) -> None: response = SocketModeResponse(envelope_id=req.envelope_id) client.send_socket_mode_response(response) -def action_routing(req: SocketModeRequest, client: SocketModeClient) -> None: +def action_routing(req: SocketModeRequest, client: TenantSocketModeClient) -> None: if actions := req.payload.get("actions"): action = cast(dict[str, Any], actions[0]) @@ -429,13 +435,13 @@ def action_routing(req: SocketModeRequest, client: SocketModeClient) -> None: return handle_generate_answer_button(req, client) -def view_routing(req: SocketModeRequest, client: SocketModeClient) -> None: +def view_routing(req: SocketModeRequest, client: TenantSocketModeClient) -> None: if view := req.payload.get("view"): if view["callback_id"] == VIEW_DOC_FEEDBACK_ID: return process_feedback(req, client) -def process_slack_event(client: SocketModeClient, req: SocketModeRequest) -> None: +def process_slack_event(client: TenantSocketModeClient, req: SocketModeRequest) -> None: # Always respond right away, if Slack doesn't receive these frequently enough # it will assume the Bot is DEAD!!! :( acknowledge_message(req, client) @@ -453,17 +459,20 @@ def process_slack_event(client: SocketModeClient, req: SocketModeRequest) -> Non logger.error(f"Slack request payload: {req.payload}") -def _get_socket_client(slack_bot_tokens: SlackBotTokens) -> SocketModeClient: +def _get_socket_client( + slack_bot_tokens: SlackBotTokens, tenant_id: str | None +) -> TenantSocketModeClient: # For more info on how to set this up, checkout the docs: # https://docs.danswer.dev/slack_bot_setup - return SocketModeClient( + return TenantSocketModeClient( # This app-level token will be used only for establishing a connection app_token=slack_bot_tokens.app_token, web_client=WebClient(token=slack_bot_tokens.bot_token), + tenant_id=tenant_id, ) -def _initialize_socket_client(socket_client: SocketModeClient) -> None: +def _initialize_socket_client(socket_client: TenantSocketModeClient) -> None: socket_client.socket_mode_request_listeners.append(process_slack_event) # type: ignore # Establish a WebSocket connection to the Socket Mode servers @@ -481,8 +490,8 @@ def _initialize_socket_client(socket_client: SocketModeClient) -> None: # NOTE: we are using Web Sockets so that you can run this from within a firewalled VPC # without issue. if __name__ == "__main__": - slack_bot_tokens: SlackBotTokens | None = None - socket_client: SocketModeClient | None = None + slack_bot_tokens: dict[str | None, SlackBotTokens] = {} + socket_clients: dict[str | None, TenantSocketModeClient] = {} set_is_ee_based_on_env_variable() @@ -491,46 +500,56 @@ def _initialize_socket_client(socket_client: SocketModeClient) -> None: while True: try: - latest_slack_bot_tokens = fetch_tokens() - - if latest_slack_bot_tokens != slack_bot_tokens: - if slack_bot_tokens is not None: - logger.notice("Slack Bot tokens have changed - reconnecting") - else: - # This happens on the very first time the listener process comes up - # or the tokens have updated (set up for the first time) - with Session(get_sqlalchemy_engine()) as db_session: - search_settings = get_current_search_settings(db_session) - embedding_model = EmbeddingModel.from_db_model( - search_settings=search_settings, - server_host=MODEL_SERVER_HOST, - server_port=MODEL_SERVER_PORT, - ) - - warm_up_bi_encoder( - embedding_model=embedding_model, - ) - - slack_bot_tokens = latest_slack_bot_tokens - # potentially may cause a message to be dropped, but it is complicated - # to avoid + (1) if the user is changing tokens, they are likely okay with some - # "migration downtime" and (2) if a single message is lost it is okay - # as this should be a very rare occurrence - if socket_client: - socket_client.close() - - socket_client = _get_socket_client(slack_bot_tokens) - _initialize_socket_client(socket_client) - - # Let the handlers run in the background + re-check for token updates every 60 seconds + tenant_ids = get_all_tenant_ids() # Function to retrieve all tenant IDs + + for tenant_id in tenant_ids: + with get_session_with_tenant(tenant_id) as db_session: + try: + latest_slack_bot_tokens = fetch_tokens() + + if ( + tenant_id not in slack_bot_tokens + or latest_slack_bot_tokens != slack_bot_tokens[tenant_id] + ): + if tenant_id in slack_bot_tokens: + logger.notice( + f"Slack Bot tokens have changed for tenant {tenant_id} - reconnecting" + ) + else: + # Initial setup for this tenant + search_settings = get_current_search_settings( + db_session + ) + embedding_model = EmbeddingModel.from_db_model( + search_settings=search_settings, + server_host=MODEL_SERVER_HOST, + server_port=MODEL_SERVER_PORT, + ) + warm_up_bi_encoder(embedding_model=embedding_model) + + slack_bot_tokens[tenant_id] = latest_slack_bot_tokens + + # Close existing WebClient if any + if tenant_id in socket_clients: + socket_clients[tenant_id].close() + + socket_client = _get_socket_client( + latest_slack_bot_tokens, tenant_id + ) + _initialize_socket_client(socket_client) + + socket_clients[tenant_id] = socket_client + + except KvKeyNotFoundError: + logger.debug(f"Missing Slack Bot tokens for tenant {tenant_id}") + if tenant_id in socket_clients: + socket_clients[tenant_id].disconnect() + del socket_clients[tenant_id] + del slack_bot_tokens[tenant_id] + + # Wait before checking for updates Event().wait(timeout=60) - except KvKeyNotFoundError: - # try again every 30 seconds. This is needed since the user may add tokens - # via the UI at any point in the programs lifecycle - if we just allow it to - # fail, then the user will need to restart the containers after adding tokens - logger.debug( - "Missing Slack Bot tokens - waiting 60 seconds and trying again" - ) - if socket_client: - socket_client.disconnect() + + except Exception as e: + logger.exception(f"An error occurred: {e}") time.sleep(60) diff --git a/backend/danswer/danswerbot/slack/utils.py b/backend/danswer/danswerbot/slack/utils.py index 81209cf5c17..67c31ad0e42 100644 --- a/backend/danswer/danswerbot/slack/utils.py +++ b/backend/danswer/danswerbot/slack/utils.py @@ -12,7 +12,6 @@ from slack_sdk.errors import SlackApiError from slack_sdk.models.blocks import Block from slack_sdk.models.metadata import Metadata -from sqlalchemy.orm import Session from danswer.configs.app_configs import DISABLE_TELEMETRY from danswer.configs.constants import ID_SEPARATOR @@ -31,7 +30,7 @@ from danswer.connectors.slack.utils import SlackTextCleaner from danswer.danswerbot.slack.constants import FeedbackVisibility from danswer.danswerbot.slack.tokens import fetch_tokens -from danswer.db.engine import get_sqlalchemy_engine +from danswer.db.engine import get_session_with_tenant from danswer.db.users import get_user_by_email from danswer.llm.exceptions import GenAIDisabledException from danswer.llm.factory import get_default_llms @@ -489,7 +488,9 @@ def read_slack_thread( return thread_messages -def slack_usage_report(action: str, sender_id: str | None, client: WebClient) -> None: +def slack_usage_report( + action: str, sender_id: str | None, client: WebClient, tenant_id: str | None +) -> None: if DISABLE_TELEMETRY: return @@ -501,7 +502,7 @@ def slack_usage_report(action: str, sender_id: str | None, client: WebClient) -> logger.warning("Unable to find sender email") if sender_email is not None: - with Session(get_sqlalchemy_engine()) as db_session: + with get_session_with_tenant(tenant_id) as db_session: danswer_user = get_user_by_email(email=sender_email, db_session=db_session) optional_telemetry( From ed4ebff774227c930dfb482c288dae882ec17125 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Thu, 17 Oct 2024 11:53:08 -0700 Subject: [PATCH 2/4] move to utils --- backend/danswer/configs/app_configs.py | 2 +- .../danswerbot/slack/handlers/handle_buttons.py | 3 ++- backend/danswer/danswerbot/slack/listener.py | 10 ++-------- backend/danswer/danswerbot/slack/utils.py | 7 +++++++ deployment/docker_compose/docker-compose.dev.yml | 2 +- deployment/docker_compose/docker-compose.gpu-dev.yml | 2 +- .../docker_compose/docker-compose.search-testing.yml | 2 +- 7 files changed, 15 insertions(+), 13 deletions(-) diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index 0d8b7da7010..2497730bf51 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -134,7 +134,7 @@ os.environ.get("POSTGRES_PASSWORD") or "password" ) POSTGRES_HOST = os.environ.get("POSTGRES_HOST") or "localhost" -POSTGRES_PORT = os.environ.get("POSTGRES_PORT") or "5432" +POSTGRES_PORT = os.environ.get("POSTGRES_PORT") or "5433" POSTGRES_DB = os.environ.get("POSTGRES_DB") or "postgres" POSTGRES_API_SERVER_POOL_SIZE = int( diff --git a/backend/danswer/danswerbot/slack/handlers/handle_buttons.py b/backend/danswer/danswerbot/slack/handlers/handle_buttons.py index 1edbd596c29..f379e6af4ca 100644 --- a/backend/danswer/danswerbot/slack/handlers/handle_buttons.py +++ b/backend/danswer/danswerbot/slack/handlers/handle_buttons.py @@ -24,7 +24,6 @@ from danswer.danswerbot.slack.handlers.handle_regular_answer import ( handle_regular_answer, ) -from danswer.danswerbot.slack.listener import TenantSocketModeClient from danswer.danswerbot.slack.models import SlackMessageInfo from danswer.danswerbot.slack.utils import build_feedback_id from danswer.danswerbot.slack.utils import decompose_action_id @@ -34,6 +33,7 @@ from danswer.danswerbot.slack.utils import get_feedback_visibility from danswer.danswerbot.slack.utils import read_slack_thread from danswer.danswerbot.slack.utils import respond_in_thread +from danswer.danswerbot.slack.utils import TenantSocketModeClient from danswer.danswerbot.slack.utils import update_emote_react from danswer.db.engine import get_session_with_tenant from danswer.db.feedback import create_chat_message_feedback @@ -42,6 +42,7 @@ from danswer.document_index.factory import get_default_document_index from danswer.utils.logger import setup_logger + logger = setup_logger() diff --git a/backend/danswer/danswerbot/slack/listener.py b/backend/danswer/danswerbot/slack/listener.py index 95d085cb872..17a183ae2ae 100644 --- a/backend/danswer/danswerbot/slack/listener.py +++ b/backend/danswer/danswerbot/slack/listener.py @@ -4,7 +4,6 @@ from typing import cast from slack_sdk import WebClient -from slack_sdk.socket_mode import SocketModeClient from slack_sdk.socket_mode.request import SocketModeRequest from slack_sdk.socket_mode.response import SocketModeResponse @@ -47,6 +46,7 @@ from danswer.danswerbot.slack.utils import remove_danswer_bot_tag from danswer.danswerbot.slack.utils import rephrase_slack_message from danswer.danswerbot.slack.utils import respond_in_thread +from danswer.danswerbot.slack.utils import TenantSocketModeClient from danswer.db.engine import get_session_with_tenant from danswer.db.search_settings import get_current_search_settings from danswer.key_value_store.interface import KvKeyNotFoundError @@ -80,12 +80,6 @@ _OFFICIAL_SLACKBOT_USER_ID = "USLACKBOT" -class TenantSocketModeClient(SocketModeClient): - def __init__(self, tenant_id: str | None, *args: Any, **kwargs: Any): - super().__init__(*args, **kwargs) - self.tenant_id = tenant_id - - def prefilter_requests(req: SocketModeRequest, client: TenantSocketModeClient) -> bool: """True to keep going, False to ignore this Slack request""" if req.type == "events_api": @@ -476,7 +470,7 @@ def _initialize_socket_client(socket_client: TenantSocketModeClient) -> None: socket_client.socket_mode_request_listeners.append(process_slack_event) # type: ignore # Establish a WebSocket connection to the Socket Mode servers - logger.notice("Listening for messages from Slack...") + logger.notice(f"Listening for messages from Slack {socket_client.tenant_id }...") socket_client.connect() diff --git a/backend/danswer/danswerbot/slack/utils.py b/backend/danswer/danswerbot/slack/utils.py index 67c31ad0e42..345f3605bd5 100644 --- a/backend/danswer/danswerbot/slack/utils.py +++ b/backend/danswer/danswerbot/slack/utils.py @@ -12,6 +12,7 @@ from slack_sdk.errors import SlackApiError from slack_sdk.models.blocks import Block from slack_sdk.models.metadata import Metadata +from slack_sdk.socket_mode import SocketModeClient from danswer.configs.app_configs import DISABLE_TELEMETRY from danswer.configs.constants import ID_SEPARATOR @@ -578,3 +579,9 @@ def get_feedback_visibility() -> FeedbackVisibility: return FeedbackVisibility(DANSWER_BOT_FEEDBACK_VISIBILITY.lower()) except ValueError: return FeedbackVisibility.PRIVATE + + +class TenantSocketModeClient(SocketModeClient): + def __init__(self, tenant_id: str | None, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) + self.tenant_id = tenant_id diff --git a/deployment/docker_compose/docker-compose.dev.yml b/deployment/docker_compose/docker-compose.dev.yml index 5298859d13d..ef706306578 100644 --- a/deployment/docker_compose/docker-compose.dev.yml +++ b/deployment/docker_compose/docker-compose.dev.yml @@ -300,7 +300,7 @@ services: - POSTGRES_USER=${POSTGRES_USER:-postgres} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-password} ports: - - "5432:5432" + - "5433:5432" volumes: - db_volume:/var/lib/postgresql/data diff --git a/deployment/docker_compose/docker-compose.gpu-dev.yml b/deployment/docker_compose/docker-compose.gpu-dev.yml index 6397f657c19..03e436a2eb2 100644 --- a/deployment/docker_compose/docker-compose.gpu-dev.yml +++ b/deployment/docker_compose/docker-compose.gpu-dev.yml @@ -312,7 +312,7 @@ services: - POSTGRES_USER=${POSTGRES_USER:-postgres} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-password} ports: - - "5432:5432" + - "5433:5432" volumes: - db_volume:/var/lib/postgresql/data diff --git a/deployment/docker_compose/docker-compose.search-testing.yml b/deployment/docker_compose/docker-compose.search-testing.yml index fab950c064e..2afd54e029c 100644 --- a/deployment/docker_compose/docker-compose.search-testing.yml +++ b/deployment/docker_compose/docker-compose.search-testing.yml @@ -157,7 +157,7 @@ services: - POSTGRES_USER=${POSTGRES_USER:-postgres} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-password} ports: - - "5432" + - "5433" volumes: - db_volume:/var/lib/postgresql/data From 9000c5844df3c0cdd96a7b9970dc4a377c74cf12 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Thu, 17 Oct 2024 13:02:51 -0700 Subject: [PATCH 3/4] k --- .../danswerbot/slack/handlers/handle_regular_answer.py | 1 + backend/danswer/danswerbot/slack/listener.py | 9 ++++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py b/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py index 77a16bae537..08b0b8a5a57 100644 --- a/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py +++ b/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py @@ -150,6 +150,7 @@ def _get_answer(new_message_request: DirectQARequest) -> OneShotQAResponse | Non max_document_tokens: int | None = None max_history_tokens: int | None = None + print(f"GETTING SESSION WITH {tenant_id}") with get_session_with_tenant(tenant_id) as db_session: if len(new_message_request.messages) > 1: if new_message_request.persona_config: diff --git a/backend/danswer/danswerbot/slack/listener.py b/backend/danswer/danswerbot/slack/listener.py index 17a183ae2ae..86e59708820 100644 --- a/backend/danswer/danswerbot/slack/listener.py +++ b/backend/danswer/danswerbot/slack/listener.py @@ -523,7 +523,10 @@ def _initialize_socket_client(socket_client: TenantSocketModeClient) -> None: slack_bot_tokens[tenant_id] = latest_slack_bot_tokens - # Close existing WebClient if any + # potentially may cause a message to be dropped, but it is complicated + # to avoid + (1) if the user is changing tokens, they are likely okay with some + # "migration downtime" and (2) if a single message is lost it is okay + # as this should be a very rare occurrence if tenant_id in socket_clients: socket_clients[tenant_id].close() @@ -544,6 +547,6 @@ def _initialize_socket_client(socket_client: TenantSocketModeClient) -> None: # Wait before checking for updates Event().wait(timeout=60) - except Exception as e: - logger.exception(f"An error occurred: {e}") + except Exception: + logger.exception("An error occurred outside of main event loop") time.sleep(60) From 2b888a66ab401002efb664f51d81579924bc7ba5 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Thu, 17 Oct 2024 13:06:17 -0700 Subject: [PATCH 4/4] k --- backend/danswer/configs/app_configs.py | 2 +- .../danswer/danswerbot/slack/handlers/handle_regular_answer.py | 1 - deployment/docker_compose/docker-compose.dev.yml | 2 +- deployment/docker_compose/docker-compose.gpu-dev.yml | 2 +- deployment/docker_compose/docker-compose.search-testing.yml | 2 +- 5 files changed, 4 insertions(+), 5 deletions(-) diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index 2497730bf51..0d8b7da7010 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -134,7 +134,7 @@ os.environ.get("POSTGRES_PASSWORD") or "password" ) POSTGRES_HOST = os.environ.get("POSTGRES_HOST") or "localhost" -POSTGRES_PORT = os.environ.get("POSTGRES_PORT") or "5433" +POSTGRES_PORT = os.environ.get("POSTGRES_PORT") or "5432" POSTGRES_DB = os.environ.get("POSTGRES_DB") or "postgres" POSTGRES_API_SERVER_POOL_SIZE = int( diff --git a/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py b/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py index 08b0b8a5a57..77a16bae537 100644 --- a/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py +++ b/backend/danswer/danswerbot/slack/handlers/handle_regular_answer.py @@ -150,7 +150,6 @@ def _get_answer(new_message_request: DirectQARequest) -> OneShotQAResponse | Non max_document_tokens: int | None = None max_history_tokens: int | None = None - print(f"GETTING SESSION WITH {tenant_id}") with get_session_with_tenant(tenant_id) as db_session: if len(new_message_request.messages) > 1: if new_message_request.persona_config: diff --git a/deployment/docker_compose/docker-compose.dev.yml b/deployment/docker_compose/docker-compose.dev.yml index ef706306578..5298859d13d 100644 --- a/deployment/docker_compose/docker-compose.dev.yml +++ b/deployment/docker_compose/docker-compose.dev.yml @@ -300,7 +300,7 @@ services: - POSTGRES_USER=${POSTGRES_USER:-postgres} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-password} ports: - - "5433:5432" + - "5432:5432" volumes: - db_volume:/var/lib/postgresql/data diff --git a/deployment/docker_compose/docker-compose.gpu-dev.yml b/deployment/docker_compose/docker-compose.gpu-dev.yml index 03e436a2eb2..6397f657c19 100644 --- a/deployment/docker_compose/docker-compose.gpu-dev.yml +++ b/deployment/docker_compose/docker-compose.gpu-dev.yml @@ -312,7 +312,7 @@ services: - POSTGRES_USER=${POSTGRES_USER:-postgres} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-password} ports: - - "5433:5432" + - "5432:5432" volumes: - db_volume:/var/lib/postgresql/data diff --git a/deployment/docker_compose/docker-compose.search-testing.yml b/deployment/docker_compose/docker-compose.search-testing.yml index 2afd54e029c..fab950c064e 100644 --- a/deployment/docker_compose/docker-compose.search-testing.yml +++ b/deployment/docker_compose/docker-compose.search-testing.yml @@ -157,7 +157,7 @@ services: - POSTGRES_USER=${POSTGRES_USER:-postgres} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-password} ports: - - "5433" + - "5432" volumes: - db_volume:/var/lib/postgresql/data