Skip to content

Commit

Permalink
Backport ConverseService changes from ovos-core 0.0.8
Browse files Browse the repository at this point in the history
  • Loading branch information
NeonDaniel committed Apr 4, 2024
1 parent ffa538f commit 9edebe0
Showing 1 changed file with 263 additions and 4 deletions.
267 changes: 263 additions & 4 deletions neon_core/skills/intent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,26 @@

import time
import wave
from threading import Event
from typing import List

from ovos_bus_client.util import get_message_lang
from ovos_utils import flatten_list

from neon_transformers.text_transformers import UtteranceTransformersService
from ovos_bus_client import Message, MessageBusClient
from neon_utils.message_utils import get_message_user
from ovos_bus_client import Message, MessageBusClient, SessionManager, UtteranceState
from neon_utils.message_utils import get_message_user, dig_for_message
from neon_utils.metrics_utils import Stopwatch
from neon_utils.user_utils import apply_local_user_profile_updates
from neon_utils.configuration_utils import get_neon_user_config
from lingua_franca.parse import get_full_lang_code
from ovos_config.locale import set_default_lang
from ovos_config.locale import set_default_lang, setup_locale
from ovos_utils.log import LOG
from neon_core.configuration import Configuration
from neon_core.language import get_lang_config

from mycroft.skills.intent_service import IntentService
from mycroft.skills.intent_services import ConverseService
from mycroft.skills.intent_services import ConverseService, IntentMatch

try:
from neon_utterance_translator_plugin import UtteranceTranslator
Expand Down Expand Up @@ -281,6 +285,261 @@ def handle_get_padatious(self, message):
{"intent": intent}))


# TODO: Overrides below backporting 0.0.8 compat
class NeonConverseService(ConverseService):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bus.on('mycroft.speech.recognition.unknown', self.reset_converse)
self.bus.on('intent.service.skills.deactivate', self.handle_deactivate_skill_request)
self.bus.on('intent.service.skills.activate', self.handle_activate_skill_request)
self.bus.on('active_skill_request', self.handle_activate_skill_request) # TODO backwards compat, deprecate
self.bus.on('intent.service.active_skills.get', self.handle_get_active_skills)
self.bus.on("skill.converse.get_response.enable", self.handle_get_response_enable)
self.bus.on("skill.converse.get_response.disable", self.handle_get_response_disable)

@property
def active_skills(self):
session = SessionManager.get()
return session.active_skills

@active_skills.setter
def active_skills(self, val):
session = SessionManager.get()
session.active_skills = []
for skill_id, ts in val:
session.activate_skill(skill_id)

def get_active_skills(self, message=None):
"""Active skill ids ordered by converse priority
this represents the order in which converse will be called
Returns:
active_skills (list): ordered list of skill_ids
"""
session = SessionManager.get(message)
return [skill[0] for skill in session.active_skills]

def deactivate_skill(self, skill_id, source_skill=None, message=None):
"""Remove a skill from being targetable by converse.
Args:
skill_id (str): skill to remove
source_skill (str): skill requesting the removal
"""
source_skill = source_skill or skill_id
if self._deactivate_allowed(skill_id, source_skill):
session = SessionManager.get(message)
if session.is_active(skill_id):
# update converse session
session.deactivate_skill(skill_id)

# keep message.context
message = message or Message("")
message.context["session"] = session.serialize() # update session active skills
# send bus event
self.bus.emit(
message.forward("intent.service.skills.deactivated",
data={"skill_id": skill_id}))
if skill_id in self._consecutive_activations:
self._consecutive_activations[skill_id] = 0

def activate_skill(self, skill_id, source_skill=None, message=None):
"""Add a skill or update the position of an active skill.
The skill is added to the front of the list, if it's already in the
list it's removed so there is only a single entry of it.
Args:
skill_id (str): identifier of skill to be added.
source_skill (str): skill requesting the removal
"""
source_skill = source_skill or skill_id
if self._activate_allowed(skill_id, source_skill):
# update converse session
session = SessionManager.get(message)
session.activate_skill(skill_id)

# keep message.context
message = message or Message("")
message.context["session"] = session.serialize() # update session active skills
message = message.forward("intent.service.skills.activated",
{"skill_id": skill_id})
# send bus event
self.bus.emit(message)
# update activation counter
self._consecutive_activations[skill_id] += 1

def _collect_converse_skills(self, message=None):
"""use the messagebus api to determine which skills want to converse
This includes all skills and external applications"""
message = message or dig_for_message()
session = SessionManager.get(message)

skill_ids = []
# include all skills in get_response state
want_converse = [skill_id for skill_id, state in session.utterance_states.items()
if state == UtteranceState.RESPONSE]
skill_ids += want_converse # dont wait for these pong answers (optimization)

active_skills = self.get_active_skills()

if not active_skills:
return want_converse

event = Event()

