diff --git a/MAVProxy/modules/mavproxy_chat/chat_openai.py b/MAVProxy/modules/mavproxy_chat/chat_openai.py index 09bc0adceb..d3b0f8c4ae 100644 --- a/MAVProxy/modules/mavproxy_chat/chat_openai.py +++ b/MAVProxy/modules/mavproxy_chat/chat_openai.py @@ -8,7 +8,7 @@ ''' from pymavlink import mavutil -import time +import time, re from datetime import datetime import json import math @@ -238,6 +238,36 @@ def handle_function_call(self, run): output = "get_mavlink_message: failed to retrieve message" print("chat: get_mavlink_message: failed to retrieve message") + # get all parameters from vehicle + if tool_call.function.name == "get_all_parameters": + recognised_function = True + try: + arguments = json.loads(tool_call.function.arguments) + output = self.get_all_parameters(arguments) + except: + output = "get_all_parameters: failed to retrieve parameters" + print("chat: get_all_parameters: failed to retrieve parameters") + + # get a vehicle parameter's value + if tool_call.function.name == "get_parameter": + recognised_function = True + try: + arguments = json.loads(tool_call.function.arguments) + output = self.get_parameter(arguments) + except: + output = "get_parameter: failed to retrieve parameter value" + print("chat: get_parameters: failed to retrieve parameter value") + + # set a vehicle parameter's value + if tool_call.function.name == "set_parameter": + recognised_function = True + try: + arguments = json.loads(tool_call.function.arguments) + output = self.set_parameter(arguments) + except: + output = "set_parameter: failed to set parameter value" + print("chat: set_parameter: failed to set parameter value") + if not recognised_function: print("chat: handle_function_call: unrecognised function call: " + tool_call.function.name) output = "unrecognised function call: " + tool_call.function.name @@ -428,6 +458,62 @@ def get_mavlink_message(self, arguments): except: return "get_mavlink_message: failed to convert message to json" + # get all available parameters names and their values + def get_all_parameters(self, arguments): + # check if any parameters are available + if self.mpstate.mav_param is None or len(self.mpstate.mav_param) == 0: + return "get_all_parameters: no parameters are available" + param_list = {} + for param_name in sorted(self.mpstate.mav_param.keys()): + param_list[param_name] = self.mpstate.mav_param.get(param_name) + try: + return json.dumps(param_list) + except: + return "get_all_parameters: failed to convert parameter list to json" + + # get a vehicle parameter's value + def get_parameter(self, arguments): + param_name = arguments.get("name", None) + if param_name is None: + print("get_parameter: name not specified") + return "get_parameter: name not specified" + + # start with empty parameter list + param_list = {} + + # handle param name containing regex + if self.contains_regex(param_name): + pattern = re.compile(param_name) + for existing_param_name in sorted(self.mpstate.mav_param.keys()): + if pattern.match(existing_param_name) is not None: + param_value = self.mpstate.functions.get_mav_param(existing_param_name, None) + if param_value is None: + print("chat: get_parameter unable to get " + existing_param_name) + else: + param_list[existing_param_name] = param_value + else: + # handle simple case of a single parameter name + param_value = self.mpstate.functions.get_mav_param(param_name, None) + if param_value is None: + return "get_parameter: " + param_name + " parameter not found" + param_list[param_name] = param_value + + try: + return json.dumps(param_list) + except: + return "get_parameter: failed to convert parameter list to json" + + # set a vehicle parameter's value + def set_parameter(self, arguments): + param_name = arguments.get("name", None) + if param_name is None: + return "set_parameter: parameter name not specified" + param_value = arguments.get("value", None) + if param_value is None: + return "set_parameter: value not specified" + self.mpstate.functions.param_set(param_name, param_value, retries=3) + return "set_parameter: parameter value set" + # wrap latitude to range -90 to 90 def wrap_latitude(self, latitude_deg): if latitude_deg > 90: @@ -447,4 +533,12 @@ def wrap_longitude(self, longitude_deg): # send status to chat window via callback def send_status(self, status): if self.status_cb is not None: - self.status_cb(status) \ No newline at end of file + self.status_cb(status) + + # returns true if string contains regex characters + def contains_regex(self, string): + regex_characters = ".^$*+?{}[]\|()" + for x in regex_characters: + if string.count(x): + return True + return False