Skip to content

Commit

Permalink
set points for platform.driver
Browse files Browse the repository at this point in the history
  • Loading branch information
craig8 committed Nov 3, 2023
1 parent 7c7b552 commit 2547a92
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 77 deletions.
15 changes: 15 additions & 0 deletions services/core/IEEE_2030_5/demo/inverter1.points.csv
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,18 @@ EKG3,INV_REACT_PWR,waveform,waveform,TRUE,tan,float,Cosine wave
SampleBool3,energized,On / Off,on/off,FALSE,TRUE,boolean,Status indidcator of cooling stage 1
SampleWritableBool3,connected,On / Off,on/off,TRUE,TRUE,boolean,Status indicator
SampleLong3,INV_OP_STATUS_MODE,Enumeration,1-4,FALSE,3,int,Mode of Inverter
ctrl_freq_max,ctrl_freq_max,,,TRUE,,int,
ctrl_volt_max,ctrl_volt_max,,,TRUE,,int,
ctrl_freq_min,ctrl_freq_min,,,TRUE,,int,
ctrl_volt_min,ctrl_volt_min,,,TRUE,,int,
ctrl_ramp_tms,ctrl_ramp_tms,,,TRUE,,int,
ctrl_rand_delay,ctrl_rand_delay,,,TRUE,,int,
ctrl_grad_w,ctrl_grad_w,,,TRUE,,int,
ctrl_soft_grad_w,ctrl_soft_grad_w,,,TRUE,,int,
ctrl_connected,ctrl_connected,,,TRUE,,boolean,
ctrl_energized,ctrl_energized,,,TRUE,,boolean,
ctrl_fixed_pf_absorb_w,ctrl_fixed_pf_absorb_w,,,TRUE,,int,
ctrl_fixed_pf_ingect_w,ctrl_fixed_pf_ingect_w,,,TRUE,,int,
ctrl_fixed_var,ctrl_fixed_var,,,TRUE,,int,
ctrl_fixed_w,ctrl_fixed_w,,,TRUE,,int,
ctrl_es_delay,ctrl_es_delay,,,TRUE,,int,
119 changes: 62 additions & 57 deletions services/core/IEEE_2030_5/ieee_2030_5/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
from volttron.platform.vip.agent.subsystems.query import Query

# from . import __version__
__version__ = "0.1.0"
__version__ = '0.1.0'

# Setup logging so that it runs within the platform
utils.setup_logging()

logging.getLogger("ieee_2030_5.client.req_resp").setLevel(logging.INFO)
logging.getLogger('ieee_2030_5.client.req_resp').setLevel(logging.INFO)
# The logger for this agent is _log and can be used throughout this file.
_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -98,7 +98,7 @@ def set_value(self, value: Any):
self.changed = True

def __post_init__(self):
params = self.parameter_type.split("::")
params = self.parameter_type.split('::')

# Only if we have a proper object specifier that we know about
if len(params) == 2:
Expand All @@ -113,10 +113,10 @@ def __post_init__(self):
elif params[0] == 'DefaultDERControl':
self.parent_object = DEFAULT_DER_CONTROL

assert self.parent_object is not None, f"The parent object type {params[0]} is not known, please check spelling in configuration file."
assert self.parent_object is not None, f'The parent object type {params[0]} is not known, please check spelling in configuration file.'
assert hasattr(
self.parent_object, params[1]
), f"{params[0]} does not have property {params[1]}, please check spelling in configuration file."
), f'{params[0]} does not have property {params[1]}, please check spelling in configuration file.'
self.parameter = params[1]

@staticmethod
Expand All @@ -137,7 +137,7 @@ class IEEE_2030_5_Agent(Agent):

def __init__(self, config_path: str, **kwargs):
super().__init__(**kwargs)
_log.debug("vip_identity: " + self.core.identity)
_log.debug('vip_identity: ' + self.core.identity)

config = utils.load_config(config_path)

