Skip to content

Commit

Permalink
fw: make incoming face required
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Dec 25, 2024
1 parent e370916 commit 5d6847a
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 28 deletions.
2 changes: 1 addition & 1 deletion fw/defn/pkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Pkt struct {

PitToken []byte
CongestionMark *uint64
IncomingFaceID *uint64
IncomingFaceID uint64
NextHopFaceID *uint64
CachePolicy *uint64
}
Expand Down
2 changes: 1 addition & 1 deletion fw/dispatch/face.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Face interface {
type OutPkt struct {
Pkt *defn.Pkt
PitToken []byte
InFace *uint64
InFace uint64
}

// FaceDispatch is used to allow forwarding to interact with faces without a circular dependency issue.
Expand Down
6 changes: 3 additions & 3 deletions fw/face/ndnlp-link-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func sendPacket(l *NDNLPLinkService, out dispatch.OutPkt) {
}

// Incoming face indication
if l.options.IsIncomingFaceIndicationEnabled && out.InFace != nil {
fragment.IncomingFaceId = out.InFace
if l.options.IsIncomingFaceIndicationEnabled {
fragment.IncomingFaceId = utils.IdPtr(out.InFace)
}

// Congestion marking
Expand Down Expand Up @@ -299,7 +299,7 @@ func (l *NDNLPLinkService) handleIncomingFrame(frame []byte) {
// All incoming frames come through a link service
// Attempt to decode buffer into LpPacket
pkt := &defn.Pkt{
IncomingFaceID: utils.IdPtr(l.faceID),
IncomingFaceID: l.faceID,
}

L2r := enc.NewBufferReader(wire)
Expand Down
35 changes: 12 additions & 23 deletions fw/fw/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,11 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) {
panic("processIncomingInterest called with non-Interest packet")
}

// Ensure incoming face is indicated
if packet.IncomingFaceID == nil {
core.LogError(t, "Interest missing IncomingFaceId - DROP")
return
}
// Already asserted that this is an Interest in link service
// Get incoming face
incomingFace := dispatch.GetFace(*packet.IncomingFaceID)
incomingFace := dispatch.GetFace(packet.IncomingFaceID)
if incomingFace == nil {
core.LogError(t, "Non-existent incoming FaceID=", *packet.IncomingFaceID,
core.LogError(t, "Non-existent incoming FaceID=", packet.IncomingFaceID,
" for Interest=", packet.Name, " - DROP")
return
}
Expand Down Expand Up @@ -376,7 +371,7 @@ func (t *Thread) processOutgoingInterest(
outgoingFace.SendPacket(dispatch.OutPkt{
Pkt: packet,
PitToken: pitToken,
InFace: utils.IdPtr(inFace),
InFace: inFace,
})

return true
Expand All @@ -400,12 +395,6 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) {
panic("processIncomingData called with non-Data packet")
}

// Ensure incoming face is indicated
if packet.IncomingFaceID == nil {
core.LogError(t, "Data missing IncomingFaceId - DROP")
return
}

// Get PIT if present
var pitToken *uint32
//lint:ignore S1009 removing the nil check causes a segfault ¯\_(ツ)_/¯
Expand All @@ -414,9 +403,9 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) {
}

// Get incoming face
incomingFace := dispatch.GetFace(*packet.IncomingFaceID)
incomingFace := dispatch.GetFace(packet.IncomingFaceID)
if incomingFace == nil {
core.LogError(t, "Non-existent nexthop FaceID=", *packet.IncomingFaceID, " for Data=", packet.Name, " DROP")
core.LogError(t, "Non-existent nexthop FaceID=", packet.IncomingFaceID, " for Data=", packet.Name, " DROP")
return
}

Expand All @@ -425,7 +414,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) {
// Check if violates /localhost
if incomingFace.Scope() == defn.NonLocal && len(packet.Name) > 0 &&
bytes.Equal(data.Name()[0].Val, LOCALHOST) {
core.LogWarn(t, "Data ", packet.Name, " from non-local FaceID=", *packet.IncomingFaceID, " violates /localhost scope - DROP")
core.LogWarn(t, "Data ", packet.Name, " from non-local FaceID=", packet.IncomingFaceID, " violates /localhost scope - DROP")
return
}

Expand All @@ -438,7 +427,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) {
pitEntries := t.pitCS.FindInterestPrefixMatchByDataEnc(data, pitToken)
if len(pitEntries) == 0 {
// Unsolicited Data - nothing more to do
core.LogDebug(t, "Unsolicited data ", packet.Name, " FaceID=", *packet.IncomingFaceID, " - DROP")
core.LogDebug(t, "Unsolicited data ", packet.Name, " FaceID=", packet.IncomingFaceID, " - DROP")
return
}

Expand All @@ -456,7 +445,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) {

// Invoke strategy's AfterReceiveData
core.LogTrace(t, "Sending Data=", packet.Name, " to strategy=", strategyName)
strategy.AfterReceiveData(packet, pitEntry, *packet.IncomingFaceID)
strategy.AfterReceiveData(packet, pitEntry, packet.IncomingFaceID)

// Mark PIT entry as satisfied
pitEntry.SetSatisfied(true)
Expand All @@ -478,7 +467,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) {
// Store all pending downstreams (except face Data packet arrived on) and PIT tokens
downstreams := make(map[uint64][]byte)
for face, record := range pitEntry.InRecords() {
if face != *packet.IncomingFaceID {
if face != packet.IncomingFaceID {
// TODO: Ad-hoc faces
downstreams[face] = make([]byte, len(record.PitToken))
copy(downstreams[face], record.PitToken)
Expand All @@ -489,7 +478,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) {
table.SetExpirationTimerToNow(pitEntry)

// Invoke strategy's BeforeSatisfyInterest
strategy.BeforeSatisfyInterest(pitEntry, *packet.IncomingFaceID)
strategy.BeforeSatisfyInterest(pitEntry, packet.IncomingFaceID)

// Mark PIT entry as satisfied
pitEntry.SetSatisfied(true)
Expand All @@ -506,7 +495,7 @@ func (t *Thread) processIncomingData(packet *defn.Pkt) {
// Call outgoing Data pipeline for each pending downstream
for face, token := range downstreams {
core.LogTrace(t, "Multiple PIT entries Data=", packet.Name)
t.processOutgoingData(packet, face, token, *packet.IncomingFaceID)
t.processOutgoingData(packet, face, token, packet.IncomingFaceID)
}
}
}
Expand Down Expand Up @@ -545,6 +534,6 @@ func (t *Thread) processOutgoingData(
outgoingFace.SendPacket(dispatch.OutPkt{
Pkt: packet,
PitToken: pitToken,
InFace: utils.IdPtr(inFace),
InFace: inFace,
})
}

0 comments on commit 5d6847a

Please sign in to comment.