From 9edebe07766226508b84d7bb71239d1e735616e0 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Thu, 4 Apr 2024 09:05:09 -0700 Subject: [PATCH] Backport ConverseService changes from ovos-core 0.0.8 --- neon_core/skills/intent_service.py | 267 ++++++++++++++++++++++++++++- 1 file changed, 263 insertions(+), 4 deletions(-) diff --git a/neon_core/skills/intent_service.py b/neon_core/skills/intent_service.py index 6d75c546b..88ba42228 100644 --- a/neon_core/skills/intent_service.py +++ b/neon_core/skills/intent_service.py @@ -28,22 +28,26 @@ import time import wave +from threading import Event from typing import List +from ovos_bus_client.util import get_message_lang +from ovos_utils import flatten_list + from neon_transformers.text_transformers import UtteranceTransformersService -from ovos_bus_client import Message, MessageBusClient -from neon_utils.message_utils import get_message_user +from ovos_bus_client import Message, MessageBusClient, SessionManager, UtteranceState +from neon_utils.message_utils import get_message_user, dig_for_message from neon_utils.metrics_utils import Stopwatch from neon_utils.user_utils import apply_local_user_profile_updates from neon_utils.configuration_utils import get_neon_user_config from lingua_franca.parse import get_full_lang_code -from ovos_config.locale import set_default_lang +from ovos_config.locale import set_default_lang, setup_locale from ovos_utils.log import LOG from neon_core.configuration import Configuration from neon_core.language import get_lang_config from mycroft.skills.intent_service import IntentService -from mycroft.skills.intent_services import ConverseService +from mycroft.skills.intent_services import ConverseService, IntentMatch try: from neon_utterance_translator_plugin import UtteranceTranslator @@ -281,6 +285,261 @@ def handle_get_padatious(self, message): {"intent": intent})) +# TODO: Overrides below backporting 0.0.8 compat class NeonConverseService(ConverseService): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self.bus.on('mycroft.speech.recognition.unknown', self.reset_converse) + self.bus.on('intent.service.skills.deactivate', self.handle_deactivate_skill_request) + self.bus.on('intent.service.skills.activate', self.handle_activate_skill_request) + self.bus.on('active_skill_request', self.handle_activate_skill_request) # TODO backwards compat, deprecate + self.bus.on('intent.service.active_skills.get', self.handle_get_active_skills) + self.bus.on("skill.converse.get_response.enable", self.handle_get_response_enable) + self.bus.on("skill.converse.get_response.disable", self.handle_get_response_disable) + + @property + def active_skills(self): + session = SessionManager.get() + return session.active_skills + + @active_skills.setter + def active_skills(self, val): + session = SessionManager.get() + session.active_skills = [] + for skill_id, ts in val: + session.activate_skill(skill_id) + + def get_active_skills(self, message=None): + """Active skill ids ordered by converse priority + this represents the order in which converse will be called + + Returns: + active_skills (list): ordered list of skill_ids + """ + session = SessionManager.get(message) + return [skill[0] for skill in session.active_skills] + + def deactivate_skill(self, skill_id, source_skill=None, message=None): + """Remove a skill from being targetable by converse. + + Args: + skill_id (str): skill to remove + source_skill (str): skill requesting the removal + """ + source_skill = source_skill or skill_id + if self._deactivate_allowed(skill_id, source_skill): + session = SessionManager.get(message) + if session.is_active(skill_id): + # update converse session + session.deactivate_skill(skill_id) + + # keep message.context + message = message or Message("") + message.context["session"] = session.serialize() # update session active skills + # send bus event + self.bus.emit( + message.forward("intent.service.skills.deactivated", + data={"skill_id": skill_id})) + if skill_id in self._consecutive_activations: + self._consecutive_activations[skill_id] = 0 + + def activate_skill(self, skill_id, source_skill=None, message=None): + """Add a skill or update the position of an active skill. + + The skill is added to the front of the list, if it's already in the + list it's removed so there is only a single entry of it. + + Args: + skill_id (str): identifier of skill to be added. + source_skill (str): skill requesting the removal + """ + source_skill = source_skill or skill_id + if self._activate_allowed(skill_id, source_skill): + # update converse session + session = SessionManager.get(message) + session.activate_skill(skill_id) + + # keep message.context + message = message or Message("") + message.context["session"] = session.serialize() # update session active skills + message = message.forward("intent.service.skills.activated", + {"skill_id": skill_id}) + # send bus event + self.bus.emit(message) + # update activation counter + self._consecutive_activations[skill_id] += 1 + + def _collect_converse_skills(self, message=None): + """use the messagebus api to determine which skills want to converse + This includes all skills and external applications""" + message = message or dig_for_message() + session = SessionManager.get(message) + + skill_ids = [] + # include all skills in get_response state + want_converse = [skill_id for skill_id, state in session.utterance_states.items() + if state == UtteranceState.RESPONSE] + skill_ids += want_converse # dont wait for these pong answers (optimization) + + active_skills = self.get_active_skills() + + if not active_skills: + return want_converse + + event = Event() + + def handle_ack(msg): + nonlocal event + skill_id = msg.data["skill_id"] + + # validate the converse pong + if all((skill_id not in want_converse, + msg.data.get("can_handle", True), + skill_id in active_skills)): + want_converse.append(skill_id) + + if skill_id not in skill_ids: # track which answer we got + skill_ids.append(skill_id) + + if all(s in skill_ids for s in active_skills): + # all skills answered the ping! + event.set() + + self.bus.on("skill.converse.pong", handle_ack) + + # ask skills if they want to converse + for skill_id in active_skills: + self.bus.emit(message.forward(f"{skill_id}.converse.ping", + {"skill_id": skill_id})) + + # wait for all skills to acknowledge they want to converse + event.wait(timeout=0.5) + + self.bus.remove("skill.converse.pong", handle_ack) + return want_converse + + def _check_converse_timeout(self, message=None): + """ filter active skill list based on timestamps """ + message = message or dig_for_message() + timeouts = self.config.get("skill_timeouts") or {} + def_timeout = self.config.get("timeout", 300) + session = SessionManager.get(message) + session.active_skills = [ + skill for skill in session.active_skills + if time.time() - skill[1] <= timeouts.get(skill[0], def_timeout)] + + def converse(self, utterances, skill_id, lang, message): + """Call skill and ask if they want to process the utterance. + + Args: + utterances (list of tuples): utterances paired with normalized + versions. + skill_id: skill to query. + lang (str): current language + message (Message): message containing interaction info. + + Returns: + handled (bool): True if handled otherwise False. + """ + session = SessionManager.get(message) + session.lang = lang + + state = session.utterance_states.get(skill_id, UtteranceState.INTENT) + if state == UtteranceState.RESPONSE: + converse_msg = message.reply(f"{skill_id}.converse.get_response", + {"utterances": utterances, + "lang": lang}) + self.bus.emit(converse_msg) + return True + + if self._converse_allowed(skill_id): + converse_msg = message.reply(f"{skill_id}.converse.request", + {"utterances": utterances, + "lang": lang}) + result = self.bus.wait_for_response(converse_msg, + 'skill.converse.response') + if result and 'error' in result.data: + error_msg = result.data['error'] + LOG.error(f"{skill_id}: {error_msg}") + return False + elif result is not None: + return result.data.get('result', False) + return False + + def converse_with_skills(self, utterances, lang, message): + """Give active skills a chance at the utterance + + Args: + utterances (list): list of utterances + lang (string): 4 letter ISO language code + message (Message): message to use to generate reply + + Returns: + IntentMatch if handled otherwise None. + """ + # we call flatten in case someone is sending the old style list of tuples + utterances = flatten_list(utterances) + # filter allowed skills + self._check_converse_timeout(message) + # check if any skill wants to handle utterance + for skill_id in self._collect_converse_skills(message): + if self.converse(utterances, skill_id, lang, message): + return IntentMatch('Converse', None, None, skill_id) + return None + + def handle_get_response_enable(self, message): + skill_id = message.data["skill_id"] + session = SessionManager.get(message) + session.enable_response_mode(skill_id) + if session.session_id == "default": + SessionManager.sync(message) + + def handle_get_response_disable(self, message): + skill_id = message.data["skill_id"] + session = SessionManager.get(message) + session.disable_response_mode(skill_id) + if session.session_id == "default": + SessionManager.sync(message) + + def handle_activate_skill_request(self, message): + # TODO imperfect solution - only a skill can activate itself + # someone can forge this message and emit it raw, but in OpenVoiceOS all + # skill messages should have skill_id in context, so let's make sure + # this doesnt happen accidentally at very least + skill_id = message.data['skill_id'] + source_skill = message.context.get("skill_id") + self.activate_skill(skill_id, source_skill, message) + sess = SessionManager.get(message) + if sess.session_id == "default": + SessionManager.sync(message) + + def handle_deactivate_skill_request(self, message): + # TODO imperfect solution - only a skill can deactivate itself + # someone can forge this message and emit it raw, but in ovos-core all + # skill message should have skill_id in context, so let's make sure + # this doesnt happen accidentally + skill_id = message.data['skill_id'] + source_skill = message.context.get("skill_id") or skill_id + self.deactivate_skill(skill_id, source_skill, message) + sess = SessionManager.get(message) + if sess.session_id == "default": + SessionManager.sync(message) + + def reset_converse(self, message): + """Let skills know there was a problem with speech recognition""" + lang = get_message_lang(message) + try: + setup_locale(lang) # restore default lang + except Exception as e: + LOG.exception(f"Failed to set lingua_franca default lang to {lang}") + + self.converse_with_skills([], lang, message) + + def handle_get_active_skills(self, message): + """Send active skills to caller. + + Argument: + message: query message to reply to. + """ + self.bus.emit(message.reply("intent.service.active_skills.reply", + {"skills": self.get_active_skills(message)}))