diff --git a/fw/dispatch/face.go b/fw/dispatch/face.go index bffbd7dc..8eac0fb7 100644 --- a/fw/dispatch/face.go +++ b/fw/dispatch/face.go @@ -27,7 +27,13 @@ type Face interface { State() defn.State - SendPacket(packet *defn.Pkt) + SendPacket(out OutPkt) +} + +type OutPkt struct { + Pkt *defn.Pkt + PitToken []byte + InFace *uint64 } // FaceDispatch is used to allow forwarding to interact with faces without a circular dependency issue. diff --git a/fw/face/link-service.go b/fw/face/link-service.go index 53b96b63..29622b14 100644 --- a/fw/face/link-service.go +++ b/fw/face/link-service.go @@ -42,7 +42,7 @@ type LinkService interface { Run(initial []byte) // Add a packet to the send queue for this link service - SendPacket(packet *defn.Pkt) + SendPacket(out dispatch.OutPkt) // Synchronously handle an incoming frame and dispatch to fw handleIncomingFrame(frame []byte) @@ -63,7 +63,7 @@ type linkServiceBase struct { faceID uint64 transport transport stopped chan bool - sendQueue chan *defn.Pkt + sendQueue chan dispatch.OutPkt // Counters nInInterests uint64 @@ -93,7 +93,7 @@ func (l *linkServiceBase) SetFaceID(faceID uint64) { func (l *linkServiceBase) makeLinkServiceBase() { l.stopped = make(chan bool) - l.sendQueue = make(chan *defn.Pkt, faceQueueSize) + l.sendQueue = make(chan dispatch.OutPkt, faceQueueSize) } // @@ -207,14 +207,14 @@ func (l *linkServiceBase) Close() { // // SendPacket adds a packet to the send queue for this link service -func (l *linkServiceBase) SendPacket(packet *defn.Pkt) { +func (l *linkServiceBase) SendPacket(out dispatch.OutPkt) { select { - case l.sendQueue <- packet: + case l.sendQueue <- out: // Packet queued successfully core.LogTrace(l, "Queued packet for Link Service") default: // Drop packet due to congestion - core.LogWarn(l, "Dropped packet due to congestion") + core.LogDebug(l, "Dropped packet due to congestion") // TODO: Signal congestion } diff --git a/fw/face/ndnlp-link-service.go b/fw/face/ndnlp-link-service.go index f53aa242..991d3959 100644 --- a/fw/face/ndnlp-link-service.go +++ b/fw/face/ndnlp-link-service.go @@ -15,6 +15,7 @@ import ( "github.com/named-data/YaNFD/core" defn "github.com/named-data/YaNFD/defn" + "github.com/named-data/YaNFD/dispatch" enc "github.com/zjkmxy/go-ndn/pkg/encoding" spec "github.com/zjkmxy/go-ndn/pkg/ndn/spec_2022" "github.com/zjkmxy/go-ndn/pkg/utils" @@ -171,7 +172,8 @@ func (l *NDNLPLinkService) runSend() { } } -func sendPacket(l *NDNLPLinkService, pkt *defn.Pkt) { +func sendPacket(l *NDNLPLinkService, out dispatch.OutPkt) { + pkt := out.Pkt wire := pkt.Raw // Counters @@ -201,33 +203,22 @@ func sendPacket(l *NDNLPLinkService, pkt *defn.Pkt) { // Split up fragment nFragments := int((len(wire) + effectiveMtu - 1) / effectiveMtu) - lastFragSize := len(wire) - effectiveMtu*(nFragments-1) fragments = make([]*spec.LpPacket, nFragments) reader := enc.NewBufferReader(wire) for i := 0; i < nFragments; i++ { - if i < nFragments-1 { - frag, err := reader.ReadWire(effectiveMtu) - if err != nil { - core.LogFatal(l, "Unexpected Wire reading error") - } - fragments[i] = &spec.LpPacket{ - Fragment: frag, - } - } else { - frag, err := reader.ReadWire(lastFragSize) - if err != nil { - core.LogFatal(l, "Unexpected Wire reading error") - } - fragments[i] = &spec.LpPacket{ - Fragment: frag, - } + readSize := effectiveMtu + if i == nFragments-1 { + readSize = len(wire) - effectiveMtu*(nFragments-1) } + + frag, err := reader.ReadWire(readSize) + if err != nil { + core.LogFatal(l, "Unexpected Wire reading error") + } + fragments[i] = &spec.LpPacket{Fragment: frag} } } else { - fragments = make([]*spec.LpPacket, 1) - fragments[0] = &spec.LpPacket{ - Fragment: enc.Wire{wire}, - } + fragments = []*spec.LpPacket{{Fragment: enc.Wire{wire}}} } // Sequence @@ -239,6 +230,7 @@ func sendPacket(l *NDNLPLinkService, pkt *defn.Pkt) { } // Congestion marking + congestionMark := pkt.CongestionMark // from upstream if congestionMarking { // GetSendQueueSize is expensive, so only check every 1/2 of the threshold // and only if we can mark congestion for this particular packet @@ -246,7 +238,7 @@ func sendPacket(l *NDNLPLinkService, pkt *defn.Pkt) { if now.After(l.lastTimeCongestionMarked.Add(l.options.BaseCongestionMarkingInterval)) && l.transport.GetSendQueueSize() > l.options.DefaultCongestionThresholdBytes { core.LogWarn(l, "Marking congestion") - fragments[0].CongestionMark = utils.IdPtr[uint64](1) + congestionMark = utils.IdPtr[uint64](1) // ours l.lastTimeCongestionMarked = now } @@ -256,23 +248,23 @@ func sendPacket(l *NDNLPLinkService, pkt *defn.Pkt) { l.congestionCheck += uint64(len(wire)) // approx } - // PIT tokens - if len(pkt.PitToken) > 0 { - fragments[0].PitToken = pkt.PitToken - } + // Send fragment(s) + for _, fragment := range fragments { + // PIT tokens + if len(out.PitToken) > 0 { + fragment.PitToken = out.PitToken + } - // Incoming face indication - if l.options.IsIncomingFaceIndicationEnabled && pkt.IncomingFaceID != nil { - fragments[0].IncomingFaceId = pkt.IncomingFaceID - } + // Incoming face indication + if l.options.IsIncomingFaceIndicationEnabled && out.InFace != nil { + fragment.IncomingFaceId = out.InFace + } - // Congestion marking - if pkt.CongestionMark != nil { - fragments[0].CongestionMark = pkt.CongestionMark - } + // Congestion marking + if congestionMark != nil { + fragment.CongestionMark = congestionMark + } - // Send fragment(s) - for _, fragment := range fragments { pkt := &spec.Packet{ LpPacket: fragment, } diff --git a/fw/fw/bestroute.go b/fw/fw/bestroute.go index 80bdca72..bb6c659d 100644 --- a/fw/fw/bestroute.go +++ b/fw/fw/bestroute.go @@ -10,6 +10,7 @@ package fw import ( "reflect" "sort" + "time" "github.com/named-data/YaNFD/core" "github.com/named-data/YaNFD/defn" @@ -17,6 +18,8 @@ import ( enc "github.com/zjkmxy/go-ndn/pkg/encoding" ) +const BestRouteSuppressionTime = 500 * time.Millisecond + // BestRoute is a forwarding strategy that forwards Interests // to the nexthop with the lowest cost. type BestRoute struct { @@ -61,6 +64,22 @@ func (s *BestRoute) AfterReceiveInterest( inFace uint64, nexthops []*table.FibNextHopEntry, ) { + if len(nexthops) == 0 { + core.LogDebug(s, "AfterReceiveInterest: No nexthop for Interest=", packet.Name, " - DROP") + return + } + + // If there is an out record less than suppression interval ago, drop the + // retransmission to suppress it (only if the nonce is different) + for _, outRecord := range pitEntry.OutRecords() { + if outRecord.LatestNonce != *packet.L3.Interest.NonceV && + outRecord.LatestTimestamp.Add(BestRouteSuppressionTime).After(time.Now()) { + core.LogDebug(s, "AfterReceiveInterest: Suppressed Interest=", packet.Name, " - DROP") + return + } + } + + // Sort nexthops by cost and send to best-possible nexthop sort.Slice(nexthops, func(i, j int) bool { return nexthops[i].Cost < nexthops[j].Cost }) for _, nh := range nexthops { core.LogTrace(s, "AfterReceiveInterest: Forwarding Interest=", packet.Name, " to FaceID=", nh.Nexthop) diff --git a/fw/fw/multicast.go b/fw/fw/multicast.go index 73bb2799..b1a1d7d1 100644 --- a/fw/fw/multicast.go +++ b/fw/fw/multicast.go @@ -9,6 +9,7 @@ package fw import ( "reflect" + "time" "github.com/named-data/YaNFD/core" "github.com/named-data/YaNFD/defn" @@ -16,6 +17,8 @@ import ( enc "github.com/zjkmxy/go-ndn/pkg/encoding" ) +const MulticastSuppressionTime = 500 * time.Millisecond + // Multicast is a forwarding strategy that forwards Interests to all nexthop faces. type Multicast struct { StrategyBase @@ -64,6 +67,17 @@ func (s *Multicast) AfterReceiveInterest( return } + // If there is an out record less than suppression interval ago, drop the + // retransmission to suppress it (only if the nonce is different) + for _, outRecord := range pitEntry.OutRecords() { + if outRecord.LatestNonce != *packet.L3.Interest.NonceV && + outRecord.LatestTimestamp.Add(MulticastSuppressionTime).After(time.Now()) { + core.LogDebug(s, "AfterReceiveInterest: Suppressed Interest=", packet.Name, " - DROP") + return + } + } + + // Send interest to all nexthops for _, nexthop := range nexthops { core.LogTrace(s, "AfterReceiveInterest: Forwarding Interest=", packet.Name, " to FaceID=", nexthop.Nexthop) s.SendInterest(packet, pitEntry, nexthop.Nexthop, inFace) diff --git a/fw/fw/thread.go b/fw/fw/thread.go index 96a936db..6c40a72f 100644 --- a/fw/fw/thread.go +++ b/fw/fw/thread.go @@ -186,21 +186,15 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) { } if interest.HopLimitV != nil { + core.LogTrace(t, "Interest ", packet.Name, " has HopLimit=", *interest.HopLimitV) if *interest.HopLimitV == 0 { return } *interest.HopLimitV -= 1 } - // Get PIT token (if any) - incomingPitToken := make([]byte, 0) - if len(packet.PitToken) > 0 { - incomingPitToken = make([]byte, len(packet.PitToken)) - copy(incomingPitToken, packet.PitToken) - core.LogTrace(t, "OnIncomingInterest: ", packet.Name, ", FaceID=", incomingFace.FaceID(), ", Has PitToken") - } else { - core.LogTrace(t, "OnIncomingInterest: ", packet.Name, ", FaceID=", incomingFace.FaceID()) - } + // Log PIT token (if any) + core.LogTrace(t, "OnIncomingInterest: ", packet.Name, ", FaceID=", incomingFace.FaceID(), ", PitTokenL=", len(packet.PitToken)) // Check if violates /localhost if incomingFace.Scope() == defn.NonLocal && @@ -236,20 +230,22 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) { // Drop packet if no nonce is found if interest.NonceV == nil { - core.LogInfo(t, "Interest ", packet.Name, " is missing Nonce - DROP") + core.LogDebug(t, "Interest ", packet.Name, " is missing Nonce - DROP") return } + // Check if packet is in dead nonce list if exists := t.deadNonceList.Find(interest.NameV, *interest.NonceV); exists { - core.LogInfo(t, "Interest ", packet.Name, " is dropped by DeadNonce: ", *interest.NonceV) + core.LogDebug(t, "Interest ", packet.Name, " is dropped by DeadNonce: ", *interest.NonceV) return } + // Check if any matching PIT entries (and if duplicate) //read into this, looks like this one will have to be manually changed pitEntry, isDuplicate := t.pitCS.InsertInterest(interest, fhName, incomingFace.FaceID()) if isDuplicate { // Interest loop - since we don't use Nacks, just drop - core.LogInfo(t, "Interest ", packet.Name, " is looping - DROP") + core.LogDebug(t, "Interest ", packet.Name, " is looping - DROP") return } @@ -259,7 +255,8 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) { // Add in-record and determine if already pending // this looks like custom interest again, but again can be changed without much issue? - _, isAlreadyPending := pitEntry.InsertInRecord(interest, incomingFace.FaceID(), incomingPitToken) + _, isAlreadyPending, prevNonce := pitEntry.InsertInRecord( + interest, incomingFace.FaceID(), packet.PitToken) if !isAlreadyPending { core.LogTrace(t, "Interest ", packet.Name, " is not pending") @@ -289,6 +286,10 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) { } } else { core.LogTrace(t, "Interest ", packet.Name, " is already pending") + + // Add the previous nonce to the dead nonce list to prevent further looping + // TODO: review this design, not specified in NFD dev guide + t.deadNonceList.Insert(interest.NameV, prevNonce) } // Update PIT entry expiration timer @@ -298,22 +299,38 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) { if packet.NextHopFaceID != nil { if dispatch.GetFace(*packet.NextHopFaceID) != nil { core.LogTrace(t, "NextHopFaceId is set for Interest ", packet.Name, " - dispatching directly to face") - dispatch.GetFace(*packet.NextHopFaceID).SendPacket(packet) + dispatch.GetFace(*packet.NextHopFaceID).SendPacket(dispatch.OutPkt{ + Pkt: packet, + PitToken: packet.PitToken, // TODO: ?? + InFace: packet.IncomingFaceID, + }) } else { core.LogInfo(t, "Non-existent face specified in NextHopFaceId for Interest ", packet.Name, " - DROP") } return } - // Pass to strategy AfterReceiveInterest pipeline - var nexthops []*table.FibNextHopEntry - if fhName == nil { - nexthops = table.FibStrategyTable.FindNextHopsEnc(interest.NameV) - } else { - nexthops = table.FibStrategyTable.FindNextHopsEnc(fhName) + // Use forwarding hint if present + lookupName := interest.NameV + if fhName != nil { + lookupName = fhName } - strategy.AfterReceiveInterest(packet, pitEntry, incomingFace.FaceID(), nexthops) + // Query the FIB for possible nexthops + nexthops := table.FibStrategyTable.FindNextHopsEnc(lookupName) + + // Exclude faces that have an in-record for this interest + // TODO: unclear where NFD dev guide specifies such behavior (if any) + allowedNexthops := make([]*table.FibNextHopEntry, 0, len(nexthops)) + for _, nexthop := range nexthops { + record := pitEntry.InRecords()[nexthop.Nexthop] + if record == nil || nexthop.Nexthop == incomingFace.FaceID() { + allowedNexthops = append(allowedNexthops, nexthop) + } + } + + // Pass to strategy AfterReceiveInterest pipeline + strategy.AfterReceiveInterest(packet, pitEntry, incomingFace.FaceID(), allowedNexthops) } func (t *Thread) processOutgoingInterest( @@ -327,7 +344,7 @@ func (t *Thread) processOutgoingInterest( panic("processOutgoingInterest called with non-Interest packet") } - core.LogTrace(t, "OnOutgoingInterest: ", ", FaceID=", nexthop) + core.LogTrace(t, "OnOutgoingInterest: ", packet.Name, ", FaceID=", nexthop) // Get outgoing face outgoingFace := dispatch.GetFace(nexthop) @@ -352,16 +369,18 @@ func (t *Thread) processOutgoingInterest( t.NOutInterests++ + // Make new PIT token if needed + pitToken := make([]byte, 6) + binary.BigEndian.PutUint16(pitToken, uint16(t.threadID)) + binary.BigEndian.PutUint32(pitToken[2:], pitEntry.Token()) + // Send on outgoing face - packet.IncomingFaceID = utils.IdPtr(inFace) + outgoingFace.SendPacket(dispatch.OutPkt{ + Pkt: packet, + PitToken: pitToken, + InFace: utils.IdPtr(inFace), + }) - // Make new PIT token if needed - if len(packet.PitToken) != 6 { - packet.PitToken = make([]byte, 6) - } - binary.BigEndian.PutUint16(packet.PitToken, uint16(t.threadID)) - binary.BigEndian.PutUint32(packet.PitToken[2:], pitEntry.Token()) - outgoingFace.SendPacket(packet) return true } @@ -421,7 +440,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) { pitEntries := t.pitCS.FindInterestPrefixMatchByDataEnc(data, pitToken) if len(pitEntries) == 0 { // Unsolicated Data - nothing more to do - core.LogDebug(t, "Unsolicited data ", packet.Name, " - DROP") + core.LogDebug(t, "Unsolicited data ", packet.Name, " FaceID=", *packet.IncomingFaceID, " - DROP") return } @@ -430,32 +449,41 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) { strategy := t.strategies[strategyName.Hash()] if len(pitEntries) == 1 { + // When a single PIT entry matches, we pass the data to the strategy. + // See alternative behavior for multiple matches below. + pitEntry := pitEntries[0] + // Set PIT entry expiration to now - table.SetExpirationTimerToNow(pitEntries[0]) + table.SetExpirationTimerToNow(pitEntry) // Invoke strategy's AfterReceiveData core.LogTrace(t, "Sending Data=", packet.Name, " to strategy=", strategyName) - strategy.AfterReceiveData(packet, pitEntries[0], *packet.IncomingFaceID) + strategy.AfterReceiveData(packet, pitEntry, *packet.IncomingFaceID) // Mark PIT entry as satisfied - pitEntries[0].SetSatisfied(true) + pitEntry.SetSatisfied(true) // Insert into dead nonce list - for _, outRecord := range pitEntries[0].OutRecords() { + for _, outRecord := range pitEntry.OutRecords() { t.deadNonceList.Insert(data.NameV, outRecord.LatestNonce) } // Clear out records from PIT entry - pitEntries[0].ClearOutRecords() + // TODO: NFD dev guide specifies in-records should not be cleared - why? + pitEntry.ClearInRecords() + pitEntry.ClearOutRecords() } else { + // Multiple PIT entries can match when two interest have e.g. different flags + // like CanBePrefix, or different forwarding hints. In this case, we send to all + // downstream faces without consulting strategy (see NFD dev guide) for _, pitEntry := range pitEntries { // Store all pending downstreams (except face Data packet arrived on) and PIT tokens downstreams := make(map[uint64][]byte) - for downstreamFaceID, downstreamFaceRecord := range pitEntry.InRecords() { - if downstreamFaceID != *packet.IncomingFaceID { + for face, record := range pitEntry.InRecords() { + if face != *packet.IncomingFaceID { // TODO: Ad-hoc faces - downstreams[downstreamFaceID] = make([]byte, len(downstreamFaceRecord.PitToken)) - copy(downstreams[downstreamFaceID], downstreamFaceRecord.PitToken) + downstreams[face] = make([]byte, len(record.PitToken)) + copy(downstreams[face], record.PitToken) } } @@ -477,10 +505,10 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) { pitEntry.ClearInRecords() pitEntry.ClearOutRecords() - // Call outoing Data pipeline for each pending downstream - for downstreamFaceID, downstreamPITToken := range downstreams { - core.LogTrace(t, "Multiple matching PIT entries for ", packet.Name, ": sending to OnOutgoingData pipeline") - t.processOutgoingData(packet, downstreamFaceID, downstreamPITToken, *packet.IncomingFaceID) + // Call outgoing Data pipeline for each pending downstream + for face, token := range downstreams { + core.LogTrace(t, "Multiple PIT entries Data=", packet.Name) + t.processOutgoingData(packet, face, token, *packet.IncomingFaceID) } } } @@ -516,10 +544,9 @@ func (t *Thread) processOutgoingData( t.NSatisfiedInterests++ // Send on outgoing face - if len(packet.PitToken) != len(pitToken) { - packet.PitToken = make([]byte, len(pitToken)) - } - copy(packet.PitToken, pitToken) - packet.IncomingFaceID = utils.IdPtr(uint64(inFace)) - outgoingFace.SendPacket(packet) + outgoingFace.SendPacket(dispatch.OutPkt{ + Pkt: packet, + PitToken: pitToken, + InFace: utils.IdPtr(inFace), + }) } diff --git a/fw/table/fib-strategy-hashtable.go b/fw/table/fib-strategy-hashtable.go index 61c8ffde..87461b37 100644 --- a/fw/table/fib-strategy-hashtable.go +++ b/fw/table/fib-strategy-hashtable.go @@ -16,7 +16,6 @@ package table import ( "sync" - "github.com/named-data/YaNFD/utils/comparison" enc "github.com/zjkmxy/go-ndn/pkg/encoding" ) @@ -89,7 +88,7 @@ func (f *FibStrategyHashTable) findLongestPrefixMatchEnc(name enc.Name) *baseFib virtEntry, ok := f.virtTable[virtNameHash] if ok { // Virtual name present, look for longer matches - pfx := comparison.Min(virtEntry.md, len(name)) + pfx := min(virtEntry.md, len(name)) for ; pfx > f.m; pfx-- { if val, ok := f.realTable[prefixHash[pfx]]; ok { return val @@ -137,7 +136,7 @@ func (f *FibStrategyHashTable) insertEntryEnc(name enc.Name) *baseFibStrategyEnt f.virtTableNames[nameHash] = make(map[string]int) } - f.virtTable[nameHash].md = comparison.Max(f.virtTable[nameHash].md, len(name)) + f.virtTable[nameHash].md = max(f.virtTable[nameHash].md, len(name)) // Insert into set of names f.virtTableNames[nameHash][nameBytes] = len(name) @@ -154,7 +153,7 @@ func (f *FibStrategyHashTable) insertEntryEnc(name enc.Name) *baseFibStrategyEnt f.virtTableNames[virtNameHash] = make(map[string]int) } - f.virtTable[virtNameHash].md = comparison.Max(f.virtTable[virtNameHash].md, len(name)) + f.virtTable[virtNameHash].md = max(f.virtTable[virtNameHash].md, len(name)) // Insert into set of names f.virtTableNames[virtNameHash][nameBytes] = len(name) @@ -218,7 +217,7 @@ func (f *FibStrategyHashTable) pruneTables(entry *baseFibStrategyEntry) { // Update with length of next longest real prefix associated // with this virtual prefix for _, l := range f.virtTableNames[virtNameHash] { - virtEntry.md = comparison.Max(virtEntry.md, l) + virtEntry.md = max(virtEntry.md, l) } } } diff --git a/fw/table/pit-cs-tree.go b/fw/table/pit-cs-tree.go index b932dec9..dcdfaec2 100644 --- a/fw/table/pit-cs-tree.go +++ b/fw/table/pit-cs-tree.go @@ -5,7 +5,7 @@ import ( "time" "github.com/named-data/YaNFD/core" - "github.com/named-data/YaNFD/utils/priority_queue" + pq "github.com/named-data/YaNFD/utils/priority_queue" enc "github.com/zjkmxy/go-ndn/pkg/encoding" spec "github.com/zjkmxy/go-ndn/pkg/ndn/spec_2022" ) @@ -27,16 +27,16 @@ type PitCsTree struct { csReplacement CsReplacementPolicy csMap map[uint64]*nameTreeCsEntry - pitExpiryQueue priority_queue.Queue[*nameTreePitEntry, int64] + pitExpiryQueue pq.Queue[*nameTreePitEntry, int64] updateTimer chan struct{} onExpiration OnPitExpiration } type nameTreePitEntry struct { - basePitEntry // compose with BasePitEntry - pitCsTable *PitCsTree // pointer to tree - node *pitCsTreeNode // the tree node associated with this entry - queueIndex int // index of entry in the expiring queue + basePitEntry // compose with BasePitEntry + pitCsTable *PitCsTree // pointer to tree + node *pitCsTreeNode // the tree node associated with this entry + pqItem *pq.Item[*nameTreePitEntry, int64] // entry in the expiring queue } type nameTreeCsEntry struct { @@ -66,7 +66,7 @@ func NewPitCS(onExpiration OnPitExpiration) *PitCsTree { pitCs.root.children = make(map[uint64]*pitCsTreeNode) pitCs.onExpiration = onExpiration pitCs.pitTokenMap = make(map[uint32]*nameTreePitEntry) - pitCs.pitExpiryQueue = priority_queue.New[*nameTreePitEntry, int64]() + pitCs.pitExpiryQueue = pq.New[*nameTreePitEntry, int64]() pitCs.updateTimer = make(chan struct{}) // This value has already been validated from loading the configuration, @@ -94,7 +94,7 @@ func (p *PitCsTree) UpdateTimer() <-chan struct{} { func (p *PitCsTree) Update() { for p.pitExpiryQueue.Len() > 0 && p.pitExpiryQueue.PeekPriority() <= time.Now().UnixNano() { entry := p.pitExpiryQueue.Pop() - entry.queueIndex = -1 + entry.pqItem = nil p.onExpiration(entry) p.RemoveInterest(entry) } @@ -118,10 +118,10 @@ func (p *PitCsTree) Update() { func (p *PitCsTree) updatePitExpiry(pitEntry PitEntry) { e := pitEntry.(*nameTreePitEntry) - if e.queueIndex < 0 { - e.queueIndex = p.pitExpiryQueue.Push(e, e.expirationTime.UnixNano()) + if e.pqItem == nil { + e.pqItem = p.pitExpiryQueue.Push(e, e.expirationTime.UnixNano()) } else { - p.pitExpiryQueue.Update(e.queueIndex, e, e.expirationTime.UnixNano()) + p.pitExpiryQueue.Update(e.pqItem, e, e.expirationTime.UnixNano()) } } @@ -159,12 +159,13 @@ func (p *PitCsTree) InsertInterest(interest *spec.Interest, hint enc.Name, inFac entry.satisfied = false node.pitEntries = append(node.pitEntries, entry) entry.token = p.generateNewPitToken() - entry.queueIndex = -1 + entry.pqItem = nil p.pitTokenMap[entry.token] = entry } + // Only considered a duplicate (loop) if from different face since + // is just retransmission and not loop if same face for face, inRecord := range entry.inRecords { - // Only considered a duplicate (loop) if from different face since is just retransmission and not loop if same face if face != inFace && inRecord.LatestNonce == *interest.NonceV { return entry, true } @@ -267,6 +268,11 @@ func (p *PitCsTree) IsCsServing() bool { // InsertOutRecord inserts an outrecord for the given interest, updating the // preexisting one if it already occcurs. func (e *nameTreePitEntry) InsertOutRecord(interest *spec.Interest, face uint64) *PitOutRecord { + lifetime := time.Millisecond * 4000 + if interest.Lifetime() != nil { + lifetime = *interest.Lifetime() + } + var record *PitOutRecord var ok bool if record, ok = e.outRecords[face]; !ok { @@ -275,7 +281,7 @@ func (e *nameTreePitEntry) InsertOutRecord(interest *spec.Interest, face uint64) record.LatestNonce = *interest.NonceV record.LatestTimestamp = time.Now() record.LatestInterest = interest.NameV.Clone() - record.ExpirationTime = time.Now().Add(time.Millisecond * 4000) + record.ExpirationTime = time.Now().Add(lifetime) e.outRecords[face] = record return record } @@ -284,7 +290,7 @@ func (e *nameTreePitEntry) InsertOutRecord(interest *spec.Interest, face uint64) record.LatestNonce = *interest.NonceV record.LatestTimestamp = time.Now() record.LatestInterest = interest.NameV.Clone() - record.ExpirationTime = time.Now().Add(time.Millisecond * 4000) + record.ExpirationTime = time.Now().Add(lifetime) return record } diff --git a/fw/table/pit-cs.go b/fw/table/pit-cs.go index e1aab94f..a9d8add4 100644 --- a/fw/table/pit-cs.go +++ b/fw/table/pit-cs.go @@ -60,7 +60,7 @@ type PitEntry interface { Token() uint32 - InsertInRecord(interest *spec.Interest, face uint64, incomingPitToken []byte) (*PitInRecord, bool) + InsertInRecord(interest *spec.Interest, face uint64, incomingPitToken []byte) (*PitInRecord, bool, uint32) InsertOutRecord(interest *spec.Interest, face uint64) *PitOutRecord GetOutRecords() []*PitOutRecord @@ -119,11 +119,17 @@ type baseCsEntry struct { // InsertInRecord finds or inserts an InRecord for the face, updating the // metadata and returning whether there was already an in-record in the entry. +// The third return value is the previous nonce if the in-record already existed. func (bpe *basePitEntry) InsertInRecord( interest *spec.Interest, face uint64, incomingPitToken []byte, -) (*PitInRecord, bool) { +) (*PitInRecord, bool, uint32) { + lifetime := time.Millisecond * 4000 + if interest.Lifetime() != nil { + lifetime = *interest.Lifetime() + } + var record *PitInRecord var ok bool if record, ok = bpe.inRecords[face]; !ok { @@ -132,18 +138,19 @@ func (bpe *basePitEntry) InsertInRecord( record.LatestNonce = *interest.NonceV record.LatestTimestamp = time.Now() record.LatestInterest = interest.NameV.Clone() - record.ExpirationTime = time.Now().Add(time.Millisecond * 4000) + record.ExpirationTime = time.Now().Add(lifetime) record.PitToken = append([]byte{}, incomingPitToken...) bpe.inRecords[face] = record - return record, false + return record, false, 0 } // Existing record + previousNonce := record.LatestNonce record.LatestNonce = *interest.NonceV record.LatestTimestamp = time.Now() record.LatestInterest = interest.NameV.Clone() - record.ExpirationTime = time.Now().Add(time.Millisecond * 4000) - return record, true + record.ExpirationTime = time.Now().Add(lifetime) + return record, true, previousNonce } // SetExpirationTimerToNow updates the expiration timer to the current time. diff --git a/fw/table/pit-cs_test.go b/fw/table/pit-cs_test.go index e9829c4d..a359589a 100644 --- a/fw/table/pit-cs_test.go +++ b/fw/table/pit-cs_test.go @@ -99,7 +99,7 @@ func TestInsertInRecord(t *testing.T) { inRecords: make(map[uint64]*PitInRecord), } faceID := uint64(1234) - inRecord, alreadyExists := bpe.InsertInRecord(interest, faceID, pitToken) + inRecord, alreadyExists, _ := bpe.InsertInRecord(interest, faceID, pitToken) assert.False(t, alreadyExists) assert.Equal(t, inRecord.Face, faceID) assert.Equal(t, inRecord.LatestNonce == *interest.NonceV, true) @@ -113,8 +113,9 @@ func TestInsertInRecord(t *testing.T) { // Case 2: interest already exists in basePitEntry.inRecords *interest.NonceV = 2 // get a "new" interest by resetting its nonce - inRecord, alreadyExists = bpe.InsertInRecord(interest, faceID, pitToken) + inRecord, alreadyExists, prevNonce := bpe.InsertInRecord(interest, faceID, pitToken) assert.True(t, alreadyExists) + assert.Equal(t, prevNonce, uint32(1)) assert.Equal(t, inRecord.Face, faceID) assert.Equal(t, inRecord.LatestNonce == *interest.NonceV, true) assert.Equal(t, inRecord.LatestInterest, interest.NameV) @@ -134,7 +135,7 @@ func TestInsertInRecord(t *testing.T) { } pitToken2 := []byte("xyz") faceID2 := uint64(6789) - inRecord, alreadyExists = bpe.InsertInRecord(interest2, faceID2, pitToken2) + inRecord, alreadyExists, _ = bpe.InsertInRecord(interest2, faceID2, pitToken2) assert.False(t, alreadyExists) assert.Equal(t, inRecord.Face, faceID2) assert.Equal(t, inRecord.LatestNonce == *interest2.NonceV, true) diff --git a/fw/table/rib.go b/fw/table/rib.go index bb70baf4..2e9b42cc 100644 --- a/fw/table/rib.go +++ b/fw/table/rib.go @@ -108,12 +108,28 @@ func (r *RibEntry) pruneIfEmpty() { delete(entry.parent.children, entry) } } + func (r *RibEntry) updateNexthopsEnc() { FibStrategyTable.ClearNextHopsEnc(r.Name) + // All routes including parents if needed + routes := append([]*Route{}, r.routes...) + + // Get all possible nexthops for parents that are inherited, + // unless we have the capture flag set + if !r.HasCaptureRoute() { + for entry := r; entry != nil; entry = entry.parent { + for _, route := range entry.routes { + if route.HasChildInheritFlag() { + routes = append(routes, route) + } + } + } + } + // Find minimum cost route per nexthop minCostRoutes := make(map[uint64]uint64) // FaceID -> Cost - for _, route := range r.routes { + for _, route := range routes { cost, ok := minCostRoutes[route.FaceID] if !ok || route.Cost < cost { minCostRoutes[route.FaceID] = route.Cost @@ -124,6 +140,11 @@ func (r *RibEntry) updateNexthopsEnc() { for nexthop, cost := range minCostRoutes { FibStrategyTable.InsertNextHopEnc(r.Name, nexthop, cost) } + + // Trigger update for all children for inheritance + for child := range r.children { + child.updateNexthopsEnc() + } } // AddRoute adds or updates a RIB entry for the specified prefix. @@ -223,3 +244,20 @@ func (r *RibEntry) CleanUpFace(faceId uint64) { r.updateNexthopsEnc() r.pruneIfEmpty() } + +func (r *RibEntry) HasCaptureRoute() bool { + for _, route := range r.routes { + if route.HasCaptureFlag() { + return true + } + } + return false +} + +func (r *Route) HasCaptureFlag() bool { + return r.Flags&RouteFlagCapture != 0 +} + +func (r *Route) HasChildInheritFlag() bool { + return r.Flags&RouteFlagChildInherit != 0 +} diff --git a/fw/utils/comparison/comparison.go b/fw/utils/comparison/comparison.go deleted file mode 100644 index aac58ea2..00000000 --- a/fw/utils/comparison/comparison.go +++ /dev/null @@ -1,19 +0,0 @@ -package comparison - -import "golang.org/x/exp/constraints" - -func Min[V constraints.Ordered](a, b V) V { - if a < b { - return a - } else { - return b - } -} - -func Max[V constraints.Ordered](a, b V) V { - if a > b { - return a - } else { - return b - } -} diff --git a/fw/utils/heap/heap.go b/fw/utils/heap/heap.go deleted file mode 100644 index b0be2579..00000000 --- a/fw/utils/heap/heap.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2009 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package heap provides heap operations for any type that implements -// heap.Interface. A heap is a tree with the property that each node is the -// minimum-valued node in its subtree. -// -// The minimum element in the tree is the root, at index 0. -// -// A heap is a common way to implement a priority queue. To build a priority -// queue, implement the Heap interface with the (negative) priority as the -// ordering for the Less method, so Push adds items while Pop removes the -// highest-priority item from the queue. The Examples include such an -// implementation; the file example_pq_test.go has the complete source. -package heap - -import "sort" - -// The Interface type describes the requirements -// for a type using the routines in this package. -// Any type that implements it may be used as a -// min-heap with the following invariants (established after -// Init has been called or if the data is empty or sorted): -// -// !h.Less(j, i) for 0 <= i < h.Len() and 2*i+1 <= j <= 2*i+2 and j < h.Len() -// -// Note that Push and Pop in this interface are for package heap's -// implementation to call. To add and remove things from the heap, -// use heap.Push and heap.Pop. -type Interface[V any] interface { - sort.Interface - Push(x V) // add x as element Len() - Pop() V // remove and return element Len() - 1. -} - -// Init establishes the heap invariants required by the other routines in this package. -// Init is idempotent with respect to the heap invariants -// and may be called whenever the heap invariants may have been invalidated. -// The complexity is O(n) where n = h.Len(). -func Init[V any](h Interface[V]) { - // heapify - n := h.Len() - for i := n/2 - 1; i >= 0; i-- { - down(h, i, n) - } -} - -// Push pushes the element x onto the heap. -// The complexity is O(log n) where n = h.Len(). -func Push[V any](h Interface[V], x V) { - h.Push(x) - up(h, h.Len()-1) -} - -// Pop removes and returns the minimum element (according to Less) from the heap. -// The complexity is O(log n) where n = h.Len(). -// Pop is equivalent to Remove(h, 0). -func Pop[V any](h Interface[V]) V { - n := h.Len() - 1 - h.Swap(0, n) - down(h, 0, n) - return h.Pop() -} - -// Remove removes and returns the element at index i from the heap. -// The complexity is O(log n) where n = h.Len(). -func Remove[V any](h Interface[V], i int) V { - n := h.Len() - 1 - if n != i { - h.Swap(i, n) - if !down(h, i, n) { - up(h, i) - } - } - return h.Pop() -} - -// Fix re-establishes the heap ordering after the element at index i has changed its value. -// Changing the value of the element at index i and then calling Fix is equivalent to, -// but less expensive than, calling Remove(h, i) followed by a Push of the new value. -// The complexity is O(log n) where n = h.Len(). -func Fix[V any](h Interface[V], i int) { - if !down(h, i, h.Len()) { - up(h, i) - } -} - -func up[V any](h Interface[V], j int) { - for { - i := (j - 1) / 2 // parent - if i == j || !h.Less(j, i) { - break - } - h.Swap(i, j) - j = i - } -} - -func down[V any](h Interface[V], i0, n int) bool { - i := i0 - for { - j1 := 2*i + 1 - if j1 >= n || j1 < 0 { // j1 < 0 after int overflow - break - } - j := j1 // left child - if j2 := j1 + 1; j2 < n && h.Less(j2, j1) { - j = j2 // = 2*i + 2 // right child - } - if !h.Less(j, i) { - break - } - h.Swap(i, j) - i = j - } - return i > i0 -} diff --git a/fw/utils/priority_queue/priority-queue.go b/fw/utils/priority_queue/priority-queue.go index ee275aee..e3715bf0 100644 --- a/fw/utils/priority_queue/priority-queue.go +++ b/fw/utils/priority_queue/priority-queue.go @@ -1,17 +1,18 @@ package priority_queue import ( - "github.com/named-data/YaNFD/utils/heap" + "container/heap" + "golang.org/x/exp/constraints" ) -type item[V any, P constraints.Ordered] struct { +type Item[V any, P constraints.Ordered] struct { object V priority P index int } -type wrapper[V any, P constraints.Ordered] []*item[V, P] +type wrapper[V any, P constraints.Ordered] []*Item[V, P] // Queue represents a priority queue with MINIMUM priority. type Queue[V any, P constraints.Ordered] struct { @@ -32,13 +33,13 @@ func (pq *wrapper[V, P]) Swap(i, j int) { (*pq)[j].index = j } -func (pq *wrapper[V, P]) Push(x *item[V, P]) { - item := x +func (pq *wrapper[V, P]) Push(x any) { + item := x.(*Item[V, P]) item.index = len(*pq) *pq = append(*pq, item) } -func (pq *wrapper[V, P]) Pop() *item[V, P] { +func (pq *wrapper[V, P]) Pop() any { old := *pq n := len(old) item := old[n-1] @@ -50,17 +51,17 @@ func (pq *wrapper[V, P]) Pop() *item[V, P] { // Len returns the length of the priroity queue. func (pq *Queue[V, P]) Len() int { - return len(pq.pq) + return pq.pq.Len() } // Push pushes the 'value' onto the priority queue. -func (pq *Queue[V, P]) Push(value V, priority P) int { - ret := &item[V, P]{ +func (pq *Queue[V, P]) Push(value V, priority P) *Item[V, P] { + ret := &Item[V, P]{ object: value, priority: priority, } - heap.Push[*item[V, P]](&pq.pq, ret) - return ret.index + heap.Push(&pq.pq, ret) + return ret } // Peek returns the minimum element of the priority queue without removing it. @@ -75,20 +76,14 @@ func (pq *Queue[V, P]) PeekPriority() P { // Pop removes and returns the minimum element of the priority queue. func (pq *Queue[V, P]) Pop() V { - return heap.Pop[*item[V, P]](&pq.pq).object + return heap.Pop(&pq.pq).(*Item[V, P]).object } -// Update modifies the priority and value of the item with 'index' in the queue. -// returns the updated index. -func (pq *Queue[V, P]) Update(index int, value V, priority P) int { - if index < 0 || index >= len(pq.pq) { - return -1 - } - it := pq.pq[index] - it.object = value - it.priority = priority - heap.Fix[*item[V, P]](&pq.pq, it.index) - return it.index +// Update modifies the priority and value of the item +func (pq *Queue[V, P]) Update(item *Item[V, P], value V, priority P) { + item.object = value + item.priority = priority + heap.Fix(&pq.pq, item.index) } // New creates a new priority queue. Not required to call. diff --git a/fw/utils/priority_queue/priority-queue_test.go b/fw/utils/priority_queue/priority-queue_test.go index 16fc0e77..91d95cf8 100644 --- a/fw/utils/priority_queue/priority-queue_test.go +++ b/fw/utils/priority_queue/priority-queue_test.go @@ -9,15 +9,15 @@ import ( func TestBasics(t *testing.T) { q := priority_queue.New[int, int]() - assert.Equal(t, q.Len(), 0) + assert.Equal(t, 0, q.Len()) q.Push(1, 1) q.Push(2, 3) q.Push(3, 2) - assert.Equal(t, q.Len(), 3) - assert.Equal(t, q.PeekPriority(), 1) - assert.Equal(t, q.Pop(), 1) - assert.Equal(t, q.PeekPriority(), 2) - assert.Equal(t, q.Pop(), 3) - assert.Equal(t, q.Pop(), 2) - assert.Equal(t, q.Len(), 0) + assert.Equal(t, 3, q.Len()) + assert.Equal(t, 1, q.PeekPriority()) + assert.Equal(t, 1, q.Pop()) + assert.Equal(t, 2, q.PeekPriority()) + assert.Equal(t, 3, q.Pop()) + assert.Equal(t, 2, q.Pop()) + assert.Equal(t, 0, q.Len()) }