Expand All @@ -146,22 +146,27 @@ def __init__(self, config_path: str, **kwargs):
self._certfile = Path(config['certfile']).expanduser()
self._pin = config['pin']
self._log_req_resp = bool(config.get('log_req_resp', False))
self._device_topic = config["device_topic"]
self._server_hostname = config["server_hostname"]
self._server_ssl_port = config.get("server_ssl_port", 443)
self._server_http_port = config.get("server_http_port", None)
self._mirror_usage_point_list = config.get("MirrorUsagePointList", [])
self._der_capabilities_info = config.get("DERCapability")
self._der_settings_info = config.get("DERSettings")
self._der_status_info = config.get("DERStatus")
self._device_topic = config['device_topic']
# Remove devices/ from prefix for sending data.
if self._device_topic.startswith('devices/'):
self._device_topic = self._device_topic[len('devices/'):]
self._server_hostname = config['server_hostname']
self._server_ssl_port = config.get('server_ssl_port', 443)
self._server_http_port = config.get('server_http_port', None)
self._mirror_usage_point_list = config.get('MirrorUsagePointList', [])
self._der_capabilities_info = config.get('DERCapability')
self._der_settings_info = config.get('DERSettings')
self._der_status_info = config.get('DERStatus')
#self._point_map = config.get("point_map")
self._mapped_points: Dict[str, MappedPoint] = {}
self._default_config = {
"device_topic": self._device_topic,
"MirrorUsagePointList": self._mirror_usage_point_list,
"point_map": config.get("point_map"),
"default_der_control_poll": int(config.get('default_der_control_poll', 60))
'device_topic': self._device_topic,
'MirrorUsagePointList': self._mirror_usage_point_list,
'point_map': config.get('point_map'),
'default_der_control_poll': int(config.get('default_der_control_poll', 60))
}
self._topic_without_prefix: str = self._device_topic[self._device_topic.find('devices/') +
len('devices/'):]
self._server_usage_points: m.UsagePointList

