Skip to content

Commit

Permalink
Merge pull request #1167 from GerhardOfRivia/py-typing-model
Browse files Browse the repository at this point in the history
Start add typing to python lib
  • Loading branch information
jmthomas authored Apr 6, 2024
2 parents 89c5d5d + c595605 commit 05f76f2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 43 deletions.
58 changes: 27 additions & 31 deletions openc3/python/openc3/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import json
import time
from typing import Optional

from openc3.utilities.store import Store, EphemeralStore
from openc3.utilities.store_queued import StoreQueued, EphemeralStoreQueued

Expand All @@ -33,43 +35,40 @@ def store_queued(cls):
# without primary_key to support other class methods.

@classmethod
def get(cls, primary_key, name):
def get(cls, primary_key: str, name: str):
"""@return [Hash|nil] Hash of this model or nil if name not found under primary_key"""
json_data = cls.store().hget(primary_key, name)
if json_data:
return json.loads(json_data)
else:
return None
return json.loads(json_data) if json_data else None

@classmethod
def names(cls, primary_key):
def names(cls, primary_key: str):
"""@return [Array<String>] All the names stored under the primary key"""
keys = cls.store().hkeys(primary_key)
keys.sort()
return [key.decode() for key in keys]

@classmethod
def all(cls, primary_key):
def all(cls, primary_key: str):
"""@return [Array<Hash>] All the models (as Hash objects) stored under the primary key"""
base = cls.store().hgetall(primary_key)
# decode the binary string keys to strings
hash = {k.decode(): v for (k, v) in base.items()}
for key, value in hash.items():
hash[key] = json.loads(value)
return hash
hash_ = {k.decode(): v for (k, v) in base.items()}
for key, value in hash_.items():
hash_[key] = json.loads(value)
return hash_

# END NOTE

# Sets (updates) the redis hash of this model
@classmethod
def set(cls, json, scope, queued=False):
json["scope"] = scope
cls(**json).create(force=True, queued=queued)
def set(cls, json_data: dict, scope: str, queued: bool = False):
json_data["scope"] = scope
cls(**json_data).create(force=True, queued=queued)

# @return [Model] Model generated from the passed JSON
@classmethod
def from_json(cls, json_data, scope):
if type(json_data) == str:
def from_json(cls, json_data: str | dict, scope: str):
if type(json_data) is str:
json_data = json.loads(json_data)
if json_data is None:
raise RuntimeError("json data is nil")
Expand All @@ -79,12 +78,9 @@ def from_json(cls, json_data, scope):
# Calls self.get and then from_json to turn the Hash configuration into a Ruby Model object.
# @return [Object|nil] Model object or nil if name not found under primary_key
@classmethod
def get_model(cls, name, scope):
json = cls.get(name, scope)
if json:
return cls.from_json(json, scope)
else:
return None
def get_model(cls, name: str, scope: str):
json_data = cls.get(name, scope)
return cls.from_json(json_data, scope) if json_data else None

# NOTE: get_all_models not implemented as it is currently
# unused by any python models
Expand All @@ -93,13 +89,13 @@ def get_model(cls, name, scope):
# only needed by plugin_model which is Ruby only

# Store the primary key and keyword arguments
def __init__(self, primary_key, **kw_args):
self.primary_key = primary_key
self.name = kw_args.get("name")
self.updated_at = kw_args.get("updated_at")
self.plugin = kw_args.get("plugin")
self.scope = kw_args.get("scope")
self.destroyed = False
def __init__(self, primary_key: str, **kw_args):
self.primary_key: str = primary_key
self.name: Optional[str] = kw_args.get("name")
self.updated_at: Optional[float] = kw_args.get("updated_at")
self.plugin: Optional[str] = kw_args.get("plugin")
self.scope: Optional[str] = kw_args.get("scope")
self.destroyed: bool = False

# Update the Redis hash at primary_key and set the field "name"
# to the JSON generated via calling as_json
Expand Down Expand Up @@ -129,8 +125,8 @@ def update(self, force=False, queued=True):

# Deploy the model into the OpenC3 system. Subclasses must implement this
# and typically create MicroserviceModels to implement.
def deploy(self, gem_path, variables):
raise RuntimeError("must be implemented by subclass")
def deploy(self, gem_path: str, variables: str):
raise NotImplementedError("must be implemented by subclass")

# Undo the actions of deploy and remove the model from OpenC3.
# Subclasses must implement this as by default it is a noop.
Expand Down
31 changes: 19 additions & 12 deletions openc3/python/test/models/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@
# if purchased from OpenC3, Inc.

import time
import json
from typing import Optional
import unittest
from unittest.mock import *

from test.test_helper import *

from openc3.models.model import Model


class MyModel(Model):
def __init__(self, name, scope, plugin=None, updated_at=None):
def __init__(
self,
name: str,
scope: Optional[str] = None,
plugin: Optional[str] = None,
updated_at: Optional[float] = None,
):
super().__init__(
f"{scope}__TEST",
name=name,
Expand All @@ -33,15 +40,15 @@ def __init__(self, name, scope, plugin=None, updated_at=None):
)

@classmethod
def get(cls, name, scope=None):
def get(cls, name: str, scope: Optional[str] = None):
return super().get(f"{scope}__TEST", name=name)

@classmethod
def names(cls, scope=None):
def names(cls, scope: Optional[str] = None):
return super().names(f"{scope}__TEST")

@classmethod
def all(cls, scope=None):
def all(cls, scope: Optional[str] = None):
return super().all(f"{scope}__TEST")


Expand Down Expand Up @@ -75,7 +82,7 @@ def test_complains_if_it_already_exists(self):
with self.assertRaisesRegex(RuntimeError, "model already exists"):
model.create()

def test_complains_if_updating_non_existant(self):
def test_complains_if_updating_non_existent(self):
model = Model("primary_key", name="model")
with self.assertRaisesRegex(RuntimeError, "model doesn't exist"):
model.create(update=True)
Expand Down Expand Up @@ -104,8 +111,8 @@ def test_update_updates_existing(self):

def test_deploy_must_be_implemented_by_subclass(self):
model = Model("primary_key", name="model")
with self.assertRaisesRegex(RuntimeError, "must be implemented by subclass"):
model.deploy(None, None)
with self.assertRaisesRegex(NotImplementedError, "must be implemented by subclass"):
model.deploy("", "")

def test_removes_the_model(self):
model = Model("primary_key", name="model")
Expand All @@ -130,10 +137,10 @@ def test_round_trips_the_model_with_json(self):
now = time.time()
model = MyModel(name="TEST1", scope="DEFAULT", plugin="ONE", updated_at=now)
model.create()
hash = model.as_json()
json_data = json.dumps(hash)
hash_ = model.as_json()
json_data = json.dumps(hash_)
model2 = MyModel.from_json(json_data, scope="DEFAULT")
self.assertEqual(hash, (model2.as_json()))
self.assertEqual(hash_, (model2.as_json()))

def test_returns_none_if_the_name_cant_be_found(self):
self.assertIsNone(MyModel.get(name="BLAH", scope="DEFAULT"))
Expand Down

0 comments on commit 05f76f2

Please sign in to comment.