-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRingNode.py
203 lines (155 loc) · 7.76 KB
/
RingNode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# coding: utf-8
import time
import socket
import threading
import logging
import pickle
import queue
import copy
from utils import NODE_JOIN, REQUEST_INFO, ENTITIES_NAMES, NODE_DISCOVERY, ORDER, PICKUP, \
TOKEN, PICK, GIVE_FOOD, print_out
from adaptor import Adaptor
from encapsulation_utils import nodes_message_create, token_message_create, \
pre_ring_message_create, discovery_message_create, entities_message_create
class RingNode(threading.Thread):
def __init__(self, address, self_id, name, max_nodes=4, ring_address=None, timeout=3):
threading.Thread.__init__(self)
self.id = self_id
self.addr = address
self.ring_address = ring_address
self.max_nodes = max_nodes
self.inside_ring = False
self.successor_id = self.max_nodes*2
self.successor_addr = self.addr
self.nodes_com = []
self.name = name
self.entities = {}
for i in range(len(ENTITIES_NAMES)):
self.entities[ENTITIES_NAMES[i]] = None
self.coordinator = False
self.inside_ring_order = 0
# queues
self.in_queue = queue.Queue() # messages received from the token
self.out_queue = queue.Queue() # messages to send to the token
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.socket.settimeout(timeout)
self.logger = logging.getLogger("Node {}".format(self.id))
# adaptor for professor's client
self.adaptor = Adaptor()
def send(self, address, o):
p = pickle.dumps(o)
self.socket.sendto(p, address)
def recv(self):
try:
p, addr = self.socket.recvfrom(1024)
except socket.timeout:
return None, None
else:
if len(p) == 0:
return None, addr
else:
return p, addr
def broadcast(self, message_to_send):
for i in range(254):
address_send = ('127.0.0.' + str(i + 1), 5000)
self.send(address_send, message_to_send)
def requestInfo(self):
# request info about other nodes (because they can already be in a ring and this is to accelerate the process
# of enter the ring)
message_pre_ring = pre_ring_message_create(self.addr, self.id)
message_to_send = nodes_message_create(REQUEST_INFO, message_pre_ring)
self.broadcast(message_to_send)
def discoveryReply(self, args):
message_to_send = nodes_message_create(NODE_DISCOVERY, args.copy())
if self.name == args['name'] and args['id'] is None:
message_to_send['args']['id'] = self.id
elif args['id'] is not None:
self.entities[args['name']] = args['id']
# self.logger.debug('My table of entities: ' + str(self.entities))
if args['id'] != self.id:
self.send(self.successor_addr, message_to_send)
def allNodesDiscovered(self):
number_nodes = 0
for i in self.entities:
if self.entities[i] is not None:
number_nodes += 1
return number_nodes == self.max_nodes
def sendMessageToToken(self, id_to_send, order):
token_to_send = token_message_create(id_to_send, order)
message_to_send = nodes_message_create(TOKEN, token_to_send)
self.out_queue.put(message_to_send)
def sendToClient(self, addr, method, args):
message_to_send = nodes_message_create(method, args)
self.send(addr, message_to_send)
def run(self):
self.socket.bind(self.addr)
delta_time = time.time()
token_sent = False
while True:
if not self.inside_ring:
self.requestInfo()
p, addr = self.recv()
if p is not None:
message_received = self.adaptor.adapt(pickle.loads(p), addr)
if message_received['method'] == REQUEST_INFO:
message_pre_ring = pre_ring_message_create(self.addr, self.id)
message_to_send = nodes_message_create(NODE_JOIN, message_pre_ring)
self.send(message_received['args']['addr'], message_to_send)
if message_received['method'] == NODE_JOIN or message_received['method'] == REQUEST_INFO:
args = message_received['args']
if args['id'] not in self.nodes_com:
self.nodes_com.append(args['id'])
self.logger.debug("Nodes that i know about: " + str(self.nodes_com))
if self.coordinator and args['id'] < self.id:
self.coordinator = False
self.logger.debug("I'm not the coordinator!")
if not self.coordinator and self.id <= min(self.nodes_com):
self.coordinator = True
self.logger.debug("I'm the coordinator!")
if args['id'] > self.successor_id and self.successor_id < self.id and len(self.nodes_com) > self.id + 1:
self.inside_ring = False
self.successor_id = self.max_nodes*2
self.successor_addr = self.addr
if (len(self.nodes_com) > 1 and self.id == max(self.nodes_com) and args['id'] == min(self.nodes_com)
or self.successor_id > args['id'] > self.id):
self.inside_ring = True
self.successor_id = args['id']
self.successor_addr = args['addr']
# self.logger.debug("Me: " + str(self.addr) + "\nSuccessor:" + str(self.successor_addr) + "\n")
elif message_received['method'] == NODE_DISCOVERY:
self.discoveryReply(message_received['args'])
elif message_received['method'] == ORDER:
message_received_copy = copy.deepcopy(message_received)
message_received_copy['args']['food'] = print_out(message_received_copy['args']['food'])
self.logger.debug("Message received from client: " + str(message_received_copy))
self.sendMessageToToken(self.entities['Waiter'], message_received['args'])
elif message_received['method'] == PICKUP:
self.logger.debug("Message received from client: " + str(message_received))
message_to_send = entities_message_create(PICK, message_received['args'])
self.sendMessageToToken(self.entities['Clerk'], message_to_send)
elif message_received['method'] == TOKEN:
id_destination = message_received['args']['id']
message_to_send = message_received
if id_destination == self.id:
self.in_queue.put(message_received['args']['order'])
message_to_send = nodes_message_create(TOKEN, token_message_create(None, None))
if self.out_queue.qsize() > 0 and (id_destination == self.id
or id_destination is None):
message_to_send = self.out_queue.get()
self.send(self.successor_addr, message_to_send)
else:
self.send(self.successor_addr, message_received)
if self.coordinator and self.inside_ring and len(self.nodes_com) == self.max_nodes:
if not self.allNodesDiscovered():
for entity in self.entities:
if self.entities[entity] is None:
message_to_discover = discovery_message_create(entity, None)
message_to_send = nodes_message_create(NODE_DISCOVERY, message_to_discover)
self.send(self.successor_addr, message_to_send)
elif not token_sent:
token_to_send = token_message_create(None, None)
message_to_send = nodes_message_create(TOKEN, token_to_send)
self.send(self.successor_addr, message_to_send)
token_sent = True
self.logger.debug("TOKEN SENT BEFORE %s SECONDS!", str(time.time() - delta_time))