def handle_ack(msg):
nonlocal event
skill_id = msg.data["skill_id"]

# validate the converse pong
if all((skill_id not in want_converse,
msg.data.get("can_handle", True),
skill_id in active_skills)):
want_converse.append(skill_id)

if skill_id not in skill_ids: # track which answer we got
skill_ids.append(skill_id)

if all(s in skill_ids for s in active_skills):
# all skills answered the ping!
event.set()

self.bus.on("skill.converse.pong", handle_ack)

# ask skills if they want to converse
for skill_id in active_skills:
self.bus.emit(message.forward(f"{skill_id}.converse.ping",
{"skill_id": skill_id}))

# wait for all skills to acknowledge they want to converse
event.wait(timeout=0.5)

self.bus.remove("skill.converse.pong", handle_ack)
return want_converse

def _check_converse_timeout(self, message=None):
""" filter active skill list based on timestamps """
message = message or dig_for_message()
timeouts = self.config.get("skill_timeouts") or {}
def_timeout = self.config.get("timeout", 300)
session = SessionManager.get(message)
session.active_skills = [
skill for skill in session.active_skills
if time.time() - skill[1] <= timeouts.get(skill[0], def_timeout)]

def converse(self, utterances, skill_id, lang, message):
"""Call skill and ask if they want to process the utterance.
Args:
utterances (list of tuples): utterances paired with normalized
versions.
skill_id: skill to query.
lang (str): current language
message (Message): message containing interaction info.
Returns:
handled (bool): True if handled otherwise False.
"""
session = SessionManager.get(message)
session.lang = lang

state = session.utterance_states.get(skill_id, UtteranceState.INTENT)
if state == UtteranceState.RESPONSE:
converse_msg = message.reply(f"{skill_id}.converse.get_response",
{"utterances": utterances,
"lang": lang})
self.bus.emit(converse_msg)
return True

if self._converse_allowed(skill_id):
converse_msg = message.reply(f"{skill_id}.converse.request",
{"utterances": utterances,
"lang": lang})
result = self.bus.wait_for_response(converse_msg,
'skill.converse.response')
if result and 'error' in result.data:
error_msg = result.data['error']
LOG.error(f"{skill_id}: {error_msg}")
return False
elif result is not None:
return result.data.get('result', False)
return False

def converse_with_skills(self, utterances, lang, message):
"""Give active skills a chance at the utterance
Args:
utterances (list): list of utterances
lang (string): 4 letter ISO language code
message (Message): message to use to generate reply
Returns:
IntentMatch if handled otherwise None.
"""
# we call flatten in case someone is sending the old style list of tuples
utterances = flatten_list(utterances)
# filter allowed skills
self._check_converse_timeout(message)
# check if any skill wants to handle utterance
for skill_id in self._collect_converse_skills(message):
if self.converse(utterances, skill_id, lang, message):
return IntentMatch('Converse', None, None, skill_id)
return None

def handle_get_response_enable(self, message):
skill_id = message.data["skill_id"]
session = SessionManager.get(message)
session.enable_response_mode(skill_id)
if session.session_id == "default":
SessionManager.sync(message)

def handle_get_response_disable(self, message):
skill_id = message.data["skill_id"]
session = SessionManager.get(message)
session.disable_response_mode(skill_id)
if session.session_id == "default":
SessionManager.sync(message)

def handle_activate_skill_request(self, message):
# TODO imperfect solution - only a skill can activate itself
# someone can forge this message and emit it raw, but in OpenVoiceOS all
# skill messages should have skill_id in context, so let's make sure
# this doesnt happen accidentally at very least
skill_id = message.data['skill_id']
source_skill = message.context.get("skill_id")
self.activate_skill(skill_id, source_skill, message)
sess = SessionManager.get(message)
if sess.session_id == "default":
SessionManager.sync(message)

def handle_deactivate_skill_request(self, message):
# TODO imperfect solution - only a skill can deactivate itself
# someone can forge this message and emit it raw, but in ovos-core all
# skill message should have skill_id in context, so let's make sure
# this doesnt happen accidentally
skill_id = message.data['skill_id']
source_skill = message.context.get("skill_id") or skill_id
self.deactivate_skill(skill_id, source_skill, message)
sess = SessionManager.get(message)
if sess.session_id == "default":
SessionManager.sync(message)

def reset_converse(self, message):
"""Let skills know there was a problem with speech recognition"""
lang = get_message_lang(message)
try:
setup_locale(lang) # restore default lang
except Exception as e:
LOG.exception(f"Failed to set lingua_franca default lang to {lang}")

self.converse_with_skills([], lang, message)

def handle_get_active_skills(self, message):
"""Send active skills to caller.
Argument:
message: query message to reply to.
"""
self.bus.emit(message.reply("intent.service.active_skills.reply",
{"skills": self.get_active_skills(message)}))

0 comments on commit 9edebe0

Please sign in to comment.