-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathspreader.py
executable file
·112 lines (90 loc) · 2.7 KB
/
spreader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import cmfm
import sys
import time
import threading
from threading import Thread
from multiprocessing import Queue
import time
#f = sys.argv[1]
k = int(sys.argv[1]) # arbitrary value for superspreader
h_size = int(sys.argv[2]) # size of each hash table
h_num = int(sys.argv[3]) # number of hash tables
def cmfmFunction( threadName, fileName, q):
# initialize CM-FM Sketch
sketch = cmfm.CountMinSketch(h_size, h_num)
with open(fileName, 'r') as fin:
for line in fin:
arr = line.strip().split(',') # [src, dst]
#print arr
#print arr[0], arr[1]
src = arr[0] # src ip
dst = arr[1] # dst ip
sketch.add(src, dst)
#count = sketch.query(arr[0])
#if count >= k:
# print "%s is a %d-superspreader with %d distinct destinations" % (arr[0], k, count)
q.put(sketch)
#sketch.cleanup()
#del sketch
print "Starting CM-FM"
print "k = " + str(sys.argv[1])
print "hash table size = " + str(sys.argv[2])
print "hash table number = " + str(sys.argv[3])
print "files: " + str(sys.argv[4:])
threadList = []
results = []
i = 0
cm_fm_start_time = time.time()
for fileName in sys.argv[4:]:
i += 1
try:
threadName = "Thread" + str(i)
print threadName
q = Queue()
t = Thread(target=cmfmFunction, name=(threadName), args=("One!", fileName, q ) )
t.start()
threadList.append(t)
results.append(q)
except Exception as e:
print "Error: unable to start thread! " + str(e)
for thread in threadList:
thread.join()
print thread.name + " ended"
print("--- Building sketches took %s seconds ---" % (time.time() - cm_fm_start_time))
combine_start_time = time.time()
print "Combining sketches"
primarySketch=None
i = 0
for result in results:
i += 1
print "Combining sketch " + str(i)
if primarySketch is None:
primarySketch = result.get()
else:
primarySketch.combine(result.get())
print("--- Combining sketches took %s seconds ---" % (time.time() - combine_start_time))
print "Final processing"
final_start_time = time.time()
fwrite = "dist-%d-superspreader-%d-%d.txt" % (k, h_size, h_num)
i = 0
with open(fwrite, 'w+') as fout:
for fileName in sys.argv[4:]:
with open(fileName, 'r') as fin:
fin.readline()
#print "processing" + fileName
# read line by line
for line in fin:
arr = line.strip().split(',')
src = arr[0] # src ip
#print (src)
count = primarySketch.query(src)
if count >= k:
fout.write(src + '\n')
#fout.write(src + " " + str(count) + '\n')
i += 1
print("--- Final step took %s seconds ---" % (time.time() - final_start_time))
print "Done processing!!"
#print "Cleaning up!!"
#for result in results:
# result.get().cleanup()
print "Done!!"