-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathDeductive_synthesis_modified(SQL).py
238 lines (201 loc) · 7.73 KB
/
Deductive_synthesis_modified(SQL).py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import csv
import numpy as np
import time
from datetime import datetime, date
import psycopg2
#Level number for deductive synthesis
level_num = 0
#Open DB connection
con = psycopg2.connect(
database="Synthesis_mod_AI",
user="postgres",
password="admin",
host="127.0.0.1",
port="5432"
)
#Set up working directory
work_dir = "C:\\1\\"
#Clear DB tables
cur = con.cursor()
cur.execute('''TRUNCATE TABLE public."Facts";''')
cur.execute('''TRUNCATE TABLE public."Req_model";''')
cur.execute('''TRUNCATE TABLE public."Facts_processed";''')
con.commit()
class src_node:
def start(self, model_type, node_type, id, name, parent_id):
self.model_type = model_type
self.node_type = node_type
self.id = id
self.name = name
self.parent_id = parent_id
class src_model:
def start(self, file_name, model1):
model = []
with open(file_name) as f_obj:
csv_dict_reader_models(f_obj, model)
self.model1 = model
class req_model:
def start(self, file_name, model1):
model = []
with open(file_name) as f_obj:
csv_dict_reader_models(f_obj, model)
self.model1 = model
class facts_model:
def start(self, file_name, model1):
model = []
with open(file_name) as f_obj:
csv_dict_reader_facts_and_req_model(f_obj, model)
self.model1 = model
class base_node:
def start(self, base_id, model_type, node_type, id, name, parent_id, level_num):
self.base_id = base_id
self.model_type = model_type
self.node_type = node_type
self.id = id
self.name = name
self.parent_id = parent_id
self.base_parent_id = base_parent_id
self.level_num = level_num
class base_link:
def start(self, id, src_link, dist_link, rule, notes):
self.id = id
self.src_link = src_link
self.dist_link = dist_link
self.rule = rule
self.notes = notes
class base_rules:
def start(self, src_node_type, dist_node_type, rule):
self.src_node_type = src_node_type
self.dist_node_type = dist_node_type
self.rule = rule
class base_requirements:
def start(self, model_type, node_type, id, name):
self.model_type = model_type
self.node_type = node_type
self.id = id
self.name = name
self.base_id = base_id
class base_model:
def start(self):
self.model = []
self.model_req = []
self.links = []
self.links_req = []
self.requirements = []
class goal:
def start(self, goal_id, goal_desc, level_num, node_id, goal_state):
self.goal_id = goal_id
self.goal_desc = goal_desc
self.level_num = level_num
self.node_id = node_id
self.goal_state = goal_state
class fact:
def start(self, fact_state, model_type, node_type, id, name, parent_id):
self.fact_state = fact_state
self.model_type = model_type
self.node_type = node_type
self.id = id
self.name = name
self.parent_id = parent_id
def csv_dict_reader_models(file_obj, model):
"""
Read a CSV file using csv.DictReader
"""
reader = csv.DictReader(file_obj, delimiter=',')
i=0
for line in reader:
model.append(src_node())
model[i].model_type = line["MODEL_TYPE"]
model[i].node_type = line["NODE_TYPE"]
model[i].id = line["ID"]
model[i].name = line["NAME"]
model[i].parent_id = line["PARENT_ID"]
i = i+1
return model
def csv_dict_reader_facts_and_req_model(file_obj_rules, model):
"""
Read a CSV file using csv.DictReader
"""
reader = csv.DictReader(file_obj_rules, delimiter=',')
i=0
for line in reader:
model.append(fact())
model[i].model_type = line["MODEL_TYPE"]
model[i].node_type = line["NODE_TYPE"]
model[i].id = line["FACT_ID"]
model[i].name = line["NAME"]
model[i].parent_id = line["PARENT_ID"]
model[i].level_num = line["LEVEL_NUM"]
i = i+1
t0 = float(time.time() * 1000)
# Single fact importing
sql = '''COPY "Facts" ("MODEL_TYPE", "NODE_TYPE", "FACT_ID", "NAME", "PARENT_ID", "LEVEL_NUM") FROM \'''' + work_dir + 'Test_facts.csv' + '''\' WITH DELIMITER ',' CSV HEADER;'''
cur.execute(sql)
con.commit()
print(datetime.now(), " - Set of facts has been imported")
t1 = float(time.time() * 1000)
print(datetime.now(), " - Facts import time: " + str(t1 - t0) + "ms.")
t02 = float(time.time() * 1000)
sql = '''COPY "Req_model" ("ID", "MODEL_TYPE", "NODE_TYPE", "SRC_ID", "NAME", "PARENT_ID", "LEVEL_NUM") FROM \'''' + work_dir + 'Req_model_1.csv' + '''\' WITH DELIMITER ',' CSV HEADER;'''
cur.execute(sql)
con.commit()
print(datetime.now(), ' - The rules readed')
t12 = float(time.time() * 1000)
print(datetime.now(), " - Rules reading time: " + str(t12 - t02) + "ms.")
#Process the set of facts
t03 = float(time.time() * 1000)
sql = '''INSERT INTO public."Facts_processed" ("FACT_ID", "LEVEL_NUM","isAchieved","CONTROL_DATE")
SELECT d."FACT_ID", s."LEVEL_NUM", 1, current_timestamp FROM public."Req_model" as s JOIN public."Facts" as d
ON s."MODEL_TYPE" = d."MODEL_TYPE"
AND s."NODE_TYPE" = d."NODE_TYPE"
AND s."NAME" = d."NAME"
AND s."SRC_ID" = d."FACT_ID"
WHERE s."LEVEL_NUM" = ''' + str(level_num) + ''';'''
cur.execute(sql)
con.commit()
print(datetime.now(), ' - The current factes have been processed')
t13 = float(time.time() * 1000)
print(datetime.now(), " - Facts processing time: " + str(t13 - t03) + "ms.")
return model
def deductive_synthesis(model, facts, level_num):
num = 0
max_level = 0
for i in range(len(model)):
if max_level < int(model[i].level_num):
max_level = int(model[i].level_num)
print(datetime.now(), ' - The model Max_level= ', max_level)
sql = '''SELECT count(s."isAchieved") FROM public."Facts_processed" as s
WHERE "LEVEL_NUM" = ''' + str(level_num) + ''' AND "isAchieved" = 1;'''
cur.execute(sql)
con.commit()
level_elements_fact_achieved = cur.fetchall()
for row in level_elements_fact_achieved:
level_elements_fact_achieved_int = row[0]
sql = '''SELECT count(s."LEVEL_NUM") FROM public."Req_model" as s
WHERE s."LEVEL_NUM" = ''' + str(level_num) + ''';'''
cur.execute(sql)
level_elements_req = cur.fetchall()
if (level_elements_req == level_elements_fact_achieved):
resolution = str("The model is proved. Complexity: " + str(num) + " Level number: " + str(level_num) + " Len model:" + str (len(model)))
else:
resolution = str("The model is not proved. Complexity: " +str(num) + " Elements/Len model:" + str(level_elements_fact_achieved_int) + " / " + str (len(model)))
return resolution
if __name__ == "__main__":
print(datetime.now(), " - Deductive synthesis is Started")
#Import base model from file
base_obj = base_model()
file_obj = np.load(work_dir + 'Model_base_req.npz', allow_pickle=True)
base_obj.model = file_obj['arr_0']
print(datetime.now(), " - Reference model has been imported")
t0 = float(time.time() * 1000)
# Read facts
facts = facts_model()
#Positive scenario - the One fact
facts.start(work_dir + 'Test_one_fact.csv', model1=[])
#Deductive analysis
t2 = float(time.time() * 1000)
resolution = deductive_synthesis(base_obj.model, facts.model1,level_num)
t3 = float(time.time() * 1000)
print(datetime.now(), ' -', resolution)
print(datetime.now(), " - Exec. time for Level: " + str(level_num) + " - " + str(t3-t2) + "ms.")
con.close()