Skip to content

Commit

Permalink
Merge pull request #74 from named-data/patch-4
Browse files Browse the repository at this point in the history
Another round of various fixes
  • Loading branch information
pulsejet authored Dec 18, 2024
2 parents 6349e6b + f11fbeb commit 82be97a
Show file tree
Hide file tree
Showing 15 changed files with 259 additions and 292 deletions.
8 changes: 7 additions & 1 deletion fw/dispatch/face.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ type Face interface {

State() defn.State

SendPacket(packet *defn.Pkt)
SendPacket(out OutPkt)
}

type OutPkt struct {
Pkt *defn.Pkt
PitToken []byte
InFace *uint64
}

// FaceDispatch is used to allow forwarding to interact with faces without a circular dependency issue.
Expand Down
12 changes: 6 additions & 6 deletions fw/face/link-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type LinkService interface {
Run(initial []byte)

// Add a packet to the send queue for this link service
SendPacket(packet *defn.Pkt)
SendPacket(out dispatch.OutPkt)
// Synchronously handle an incoming frame and dispatch to fw
handleIncomingFrame(frame []byte)

Expand All @@ -63,7 +63,7 @@ type linkServiceBase struct {
faceID uint64
transport transport
stopped chan bool
sendQueue chan *defn.Pkt
sendQueue chan dispatch.OutPkt

// Counters
nInInterests uint64
Expand Down Expand Up @@ -93,7 +93,7 @@ func (l *linkServiceBase) SetFaceID(faceID uint64) {

func (l *linkServiceBase) makeLinkServiceBase() {
l.stopped = make(chan bool)
l.sendQueue = make(chan *defn.Pkt, faceQueueSize)
l.sendQueue = make(chan dispatch.OutPkt, faceQueueSize)
}

//
Expand Down Expand Up @@ -207,14 +207,14 @@ func (l *linkServiceBase) Close() {
//

// SendPacket adds a packet to the send queue for this link service
func (l *linkServiceBase) SendPacket(packet *defn.Pkt) {
func (l *linkServiceBase) SendPacket(out dispatch.OutPkt) {
select {
case l.sendQueue <- packet:
case l.sendQueue <- out:
// Packet queued successfully
core.LogTrace(l, "Queued packet for Link Service")
default:
// Drop packet due to congestion
core.LogWarn(l, "Dropped packet due to congestion")
core.LogDebug(l, "Dropped packet due to congestion")

// TODO: Signal congestion
}
Expand Down
66 changes: 29 additions & 37 deletions fw/face/ndnlp-link-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/named-data/YaNFD/core"
defn "github.com/named-data/YaNFD/defn"
"github.com/named-data/YaNFD/dispatch"
enc "github.com/zjkmxy/go-ndn/pkg/encoding"
spec "github.com/zjkmxy/go-ndn/pkg/ndn/spec_2022"
"github.com/zjkmxy/go-ndn/pkg/utils"
Expand Down Expand Up @@ -171,7 +172,8 @@ func (l *NDNLPLinkService) runSend() {
}
}

func sendPacket(l *NDNLPLinkService, pkt *defn.Pkt) {
func sendPacket(l *NDNLPLinkService, out dispatch.OutPkt) {
pkt := out.Pkt
wire := pkt.Raw

// Counters
Expand Down Expand Up @@ -201,33 +203,22 @@ func sendPacket(l *NDNLPLinkService, pkt *defn.Pkt) {

// Split up fragment
nFragments := int((len(wire) + effectiveMtu - 1) / effectiveMtu)
lastFragSize := len(wire) - effectiveMtu*(nFragments-1)
fragments = make([]*spec.LpPacket, nFragments)
reader := enc.NewBufferReader(wire)
for i := 0; i < nFragments; i++ {
if i < nFragments-1 {
frag, err := reader.ReadWire(effectiveMtu)
if err != nil {
core.LogFatal(l, "Unexpected Wire reading error")
}
fragments[i] = &spec.LpPacket{
Fragment: frag,
}
} else {
frag, err := reader.ReadWire(lastFragSize)
if err != nil {
core.LogFatal(l, "Unexpected Wire reading error")
}
fragments[i] = &spec.LpPacket{
Fragment: frag,
}
readSize := effectiveMtu
if i == nFragments-1 {
readSize = len(wire) - effectiveMtu*(nFragments-1)
}

frag, err := reader.ReadWire(readSize)
if err != nil {
core.LogFatal(l, "Unexpected Wire reading error")
}
fragments[i] = &spec.LpPacket{Fragment: frag}
}
} else {
fragments = make([]*spec.LpPacket, 1)
fragments[0] = &spec.LpPacket{
Fragment: enc.Wire{wire},
}
fragments = []*spec.LpPacket{{Fragment: enc.Wire{wire}}}
}

// Sequence
Expand All @@ -239,14 +230,15 @@ func sendPacket(l *NDNLPLinkService, pkt *defn.Pkt) {
}

// Congestion marking
congestionMark := pkt.CongestionMark // from upstream
if congestionMarking {
// GetSendQueueSize is expensive, so only check every 1/2 of the threshold
// and only if we can mark congestion for this particular packet
if l.congestionCheck > l.options.DefaultCongestionThresholdBytes {
if now.After(l.lastTimeCongestionMarked.Add(l.options.BaseCongestionMarkingInterval)) &&
l.transport.GetSendQueueSize() > l.options.DefaultCongestionThresholdBytes {
core.LogWarn(l, "Marking congestion")
fragments[0].CongestionMark = utils.IdPtr[uint64](1)
congestionMark = utils.IdPtr[uint64](1) // ours
l.lastTimeCongestionMarked = now
}

Expand All @@ -256,23 +248,23 @@ func sendPacket(l *NDNLPLinkService, pkt *defn.Pkt) {
l.congestionCheck += uint64(len(wire)) // approx
}

// PIT tokens
if len(pkt.PitToken) > 0 {
fragments[0].PitToken = pkt.PitToken
}
// Send fragment(s)
for _, fragment := range fragments {
// PIT tokens
if len(out.PitToken) > 0 {
fragment.PitToken = out.PitToken
}

// Incoming face indication
if l.options.IsIncomingFaceIndicationEnabled && pkt.IncomingFaceID != nil {
fragments[0].IncomingFaceId = pkt.IncomingFaceID
}
// Incoming face indication
if l.options.IsIncomingFaceIndicationEnabled && out.InFace != nil {
fragment.IncomingFaceId = out.InFace
}

// Congestion marking
if pkt.CongestionMark != nil {
fragments[0].CongestionMark = pkt.CongestionMark
}
// Congestion marking
if congestionMark != nil {
fragment.CongestionMark = congestionMark
}

// Send fragment(s)
for _, fragment := range fragments {
pkt := &spec.Packet{
LpPacket: fragment,
}
Expand Down
19 changes: 19 additions & 0 deletions fw/fw/bestroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ package fw
import (
"reflect"
"sort"
"time"

"github.com/named-data/YaNFD/core"
"github.com/named-data/YaNFD/defn"
"github.com/named-data/YaNFD/table"
enc "github.com/zjkmxy/go-ndn/pkg/encoding"
)

const BestRouteSuppressionTime = 500 * time.Millisecond

// BestRoute is a forwarding strategy that forwards Interests
// to the nexthop with the lowest cost.
type BestRoute struct {
Expand Down Expand Up @@ -61,6 +64,22 @@ func (s *BestRoute) AfterReceiveInterest(
inFace uint64,
nexthops []*table.FibNextHopEntry,
) {
if len(nexthops) == 0 {
core.LogDebug(s, "AfterReceiveInterest: No nexthop for Interest=", packet.Name, " - DROP")
return
}

// If there is an out record less than suppression interval ago, drop the
// retransmission to suppress it (only if the nonce is different)
for _, outRecord := range pitEntry.OutRecords() {
if outRecord.LatestNonce != *packet.L3.Interest.NonceV &&
outRecord.LatestTimestamp.Add(BestRouteSuppressionTime).After(time.Now()) {
core.LogDebug(s, "AfterReceiveInterest: Suppressed Interest=", packet.Name, " - DROP")
return
}
}

// Sort nexthops by cost and send to best-possible nexthop
sort.Slice(nexthops, func(i, j int) bool { return nexthops[i].Cost < nexthops[j].Cost })
for _, nh := range nexthops {
core.LogTrace(s, "AfterReceiveInterest: Forwarding Interest=", packet.Name, " to FaceID=", nh.Nexthop)
Expand Down
14 changes: 14 additions & 0 deletions fw/fw/multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ package fw

import (
"reflect"
"time"

"github.com/named-data/YaNFD/core"
"github.com/named-data/YaNFD/defn"
"github.com/named-data/YaNFD/table"
enc "github.com/zjkmxy/go-ndn/pkg/encoding"
)

const MulticastSuppressionTime = 500 * time.Millisecond

// Multicast is a forwarding strategy that forwards Interests to all nexthop faces.
type Multicast struct {
StrategyBase
Expand Down Expand Up @@ -64,6 +67,17 @@ func (s *Multicast) AfterReceiveInterest(
return
}

// If there is an out record less than suppression interval ago, drop the
// retransmission to suppress it (only if the nonce is different)
for _, outRecord := range pitEntry.OutRecords() {
if outRecord.LatestNonce != *packet.L3.Interest.NonceV &&
outRecord.LatestTimestamp.Add(MulticastSuppressionTime).After(time.Now()) {
core.LogDebug(s, "AfterReceiveInterest: Suppressed Interest=", packet.Name, " - DROP")
return
}
}

// Send interest to all nexthops
for _, nexthop := range nexthops {
core.LogTrace(s, "AfterReceiveInterest: Forwarding Interest=", packet.Name, " to FaceID=", nexthop.Nexthop)
s.SendInterest(packet, pitEntry, nexthop.Nexthop, inFace)
Expand Down
Loading

0 comments on commit 82be97a

Please sign in to comment.