Skip to content

Commit

Permalink
Update SQLite to allow threaded access for MQ compat.
Browse files Browse the repository at this point in the history
Update `TokenConfig` to match/extend existing config used in neon-hana
Update MQ error handling to return HTTP codes for HANA integration
  • Loading branch information
NeonDaniel committed Oct 30, 2024
1 parent 59a7ca1 commit 64f08af
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 35 deletions.
2 changes: 1 addition & 1 deletion neon_users_service/databases/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class SQLiteUserDatabase(UserDatabase):
def __init__(self, db_path: Optional[str] = None):
db_path = expanduser(db_path or "~/.local/share/neon/user-db.sqlite")
makedirs(dirname(db_path), exist_ok=True)
self.connection = connect(db_path)
self.connection = connect(db_path, check_same_thread=False)
self._db_lock = Lock()
self.connection.execute(
'''CREATE TABLE IF NOT EXISTS users
Expand Down
11 changes: 8 additions & 3 deletions neon_users_service/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,16 @@ class PermissionsConfig(BaseModel):


class TokenConfig(BaseModel):
description: str
username: str
client_id: str
expiration_timestamp: int
permissions: dict
refresh_token: str
last_used_timestamp: int
expiration: int
refresh_expiration: int
token_name: str
creation_timestamp: int
last_refresh_timestamp: int
access_token: Optional[str] = None


class User(BaseModel):
Expand Down
66 changes: 35 additions & 31 deletions neon_users_service/mq_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ovos_config.config import Configuration
from neon_mq_connector.connector import MQConnector
from neon_mq_connector.utils.network_utils import b64_to_dict, dict_to_b64
from neon_users_service.exceptions import UserNotFoundError, AuthenticationError, UserNotMatchedError, UserExistsError
from neon_users_service.models import MQRequest, User

from neon_users_service.service import NeonUsersService
Expand All @@ -27,45 +28,46 @@ def parse_mq_request(self, mq_req: dict) -> dict:
"error": f"Supplied username ({mq_req.username}) "
f"Does not match user object "
f"({mq_req.user.username})"}

if mq_req.operaion == "create":
if not mq_req.password:
return {"success": False,
"error": "Empty password provided"}
if not mq_req.user:
mq_req.user = User(username=mq_req.username,
password_hash=mq_req.password)
mq_req.user.password_hash = mq_req.password
try:
try:
if mq_req.operation == "create":
if not mq_req.password:
return {"success": False,
"error": "Empty password provided"}
if not mq_req.user:
mq_req.user = User(username=mq_req.username,
password_hash=mq_req.password)
mq_req.user.password_hash = mq_req.password
user = self.service.create_user(mq_req.user)
except Exception as e:
return {"success": False, "error": repr(e)}
elif mq_req.operation == "read":
try:
elif mq_req.operation == "read":
if mq_req.password:
user = self.service.read_authenticated_user(mq_req.username,
mq_req.password)
else:
user = self.service.read_unauthenticated_user(
mq_req.username)
except Exception as e:
return {"success": False, "error": repr(e)}
elif mq_req.operation == "update":
try:
elif mq_req.operation == "update":
if mq_req.password:
mq_req.user.password_hash = mq_req.password
user = self.service.update_user(mq_req.user)
except Exception as e:
return {"success": False, "error": repr(e)}
elif mq_req.operation == "delete":
try:
elif mq_req.operation == "delete":
user = self.service.delete_user(mq_req.user)
except Exception as e:
return {"success": False, "error": repr(e)}
else:
raise RuntimeError(f"Invalid operation requested: "
f"{mq_req.operation}")
return {"success": True, "user": user.model_dump()}
else:
raise RuntimeError(f"Invalid operation requested: "
f"{mq_req.operation}")
return {"success": True, "user": user.model_dump()}
except UserExistsError:
return {"success": False, "error": "User already exists",
"code": 409}
except UserNotFoundError:
return {"success": False, "error": "User does not exist",
"code": 404}
except UserNotMatchedError:
return {"success": False, "error": "Invalid user", "code": 401}
except AuthenticationError:
return {"success": False, "error": "Invalid username or password",
"code": 401}
except Exception as e:
return {"success": False, "error": repr(e), "code": 500}

def handle_request(self,
channel: pika.channel.Channel,
Expand All @@ -87,18 +89,20 @@ def handle_request(self,
request = b64_to_dict(body)
message_id = request.get("message_id")
response = self.parse_mq_request(request)
response["message_id"] = message_id
data = dict_to_b64(response)

routing_key = request.get('routing_key', 'neon_users_output')
# queue declare is idempotent, just making sure queue exists
channel.queue_declare(queue='neon_users_output')
channel.queue_declare(queue=routing_key)

channel.basic_publish(
exchange='',
routing_key=request.get('routing_key',
'neon_users_output'),
routing_key=routing_key,
body=data,
properties=pika.BasicProperties(expiration='1000')
)
LOG.info(f"Sent response to queue {routing_key}: {response}")
channel.basic_ack(method.delivery_tag)
except Exception as e:
LOG.exception(f"message_id={message_id}: {e}")
Expand Down

0 comments on commit 64f08af

Please sign in to comment.