Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modified FIB for TCP. #24

Merged
merged 3 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions examples/fattree_tcp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from functools import partial
from random import expovariate, sample

import numpy as np
import simpy

from ns.flow.cubic import TCPCubic
from ns.packet.tcp_generator import TCPPacketGenerator
from ns.packet.tcp_sink import TCPSink
from ns.switch.switch import SimplePacketSwitch
from ns.switch.switch import FairPacketSwitch
from ns.topos.fattree import build as build_fattree
from ns.topos.utils import generate_fib, generate_flows

env = simpy.Environment()

n_flows = 20
finish_time = 10
k = 4
pir = 40960
buffer_size = 1000


def size_dist():
return 512


def arrival_dist():
return 1.0


ft = build_fattree(k)

hosts = set()
for n in ft.nodes():
if ft.nodes[n]["type"] == "host":
hosts.add(n)

all_flows = generate_flows(
ft,
hosts,
n_flows,
finish_time=finish_time,
arrival_dist=arrival_dist,
size_dist=size_dist,
)

for fid in all_flows:
pg = TCPPacketGenerator(
env, all_flows[fid], cc=TCPCubic(), element_id=fid, rtt_estimate=0.5, debug=True
)
ps = TCPSink(env)

all_flows[fid].pkt_gen = pg
all_flows[fid].pkt_sink = ps

ft = generate_fib(ft, all_flows, tcp=True)

n_classes_per_port = 4
weights = {c: 1 for c in range(n_classes_per_port)}


def flow_to_classes(packet, n_id=0, fib=None):
return (packet.flow_id + n_id + fib[packet.flow_id]) % n_classes_per_port


for node_id in ft.nodes():
node = ft.nodes[node_id]
flow_classes = partial(flow_to_classes, n_id=node_id, fib=node["flow_to_port"])
# node["device"] = FairPacketSwitch(
# env, k, pir, buffer_size, weights, "DRR", flow_classes, element_id=f"{node_id}"
# )
node["device"] = SimplePacketSwitch(
env, k, pir, buffer_size, element_id=f"{node_id}"
)
node["device"].demux.fib = node["flow_to_port"]

for n in ft.nodes():
node = ft.nodes[n]
for port_number, next_hop in node["port_to_nexthop"].items():
node["device"].ports[port_number].out = ft.nodes[next_hop]["device"]


for flow_id, flow in all_flows.items():
flow.pkt_gen.out = ft.nodes[flow.src]["device"]
ft.nodes[flow.dst]["device"].demux.ends[flow_id] = flow.pkt_sink

flow.pkt_sink.out = ft.nodes[flow.dst]["device"]
ft.nodes[flow.src]["device"].demux.ends[flow_id + 10000] = flow.pkt_gen


env.run(until=100)

for flow_id in sample(sorted(all_flows.keys()), 5):
print(f"Flow {flow_id}")
# print(all_flows[flow_id].pkt_sink.waits)
# print(all_flows[flow_id].pkt_sink.arrivals)
# print(all_flows[flow_id].pkt_sink.perhop_times)
60 changes: 40 additions & 20 deletions ns/packet/tcp_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,13 @@ def run(self):

if self.debug:
print(
"Sent packet {:d} with size {:d}, "
"TCPPacketGenerator {:d} sent packet {:d} with size {:d}, "
"flow_id {:d} at time {:.4f}.".format(
packet.packet_id, packet.size, packet.flow_id, self.env.now
self.element_id,
packet.packet_id,
packet.size,
packet.flow_id,
self.env.now,
)
)

Expand All @@ -122,8 +126,10 @@ def run(self):

