-
Notifications
You must be signed in to change notification settings - Fork 0
/
Database.py
67 lines (56 loc) · 2.57 KB
/
Database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import sqlite3
from queue import Queue
from threading import Thread
class Database:
instance = None
MEASUREMENTS_TABLE = 'measurements'
MEASUREMENTS_SORT_FIELD = 'ts'
def __init__(self, file_path):
if Database.instance is not None:
raise BaseException('Database already exists!')
self.file_path = file_path
self._queue = Queue()
self._outqueue = Queue(maxsize=1)
Thread(target=self.thread_func, daemon=True).start()
Database.instance = self
def thread_func(self):
self.db = sqlite3.connect(self.file_path)
c = self.db.cursor()
c.execute('SELECT count(*) FROM sqlite_master WHERE type = \'table\' AND tbl_name = :name',
{'name': Database.MEASUREMENTS_TABLE})
count = c.fetchone()[0]
if count == 0:
c.execute('CREATE TABLE %s (%s real, type integer, value real)' % (Database.MEASUREMENTS_TABLE, Database.MEASUREMENTS_SORT_FIELD))
while True:
item = self._queue.get()
c = self.db.cursor()
if item['operation'] == 'insert':
if item['table'] == Database.MEASUREMENTS_TABLE:
c.execute('INSERT INTO %s VALUES (?, ?, ?)' % Database.MEASUREMENTS_TABLE, item['data'])
self.db.commit()
else:
raise BaseException('What table? %s' % item['table'])
elif item['operation'] == 'fetch':
if item['table'] == Database.MEASUREMENTS_TABLE:
result = []
for row in c.execute('SELECT * FROM (SELECT * FROM %s WHERE type = ? ORDER BY %s DESC LIMIT %d) ORDER BY %s ASC' %
(Database.MEASUREMENTS_TABLE, Database.MEASUREMENTS_SORT_FIELD, item['count'],
Database.MEASUREMENTS_SORT_FIELD), (item['type_id'],)):
result.append({'time': row[0], 'measurement': row[2]})
self._outqueue.put(result)
else:
raise BaseException('What table? %s' % item['table'])
def insert_measurement(self, timestamp, type_id, value):
self._queue.put({
'operation': 'insert',
'table': Database.MEASUREMENTS_TABLE,
'data': (timestamp, type_id, value)
})
def fetch_latest_measurements(self, type_id, count):
self._queue.put({
'operation': 'fetch',
'table': Database.MEASUREMENTS_TABLE,
'type_id': type_id,
'count': count
})
return self._outqueue.get()