Skip to content

Commit

Permalink
fw: remove heap allocations in incoming interest pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Dec 25, 2024
1 parent 5d6847a commit 021049e
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 25 deletions.
8 changes: 8 additions & 0 deletions fw/core/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,11 @@ func LogTrace(module interface{}, components ...interface{}) {
log.Debug(generateLogMessage(module, components...))
}
}

func HasTraceLogs() bool {
return shouldPrintTraceLogs
}

func HasDebugLogs() bool {
return logLevel <= log.DebugLevel
}
47 changes: 38 additions & 9 deletions fw/fw/bestroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package fw

import (
"sort"
"time"

"github.com/named-data/ndnd/fw/core"
Expand Down Expand Up @@ -63,10 +62,13 @@ func (s *BestRoute) AfterReceiveInterest(
packet *defn.Pkt,
pitEntry table.PitEntry,
inFace uint64,
nexthops []*table.FibNextHopEntry,
nexthops [MaxNextHops]*table.FibNextHopEntry,
nexthopsCount int,
) {
if len(nexthops) == 0 {
core.LogDebug(s, "AfterReceiveInterest: No nexthop for Interest=", packet.Name, " - DROP")
if nexthopsCount == 0 {
if core.HasDebugLogs() {
core.LogDebug(s, "AfterReceiveInterest: No nexthop for Interest=", packet.Name, " - DROP")
}
return
}

Expand All @@ -75,21 +77,48 @@ func (s *BestRoute) AfterReceiveInterest(
for _, outRecord := range pitEntry.OutRecords() {
if outRecord.LatestNonce != *packet.L3.Interest.Nonce() &&
outRecord.LatestTimestamp.Add(BestRouteSuppressionTime).After(time.Now()) {
core.LogDebug(s, "AfterReceiveInterest: Suppressed Interest=", packet.Name, " - DROP")
if core.HasDebugLogs() {
core.LogDebug(s, "AfterReceiveInterest: Suppressed Interest=", packet.Name, " - DROP")
}
return
}
}

// Sort nexthops by cost and send to best-possible nexthop
sort.Slice(nexthops, func(i, j int) bool { return nexthops[i].Cost < nexthops[j].Cost })
for _, nh := range nexthops {
core.LogTrace(s, "AfterReceiveInterest: Forwarding Interest=", packet.Name, " to FaceID=", nh.Nexthop)
lowesthops := nexthops
getLowest := func() *table.FibNextHopEntry {
cost := uint64(1 << 63)
index := -1
for i := 0; i < nexthopsCount; i++ {
nh := nexthops[i]
if nh == nil {
continue
}
if nh.Cost < cost {
cost = nh.Cost
index = i
}
}
if index == -1 {
return nil
}
hop := nexthops[index]
lowesthops[index] = nil
return hop
}

for nh := getLowest(); nh != nil; nh = getLowest() {
if core.HasTraceLogs() {
core.LogTrace(s, "AfterReceiveInterest: Forwarding Interest=", packet.Name, " to FaceID=", nh.Nexthop)
}
if sent := s.SendInterest(packet, pitEntry, nh.Nexthop, inFace); sent {
return
}
}

core.LogDebug(s, "AfterReceiveInterest: No usable nexthop for Interest=", packet.Name, " - DROP")
if core.HasDebugLogs() {
core.LogDebug(s, "AfterReceiveInterest: No usable nexthop for Interest=", packet.Name, " - DROP")
}
}

func (s *BestRoute) BeforeSatisfyInterest(pitEntry table.PitEntry, inFace uint64) {
Expand Down
23 changes: 17 additions & 6 deletions fw/fw/multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,13 @@ func (s *Multicast) AfterReceiveInterest(
packet *defn.Pkt,
pitEntry table.PitEntry,
inFace uint64,
nexthops []*table.FibNextHopEntry,
nexthops [MaxNextHops]*table.FibNextHopEntry,
nexthopsCount int,
) {
if len(nexthops) == 0 {
core.LogDebug(s, "AfterReceiveInterest: No nexthop for Interest=", packet.Name, " - DROP")
if nexthopsCount == 0 {
if core.HasDebugLogs() {
core.LogDebug(s, "AfterReceiveInterest: No nexthop for Interest=", packet.Name, " - DROP")
}
return
}

Expand All @@ -73,14 +76,22 @@ func (s *Multicast) AfterReceiveInterest(
for _, outRecord := range pitEntry.OutRecords() {
if outRecord.LatestNonce != *packet.L3.Interest.Nonce() &&
outRecord.LatestTimestamp.Add(MulticastSuppressionTime).After(time.Now()) {
core.LogDebug(s, "AfterReceiveInterest: Suppressed Interest=", packet.Name, " - DROP")
if core.HasDebugLogs() {
core.LogDebug(s, "AfterReceiveInterest: Suppressed Interest=", packet.Name, " - DROP")
}
return
}
}

// Send interest to all nexthops
for _, nexthop := range nexthops {
core.LogTrace(s, "AfterReceiveInterest: Forwarding Interest=", packet.Name, " to FaceID=", nexthop.Nexthop)
for i := 0; i < nexthopsCount; i++ {
nexthop := nexthops[i]
if nexthop == nil {
continue
}
if core.HasTraceLogs() {
core.LogTrace(s, "AfterReceiveInterest: Forwarding Interest=", packet.Name, " to FaceID=", nexthop.Nexthop)
}
s.SendInterest(packet, pitEntry, nexthop.Nexthop, inFace)
}
}
Expand Down
7 changes: 6 additions & 1 deletion fw/fw/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
// StrategyPrefix is the prefix of all strategy names for YaNFD
const StrategyPrefix = "/localhost/nfd/strategy"

// Maximum number of possible next hops (i.e. router links) for optimization
const MaxNextHops = 64

// Strategy represents a forwarding strategy.
type Strategy interface {
Instantiate(fwThread *Thread)
Expand All @@ -36,7 +39,9 @@ type Strategy interface {
packet *defn.Pkt,
pitEntry table.PitEntry,
inFace uint64,
nexthops []*table.FibNextHopEntry)
nexthops [MaxNextHops]*table.FibNextHopEntry,
nexthopsCount int,
)
BeforeSatisfyInterest(
pitEntry table.PitEntry,
inFace uint64)
Expand Down
35 changes: 26 additions & 9 deletions fw/fw/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) {
}

// Log PIT token (if any)
core.LogTrace(t, "OnIncomingInterest: ", packet.Name, ", FaceID=", incomingFace.FaceID(), ", PitTokenL=", len(packet.PitToken))
if core.HasTraceLogs() {
core.LogTrace(t, "OnIncomingInterest: ", packet.Name, ", FaceID=", incomingFace.FaceID(), ", PitTokenL=", len(packet.PitToken))
}

// Check if violates /localhost
if incomingFace.Scope() == defn.NonLocal &&
Expand Down Expand Up @@ -223,13 +225,17 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) {

// Drop packet if no nonce is found
if interest.Nonce() == nil {
core.LogDebug(t, "Interest ", packet.Name, " is missing Nonce - DROP")
if core.HasDebugLogs() {
core.LogDebug(t, "Interest ", packet.Name, " is missing Nonce - DROP")
}
return
}

// Check if packet is in dead nonce list
if exists := t.deadNonceList.Find(interest.Name(), *interest.Nonce()); exists {
core.LogDebug(t, "Interest ", packet.Name, " is dropped by DeadNonce: ", *interest.Nonce())
if core.HasDebugLogs() {
core.LogDebug(t, "Interest ", packet.Name, " is dropped by DeadNonce: ", *interest.Nonce())
}
return
}

Expand All @@ -238,7 +244,9 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) {
pitEntry, isDuplicate := t.pitCS.InsertInterest(interest, fhName, incomingFace.FaceID())
if isDuplicate {
// Interest loop - since we don't use Nacks, just drop
core.LogDebug(t, "Interest ", packet.Name, " is looping - DROP")
if core.HasDebugLogs() {
core.LogDebug(t, "Interest ", packet.Name, " is looping - DROP")
}
return
}

Expand Down Expand Up @@ -278,7 +286,9 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) {
}
}
} else {
core.LogTrace(t, "Interest ", packet.Name, " is already pending")
if core.HasTraceLogs() {
core.LogTrace(t, "Interest ", packet.Name, " is already pending")
}

// Add the previous nonce to the dead nonce list to prevent further looping
// TODO: review this design, not specified in NFD dev guide
Expand All @@ -291,7 +301,10 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) {
// If NextHopFaceId set, forward to that face (if it exists) or drop
if packet.NextHopFaceID != nil {
if dispatch.GetFace(*packet.NextHopFaceID) != nil {
core.LogTrace(t, "NextHopFaceId is set for Interest ", packet.Name, " - dispatching directly to face")
if core.HasTraceLogs() {
core.LogTrace(t, "NextHopFaceId is set for Interest ", packet.Name, " - dispatching directly to face")
}

dispatch.GetFace(*packet.NextHopFaceID).SendPacket(dispatch.OutPkt{
Pkt: packet,
PitToken: packet.PitToken, // TODO: ??
Expand All @@ -314,16 +327,20 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) {

// Exclude faces that have an in-record for this interest
// TODO: unclear where NFD dev guide specifies such behavior (if any)
allowedNexthops := make([]*table.FibNextHopEntry, 0, len(nexthops))
allowedNexthops := [MaxNextHops]*table.FibNextHopEntry{}
allowedNexthopsCount := 0
for _, nexthop := range nexthops {
record := pitEntry.InRecords()[nexthop.Nexthop]
if record == nil || nexthop.Nexthop == incomingFace.FaceID() {
allowedNexthops = append(allowedNexthops, nexthop)
allowedNexthops[allowedNexthopsCount] = nexthop
allowedNexthopsCount++
}
}

// Pass to strategy AfterReceiveInterest pipeline
strategy.AfterReceiveInterest(packet, pitEntry, incomingFace.FaceID(), allowedNexthops)
strategy.AfterReceiveInterest(
packet, pitEntry, incomingFace.FaceID(),
allowedNexthops, allowedNexthopsCount)
}

func (t *Thread) processOutgoingInterest(
Expand Down

0 comments on commit 021049e

Please sign in to comment.