diff --git a/panther_battery/README.md b/panther_battery/README.md index b062451d7..7118bb3e4 100644 --- a/panther_battery/README.md +++ b/panther_battery/README.md @@ -29,3 +29,7 @@ Node publishing Panther battery state read from motor controllers. Used in Panth #### Subscribes - `/panther/driver/motor_controllers_state` [*panther_msgs/DriverState*]: current motor controllers' state and error flags. + +#### Parameters + +- `~high_bat_temp` [*float*, default: **55.0**]: The temperature of the battery at which the battery health state is incorrect. \ No newline at end of file diff --git a/panther_battery/src/adc_node.py b/panther_battery/src/adc_node.py index 8339b6795..9d5263aa2 100755 --- a/panther_battery/src/adc_node.py +++ b/panther_battery/src/adc_node.py @@ -1,6 +1,9 @@ #!/usr/bin/python3 +from collections import defaultdict import math +from threading import Lock +from typing import Optional, Union import rospy @@ -11,14 +14,22 @@ class ADCNode: BAT02_DETECT_THRESH = 3.03 + V_BAT_FATAL_MIN = 27.0 + V_BAT_FATAL_MAX = 43.0 def __init__(self, name: str) -> None: rospy.init_node(name, anonymous=False) - self._V_driv_front = None - self._V_driv_rear = None - self._I_driv_front = None - self._I_driv_rear = None + self._high_bat_temp = rospy.get_param('~high_bat_temp', 55.0) + + self._driver_battery_last_info_time: Optional[float] = None + self._I_driv: Optional[float] = None + self._V_driv: Optional[float] = None + + self._volt_mean_length = 10 + self._V_bat_hist = defaultdict(lambda: [37.0] * self._volt_mean_length) + self._V_bat_mean = defaultdict(lambda: 37.0) + self._V_driv_mean: Optional[float] = None self._A = 298.15 self._B = 3977.0 @@ -29,6 +40,8 @@ def __init__(self, name: str) -> None: self._battery_count = self._check_battery_count() self._battery_charging = True # const for now, later this will be evaluated + self._lock = Lock() + # ------------------------------- # Subscribers # ------------------------------- @@ -59,10 +72,14 @@ def __init__(self, name: str) -> None: rospy.loginfo(f'[{rospy.get_name()}] Node started') def _motor_controllers_state_cb(self, driver_state: DriverState) -> None: - self._V_driv_front = driver_state.front.voltage - self._V_driv_rear = driver_state.front.current - self._I_driv_front = driver_state.rear.voltage - self._I_driv_rear = driver_state.rear.current + with self._lock: + self._driver_battery_last_info_time = rospy.get_time() + + driver_voltage = (driver_state.front.voltage + driver_state.rear.voltage) / 2.0 + self._V_driv = driver_voltage + self._V_driv_mean = self._count_volt_mean('V_driv', driver_voltage) + + self._I_driv = driver_state.front.current + driver_state.rear.current def _io_state_cb(self, io_state: IOState) -> None: self._charger_connected = io_state.charger_connected @@ -175,6 +192,8 @@ def _publish_battery_msg( battery_msg.power_supply_technology = BatteryState.POWER_SUPPLY_TECHNOLOGY_LIPO battery_msg.present = True + V_bat_mean = self._count_volt_mean(bat_pub, V_bat) + # check battery status if self._charger_connected: if battery_msg.percentage >= 1.0: @@ -186,8 +205,43 @@ def _publish_battery_msg( else: battery_msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING + # check battery health + with self._lock: + error_msg = None + if V_bat_mean < self.V_BAT_FATAL_MIN: + battery_msg.power_supply_health = BatteryState.POWER_SUPPLY_HEALTH_DEAD + error_msg = 'Battery voltage is critically low!' + elif V_bat_mean > self.V_BAT_FATAL_MAX: + battery_msg.power_supply_health = BatteryState.POWER_SUPPLY_HEALTH_OVERVOLTAGE + error_msg = 'Battery overvoltage!' + elif temp_bat >= self._high_bat_temp: + battery_msg.power_supply_health = BatteryState.POWER_SUPPLY_HEALTH_OVERHEAT + error_msg = 'Battery is overheating!' + elif self._driver_battery_last_info_time is None: + battery_msg.power_supply_health = BatteryState.POWER_SUPPLY_HEALTH_UNKNOWN + elif ( + rospy.get_time() - self._driver_battery_last_info_time < 0.1 + and abs(V_bat_mean - self._V_driv_mean) > self.V_BAT_FATAL_MAX * 0.1 + ): + battery_msg.power_supply_health = BatteryState.POWER_SUPPLY_HEALTH_UNSPEC_FAILURE + else: + battery_msg.power_supply_health = BatteryState.POWER_SUPPLY_HEALTH_GOOD + + if error_msg is not None: + rospy.logerr_throttle_identical(10.0, f'[{rospy.get_name()}] {error_msg}') + bat_pub.publish(battery_msg) + def _count_volt_mean(self, label: Union[rospy.Publisher, str], new_val: float) -> float: + # Updates the average by adding the newest and removing the oldest component of mean value, + # in order to avoid recalculating the entire sum every time. + self._V_bat_mean[label] += (new_val - self._V_bat_hist[label][0]) / self._volt_mean_length + + self._V_bat_hist[label].pop(0) + self._V_bat_hist[label].append(new_val) + + return self._V_bat_mean[label] + @staticmethod def _read_file(path: str) -> int: with open(path, 'r') as file: diff --git a/panther_battery/src/roboteq_republisher_node.py b/panther_battery/src/roboteq_republisher_node.py index 0f272982f..5818f882c 100755 --- a/panther_battery/src/roboteq_republisher_node.py +++ b/panther_battery/src/roboteq_republisher_node.py @@ -1,6 +1,7 @@ #!/usr/bin/python3 from threading import Lock +from typing import Optional import rospy @@ -10,11 +11,19 @@ class RoboteqRepublisherNode: + V_BAT_FATAL_MIN = 27.0 + V_BAT_FATAL_MAX = 43.0 + def __init__(self, name: str) -> None: rospy.init_node(name, anonymous=False) - self._battery_voltage = None - self._battery_current = None + self._battery_voltage: Optional[float] = None + self._battery_current: Optional[float] = None + self._battery_voltage_mean = 37.0 + + self._volt_mean_length = 10 + self._battery_voltage_hist = [37.0] * self._volt_mean_length + self._battery_timeout = 1.0 self._last_battery_info_time = rospy.get_time() self._lock = Lock() @@ -44,9 +53,12 @@ def __init__(self, name: str) -> None: def _motor_controllers_state_cb(self, msg: DriverState) -> None: with self._lock: + new_voltage = (msg.front.voltage + msg.rear.voltage) / 2.0 + self._last_battery_info_time = rospy.get_time() - self._battery_voltage = (msg.front.voltage + msg.rear.voltage) / 2.0 + self._battery_voltage = new_voltage self._battery_current = msg.front.current + msg.rear.current + self._update_volt_mean(new_voltage) def _battery_pub_timer_cb(self, *args) -> None: with self._lock: @@ -55,6 +67,7 @@ def _battery_pub_timer_cb(self, *args) -> None: battery_msg.capacity = 20.0 battery_msg.design_capacity = 20.0 battery_msg.power_supply_technology = BatteryState.POWER_SUPPLY_TECHNOLOGY_LIPO + if ( self._battery_voltage == None or self._battery_current == None @@ -62,18 +75,40 @@ def _battery_pub_timer_cb(self, *args) -> None: ): battery_msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_UNKNOWN else: - battery_msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING battery_msg.voltage = self._battery_voltage battery_msg.temperature = float('nan') battery_msg.current = self._battery_current battery_msg.percentage = (battery_msg.voltage - 32.0) / 10.0 battery_msg.charge = battery_msg.percentage * battery_msg.design_capacity battery_msg.present = True - # TODO: - # battery_msg.power_supply_health + + # TODO: check battery status + battery_msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING + + # check battery health + error_msg = None + if self._battery_voltage < self.V_BAT_FATAL_MIN: + battery_msg.power_supply_health = BatteryState.POWER_SUPPLY_HEALTH_DEAD + error_msg = 'Battery voltage is critically low!' + elif self._battery_voltage > self.V_BAT_FATAL_MAX: + battery_msg.power_supply_health = BatteryState.POWER_SUPPLY_HEALTH_OVERVOLTAGE + error_msg = 'Battery overvoltage!' + else: + battery_msg.power_supply_health = BatteryState.POWER_SUPPLY_HEALTH_GOOD + + if error_msg is not None: + rospy.logerr_throttle_identical(10.0, f'[{rospy.get_name()}] {error_msg}') self._battery_pub.publish(battery_msg) + def _update_volt_mean(self, new_val: float) -> float: + # Updates the average by adding the newest and removing the oldest component of mean value, + # in order to avoid recalculating the entire sum every time. + self._battery_voltage_mean += (new_val - self._battery_voltage_hist[0]) / self._volt_mean_length + + self._battery_voltage_hist.pop(0) + self._battery_voltage_hist.append(new_val) + def main(): roboteq_republisher_node = RoboteqRepublisherNode('roboteq_republisher_node')