if self.debug:
print(
"Setting a timer for packet {:d} with an RTO"
" of {:.4f}.".format(packet.packet_id, self.rto)
" TCPPacketGenerator {:d} is setting a timer for packet {:d} with an RTO"
" of {:.4f}.".format(
self.element_id, packet.packet_id, self.rto
)
)
else:
# No further space in the congestion window to transmit packets
Expand All @@ -134,8 +140,8 @@ def timeout_callback(self, packet_id=0):
"""To be called when a timer expired for a packet with 'packet_id'."""
if self.debug:
print(
"Timer expired for packet {:d} at time {:.4f}.".format(
packet_id, self.env.now
"TCPPacketGenerator {:d}'s Timer expired for packet {:d} at time {:.4f}.".format(
self.element_id, packet_id, self.env.now
)
)

Expand All @@ -147,8 +153,11 @@ def timeout_callback(self, packet_id=0):

if self.debug:
print(
"Resending packet {:d} with flow_id {:d} at time {:.4f}.".format(
resent_pkt.packet_id, resent_pkt.flow_id, self.env.now
"TCPPacketGenerator {:d} is resending packet {:d} with flow_id {:d} at time {:.4f}.".format(
self.element_id,
resent_pkt.packet_id,
resent_pkt.flow_id,
self.env.now,
)
)

Expand All @@ -171,12 +180,17 @@ def put(self, ack):
if self.dupack == 3:
self.congestion_control.consecutive_dupacks_received()

print("!!!", self.element_id, self.sent_packets)

resent_pkt = self.sent_packets[ack.ack]
resent_pkt.time = self.env.now
if self.debug:
print(
"Resending packet {:d} with flow_id {:d} at time {:.4f}.".format(
resent_pkt.packet_id, resent_pkt.flow_id, self.env.now
"TCPPacketGenerator {:d} is resending packet {:d} with flow_id {:d} at time {:.4f}.".format(
self.element_id,
resent_pkt.packet_id,
resent_pkt.flow_id,
self.env.now,
)
)

Expand All @@ -199,9 +213,13 @@ def put(self, ack):

if self.debug:
print(
"Sent packet {:d} with size {:d}, "
"TCPPacketGenerator {:d} sent packet {:d} with size {:d}, "
"flow_id {:d} at time {:.4f} as dupack > 3.".format(
packet.packet_id, packet.size, packet.flow_id, self.env.now
self.element_id,
packet.packet_id,
packet.size,
packet.flow_id,
self.env.now,
)
)

Expand All @@ -217,8 +235,10 @@ def put(self, ack):

if self.debug:
print(
"Setting a timer for packet {:d} with an RTO"
" of {:.4f}.".format(packet.packet_id, self.rto)
"TCPPacketGenerator {:d} is setting a timer for packet {:d} with an RTO"
" of {:.4f}.".format(
self.element_id, packet.packet_id, self.rto
)
)

return
Expand All @@ -238,13 +258,13 @@ def put(self, ack):

if self.debug:
print(
"Ack received till sequence number {:d} at time {:.4f}.".format(
ack.ack, self.env.now
"TCPPacketGenerator {:d} received Ack till sequence number {:d} at time {:.4f}.".format(
self.element_id, ack.ack, self.env.now
)
)
print(
"Congestion window size = {:.1f}, last ack = {:d}.".format(
self.congestion_control.cwnd, self.last_ack
"TCPPacketGenerator {:d} congestion window size = {:.1f}, last ack = {:d}.".format(
self.element_id, self.congestion_control.cwnd, self.last_ack
)
)

Expand All @@ -259,8 +279,8 @@ def put(self, ack):
for packet_id in acked_packets:
if self.debug:
print(
"Stopped timer {:d} at time {:.4f}.".format(
packet_id, self.env.now
"TCPPacketGenerator {:d} stopped timer {:d} at time {:.4f}.".format(
self.element_id, packet_id, self.env.now
)
)
self.timers[packet_id].stop()
Expand Down
31 changes: 28 additions & 3 deletions ns/topos/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,36 @@ def read_topo(fname):
print(f"{fname} is not GraphML")


def generate_flows(G, hosts, nflows):
def generate_flows(
G,
hosts,
nflows,
size=None,
start_time=None,
finish_time=None,
arrival_dist=None,
size_dist=None,
):
all_flows = dict()
for flow_id in range(nflows):
src, dst = sample(sorted(hosts), 2)
all_flows[flow_id] = Flow(flow_id, src, dst)
all_flows[flow_id] = Flow(
flow_id,
src,
dst,
size=size,
start_time=start_time,
finish_time=finish_time,
arrival_dist=arrival_dist,
size_dist=size_dist,
)
# all_flows[flow_id].path = sample(
# list(nx.all_simple_paths(G, src, dst, cutoff=nx.diameter(G))), 1
all_flows[flow_id].path = sample(list(nx.all_shortest_paths(G, src, dst)), 1)[0]
return all_flows


def generate_fib(G, all_flows):
def generate_fib(G, all_flows, tcp=False):
for n in G.nodes():
node = G.nodes[n]

Expand All @@ -45,4 +63,11 @@ def generate_fib(G, all_flows):
G.nodes[a]["flow_to_port"][flow.fid] = G.nodes[a]["nexthop_to_port"][z]
G.nodes[a]["flow_to_nexthop"][flow.fid] = z

# generates reverse fib for TCPSink sending Ack to TCPSource
if tcp:
G.nodes[z]["flow_to_port"][flow.fid + 10000] = G.nodes[z][
"nexthop_to_port"
][a]
G.nodes[z]["flow_to_nexthop"][flow.fid + 10000] = a

return G
Loading