diff --git a/examples/fattree.py b/examples/fattree.py index 979b3e4..396390b 100644 --- a/examples/fattree.py +++ b/examples/fattree.py @@ -13,30 +13,32 @@ env = simpy.Environment() -n_flows = 100 -k = 4 +n_flows = 128 +k = 8 pir = 100000 buffer_size = 1000 -mean_pkt_size = 100.0 +mean_pkt_size = 1000.0 + + +def size_dist(): + return 1000 + + +def arrival_dist(): + return 1.0 + ft = build_fattree(k) hosts = set() for n in ft.nodes(): - if ft.nodes[n]['type'] == 'host': + if ft.nodes[n]["type"] == "host": hosts.add(n) all_flows = generate_flows(ft, hosts, n_flows) -size_dist = partial(expovariate, 1.0 / mean_pkt_size) for fid in all_flows: - arr_dist = partial(expovariate, 1 + np.random.rand()) - - pg = DistPacketGenerator(env, - f"Flow_{fid}", - arr_dist, - size_dist, - flow_id=fid) + pg = DistPacketGenerator(env, f"Flow_{fid}", arrival_dist, size_dist, flow_id=fid) ps = PacketSink(env) all_flows[fid].pkt_gen = pg @@ -55,31 +57,24 @@ def flow_to_classes(packet, n_id=0, fib=None): for node_id in ft.nodes(): node = ft.nodes[node_id] # node['device'] = SimplePacketSwitch(env, k, pir, buffer_size, element_id=f"{node_id}") - flow_classes = partial(flow_to_classes, - n_id=node_id, - fib=node['flow_to_port']) - node['device'] = FairPacketSwitch(env, - k, - pir, - buffer_size, - weights, - 'DRR', - flow_classes, - element_id=f"{node_id}") - node['device'].demux.fib = node['flow_to_port'] + flow_classes = partial(flow_to_classes, n_id=node_id, fib=node["flow_to_port"]) + node["device"] = FairPacketSwitch( + env, k, pir, buffer_size, weights, "DRR", flow_classes, element_id=f"{node_id}" + ) + node["device"].demux.fib = node["flow_to_port"] for n in ft.nodes(): node = ft.nodes[n] - for port_number, next_hop in node['port_to_nexthop'].items(): - node['device'].ports[port_number].out = ft.nodes[next_hop]['device'] + for port_number, next_hop in node["port_to_nexthop"].items(): + node["device"].ports[port_number].out = ft.nodes[next_hop]["device"] for flow_id, flow in all_flows.items(): - flow.pkt_gen.out = ft.nodes[flow.src]['device'] - ft.nodes[flow.dst]['device'].demux.ends[flow_id] = flow.pkt_sink + flow.pkt_gen.out = ft.nodes[flow.src]["device"] + ft.nodes[flow.dst]["device"].demux.ends[flow_id] = flow.pkt_sink -env.run(until=100) +env.run(until=1500) -for flow_id in sample(all_flows.keys(), 5): +for flow_id in sample(sorted(all_flows.keys()), 5): print(f"Flow {flow_id}") print(all_flows[flow_id].pkt_sink.waits) print(all_flows[flow_id].pkt_sink.arrivals) diff --git a/ns/port/port.py b/ns/port/port.py index 79d1d7d..d7c56dd 100644 --- a/ns/port/port.py +++ b/ns/port/port.py @@ -6,39 +6,41 @@ class Port: - """ Models an output port on a switch with a given rate and buffer size (in either bytes - or the number of packets), using the simple tail-drop mechanism to drop packets. - - Parameters - ---------- - env: simpy.Environment - the simulation environment. - rate: float - the bit rate of the port (0 for unlimited). - element_id: int - the element id of this port. - qlimit: integer (or None) - a queue limit in bytes or packets (including the packet in service), beyond - which all packets will be dropped. - limit_bytes: bool - if True, the queue limit will be based on bytes; if False, the queue limit - will be based on packets. - zero_downstream_buffer: bool - if True, assume that the downstream element does not have any buffers, - and backpressure is in effect so that all waiting packets queue up in this - element's buffer. - debug: bool - If True, prints more verbose debug information. + """Models an output port on a switch with a given rate and buffer size (in either bytes + or the number of packets), using the simple tail-drop mechanism to drop packets. + + Parameters + ---------- + env: simpy.Environment + the simulation environment. + rate: float + the bit rate of the port (0 for unlimited). + element_id: int + the element id of this port. + qlimit: integer (or None) + a queue limit in bytes or packets (including the packet in service), beyond + which all packets will be dropped. + limit_bytes: bool + if True, the queue limit will be based on bytes; if False, the queue limit + will be based on packets. + zero_downstream_buffer: bool + if True, assume that the downstream element does not have any buffers, + and backpressure is in effect so that all waiting packets queue up in this + element's buffer. + debug: bool + If True, prints more verbose debug information. """ - def __init__(self, - env, - rate: float, - qlimit: int = None, - limit_bytes: bool = False, - zero_downstream_buffer: bool = False, - element_id: int = None, - debug: bool = False): + def __init__( + self, + env, + rate: float, + qlimit: int = None, + limit_bytes: bool = False, + zero_downstream_buffer: bool = False, + element_id: int = None, + debug: bool = False, + ): self.store = simpy.Store(env) self.rate = rate self.env = env @@ -67,9 +69,7 @@ def update(self, packet): """ # There is nothing that needs to be done, just print a debug message if self.debug: - print( - f"Retrieved Packet {packet.packet_id} from flow {packet.flow_id}." - ) + print(f"Retrieved Packet {packet.packet_id} from flow {packet.flow_id}.") def run(self): """The generator function used in simulations.""" @@ -87,9 +87,9 @@ def run(self): self.byte_size -= packet.size if self.zero_downstream_buffer: - self.out.put(packet, - upstream_update=self.update, - upstream_store=self.store) + self.out.put( + packet, upstream_update=self.update, upstream_store=self.store + ) else: self.out.put(packet) @@ -97,7 +97,7 @@ def run(self): self.busy_packet_size = 0 def put(self, packet): - """ Sends a packet to this element. """ + """Sends a packet to this element.""" self.packets_received += 1 if self.zero_downstream_buffer: @@ -132,8 +132,7 @@ def put(self, packet): else: # If the packet has not been dropped, record the queue length at this port if self.debug: - print( - f"Queue length at port: {len(self.store.items)} packets.") + print(f"Queue length at port: {len(self.store.items)} packets.") self.byte_size = byte_count diff --git a/ns/topos/utils.py b/ns/topos/utils.py index 2bd927f..19bddfd 100644 --- a/ns/topos/utils.py +++ b/ns/topos/utils.py @@ -15,11 +15,11 @@ def read_topo(fname): def generate_flows(G, hosts, nflows): all_flows = dict() for flow_id in range(nflows): - src, dst = sample(hosts, 2) + src, dst = sample(sorted(hosts), 2) all_flows[flow_id] = Flow(flow_id, src, dst) all_flows[flow_id].path = sample( - list(nx.all_simple_paths(G, src, dst, cutoff=nx.diameter(G))), - 1)[0] + list(nx.all_simple_paths(G, src, dst, cutoff=nx.diameter(G))), 1 + )[0] return all_flows @@ -27,23 +27,22 @@ def generate_fib(G, all_flows): for n in G.nodes(): node = G.nodes[n] - node['port_to_nexthop'] = dict() - node['nexthop_to_port'] = dict() + node["port_to_nexthop"] = dict() + node["nexthop_to_port"] = dict() for port, nh in enumerate(nx.neighbors(G, n)): - node['nexthop_to_port'][nh] = port - node['port_to_nexthop'][port] = nh + node["nexthop_to_port"][nh] = port + node["port_to_nexthop"][port] = nh - node['flow_to_port'] = dict() - node['flow_to_nexthop'] = dict() + node["flow_to_port"] = dict() + node["flow_to_nexthop"] = dict() for f in all_flows: flow = all_flows[f] path = list(zip(flow.path, flow.path[1:])) for seg in path: a, z = seg - G.nodes[a]['flow_to_port'][ - flow.fid] = G.nodes[a]['nexthop_to_port'][z] - G.nodes[a]['flow_to_nexthop'][flow.fid] = z + G.nodes[a]["flow_to_port"][flow.fid] = G.nodes[a]["nexthop_to_port"][z] + G.nodes[a]["flow_to_nexthop"][flow.fid] = z return G