Skip to content

Commit

Permalink
Add MongoDb database class with unit tests
Browse files Browse the repository at this point in the history
Implement unit test automation with config from GHA secrets
  • Loading branch information
NeonDaniel committed Nov 5, 2024
1 parent 0420d2d commit a30cbd3
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ jobs:
- name: Unit Tests
run: |
pytest tests --doctest-modules --junitxml=tests/unit-test-results.xml
env:
MONGO_TEST_CONFIG: ${{secrets.MONGO_TEST_CONFIG}}
- name: Upload test results
uses: actions/upload-artifact@v4
with:
Expand Down
45 changes: 45 additions & 0 deletions neon_users_service/databases/mongodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from pymongo import MongoClient
from neon_users_service.databases import UserDatabase
from neon_data_models.models.user.database import User
from neon_users_service.exceptions import UserNotFoundError


class MongoDbUserDatabase(UserDatabase):
def __init__(self, db_host: str, db_port: int, db_user: str, db_pass: str,
db_name: str = "neon-users", collection_name: str = "users"):
connection_string = f"mongodb://{db_user}:{db_pass}@{db_host}:{db_port}"
self.client = MongoClient(connection_string)
db = self.client[db_name]
self.collection = db[collection_name]

def _db_create_user(self, user: User) -> User:
self.collection.insert_one({**user.model_dump(),
"_id": user.user_id})
return self.read_user_by_id(user.user_id)

def read_user_by_id(self, user_id: str) -> User:
result = self.collection.find_one({"user_id": user_id})
if not result:
raise UserNotFoundError(user_id)
return User(**result)

def read_user_by_username(self, username: str) -> User:
result = self.collection.find_one({"username": username})
if not result:
raise UserNotFoundError(username)
return User(**result)

def _db_update_user(self, user: User) -> User:
update = user.model_dump()
update.pop("user_id")
update.pop("created_timestamp")
self.collection.update_one({"user_id": user.user_id},
{"$set": update})
return self.read_user_by_id(user.user_id)

def _db_delete_user(self, user: User) -> User:
self.collection.delete_one({"user_id": user.user_id})
return user

def shutdown(self):
self.client.close()
74 changes: 73 additions & 1 deletion tests/test_databases_sqlite.py → tests/test_databases.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from os import remove
import json

from os import remove, environ
from os.path import join, dirname, isfile
from time import time
from typing import Optional
from unittest import TestCase
from uuid import uuid4

from neon_users_service.databases.sqlite import SQLiteUserDatabase
from neon_users_service.exceptions import UserExistsError, UserNotFoundError
Expand Down Expand Up @@ -110,3 +113,72 @@ def test_delete_user(self):
self.database.read_user_by_id(user.user_id)
with self.assertRaises(UserNotFoundError):
self.database.read_user_by_username(user.username)


class TestMongoDb(TestCase):
test_config = json.loads(environ.get("MONGO_TEST_CONFIG"))
from neon_users_service.databases.mongodb import MongoDbUserDatabase
database = MongoDbUserDatabase(**test_config)
test_user = User(username="test_user", password_hash="password")

def tearDown(self):
try:
self.database.delete_user(self.test_user.user_id)
except UserNotFoundError:
pass

def test_create_user(self):
# Create User
user = self.database.create_user(self.test_user)
self.assertEqual(user, self.test_user)

# Existing user fails
with self.assertRaises(UserExistsError):
self.database.create_user(self.test_user)

def test_read_user(self):
user = self.database.create_user(self.test_user)

by_id = self.database.read_user_by_id(user.user_id)
by_name = self.database.read_user_by_username(user.username)
self.assertEqual(by_id, user)
self.assertEqual(by_name, user)

# Invalid inputs
with self.assertRaises(UserNotFoundError):
self.database.read_user_by_id(user.username)
with self.assertRaises(UserNotFoundError):
self.database.read_user_by_username(user.user_id)

def test_update_user(self):
user = self.database.create_user(self.test_user)

user.password_hash = "new_password"
user.permissions.node = AccessRoles.OWNER

# Invalid change request
fake_time = round(time()) + 10
user.created_timestamp = fake_time

updated = self.database.update_user(user)
self.assertNotEqual(user.created_timestamp,
updated.created_timestamp, f"fake={fake_time}")
self.assertEqual(self.test_user.created_timestamp,
updated.created_timestamp)

self.assertEqual(updated.password_hash, user.password_hash)
self.assertEqual(updated.permissions.node, AccessRoles.OWNER)

# Invalid update request (user not exists
user.user_id = str(uuid4())
with self.assertRaises(UserNotFoundError):
self.database.update_user(user)

def test_delete_user(self):
user = self.database.create_user(self.test_user)

# Delete valid user
self.assertEqual(user, self.database.delete_user(user.user_id))

with self.assertRaises(UserNotFoundError):
self.database.delete_user(user.user_id)

0 comments on commit a30cbd3

Please sign in to comment.