Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix machine_type unique constraint to support a machine_type for multiple sites #220

Merged
merged 5 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ Manuel Giffels <giffels@gmail.com>
Stefan Kroboth <stefan.kroboth@gmail.com>
Eileen Kuehn <eileen.kuehn@kit.edu>
matthias.schnepf <matthias.schnepf@kit.edu>
Max Fischer <maxfischer2781@gmail.com>
ubdsv <ubdsv@student.kit.edu>
Rene Caspart <rene.caspart@cern.ch>
Max Fischer <maxfischer2781@gmail.com>
Leon Schuhmacher <ji7635@partner.kit.edu>
R. Florian von Cube <florian.voncube@gmail.com>
mschnepf <matthias.schnepf@kit.edu>
Expand Down
17 changes: 15 additions & 2 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
.. Created by changelog.py at 2021-10-06, command
'/Users/giffler/.cache/pre-commit/repor6pnmwlm/py_env-python3.9/bin/changelog docs/source/changes compile --output=docs/source/changelog.rst'
.. Created by changelog.py at 2021-11-30, command
'/Users/giffler/.cache/pre-commit/repor6pnmwlm/py_env-default/bin/changelog docs/source/changes compile --output=docs/source/changelog.rst'
based on the format of 'https://keepachangelog.com/'

#########
CHANGELOG
#########

[Unreleased] - 2021-11-30
=========================

Changed
-------

* SSHExecutor respects the remote MaxSessions via queueing

Fixed
-----

* Unique constraints in database schema have been fixed to allow same machine_type and remote_resource_uuid on multiple sites

[0.6.0] - 2021-08-09
====================

Expand Down
9 changes: 9 additions & 0 deletions docs/source/changes/220.fix_unique_constraints_db_schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
category: fixed
summary: "Unique constraints in database schema have been fixed to allow same machine_type and remote_resource_uuid on multiple sites"
description: |
The unique constraints in the datebase schema have been relaxed to allow the same machine_type and the same
remote_resource_uuid to be used on multiple sites. In addition, the unittest of the SqliteRegistry have been improved.
pull_requests:
- 220
issues:
- 219
72 changes: 50 additions & 22 deletions tardis/plugins/sqliteregistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from typing import List, Dict, Generator
import asyncio
import logging
import sqlite3
Expand All @@ -23,35 +24,60 @@ def __init__(self):
configuration = Configuration()
self._db_file = configuration.Plugins.SqliteRegistry.db_file
self._deploy_db_schema()
self._dispatch_on_state = dict(
BootingState=self.insert_resource, DownState=self.delete_resource
)
self._dispatch_on_state = {
"BootingState": self.insert_resource,
"DownState": self.delete_resource,
}

self.thread_pool_executor = ThreadPoolExecutor(max_workers=1)

for site in configuration.Sites:
self.add_site(site.name)
for machine_type in getattr(configuration, site.name).MachineTypes:
self.add_machine_types(site.name, machine_type)

def add_machine_types(self, site_name: str, machine_type: str):
def add_machine_types(self, site_name: str, machine_type: str) -> None:
if self._get_machine_type(site_name, machine_type):
logger.debug(
f"{machine_type} is already present for {site_name} in database! Skipping insertion!" # noqa B950
)
return
sql_query = """
INSERT OR IGNORE INTO MachineTypes(machine_type, site_id)
INSERT OR ROLLBACK INTO MachineTypes(machine_type, site_id)
SELECT :machine_type, Sites.site_id FROM Sites
WHERE Sites.site_name = :site_name"""
self.execute(sql_query, dict(site_name=site_name, machine_type=machine_type))
self.execute(sql_query, {"site_name": site_name, "machine_type": machine_type})

def _get_machine_type(self, site_name: str, machine_type: str) -> List[Dict]:
sql_query = """
SELECT * FROM MachineTypes MT
JOIN Sites S ON MT.site_id = S.site_id
WHERE MT.machine_type = :machine_type AND S.site_name = :site_name"""
return self.execute(
sql_query, {"site_name": site_name, "machine_type": machine_type}
)

def add_site(self, site_name: str) -> None:
if self._get_site(site_name):
logger.debug(
f"{site_name} already present in database! Skipping insertion!"
)
return
sql_query = "INSERT OR ROLLBACK INTO Sites(site_name) VALUES (:site_name)"
self.execute(sql_query, {"site_name": site_name})

def add_site(self, site_name: str):
sql_query = "INSERT OR IGNORE INTO Sites(site_name) VALUES (:site_name)"
self.execute(sql_query, dict(site_name=site_name))
def _get_site(self, site_name: str) -> List[Dict]:
sql_query = "SELECT * FROM Sites WHERE site_name = :site_name"
return self.execute(sql_query, {"site_name": site_name})

