diff --git a/examples/fattree_tcp.py b/examples/fattree_tcp.py new file mode 100644 index 0000000..51713d1 --- /dev/null +++ b/examples/fattree_tcp.py @@ -0,0 +1,98 @@ +from functools import partial +from random import expovariate, sample + +import numpy as np +import simpy + +from ns.flow.cubic import TCPCubic +from ns.packet.tcp_generator import TCPPacketGenerator +from ns.packet.tcp_sink import TCPSink +from ns.switch.switch import SimplePacketSwitch +from ns.switch.switch import FairPacketSwitch +from ns.topos.fattree import build as build_fattree +from ns.topos.utils import generate_fib, generate_flows + +env = simpy.Environment() + +n_flows = 20 +finish_time = 10 +k = 4 +pir = 40960 +buffer_size = 1000 + + +def size_dist(): + return 512 + + +def arrival_dist(): + return 1.0 + + +ft = build_fattree(k) + +hosts = set() +for n in ft.nodes(): + if ft.nodes[n]["type"] == "host": + hosts.add(n) + +all_flows = generate_flows( + ft, + hosts, + n_flows, + finish_time=finish_time, + arrival_dist=arrival_dist, + size_dist=size_dist, +) + +for fid in all_flows: + pg = TCPPacketGenerator( + env, all_flows[fid], cc=TCPCubic(), element_id=fid, rtt_estimate=0.5, debug=True + ) + ps = TCPSink(env) + + all_flows[fid].pkt_gen = pg + all_flows[fid].pkt_sink = ps + +ft = generate_fib(ft, all_flows, tcp=True) + +n_classes_per_port = 4 +weights = {c: 1 for c in range(n_classes_per_port)} + + +def flow_to_classes(packet, n_id=0, fib=None): + return (packet.flow_id + n_id + fib[packet.flow_id]) % n_classes_per_port + + +for node_id in ft.nodes(): + node = ft.nodes[node_id] + flow_classes = partial(flow_to_classes, n_id=node_id, fib=node["flow_to_port"]) + # node["device"] = FairPacketSwitch( + # env, k, pir, buffer_size, weights, "DRR", flow_classes, element_id=f"{node_id}" + # ) + node["device"] = SimplePacketSwitch( + env, k, pir, buffer_size, element_id=f"{node_id}" + ) + node["device"].demux.fib = node["flow_to_port"] + +for n in ft.nodes(): + node = ft.nodes[n] + for port_number, next_hop in node["port_to_nexthop"].items(): + node["device"].ports[port_number].out = ft.nodes[next_hop]["device"] + + +for flow_id, flow in all_flows.items(): + flow.pkt_gen.out = ft.nodes[flow.src]["device"] + ft.nodes[flow.dst]["device"].demux.ends[flow_id] = flow.pkt_sink + + flow.pkt_sink.out = ft.nodes[flow.dst]["device"] + ft.nodes[flow.src]["device"].demux.ends[flow_id + 10000] = flow.pkt_gen + + +env.run(until=100) + +for flow_id in sample(sorted(all_flows.keys()), 5): + print(f"Flow {flow_id}") + # print(all_flows[flow_id].pkt_sink.waits) + # print(all_flows[flow_id].pkt_sink.arrivals) + # print(all_flows[flow_id].pkt_sink.perhop_times) diff --git a/ns/packet/tcp_generator.py b/ns/packet/tcp_generator.py index eed3a0a..84d914e 100644 --- a/ns/packet/tcp_generator.py +++ b/ns/packet/tcp_generator.py @@ -104,9 +104,13 @@ def run(self): if self.debug: print( - "Sent packet {:d} with size {:d}, " + "TCPPacketGenerator {:d} sent packet {:d} with size {:d}, " "flow_id {:d} at time {:.4f}.".format( - packet.packet_id, packet.size, packet.flow_id, self.env.now + self.element_id, + packet.packet_id, + packet.size, + packet.flow_id, + self.env.now, ) ) @@ -122,8 +126,10 @@ def run(self): if self.debug: print( - "Setting a timer for packet {:d} with an RTO" - " of {:.4f}.".format(packet.packet_id, self.rto) + " TCPPacketGenerator {:d} is setting a timer for packet {:d} with an RTO" + " of {:.4f}.".format( + self.element_id, packet.packet_id, self.rto + ) ) else: # No further space in the congestion window to transmit packets @@ -134,8 +140,8 @@ def timeout_callback(self, packet_id=0): """To be called when a timer expired for a packet with 'packet_id'.""" if self.debug: print( - "Timer expired for packet {:d} at time {:.4f}.".format( - packet_id, self.env.now + "TCPPacketGenerator {:d}'s Timer expired for packet {:d} at time {:.4f}.".format( + self.element_id, packet_id, self.env.now ) ) @@ -147,8 +153,11 @@ def timeout_callback(self, packet_id=0): if self.debug: print( - "Resending packet {:d} with flow_id {:d} at time {:.4f}.".format( - resent_pkt.packet_id, resent_pkt.flow_id, self.env.now + "TCPPacketGenerator {:d} is resending packet {:d} with flow_id {:d} at time {:.4f}.".format( + self.element_id, + resent_pkt.packet_id, + resent_pkt.flow_id, + self.env.now, ) ) @@ -171,12 +180,17 @@ def put(self, ack): if self.dupack == 3: self.congestion_control.consecutive_dupacks_received() + print("!!!", self.element_id, self.sent_packets) + resent_pkt = self.sent_packets[ack.ack] resent_pkt.time = self.env.now if self.debug: print( - "Resending packet {:d} with flow_id {:d} at time {:.4f}.".format( - resent_pkt.packet_id, resent_pkt.flow_id, self.env.now + "TCPPacketGenerator {:d} is resending packet {:d} with flow_id {:d} at time {:.4f}.".format( + self.element_id, + resent_pkt.packet_id, + resent_pkt.flow_id, + self.env.now, ) ) @@ -199,9 +213,13 @@ def put(self, ack): if self.debug: print( - "Sent packet {:d} with size {:d}, " + "TCPPacketGenerator {:d} sent packet {:d} with size {:d}, " "flow_id {:d} at time {:.4f} as dupack > 3.".format( - packet.packet_id, packet.size, packet.flow_id, self.env.now + self.element_id, + packet.packet_id, + packet.size, + packet.flow_id, + self.env.now, ) ) @@ -217,8 +235,10 @@ def put(self, ack): if self.debug: print( - "Setting a timer for packet {:d} with an RTO" - " of {:.4f}.".format(packet.packet_id, self.rto) + "TCPPacketGenerator {:d} is setting a timer for packet {:d} with an RTO" + " of {:.4f}.".format( + self.element_id, packet.packet_id, self.rto + ) ) return @@ -238,13 +258,13 @@ def put(self, ack): if self.debug: print( - "Ack received till sequence number {:d} at time {:.4f}.".format( - ack.ack, self.env.now + "TCPPacketGenerator {:d} received Ack till sequence number {:d} at time {:.4f}.".format( + self.element_id, ack.ack, self.env.now ) ) print( - "Congestion window size = {:.1f}, last ack = {:d}.".format( - self.congestion_control.cwnd, self.last_ack + "TCPPacketGenerator {:d} congestion window size = {:.1f}, last ack = {:d}.".format( + self.element_id, self.congestion_control.cwnd, self.last_ack ) ) @@ -259,8 +279,8 @@ def put(self, ack): for packet_id in acked_packets: if self.debug: print( - "Stopped timer {:d} at time {:.4f}.".format( - packet_id, self.env.now + "TCPPacketGenerator {:d} stopped timer {:d} at time {:.4f}.".format( + self.element_id, packet_id, self.env.now ) ) self.timers[packet_id].stop() diff --git a/ns/topos/utils.py b/ns/topos/utils.py index aa7a419..25c11c9 100644 --- a/ns/topos/utils.py +++ b/ns/topos/utils.py @@ -12,18 +12,36 @@ def read_topo(fname): print(f"{fname} is not GraphML") -def generate_flows(G, hosts, nflows): +def generate_flows( + G, + hosts, + nflows, + size=None, + start_time=None, + finish_time=None, + arrival_dist=None, + size_dist=None, +): all_flows = dict() for flow_id in range(nflows): src, dst = sample(sorted(hosts), 2) - all_flows[flow_id] = Flow(flow_id, src, dst) + all_flows[flow_id] = Flow( + flow_id, + src, + dst, + size=size, + start_time=start_time, + finish_time=finish_time, + arrival_dist=arrival_dist, + size_dist=size_dist, + ) # all_flows[flow_id].path = sample( # list(nx.all_simple_paths(G, src, dst, cutoff=nx.diameter(G))), 1 all_flows[flow_id].path = sample(list(nx.all_shortest_paths(G, src, dst)), 1)[0] return all_flows -def generate_fib(G, all_flows): +def generate_fib(G, all_flows, tcp=False): for n in G.nodes(): node = G.nodes[n] @@ -45,4 +63,11 @@ def generate_fib(G, all_flows): G.nodes[a]["flow_to_port"][flow.fid] = G.nodes[a]["nexthop_to_port"][z] G.nodes[a]["flow_to_nexthop"][flow.fid] = z + # generates reverse fib for TCPSink sending Ack to TCPSource + if tcp: + G.nodes[z]["flow_to_port"][flow.fid + 10000] = G.nodes[z][ + "nexthop_to_port" + ][a] + G.nodes[z]["flow_to_nexthop"][flow.fid + 10000] = a + return G