From abcb7eb0d0606f793cbfd86a00a24d5f4a063d15 Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 26 Dec 2023 18:45:50 -0500 Subject: [PATCH] Revised the scheduler examples. --- examples/static_priority.py | 20 ++---- examples/virtual_clock.py | 61 +++++------------ examples/wfq.py | 51 +++++--------- ns/packet/dist_generator.py | 79 ++++++++++++---------- ns/packet/sink.py | 40 ++++++----- ns/scheduler/virtual_clock.py | 123 ++++++++++++++++++---------------- ns/scheduler/wfq.py | 9 ++- 7 files changed, 180 insertions(+), 203 deletions(-) diff --git a/examples/static_priority.py b/examples/static_priority.py index c429dc1..a3372ad 100644 --- a/examples/static_priority.py +++ b/examples/static_priority.py @@ -11,36 +11,28 @@ def arrival_1(): - """ Packets arrive with a constant interval of 1.5 seconds. """ + """Packets arrive with a constant interval of 1.5 seconds.""" return 1.5 def arrival_2(): - """ Packets arrive with a constant interval of 2.0 seconds. """ + """Packets arrive with a constant interval of 2.0 seconds.""" return 2.0 def packet_size(): - return 100 + return 1000 env = simpy.Environment() -sp1 = SPServer(env, - 100, { - 0: 1, - 1: 10 - }, - zero_downstream_buffer=True, - debug=True) -sp2 = SPServer(env, 100, {0: 50, 1: 100}, zero_buffer=True, debug=True) -ps = PacketSink(env, rec_flow_ids=False, debug=True) +sp1 = SPServer(env, 8000, {0: 1, 1: 2}, debug=True) +ps = PacketSink(env, debug=True) pg1 = DistPacketGenerator(env, "flow_1", arrival_1, packet_size, flow_id=0) pg2 = DistPacketGenerator(env, "flow_2", arrival_2, packet_size, flow_id=1) pg1.out = sp1 pg2.out = sp1 -sp1.out = sp2 -sp2.out = ps +sp1.out = ps env.run(until=20) diff --git a/examples/virtual_clock.py b/examples/virtual_clock.py index da24314..17f2824 100644 --- a/examples/virtual_clock.py +++ b/examples/virtual_clock.py @@ -22,30 +22,21 @@ def const_size(): env = simpy.Environment() -pg1 = DistPacketGenerator(env, - "flow_0", - packet_arrival, - const_size, - initial_delay=0.0, - finish=50, - flow_id=0) -pg2 = DistPacketGenerator(env, - "flow_1", - packet_arrival, - const_size, - initial_delay=10.0, - finish=50, - flow_id=1) +pg1 = DistPacketGenerator( + env, "flow_0", packet_arrival, const_size, initial_delay=0.0, finish=50, flow_id=0 +) +pg2 = DistPacketGenerator( + env, "flow_1", packet_arrival, const_size, initial_delay=10.0, finish=50, flow_id=1 +) ps = PacketSink(env) sink_1 = PacketSink(env) sink_2 = PacketSink(env) -source_rate = 8.0 * const_size() / packet_arrival() +source_rate = 4600.0 vc_server = VirtualClockServer(env, source_rate, [2, 1], debug=True) -monitor = ServerMonitor(env, - vc_server, - partial(expovariate, 0.1), - pkt_in_service_included=True) +monitor = ServerMonitor( + env, vc_server, partial(expovariate, 0.1), pkt_in_service_included=True +) splitter_1 = Splitter() splitter_2 = Splitter() @@ -61,19 +52,13 @@ def const_size(): env.run(until=1000) -print( - "At the Virtual Clock server, the queue lengths in # packets for flow 0 are:" -) +print("At the Virtual Clock server, the queue lengths in # packets for flow 0 are:") print(monitor.sizes[0]) -print( - "At the Virtual Clock server, the queue lengths in # packets for flow 1 are:" -) +print("At the Virtual Clock server, the queue lengths in # packets for flow 1 are:") print(monitor.sizes[1]) -print( - "At the Virtual Clock server, the queue lengths in bytes for flow 0 are:") +print("At the Virtual Clock server, the queue lengths in bytes for flow 0 are:") print(monitor.byte_sizes[0]) -print( - "At the Virtual Clock server, the queue lengths in bytes for flow 1 are:") +print("At the Virtual Clock server, the queue lengths in bytes for flow 1 are:") print(monitor.byte_sizes[1]) print("At the packet sink, packet arrival times for flow 0 are:") @@ -83,26 +68,16 @@ def const_size(): print(ps.arrivals[1]) fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, sharex=True) -ax1.vlines(sink_1.arrivals[0], - 0.0, - 1.0, - colors="g", - linewidth=2.0, - label='Flow 0') -ax1.vlines(sink_2.arrivals[1], - 0.0, - 0.7, - colors="r", - linewidth=2.0, - label='Flow 1') +ax1.vlines(sink_1.arrivals[0], 0.0, 1.0, colors="g", linewidth=2.0, label="Flow 0") +ax1.vlines(sink_2.arrivals[1], 0.0, 0.7, colors="r", linewidth=2.0, label="Flow 1") ax1.set_title("Arrival times at the VC server") ax1.set_ylim([0, 1.5]) ax1.set_xlim([0, max(sink_1.arrivals[0]) + 10]) ax1.grid(True) ax1.legend() -ax2.vlines(ps.arrivals[0], 0.0, 1.0, colors="g", linewidth=2.0, label='Flow 0') -ax2.vlines(ps.arrivals[1], 0.0, 0.7, colors="r", linewidth=2.0, label='Flow 1') +ax2.vlines(ps.arrivals[0], 0.0, 1.0, colors="g", linewidth=2.0, label="Flow 0") +ax2.vlines(ps.arrivals[1], 0.0, 0.7, colors="r", linewidth=2.0, label="Flow 1") ax2.set_title("Departure times from the VC server") ax2.set_xlabel("time") ax2.set_ylim([0, 1.5]) diff --git a/examples/wfq.py b/examples/wfq.py index 9193bef..1df8683 100644 --- a/examples/wfq.py +++ b/examples/wfq.py @@ -22,30 +22,21 @@ def const_size(): env = simpy.Environment() -pg1 = DistPacketGenerator(env, - "flow_0", - packet_arrival, - const_size, - initial_delay=0.0, - finish=50, - flow_id=0) -pg2 = DistPacketGenerator(env, - "flow_1", - packet_arrival, - const_size, - initial_delay=10.0, - finish=50, - flow_id=1) +pg1 = DistPacketGenerator( + env, "flow_0", packet_arrival, const_size, initial_delay=0.0, finish=50, flow_id=0 +) +pg2 = DistPacketGenerator( + env, "flow_1", packet_arrival, const_size, initial_delay=10.0, finish=50, flow_id=1 +) ps = PacketSink(env) sink_1 = PacketSink(env) sink_2 = PacketSink(env) -source_rate = 8.0 * const_size() / packet_arrival() -wfq_server = WFQServer(env, source_rate, [1, 2]) -monitor = ServerMonitor(env, - wfq_server, - partial(expovariate, 0.1), - pkt_in_service_included=True) +source_rate = 4600 +wfq_server = WFQServer(env, source_rate, [1, 2], debug=True) +monitor = ServerMonitor( + env, wfq_server, partial(expovariate, 0.1), pkt_in_service_included=True +) splitter_1 = Splitter() splitter_2 = Splitter() @@ -70,6 +61,8 @@ def const_size(): print("At the WFQ server, the queue lengths in bytes for flow 1 are:") print(monitor.byte_sizes[1]) +print(sink_1.arrivals[0]) +print(sink_2.arrivals[1]) print("At the packet sink, packet arrival times for flow 0 are:") print(ps.arrivals[0]) @@ -77,26 +70,16 @@ def const_size(): print(ps.arrivals[1]) fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, sharex=True) -ax1.vlines(sink_1.arrivals[0], - 0.0, - 1.0, - colors="g", - linewidth=2.0, - label='Flow 0') -ax1.vlines(sink_2.arrivals[1], - 0.0, - 0.7, - colors="r", - linewidth=2.0, - label='Flow 1') +ax1.vlines(sink_1.arrivals[0], 0.0, 1.0, colors="g", linewidth=2.0, label="Flow 0") +ax1.vlines(sink_2.arrivals[1], 0.0, 0.7, colors="r", linewidth=2.0, label="Flow 1") ax1.set_title("Arrival times at the WFQ server") ax1.set_ylim([0, 1.5]) ax1.set_xlim([0, max(sink_1.arrivals[0]) + 10]) ax1.grid(True) ax1.legend() -ax2.vlines(ps.arrivals[0], 0.0, 1.0, colors="g", linewidth=2.0, label='Flow 0') -ax2.vlines(ps.arrivals[1], 0.0, 0.7, colors="r", linewidth=2.0, label='Flow 1') +ax2.vlines(ps.arrivals[0], 0.0, 1.0, colors="g", linewidth=2.0, label="Flow 0") +ax2.vlines(ps.arrivals[1], 0.0, 0.7, colors="r", linewidth=2.0, label="Flow 1") ax2.set_title("Departure times from the WFQ server") ax2.set_xlabel("time") ax2.set_ylim([0, 1.5]) diff --git a/ns/packet/dist_generator.py b/ns/packet/dist_generator.py index be1e399..1bc7e35 100644 --- a/ns/packet/dist_generator.py +++ b/ns/packet/dist_generator.py @@ -9,36 +9,39 @@ class DistPacketGenerator: - """ Generates packets with a given inter-arrival time distribution. + """Generates packets with a given inter-arrival time distribution. - Parameters - ---------- - env: simpy.Environment - The simulation environment. - element_id: str - the ID of this element. - arrival_dist: function - A no-parameter function that returns the successive inter-arrival times of - the packets. - size_dist: function - A no-parameter function that returns the successive sizes of the packets. - initial_delay: number - Starts generation after an initial delay. Defaults to 0. - finish: number - Stops generation at the finish time. Defaults to infinite. - rec_flow: bool - Are we recording the statistics of packets generated? + Parameters + ---------- + env: simpy.Environment + The simulation environment. + element_id: str + the ID of this element. + arrival_dist: function + A no-parameter function that returns the successive inter-arrival times of + the packets. + size_dist: function + A no-parameter function that returns the successive sizes of the packets. + initial_delay: number + Starts generation after an initial delay. Defaults to 0. + finish: number + Stops generation at the finish time. Defaults to infinite. + rec_flow: bool + Are we recording the statistics of packets generated? """ - def __init__(self, - env, - element_id, - arrival_dist, - size_dist, - initial_delay=0, - finish=float("inf"), - flow_id=0, - rec_flow=False, - debug=False): + + def __init__( + self, + env, + element_id, + arrival_dist, + size_dist, + initial_delay=0, + finish=float("inf"), + flow_id=0, + rec_flow=False, + debug=False, + ): self.element_id = element_id self.env = env self.arrival_dist = arrival_dist @@ -59,15 +62,20 @@ def run(self): """The generator function used in simulations.""" yield self.env.timeout(self.initial_delay) while self.env.now < self.finish: + packet = Packet( + self.env.now, + self.size_dist(), + self.packets_sent, + src=self.element_id, + flow_id=self.flow_id, + ) + # wait for next transmission yield self.env.timeout(self.arrival_dist()) + self.out.put(packet) self.packets_sent += 1 - packet = Packet(self.env.now, - self.size_dist(), - self.packets_sent, - src=self.element_id, - flow_id=self.flow_id) + if self.rec_flow: self.time_rec.append(packet.time) self.size_rec.append(packet.size) @@ -75,6 +83,5 @@ def run(self): if self.debug: print( f"Sent packet {packet.packet_id} with flow_id {packet.flow_id} at " - f"time {self.env.now}.") - - self.out.put(packet) + f"time {self.env.now}." + ) diff --git a/ns/packet/sink.py b/ns/packet/sink.py index 6eed97e..e0f6851 100644 --- a/ns/packet/sink.py +++ b/ns/packet/sink.py @@ -11,7 +11,7 @@ class PacketSink: - """ A PacketSink is designed to record both arrival times and waiting times from the incoming + """A PacketSink is designed to record both arrival times and waiting times from the incoming packets. By default, it records absolute arrival times, but it can also be initialized to record inter-arrival times. @@ -33,13 +33,15 @@ class PacketSink: If True, prints more verbose debug information. """ - def __init__(self, - env, - rec_arrivals: bool = True, - absolute_arrivals: bool = True, - rec_waits: bool = True, - rec_flow_ids: bool = True, - debug: bool = False): + def __init__( + self, + env, + rec_arrivals: bool = True, + absolute_arrivals: bool = True, + rec_waits: bool = True, + rec_flow_ids: bool = True, + debug: bool = False, + ): self.store = simpy.Store(env) self.env = env self.rec_waits = rec_waits @@ -61,7 +63,7 @@ def __init__(self, self.debug = debug def put(self, packet): - """ Sends a packet to this element. """ + """Sends a packet to this element.""" now = self.env.now if self.rec_flow_ids: @@ -82,22 +84,26 @@ def put(self, packet): self.first_arrival[rec_index] = now if not self.absolute_arrivals: - self.arrivals[rec_index][ - -1] = now - self.last_arrival[rec_index] + self.arrivals[rec_index][-1] = now - self.last_arrival[rec_index] self.last_arrival[rec_index] = now if self.debug: - print("At time {:.2f}, packet {:d} {:d} arrived.".format( - now, packet.packet_id, packet.flow_id)) + print( + "At time {:.2f}, packet {:d} in flow {:d} arrived.".format( + now, packet.packet_id, packet.flow_id + ) + ) if self.rec_waits and len(self.packet_sizes[rec_index]) >= 10: bytes_received = sum(self.packet_sizes[rec_index][-9:]) time_elapsed = self.env.now - ( - self.packet_times[rec_index][-10] + - self.waits[rec_index][-10]) + self.packet_times[rec_index][-10] + self.waits[rec_index][-10] + ) print( - "Average throughput (last 10 packets): {:.2f} bytes/second." - .format(float(bytes_received) / time_elapsed)) + "Average throughput (last 10 packets): {:.2f} bytes/second.".format( + float(bytes_received) / time_elapsed + ) + ) self.packets_received[rec_index] += 1 self.bytes_received[rec_index] += packet.size diff --git a/ns/scheduler/virtual_clock.py b/ns/scheduler/virtual_clock.py index bceb795..70d13e2 100644 --- a/ns/scheduler/virtual_clock.py +++ b/ns/scheduler/virtual_clock.py @@ -14,45 +14,47 @@ class VirtualClockServer: - """ Implements a virtual clock server. - - Parameters - ---------- - env: simpy.Environment - The simulation environment. - rate: float - The bit rate of the port. - vticks: list or dict - This can be either a list or a dictionary. If it is a list, it uses the flow_id --- - or class_id, if class-based fair queueing is activated using the `flow_classes' - parameter below --- as its index to look for the flow (or class)'s corresponding - 'vtick'. If it is a dictionary, it contains (flow_id or class_id -> vtick) pairs - for each possible flow_id or class_id. We assume that the vticks are the inverse of - the desired rates for the corresponding flows, in bits per second. - flow_classes: function - This is a function that matches a packet's flow_ids to class_ids, used to implement - class-based Deficit Round Robin. The default is a lambda function that uses a packet's - flow_id as its class_id, which is equivalent to flow-based Virtual Clock. - zero_buffer: bool - Does this server have a zero-length buffer? This is useful when multiple - basic elements need to be put together to construct a more complex element - with a unified buffer. - zero_downstream_buffer: bool - Does this server's downstream element has a zero-length buffer? If so, packets - may queue up in this element's own buffer rather than be forwarded to the - next-hop element. - debug: bool - If True, prints more verbose debug information. + """Implements a virtual clock server. + + Parameters + ---------- + env: simpy.Environment + The simulation environment. + rate: float + The bit rate of the port. + vticks: list or dict + This can be either a list or a dictionary. If it is a list, it uses the flow_id --- + or class_id, if class-based fair queueing is activated using the `flow_classes' + parameter below --- as its index to look for the flow (or class)'s corresponding + 'vtick'. If it is a dictionary, it contains (flow_id or class_id -> vtick) pairs + for each possible flow_id or class_id. We assume that the vticks are the inverse of + the desired rates for the corresponding flows, in bits per second. + flow_classes: function + This is a function that matches a packet's flow_ids to class_ids, used to implement + class-based Deficit Round Robin. The default is a lambda function that uses a packet's + flow_id as its class_id, which is equivalent to flow-based Virtual Clock. + zero_buffer: bool + Does this server have a zero-length buffer? This is useful when multiple + basic elements need to be put together to construct a more complex element + with a unified buffer. + zero_downstream_buffer: bool + Does this server's downstream element has a zero-length buffer? If so, packets + may queue up in this element's own buffer rather than be forwarded to the + next-hop element. + debug: bool + If True, prints more verbose debug information. """ - def __init__(self, - env, - rate, - vticks, - flow_classes: Callable = lambda p: p.flow_id, - zero_buffer=False, - zero_downstream_buffer=False, - debug: bool = False): + def __init__( + self, + env, + rate, + vticks, + flow_classes: Callable = lambda p: p.flow_id, + zero_buffer=False, + zero_downstream_buffer=False, + debug: bool = False, + ): self.env = env self.rate = rate self.vticks = vticks @@ -70,12 +72,12 @@ def __init__(self, self.flow_queue_count[queue_id] = 0 elif isinstance(vticks, dict): - for (queue_id, __) in vticks.items(): + for queue_id, __ in vticks.items(): self.aux_vc[queue_id] = 0.0 self.v_clocks[queue_id] = 0.0 self.flow_queue_count[queue_id] = 0 else: - raise ValueError('vticks must be either a list or a dictionary.') + raise ValueError("vticks must be either a list or a dictionary.") self.out = None self.packets_received = 0 @@ -108,8 +110,10 @@ def update_stats(self, packet): raise ValueError("Error: the packet is from an unrecorded flow.") if self.debug: - print(f"Sent Packet {packet.packet_id} from flow {packet.flow_id} " - f"belonging to class {self.flow_classes(packet)} at time {self.env.now}") + print( + f"Sent Packet {packet.packet_id} from flow {packet.flow_id} " + f"belonging to class {self.flow_classes(packet)} at time {self.env.now}" + ) def update(self, packet): """ @@ -165,23 +169,23 @@ def run(self): yield self.env.timeout(packet.size * 8.0 / self.rate) self.update_stats(packet) - self.out.put(packet, - upstream_update=self.update, - upstream_store=self.store) + self.out.put( + packet, upstream_update=self.update, upstream_store=self.store + ) self.current_packet = None else: packet = yield self.store.get() - self.update(packet) self.current_packet = packet yield self.env.timeout(packet.size * 8.0 / self.rate) self.update_stats(packet) + self.update(packet) self.out.put(packet) self.current_packet = None def put(self, packet, upstream_update=None, upstream_store=None): - """ Sends a packet to this element. """ + """Sends a packet to this element.""" self.packets_received += 1 self.byte_sizes[self.flow_classes(packet)] += packet.size now = self.env.now @@ -197,12 +201,13 @@ def put(self, packet, upstream_update=None, upstream_store=None): # desired bits per second data rate. Hence, we multiply this # value by the size of the packet in bits. self.aux_vc[self.flow_classes(packet)] = max( - now, self.aux_vc[self.flow_classes(packet)]) - self.v_clocks[self.flow_classes( - packet)] = self.v_clocks[self.flow_classes(packet)] + self.vticks[ - self.flow_classes(packet)] * packet.size * 8.0 - self.aux_vc[self.flow_classes(packet)] += self.vticks[ - self.flow_classes(packet)] + now, self.aux_vc[self.flow_classes(packet)] + ) + self.v_clocks[self.flow_classes(packet)] = ( + self.v_clocks[self.flow_classes(packet)] + + self.vticks[self.flow_classes(packet)] * packet.size * 8.0 + ) + self.aux_vc[self.flow_classes(packet)] += self.vticks[self.flow_classes(packet)] # Lots of work to do here to implement the queueing discipline @@ -211,14 +216,18 @@ def put(self, packet, upstream_update=None, upstream_store=None): f"Packet arrived at {self.env.now}, with flow_id {packet.flow_id}, " f"belong to class {self.flow_classes(packet)}, " f"packet_id {packet.packet_id}, virtual clocks {self.v_clocks[self.flow_classes(packet)]}, " - f"aux_vc {self.aux_vc[self.flow_classes(packet)]}") - - if self.zero_buffer and upstream_update is not None and upstream_store is not None: + f"aux_vc {self.aux_vc[self.flow_classes(packet)]}" + ) + + if ( + self.zero_buffer + and upstream_update is not None + and upstream_store is not None + ): self.upstream_stores[packet] = upstream_store self.upstream_updates[packet] = upstream_update if self.zero_downstream_buffer: - self.downstream_store.put( - (self.aux_vc[self.flow_classes(packet)], packet)) + self.downstream_store.put((self.aux_vc[self.flow_classes(packet)], packet)) return self.store.put((self.aux_vc[self.flow_classes(packet)], packet)) diff --git a/ns/scheduler/wfq.py b/ns/scheduler/wfq.py index fe4eaf4..d9aabf6 100644 --- a/ns/scheduler/wfq.py +++ b/ns/scheduler/wfq.py @@ -115,7 +115,13 @@ def update_stats(self, packet): # Computing the new set of active flow classes self.flow_queue_count[self.flow_classes(packet)] -= 1 + print( + f"Before removing {self.flow_classes(packet)} from the active set at time {now}." + ) + print(f"flow_queue_count = {self.flow_queue_count}") + if self.flow_queue_count[self.flow_classes(packet)] == 0: + print(f"Removing {self.flow_classes(packet)} from the active set.") self.active_set.remove(self.flow_classes(packet)) if len(self.active_set) == 0: @@ -133,7 +139,7 @@ def update_stats(self, packet): if self.debug: print( f"Sent Packet {packet.packet_id} from flow {packet.flow_id} " - f"belonging to class {self.flow_classes(packet)}" + f"belonging to class {self.flow_classes(packet)} at time {now}." ) def update(self, packet): @@ -223,7 +229,6 @@ def put(self, packet, upstream_update=None, upstream_store=None): for i in self.active_set: weight_sum += self.weights[i] - self.vtime += (now - self.last_update) / weight_sum self.finish_times[self.flow_classes(packet)] = max( self.finish_times[self.flow_classes(packet)], self.vtime ) + packet.size * 8.0 / (