forked from maxvonhippel/AttackerSynthesis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Construct.py
214 lines (190 loc) · 5.88 KB
/
Construct.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# ==============================================================================
# File : Construct.py
# Author : Max von Hippel and Cole Vick
# Authored : 30 November 2019 - 13 March 2020
# Purpose : Constructs and interprets Promela models for Korg.
# How to run: This file is used by Korg.py.
# ==============================================================================
import os
import subprocess
def makeDaisy(events, N, with_recovery=False, daisyName="daisy.pml"):
"""
Scan the model for all send and receive events,
and their complements. Effectively, this is all
the send and receive events possible according to
the (I/O) interface of the model.
@param events: the IO interface of the network
@param N: model of the network
@param with_recovery: do we want with_recovery attacks?
"""
assert(len(events) != 0)
network = innerContents(fileRead(N))
if with_recovery and \
(network in { False, None } or len(network.strip()) == 0):
return None
recovery_bitflag = "b"
j = 0
while "bit " + recovery_bitflag in network:
recovery_bitflag = "b" + str(j)
j += 1
return network, recovery_bitflag
def makeDaisyWithEvents(events, with_recovery, network, b):
"""
Make a daisy using all those ! and ? events. Add
recovery using the with_recovery flag.
"""
daisy = "active proctype daisy () {\n\tdo"
if with_recovery:
daisy = "bit " + b + "= 0;\n" + daisy
for event in events:
daisy += "\n\t:: " + event
if with_recovery:
daisy += "\n\t:: break; // recovery to N ... \n\tod"
daisy += "\n\t" + b + " = 1;\n\t"
# Add recovery to N
daisy += "// N begins here ... \n" + network + "\n}"
else:
daisy += "\n\tod\n}"
return daisy
def makeDaisyPhiFinite(label, phi):
"""
Given a label and LTL spec, creates a new phi that should be
satisfied by the daisy.
@param label: labels??
@param phi: specification for the daisy
"""
phiBody = innerContents(fileRead(phi))
newPhi = "ltl newPhi {\n\t(eventually ( " + label + \
" == 1 ) ) implies (\n\t\t" + phiBody + " )\n}"
return newPhi
def makeAttacker(events, prov, net, DIR=None, with_recovery=True, k=0):
"""
Create the attacker string given the events that the
daisy made. This method also writes the string to a
file provided one doesn't already exist.
@param prov: how spin was executed to return this process
"""
acyclicEvents, cyclicEvents = events[0], events[1]
acyclicEvents = acyclicEvents if len(acyclicEvents) > 0 else [ "skip" ]
proc = "active proctype attacker() {\n\t"
for ae in acyclicEvents:
proc += "\n\t" + ae + ";"
name = None
if with_recovery:
proc += "\n// recovery to N\n// N begins here ... \n" + net + "\n}"
else:
proc += "\n\t// Acceptance Cycle part of attack"
if len(cyclicEvents) > 0:
proc += "\n\tdo\n\t::" \
+ "".join(["\n\t " + ce + ";" for ce in cyclicEvents]) \
+ "\n\tod"
proc += "\n}"
attackerName = "attacker_" \
+ str(k) \
+ ("_WITH_RECOVERY" * int(with_recovery)) \
+ ".pml"
name = (DIR + "/") * int(DIR != None) + attackerName
# Only write the attacker if it isn't a duplicate, as
# determined using our hash function naming convention.
if (DIR != None and not os.path.exists(DIR)):
os.mkdir(DIR)
if not os.path.exists(name):
with open(name, "w") as fw:
fw.write("/* " + " ".join(prov) + " */\n")
fw.write(proc)
return name
def innerContents(singleModelBody):
"""
This method returns the body of the given
model, as an array. The body is between the
two curly brace { }.
We assume no comments in the model at the moment ...
same with in the properties.
"""
if singleModelBody == False:
return False
i, j = 0, len(singleModelBody) - 1
while (singleModelBody[i] != "{" and i < len(singleModelBody)):
i += 1
while (singleModelBody[j] != "}" and j > 0):
j -= 1
if (i >= j or singleModelBody[i] != "{" or singleModelBody[j] != "}"):
return None
return singleModelBody[i+1:j]
def writeAttacks(attacks, provenance, net, with_recovery=True, name="run"):
"""
Write given attacks to named directory, provided it doesn't
already exist.
"""
name = "out/" + name
if (not os.path.isdir("out")):
os.mkdir("out")
assert(not os.path.isdir(name))
os.mkdir(name)
for j in range(len(attacks)):
makeAttacker( \
events = attacks[j], \
prov = provenance[j], \
net = net, \
DIR = name, \
with_recovery = with_recovery, \
k = j)
def negateClaim(phi):
"""
Given a path to a given LTL specificiationm, phi, negate
phi and write it to a file called "negated.pml".
"""
contents = fileRead(phi)
if contents == None or contents == False:
return False
negated = False
with open("negated.pml", "w") as fw:
for l in contents:
fw.write(l)
if (not negated) and l == "{":
fw.write("!")
negated = True
return negated
def getIO(IOfile):
'''
Given an IO specification for a network, Q, return
a list of the input and output events.
format is like ...
[chanName]:
I: [inputs]
O: [outputs]
'''
events = set()
chan = None
lineType = lambda x : 2 if "O:" in x else 1 if "I:" in x else 0
try:
with open(IOfile, "r") as fr:
for line in fr:
j = lineType(line)
parts = [a.strip() for a in line.split(":")]
parts = [p for p in parts if len(p) > 0]
if j == 0:
chan = parts[0]
continue
part2 = [] if len(parts) < 2 else \
[a.strip() for a in parts[1].split(",")]
part2 = list(set(part2))
for msg in part2:
if j == 1 and chan != None:
events.add(chan + "?" + msg + ";")
elif j == 2 and chan != None:
events.add(chan + "!" + msg + ";")
return events
except Exception:
return None
def fileRead(fileName):
try:
txt = None
with open(fileName, 'r') as fr:
txt = fr.read()
return txt
except Exception:
return False
def writeDaisyToFile(daisy_string, file_name):
with open(file_name, "w") as fw:
fw.write(daisy_string)