-
Notifications
You must be signed in to change notification settings - Fork 27
/
action-app_template.py
executable file
·76 lines (57 loc) · 2.34 KB
/
action-app_template.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/usr/bin/env python3.7
from snipsTools import SnipsConfigParser
from hermes_python.hermes import Hermes
# imported to get type check and IDE completion
from hermes_python.ontology.dialogue.intent import IntentMessage
CONFIG_INI = "config.ini"
# if this skill is supposed to run on the satellite,
# please get this mqtt connection info from <config.ini>
#
# hint: MQTT server is always running on the master device
MQTT_IP_ADDR: str = "localhost"
MQTT_PORT: int = 1883
MQTT_ADDR: str = "{}:{}".format(MQTT_IP_ADDR, str(MQTT_PORT))
class Template:
"""class used to wrap action code with mqtt connection
please change the name referring to your application
"""
def __init__(self):
# get the configuration if needed
try:
self.config = SnipsConfigParser.read_configuration_file(CONFIG_INI)
except Exception:
self.config = None
# start listening to MQTT
self.start_blocking()
@staticmethod
def intent_1_callback(hermes: Hermes,
intent_message: IntentMessage):
# terminate the session first if not continue
hermes.publish_end_session(intent_message.session_id, "")
# action code goes here...
print('[Received] intent: {}'.format(
intent_message.intent.intent_name))
# if need to speak the execution result by tts
hermes.publish_start_session_notification(
intent_message.site_id,
"Action 1", "")
@staticmethod
def intent_2_callback(hermes: Hermes,
intent_message: IntentMessage):
# terminate the session first if not continue
hermes.publish_end_session(intent_message.session_id, "")
# action code goes here...
print('[Received] intent: {}'.format(
intent_message.intent.intent_name))
# if need to speak the execution result by tts
hermes.publish_start_session_notification(
intent_message.site_id,
"Action 2", "")
# register callback function to its intent and start listen to MQTT bus
def start_blocking(self):
with Hermes(MQTT_ADDR) as h:
h.subscribe_intent('intent_1', self.intent_1_callback)\
.subscribe_intent('intent_2', self.intent_2_callback)\
.loop_forever()
if __name__ == "__main__":
Template()