async def async_execute(self, sql_query: str, bind_parameters: dict):
async def async_execute(self, sql_query: str, bind_parameters: Dict) -> List[Dict]:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.thread_pool_executor, self.execute, sql_query, bind_parameters
)

@contextmanager
def connect(self):
def connect(self) -> Generator[sqlite3.Connection, None, None]:
con = sqlite3.connect(
self._db_file, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES
)
Expand All @@ -61,17 +87,18 @@ def connect(self):
finally:
con.close()

def _deploy_db_schema(self):
def _deploy_db_schema(self) -> None:
tables = {
"MachineTypes": [
"machine_type_id INTEGER PRIMARY KEY AUTOINCREMENT",
"machine_type VARCHAR(255) UNIQUE",
"machine_type VARCHAR(255)",
"site_id INTEGER",
"FOREIGN KEY(site_id) REFERENCES Sites(site_id)",
"CONSTRAINT unique_machine_type_per_site UNIQUE (machine_type, site_id)", # noqa B950
],
"Resources": [
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
"remote_resource_uuid VARCHAR(255) UNIQUE",
"remote_resource_uuid VARCHAR(255)",
"drone_uuid VARCHAR(255) UNIQUE",
"state_id INTEGER",
"site_id INTEGER",
Expand All @@ -81,6 +108,7 @@ def _deploy_db_schema(self):
"FOREIGN KEY(state_id) REFERENCES ResourceState(state_id)",
"FOREIGN KEY(site_id) REFERENCES Sites(site_id)",
"FOREIGN KEY(machine_type_id) REFERENCES MachineTypes(machine_type_id)",
"CONSTRAINT unique_remote_resource_uuid_per_site UNIQUE (site_id, remote_resource_uuid)", # noqa B950
],
"ResourceStates": [
"state_id INTEGER PRIMARY KEY AUTOINCREMENT",
Expand Down Expand Up @@ -108,13 +136,13 @@ def _deploy_db_schema(self):
(state,),
)

async def delete_resource(self, bind_parameters: dict):
async def delete_resource(self, bind_parameters: Dict) -> None:
sql_query = """DELETE FROM Resources
WHERE drone_uuid = :drone_uuid
AND site_id = (SELECT site_id from Sites WHERE site_name = :site_name)"""
await self.async_execute(sql_query, bind_parameters)

def execute(self, sql_query: str, bind_parameters: dict):
def execute(self, sql_query: str, bind_parameters: Dict) -> List[Dict]:
with self.connect() as connection:
connection.row_factory = lambda cur, row: {
col[0]: row[idx] for idx, col in enumerate(cur.description)
Expand All @@ -124,7 +152,7 @@ def execute(self, sql_query: str, bind_parameters: dict):
logger.debug(f"{sql_query},{bind_parameters} executed")
return cursor.fetchall()

def get_resources(self, site_name: str, machine_type: str):
def get_resources(self, site_name: str, machine_type: str) -> List[Dict]:
sql_query = """
SELECT R.remote_resource_uuid, R.drone_uuid, RS.state, R.created, R.updated
FROM Resources R
Expand All @@ -133,12 +161,12 @@ def get_resources(self, site_name: str, machine_type: str):
JOIN MachineTypes MT ON R.machine_type_id = MT.machine_type_id
WHERE S.site_name = :site_name AND MT.machine_type = :machine_type"""
return self.execute(
sql_query, dict(site_name=site_name, machine_type=machine_type)
sql_query, {"site_name": site_name, "machine_type": machine_type}
)

async def insert_resource(self, bind_parameters: dict):
async def insert_resource(self, bind_parameters: Dict) -> None:
sql_query = """
INSERT OR IGNORE INTO
INSERT OR ROLLBACK INTO
Resources(remote_resource_uuid, drone_uuid, state_id, site_id, machine_type_id,
created, updated)
SELECT :remote_resource_uuid, :drone_uuid, RS.state_id, S.site_id,
Expand All @@ -153,11 +181,11 @@ async def insert_resource(self, bind_parameters: dict):
async def notify(self, state: State, resource_attributes: AttributeDict) -> None:
state = str(state)
logger.debug(f"Drone: {str(resource_attributes)} has changed state to {state}")
bind_parameters = dict(state=state)
bind_parameters = {"state": state}
bind_parameters.update(resource_attributes)
await self._dispatch_on_state.get(state, self.update_resource)(bind_parameters)

async def update_resource(self, bind_parameters: dict):
async def update_resource(self, bind_parameters: Dict) -> None:
sql_query = """UPDATE Resources SET updated = :updated,
state_id = (SELECT state_id FROM ResourceStates WHERE state = :state)
WHERE drone_uuid = :drone_uuid
Expand Down
Loading