-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathopenlcb_nodes_db.py
128 lines (113 loc) · 4.68 KB
/
openlcb_nodes_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import json,openlcb_cpnodes,openlcb_RR_duino_nodes,openlcb_susic
from openlcb_debug import *
import time
class Nodes_db_node:
"""
Class to handle the load/saving of all/part of nodes config
This includes at least all the event ids configured in the CDI
This may also include other parameters depending on the type of nodes
Each subclass is reponsible for the file format (JSON)
"""
def __init__(self,file_name):
self.file_name = file_name
self.db={} #dict holding all nodes description, key is full ID
self.synced = False
self.last_sync = 0
self.sync_period = 10 #10 s minimum between syncs to file
def store_all_nodes(self):
debug("storing nodes DB")
with open(self.file_name,'w') as file:
db_list=[]
for n in self.db:
db_list.append(self.db[n].to_json())
#fixme: error handling
json.dump(db_list,file)
self.synced = True
self.last_sync = time.time()
# def load_node(self,desc): # virtual function to be overloaded by subclasses
# return None
def load_all_nodes(self):
try:
with open(self.file_name,'r') as file:
#fixme: error handling
db_list = json.load(file)
debug("db loading:",str(db_list))
for desc in db_list:
n = self.load_node(desc)
if n is not None:
if n.ID in self.db:
debug("2 nodes with same full ID in the DB!")
else:
debug("adding node ",str(n))
self.db[n.ID]=n
self.synced = True
self.last_sync = time.time()
except:
debug("Error loading nodes DB, missing or malformed file:",self.file_name)
def sync(self):
if not self.synced and self.last_sync+self.sync_period<time.time():
self.store_all_nodes()
"""
Base class for all CMRI nodes types:CPNodes and SUSICs for now
Format for a CP node: a (json) list each element of which is a node description
"fullID" (integer)
"type" (character) "C"
"cmri_node_add" (integer)
"version" (integer)
"name" (string)
"description" (string)
"IO_config" (integer)
"IOX_config" (list of pairs of integers (-1,0 or 1), only the needed number <=8)
"basic_events" 16 pairs of events (8 bytes, hex noted, separated by '.')
"IOX_events" pairs of events (only the needed number <=128)
Format for a SUSIC node: a (json) list each element of which is a node description
Format for a node description: json format with the following fields
"fullID" (integer)
"type" (character) "X" or "N"
"cmri_node_add" (integer)
"version" (integer)
"name" (string)
"description" (string)
"type" (character)
"cards_sets" (list of integers)
"events" pairs of events (8 bytes, hex noted, separated by '.')
"""
class Nodes_db_CMRI(Nodes_db_node):
def __init__(self,filename):
super().__init__(filename)
def load_node(self,js):
debug("cmri db node",js)
if "fullID" not in js or "cmri_node_add" not in js or "type" not in js:
debug("missing fields in the node description",js)
return None
debug("all info OK")
if js["type"]=="C":
return openlcb_cpnodes.Node_cpnode.from_json(js)
elif js["type"]=="N" or js["type"]=="X":
debug("SUSIC!!")
return openlcb_susic.Node_SUSIC.from_json(js)
"""
Format for RR_duino nodes: a (json) list, each element of which is a node description
"fullID" (integer) : full node ID as an openlcb node
"bus" (string) : name of the bus it is on
"address" (integer) : address on the bus (1-62)
"sensors_ev_dict" (dict) : dictionnary, subaddress is the key value is a list of events (on for on one for off)
"turnouts_ev_dict" (dict) : dictionnary, subaddress is the key value is a list of events (on to close, one to throw, one when closed one when thrown)
"version" (integer)
"name" (string)
"description" (string)
"""
class Nodes_db_RR_duino_node(Nodes_db_node):
def __init__(self,filename):
super().__init__(filename)
def load_node(self,js):
if "fullID" not in js or "address" not in js or "bus" not in js:
debug("missing fields in the node description",js)
return None
debug("load node from db",js)
return openlcb_RR_duino_nodes.RR_duino_node_desc(js)
def get_db_node_from_add(self,bus_name,add):
for fullID in self.db:
if self.db[fullID].desc_dict["bus"]==bus_name and self.db[fullID].desc_dict["address"]==add:
return self.db[fullID]
return None