-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_base.py
172 lines (127 loc) · 4.42 KB
/
data_base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Simple RAM database implementation with nested transactions."""
from __future__ import print_function
from builtins import input, dict
HELP = """
Enter method name, key or value(or both if required) separated by whitespace
to interact with the storage.
Methods:
SET - Store key and it's value. (Example: SET A 10)
GET - Get value by key. (Example: GET A)
UNSET - Remove given key. (Example: UNSET A)
COUNTS - Count occurance of value in storage. (Example: COUNTS 10)
FIND - Find all keys for value. (Example: FIND 10)
END - Exit database.
Transactions:
BEGIN - Start new transaction.
ROLLBACK - Rollback curret transaction.
COMMIT - Commit all changes for all transactions.
"""
class DataBase(object):
"""Dictionary storage with DML commands."""
def __init__(self):
"""Initialiaze storage."""
self._storage = dict()
self._rolling_back = False
self._transaction_number = 0
self._rollback_cache = dict()
def SET(self, key, value):
"""Set new key-value pair.
:param str key:
:param str value:
"""
if self._transaction_number and not self._rolling_back:
if key not in self._storage:
self._rollback_cache[self._transaction_number].append(
('UNSET', key)
)
else:
self._rollback_cache[self._transaction_number].append(
('SET', key, self._storage[key])
)
self._storage[key] = value
def GET(self, key):
"""Extract value by given key.
If not found prints NULL.
:param str key:
"""
print(self._storage.get(key, 'NULL'))
def UNSET(self, key):
"""Remove given key.
If not found does nothing.
:param str key:
"""
if key in self._storage:
if self._transaction_number and not self._rolling_back:
self._rollback_cache[self._transaction_number].append(
('SET', key, self._storage[key])
)
del self._storage[key]
else:
pass
def COUNTS(self, value):
"""Count occurance of value in storage.
:param str value:
"""
print(len([occurance for occurance in self._storage.values()
if value == occurance]))
def FIND(self, value):
"""Find all keys for value.
:param str value:
"""
print(' '.join([key for key, occurance in self._storage.items()
if value == occurance]))
def BEGIN(self):
"""Start transaction."""
self._transaction_number += 1
self._rollback_cache[self._transaction_number] = []
def ROLLBACK(self):
"""Rollback current transaction."""
if not self._transaction_number:
pass
self._rolling_back = True
for cache in reversed(self._rollback_cache[self._transaction_number]):
method = cache[0]
args = cache[1:]
getattr(self, method)(*args)
del self._rollback_cache[self._transaction_number]
if self._transaction_number != 0:
self._transaction_number -= 1
self._rolling_back = False
def COMMIT(self):
"""Clear rollback_cache and set transaction number to 0."""
self._rollback_cache = dict()
self._transaction_number = 0
def call_method(instance, method_and_args):
"""Call method with given args.
:param instance: class instance.
:param list[str] method_and_args:
:rtype: bool
"""
try:
method = method_and_args[0]
args = method_and_args[1:]
getattr(instance, method)(*args)
except AttributeError:
print(' -> Invalid method name. To get methods names type HELP.')
return False
except TypeError:
print(' -> Invalid number of argumets for', method)
return False
return True
def main():
"""Start database."""
print('Welcome to simple database! Type HELP for help.')
database = DataBase()
while True:
user_input = input(' -> ').split(' ')
if user_input[0] == 'HELP':
print(HELP)
continue
elif user_input[0] == 'END':
print('Exiting database...')
break
call_method(database, user_input)
if __name__ == '__main__':
main()