self._client = IEEE2030_5_Client(cafile=self._cacertfile,
Expand Down Expand Up @@ -191,7 +196,7 @@ def __init__(self, config_path: str, **kwargs):
try:
self._client.start(config=self._default_config)
except ConnectionRefusedError:
_log.error(f"Could not connect to server {self._server_hostname} agent exiting.")
_log.error(f'Could not connect to server {self._server_hostname} agent exiting.')
sys.exit(1)
except ValueError as e:
_log.error(e)
Expand All @@ -208,9 +213,9 @@ def __init__(self, config_path: str, **kwargs):

# Set a default configuration to ensure that self.configure is called immediately to setup
# the agent.
self.vip.config.set_default("config", self._default_config)
self.vip.config.set_default('config', self._default_config)
# Hook self.configure up to changes to the configuration file "config".
self.vip.config.subscribe(self.configure, actions=["NEW", "UPDATE"], pattern="config")
self.vip.config.subscribe(self.configure, actions=['NEW', 'UPDATE'], pattern='config')

@RPC.export
def update_der_settings(self, href: str, new_settings: m.DERSettings) -> int:
Expand All @@ -233,19 +238,19 @@ def get_der_references(self) -> List[str]:

def _active_controls_changed(self, active: m.DERControlList):
if not isinstance(active, m.DERControlList):
_log.error("Invalid instance passed to active control changed")
_log.error('Invalid instance passed to active control changed')
return
_log.debug(active)

def _default_control_changed(self, default_control: m.DefaultDERControl):
if not isinstance(default_control, m.DefaultDERControl):
_log.error("Invalid instance of default control")
raise ValueError(f"Invalid instance of default control was {type(default_control)}")
_log.error('Invalid instance of default control')
raise ValueError(f'Invalid instance of default control was {type(default_control)}')

if self._current_control is not None:
_log.info("Default config has been overwritten by event.")
_log.info('Default config has been overwritten by event.')
return
_log.info("Sending controls to platform.driver")
_log.info('Sending controls to platform.driver')

self._default_der_control = default_control

Expand All @@ -261,15 +266,15 @@ def _default_control_changed(self, default_control: m.DefaultDERControl):
try:
if point_value:
if not isinstance(point_value, (float, int, bool)):
point_value = getattr(point_value, "value")
point_value = getattr(point_value, 'value')

if point_value:
self.vip.rpc.call(PLATFORM_DRIVER, "set_point", self._device_topic,
self.vip.rpc.call(PLATFORM_DRIVER, 'set_point', self._device_topic,
point.point_on_bus, point_value)
except TypeError:
_log.error(f"Error setting point {point.point_on_bus} to {point_value}")
_log.error(f'Error setting point {point.point_on_bus} to {point_value}')
except KeyError:
_log.error(f"Error setting point {point.point_on_bus} to {point_value}")
_log.error(f'Error setting point {point.point_on_bus} to {point_value}')

for point in der_base_points:

Expand All @@ -278,20 +283,20 @@ def _default_control_changed(self, default_control: m.DefaultDERControl):
try:
if point_value:
if not isinstance(point_value, (float, int, bool)):
point_value = getattr(point_value, "value")
point_value = getattr(point_value, 'value')

if point_value:
self.vip.rpc.call(PLATFORM_DRIVER, "set_point", point.point_on_bus,
point_value)
self.vip.rpc.call(PLATFORM_DRIVER, 'set_point', self._device_topic,
point.point_on_bus, point_value)
except TypeError:
_log.error(f"Error setting point {point.point_on_bus} to {point_value}")
_log.error(f'Error setting point {point.point_on_bus} to {point_value}')

def _control_event_started(self, sender: m.DERControl):
_log.debug(f"{'='*50}Control event started\n{sender}")
if not isinstance(sender, m.DERControl):
_log.error("Invalid control event passed to event_started")
_log.error('Invalid control event passed to event_started')
raise ValueError(
f"Invalid type passed to event_started {type(sender)} instead of {type(m.DERControl)}"
f'Invalid type passed to event_started {type(sender)} instead of {type(m.DERControl)}'
)

self._current_control = sender
Expand Down Expand Up @@ -337,11 +342,11 @@ def _control_event_started(self, sender: m.DERControl):
point_value = 1 if point_value else 0

if point_value:
_log.debug(f"Setting point: {point.point_on_bus} to {point_value}")
self.vip.rpc.call(PLATFORM_DRIVER, "set_point", point.point_on_bus,
_log.debug(f'Setting point: {point.point_on_bus} to {point_value}')
self.vip.rpc.call(PLATFORM_DRIVER, 'set_point', point.point_on_bus,
point_value)
except TypeError:
_log.error(f"Error setting point {point.point_on_bus} to {point_value}")
_log.error(f'Error setting point {point.point_on_bus} to {point_value}')

def _control_event_ended(self, sender: m.DERControl):
_log.debug(f"{'='*50}Control event ended\n{sender}")
Expand All @@ -359,7 +364,7 @@ def configure(self, config_name, action, contents):

if not config.get('point_map'):
raise ValueError(
"Must have point_map specified in config store or referenced to a config store entry!"
'Must have point_map specified in config store or referenced to a config store entry!'
)
# Only deal with points that have both on bus point and
# a 2030.5 parameter type
Expand All @@ -383,7 +388,7 @@ def configure(self, config_name, action, contents):
device_topic = config['device_topic']
new_usage_points: Dict[str, m.MirrorUsagePoint] = {}

for mup in config.get("MirrorUsagePointList", []):
for mup in config.get('MirrorUsagePointList', []):
device_topic_point = mup.pop('device_point')
new_usage_points[mup['mRID']] = m.MirrorUsagePoint(**mup)
new_usage_points[mup['mRID']].deviceLFDI = self._client.lfdi
Expand All @@ -393,11 +398,11 @@ def configure(self, config_name, action, contents):
mup['device_point'] = device_topic_point

except ValueError as e:
_log.error("ERROR PROCESSING CONFIGURATION: {}".format(e))
_log.error('ERROR PROCESSING CONFIGURATION: {}'.format(e))
return

all_message = "/".join([self._device_topic, 'all'])
self.vip.pubsub.unsubscribe(peer="pubsub",
all_message = '/'.join([self._device_topic, 'all'])
self.vip.pubsub.unsubscribe(peer='pubsub',
prefix=all_message,
callback=self._data_published)

Expand All @@ -421,7 +426,7 @@ def configure(self, config_name, action, contents):
mRID=mup.MirrorMeterReading[0].mRID,
href=location,
description=mup.MirrorMeterReading[0].description)
rs = m.MirrorReadingSet(mRID=mup_reading.mRID + "1",
rs = m.MirrorReadingSet(mRID=mup_reading.mRID + '1',
timePeriod=m.DateTimeInterval())
rs.timePeriod.start = int(round(datetime.utcnow().timestamp()))
rs.timePeriod.duration = self._mup_pollRate
Expand All @@ -434,14 +439,14 @@ def configure(self, config_name, action, contents):

self._server_usage_points = self._client.mirror_usage_point_list()

all_message = "/".join([self._device_topic, 'all'])
self.vip.pubsub.subscribe(peer="pubsub", prefix=all_message, callback=self._data_published)
all_message = '/'.join([self._device_topic, 'all'])
self.vip.pubsub.subscribe(peer='pubsub', prefix=all_message, callback=self._data_published)

def _cast_multipler(self, value: str) -> int:
try:
return int(value)
except ValueError:
_log.warning(f"Casting multiplier to int failed: {value}")
_log.warning(f'Casting multiplier to int failed: {value}')
return 1

def _transform_settings(self, points: List[MappedPoint]) -> m.DERSettings:
Expand All @@ -463,7 +468,7 @@ def _transform_settings(self, points: List[MappedPoint]) -> m.DERSettings:
for point in points:
assert isinstance(
point.parent_object,
m.DERSettings), f"Parent object is not a DERSettings object: {p.parent_object}"
m.DERSettings), f'Parent object is not a DERSettings object: {p.parent_object}'

settings: m.DERSettings = point.parent_object

Expand Down Expand Up @@ -584,7 +589,7 @@ def _transform_status(self, points: List[MappedPoint]) -> m.DERStatus:
for point in points:
assert isinstance(
point.parent_object,
m.DERStatus), f"Parent object is not a DERStatus object: {p.parent_object}"
m.DERStatus), f'Parent object is not a DERStatus object: {p.parent_object}'

status: m.DERStatus = point.parent_object

Expand Down Expand Up @@ -642,7 +647,7 @@ def _transform_capabilities(self, points: List[MappedPoint]) -> m.DERCapability:

assert isinstance(
point.parent_object, m.DERCapability
), f"Parent object is not a DERCapability object: {point.parent_object}"
), f'Parent object is not a DERCapability object: {point.parent_object}'

capabilities: m.DERCapability = point.parent_object

Expand Down Expand Up @@ -731,7 +736,7 @@ def _data_published(self, peer, sender, bus, topic, headers, message):
"""
Callback triggered by the device_topic setup using the topic from the agent's config file
"""
_log.debug(f"DATA Received from {sender}")
_log.debug(f'DATA Received from {sender}')
points = AllPoints.frombus(message)

current_timestamp: datetime = utils.parse_timestamp_string(headers.get('TimeStamp'))
Expand Down Expand Up @@ -764,25 +769,25 @@ def _data_published(self, peer, sender, bus, topic, headers, message):
mp.reset_changed()

for index, pt in enumerate(self._mirror_usage_point_list):
if pt["device_point"] in points.points:
reading_mRID = pt["MirrorMeterReading"]['mRID']
if pt['device_point'] in points.points:
reading_mRID = pt['MirrorMeterReading']['mRID']
reading = self._mup_readings[reading_mRID]
for rs_index, rs in enumerate(reading.MirrorReadingSet):
rs = reading.MirrorReadingSet[rs_index]
rs.Reading.append(
m.Reading(timePeriod=m.DateTimeInterval(
start=int(current_timestamp.timestamp())),
value=points.points[pt["device_point"]]))
value=points.points[pt['device_point']]))
start = rs.timePeriod.start
if start + self._mup_pollRate < self._client.server_time:
self._times_published[reading_mRID] = self._times_published.get(
reading_mRID, 0) + 1
rs.mRID = "_".join(
rs.mRID = '_'.join(
[reading_mRID, str(self._times_published[reading_mRID])])

new_reading_href = self._client.post_mirror_reading(reading)
_log.info(
f"New readings({len(rs.Reading)}) posted available at: {new_reading_href}"
f'New readings({len(rs.Reading)}) posted available at: {new_reading_href}'
)
rs.Reading.clear()
rs.timePeriod.start = self._client.server_time
Expand Down
Loading

0 comments on commit 2547a92

Please sign in to comment.