Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start add typing to python lib #1167

Merged
merged 1 commit into from
Apr 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 27 additions & 31 deletions openc3/python/openc3/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import json
import time
from typing import Optional

from openc3.utilities.store import Store, EphemeralStore
from openc3.utilities.store_queued import StoreQueued, EphemeralStoreQueued

Expand All @@ -33,43 +35,40 @@ def store_queued(cls):
# without primary_key to support other class methods.

@classmethod
def get(cls, primary_key, name):
def get(cls, primary_key: str, name: str):
"""@return [Hash|nil] Hash of this model or nil if name not found under primary_key"""
json_data = cls.store().hget(primary_key, name)
if json_data:
return json.loads(json_data)
else:
return None
return json.loads(json_data) if json_data else None

@classmethod
def names(cls, primary_key):
def names(cls, primary_key: str):
"""@return [Array<String>] All the names stored under the primary key"""
keys = cls.store().hkeys(primary_key)
keys.sort()
return [key.decode() for key in keys]

@classmethod
def all(cls, primary_key):
def all(cls, primary_key: str):
"""@return [Array<Hash>] All the models (as Hash objects) stored under the primary key"""
base = cls.store().hgetall(primary_key)
# decode the binary string keys to strings
hash = {k.decode(): v for (k, v) in base.items()}
for key, value in hash.items():
hash[key] = json.loads(value)
return hash
hash_ = {k.decode(): v for (k, v) in base.items()}
for key, value in hash_.items():
hash_[key] = json.loads(value)
return hash_
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the underscore: hash_

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hash is a built-in function in python, https://docs.python.org/3/library/functions.html#hash


# END NOTE

# Sets (updates) the redis hash of this model
@classmethod
def set(cls, json, scope, queued=False):
json["scope"] = scope
cls(**json).create(force=True, queued=queued)
def set(cls, json_data: dict, scope: str, queued: bool = False):
json_data["scope"] = scope
cls(**json_data).create(force=True, queued=queued)

# @return [Model] Model generated from the passed JSON
@classmethod
def from_json(cls, json_data, scope):
if type(json_data) == str:
def from_json(cls, json_data: str | dict, scope: str):
if type(json_data) is str:
json_data = json.loads(json_data)
if json_data is None:
raise RuntimeError("json data is nil")
Expand All @@ -79,12 +78,9 @@ def from_json(cls, json_data, scope):
# Calls self.get and then from_json to turn the Hash configuration into a Ruby Model object.
# @return [Object|nil] Model object or nil if name not found under primary_key
@classmethod
def get_model(cls, name, scope):
json = cls.get(name, scope)
if json:
return cls.from_json(json, scope)
else:
return None
def get_model(cls, name: str, scope: str):
json_data = cls.get(name, scope)
return cls.from_json(json_data, scope) if json_data else None

# NOTE: get_all_models not implemented as it is currently
# unused by any python models
Expand All @@ -93,13 +89,13 @@ def get_model(cls, name, scope):
# only needed by plugin_model which is Ruby only

# Store the primary key and keyword arguments
def __init__(self, primary_key, **kw_args):
self.primary_key = primary_key
self.name = kw_args.get("name")
self.updated_at = kw_args.get("updated_at")
self.plugin = kw_args.get("plugin")
self.scope = kw_args.get("scope")
self.destroyed = False
def __init__(self, primary_key: str, **kw_args):
self.primary_key: str = primary_key
self.name: Optional[str] = kw_args.get("name")
self.updated_at: Optional[float] = kw_args.get("updated_at")
self.plugin: Optional[str] = kw_args.get("plugin")
self.scope: Optional[str] = kw_args.get("scope")
self.destroyed: bool = False

# Update the Redis hash at primary_key and set the field "name"
# to the JSON generated via calling as_json
Expand Down Expand Up @@ -129,8 +125,8 @@ def update(self, force=False, queued=True):

# Deploy the model into the OpenC3 system. Subclasses must implement this
# and typically create MicroserviceModels to implement.
def deploy(self, gem_path, variables):
raise RuntimeError("must be implemented by subclass")
def deploy(self, gem_path: str, variables: str):
raise NotImplementedError("must be implemented by subclass")

# Undo the actions of deploy and remove the model from OpenC3.
# Subclasses must implement this as by default it is a noop.
Expand Down
31 changes: 19 additions & 12 deletions openc3/python/test/models/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@
# if purchased from OpenC3, Inc.

import time
import json
from typing import Optional
import unittest
from unittest.mock import *

from test.test_helper import *

from openc3.models.model import Model


class MyModel(Model):
def __init__(self, name, scope, plugin=None, updated_at=None):
def __init__(
self,
name: str,
scope: Optional[str] = None,
plugin: Optional[str] = None,
updated_at: Optional[float] = None,
):
super().__init__(
f"{scope}__TEST",
name=name,
Expand All @@ -33,15 +40,15 @@ def __init__(self, name, scope, plugin=None, updated_at=None):
)

@classmethod
def get(cls, name, scope=None):
def get(cls, name: str, scope: Optional[str] = None):
return super().get(f"{scope}__TEST", name=name)

@classmethod
def names(cls, scope=None):
def names(cls, scope: Optional[str] = None):
return super().names(f"{scope}__TEST")

@classmethod
def all(cls, scope=None):
def all(cls, scope: Optional[str] = None):
return super().all(f"{scope}__TEST")


Expand Down Expand Up @@ -75,7 +82,7 @@ def test_complains_if_it_already_exists(self):
with self.assertRaisesRegex(RuntimeError, "model already exists"):
model.create()

def test_complains_if_updating_non_existant(self):
def test_complains_if_updating_non_existent(self):
model = Model("primary_key", name="model")
with self.assertRaisesRegex(RuntimeError, "model doesn't exist"):
model.create(update=True)
Expand Down Expand Up @@ -104,8 +111,8 @@ def test_update_updates_existing(self):

def test_deploy_must_be_implemented_by_subclass(self):
model = Model("primary_key", name="model")
with self.assertRaisesRegex(RuntimeError, "must be implemented by subclass"):
model.deploy(None, None)
with self.assertRaisesRegex(NotImplementedError, "must be implemented by subclass"):
model.deploy("", "")

def test_removes_the_model(self):
model = Model("primary_key", name="model")
Expand All @@ -130,10 +137,10 @@ def test_round_trips_the_model_with_json(self):
now = time.time()
model = MyModel(name="TEST1", scope="DEFAULT", plugin="ONE", updated_at=now)
model.create()
hash = model.as_json()
json_data = json.dumps(hash)
hash_ = model.as_json()
json_data = json.dumps(hash_)
model2 = MyModel.from_json(json_data, scope="DEFAULT")
self.assertEqual(hash, (model2.as_json()))
self.assertEqual(hash_, (model2.as_json()))

def test_returns_none_if_the_name_cant_be_found(self):
self.assertIsNone(MyModel.get(name="BLAH", scope="DEFAULT"))
Expand Down
Loading