Skip to content

Commit

Permalink
Merge pull request #103 from named-data/slog
Browse files Browse the repository at this point in the history
log: switch to slog
  • Loading branch information
zjkmxy authored Jan 14, 2025
2 parents 814d19d + b40133a commit 5079fca
Show file tree
Hide file tree
Showing 104 changed files with 965 additions and 1,685 deletions.
54 changes: 27 additions & 27 deletions dv/dv/advert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,37 @@ import (
"github.com/named-data/ndnd/std/utils"
)

func (dv *Router) advertGenerateNew() {
dv.mutex.Lock()
defer dv.mutex.Unlock()
func (a *advertModule) generate() {
a.dv.mutex.Lock()
defer a.dv.mutex.Unlock()

// Increment sequence number
dv.advertSyncSeq++
a.seq++

// Produce the advertisement
name, err := dv.client.Produce(object.ProduceArgs{
Name: dv.config.AdvertisementDataPrefix().Append(
enc.NewTimestampComponent(dv.advertBootTime),
name, err := a.dv.client.Produce(object.ProduceArgs{
Name: a.dv.config.AdvertisementDataPrefix().Append(
enc.NewTimestampComponent(a.bootTime),
),
Content: dv.rib.Advert().Encode(),
Version: utils.IdPtr(dv.advertSyncSeq),
Content: a.dv.rib.Advert().Encode(),
Version: utils.IdPtr(a.seq),
FreshnessPeriod: 10 * time.Second,
})
if err != nil {
log.Errorf("advert-data: failed to produce advertisement: %+v", err)
log.Error(a, "Failed to produce advertisement", "err", err)
}
dv.advertDir.Push(name)
dv.advertDir.Evict(dv.client)
a.objDir.Push(name)
a.objDir.Evict(a.dv.client)

// Notify neighbors with sync for new advertisement
go dv.advertSyncSendInterest()
go a.sendSyncInterest()
}

func (dv *Router) advertDataFetch(nName enc.Name, bootTime uint64, seqNo uint64) {
func (a *advertModule) dataFetch(nName enc.Name, bootTime uint64, seqNo uint64) {
// debounce; wait before fetching, then check if this is still the latest
// sequence number known for this neighbor
time.Sleep(10 * time.Millisecond)
if ns := dv.neighbors.Get(nName); ns == nil || ns.AdvertBoot != bootTime || ns.AdvertSeq != seqNo {
if ns := a.dv.neighbors.Get(nName); ns == nil || ns.AdvertBoot != bootTime || ns.AdvertSeq != seqNo {
return
}

Expand All @@ -52,53 +52,53 @@ func (dv *Router) advertDataFetch(nName enc.Name, bootTime uint64, seqNo uint64)
enc.NewVersionComponent(seqNo),
)...)

dv.client.Consume(advName, func(state *object.ConsumeState) bool {
a.dv.client.Consume(advName, func(state *object.ConsumeState) bool {
if !state.IsComplete() {
return true
}

go func() {
fetchErr := state.Error()
if fetchErr != nil {
log.Warnf("advert-data: failed to fetch advertisement %s: %+v", state.Name(), fetchErr)
log.Warn(a, "Failed to fetch advertisement", "name", state.Name(), "err", fetchErr)
time.Sleep(1 * time.Second) // wait on error
dv.advertDataFetch(nName, bootTime, seqNo)
a.dataFetch(nName, bootTime, seqNo)
return
}

// Process the advertisement
dv.advertDataHandler(nName, seqNo, state.Content())
a.dataHandler(nName, seqNo, state.Content())
}()

return true
})
}

// Received advertisement Data
func (dv *Router) advertDataHandler(nName enc.Name, seqNo uint64, data []byte) {
func (a *advertModule) dataHandler(nName enc.Name, seqNo uint64, data []byte) {
// Lock DV state
dv.mutex.Lock()
defer dv.mutex.Unlock()
a.dv.mutex.Lock()
defer a.dv.mutex.Unlock()

// Check if this is the latest advertisement
ns := dv.neighbors.Get(nName)
ns := a.dv.neighbors.Get(nName)
if ns == nil {
log.Warnf("advert-data: unknown advertisement %s", nName)
log.Warn(a, "Unknown advertisement", "name", nName)
return
}
if ns.AdvertSeq != seqNo {
log.Debugf("advert-data: old advertisement for %s (%d != %d)", nName, ns.AdvertSeq, seqNo)
log.Debug(a, "Old advertisement", "name", nName, "want", ns.AdvertSeq, "have", seqNo)
return
}

// Parse the advertisement
advert, err := tlv.ParseAdvertisement(enc.NewBufferReader(data), false)
if err != nil {
log.Errorf("advert-data: failed to parse advertisement: %+v", err)
log.Error(a, "Failed to parse advertisement", "err", err)
return
}

// Update the local advertisement list
ns.Advert = advert
go dv.ribUpdate(ns)
go a.dv.ribUpdate(ns)
}
74 changes: 45 additions & 29 deletions dv/dv/advert_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,54 @@ import (
"github.com/named-data/ndnd/std/ndn"
spec "github.com/named-data/ndnd/std/ndn/spec_2022"
spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3"
"github.com/named-data/ndnd/std/object"
sec "github.com/named-data/ndnd/std/security"
"github.com/named-data/ndnd/std/utils"
)

func (dv *Router) advertSyncSendInterest() (err error) {
type advertModule struct {
// parent router
dv *Router
// advertisement boot time for self
bootTime uint64
// advertisement sequence number for self
seq uint64
// object directory for advertisement data
objDir *object.MemoryFifoDir
}

func (a *advertModule) String() string {
return "dv-advert"
}

func (a *advertModule) sendSyncInterest() (err error) {
// Sync Interests for our outgoing connections
err = dv.advertSyncSendInterestImpl(dv.config.AdvertisementSyncActivePrefix())
err = a.sendSyncInterestImpl(a.dv.config.AdvertisementSyncActivePrefix())
if err != nil {
log.Warnf("advert-sync: failed to send active sync interest: %+v", err)
log.Error(a, "Failed to send active sync interest", "err", err)
}

// Sync Interests for incoming connections
err = dv.advertSyncSendInterestImpl(dv.config.AdvertisementSyncPassivePrefix())
err = a.sendSyncInterestImpl(a.dv.config.AdvertisementSyncPassivePrefix())
if err != nil {
log.Warnf("advert-sync: failed to send passive sync interest: %+v", err)
log.Error(a, "Failed to send passive sync interest", "err", err)
}

return err
}

func (dv *Router) advertSyncSendInterestImpl(prefix enc.Name) (err error) {
func (a *advertModule) sendSyncInterestImpl(prefix enc.Name) (err error) {
// SVS v3 Sync Data
syncName := prefix.Append(enc.NewVersionComponent(3))

// State Vector for our group
sv := &spec_svs.SvsData{
StateVector: &spec_svs.StateVector{
Entries: []*spec_svs.StateVectorEntry{{
Name: dv.config.RouterName(),
Name: a.dv.config.RouterName(),
SeqNoEntries: []*spec_svs.SeqNoEntry{{
BootstrapTime: dv.advertBootTime,
SeqNo: dv.advertSyncSeq,
BootstrapTime: a.bootTime,
SeqNo: a.seq,
}},
}},
},
Expand All @@ -52,53 +68,53 @@ func (dv *Router) advertSyncSendInterestImpl(prefix enc.Name) (err error) {
dataCfg := &ndn.DataConfig{
ContentType: utils.IdPtr(ndn.ContentTypeBlob),
}
data, err := dv.engine.Spec().MakeData(syncName, dataCfg, sv.Encode(), signer)
data, err := a.dv.engine.Spec().MakeData(syncName, dataCfg, sv.Encode(), signer)
if err != nil {
log.Errorf("advert-sync: sendSyncInterest failed make data: %+v", err)
log.Error(nil, "Failed make data", "err", err)
return
}

// Make SVS Sync Interest
intCfg := &ndn.InterestConfig{
Lifetime: utils.IdPtr(1 * time.Second),
Nonce: utils.ConvertNonce(dv.engine.Timer().Nonce()),
Nonce: utils.ConvertNonce(a.dv.engine.Timer().Nonce()),
HopLimit: utils.IdPtr(uint(2)), // use localhop w/ this
}
interest, err := dv.engine.Spec().MakeInterest(syncName, intCfg, data.Wire, nil)
interest, err := a.dv.engine.Spec().MakeInterest(syncName, intCfg, data.Wire, nil)
if err != nil {
return err
}

// Sync Interest has no reply
err = dv.engine.Express(interest, nil)
err = a.dv.engine.Express(interest, nil)
if err != nil {
return err
}

return nil
}

func (dv *Router) advertSyncOnInterest(args ndn.InterestHandlerArgs, active bool) {
func (a *advertModule) OnSyncInterest(args ndn.InterestHandlerArgs, active bool) {
// If there is no incoming face ID, we can't use this
if args.IncomingFaceId == nil {
log.Warn("advert-sync: received Sync Interest with no incoming face ID, ignoring")
log.Warn(a, "Received Sync Interest with no incoming face ID, ignoring")
return
}

// Check if app param is present
if args.Interest.AppParam() == nil {
log.Warn("advert-sync: received Sync Interest with no AppParam, ignoring")
log.Warn(a, "Received Sync Interest with no AppParam, ignoring")
return
}

// Decode Sync Data
pkt, _, err := spec.ReadPacket(enc.NewWireReader(args.Interest.AppParam()))
if err != nil {
log.Warnf("advert-sync: failed to parse Sync Data: %+v", err)
log.Warn(a, "Failed to parse Sync Data", "err", err)
return
}
if pkt.Data == nil {
log.Warnf("advert-sync: no Sync Data, ignoring")
log.Warn(a, "No Sync Data, ignoring")
return
}

Expand All @@ -108,40 +124,40 @@ func (dv *Router) advertSyncOnInterest(args ndn.InterestHandlerArgs, active bool
svWire := pkt.Data.Content()
params, err := spec_svs.ParseSvsData(enc.NewWireReader(svWire), false)
if err != nil || params.StateVector == nil {
log.Warnf("advert-sync: failed to parse StateVec: %+v", err)
log.Warn(a, "Failed to parse StateVec", "err", err)
return
}

// Process each entry in the state vector
dv.mutex.Lock()
defer dv.mutex.Unlock()
a.dv.mutex.Lock()
defer a.dv.mutex.Unlock()

// FIB needs update if face changes for any neighbor
fibDirty := false
markRecvPing := func(ns *table.NeighborState) {
err, faceDirty := ns.RecvPing(*args.IncomingFaceId, active)
if err != nil {
log.Warnf("advert-sync: failed to update neighbor: %+v", err)
log.Warn(a, "Failed to update neighbor", "err", err)
}
fibDirty = fibDirty || faceDirty
}

// There should only be one entry in the StateVector, but check all anyway
for _, node := range params.StateVector.Entries {
if len(node.SeqNoEntries) != 1 {
log.Warnf("advert-sync: unexpected %d SeqNoEntries for %s, ignoring", len(node.SeqNoEntries), node.Name)
log.Warn(a, "Unexpected SeqNoEntries count", "count", len(node.SeqNoEntries), "router", node.Name)
return
}
entry := node.SeqNoEntries[0]

// Parse name from entry
if node.Name == nil {
log.Warnf("advert-sync: failed to parse neighbor name: %+v", err)
log.Warn(a, "Failed to parse neighbor name", "err", err)
continue
}

// Check if the entry is newer than what we know
ns := dv.neighbors.Get(node.Name)
ns := a.dv.neighbors.Get(node.Name)
if ns != nil {
if ns.AdvertBoot >= entry.BootstrapTime && ns.AdvertSeq >= entry.SeqNo {
// Nothing has changed, skip
Expand All @@ -152,18 +168,18 @@ func (dv *Router) advertSyncOnInterest(args ndn.InterestHandlerArgs, active bool
// Create new neighbor entry cause none found
// This is the ONLY place where neighbors are created
// In all other places, quit if not found
ns = dv.neighbors.Add(node.Name)
ns = a.dv.neighbors.Add(node.Name)
}

markRecvPing(ns)
ns.AdvertBoot = entry.BootstrapTime
ns.AdvertSeq = entry.SeqNo

go dv.advertDataFetch(node.Name, entry.BootstrapTime, entry.SeqNo)
go a.dataFetch(node.Name, entry.BootstrapTime, entry.SeqNo)
}

// Update FIB if needed
if fibDirty {
go dv.fibUpdate()
go a.dv.fibUpdate()
}
}
4 changes: 2 additions & 2 deletions dv/dv/prefix_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (dv *Router) prefixDataFetch(nName enc.Name) {
router.Fetching = true

// Fetch the prefix data object
log.Debugf("prefix-table: fetching object for %s [%d => %d]", nName, router.Known, router.Latest)
log.Debug(dv.pfx, "Fetching prefix data", "router", nName, "known", router.Known, "latest", router.Latest)

name := router.GetNextDataName()
dv.client.Consume(name, func(state *object.ConsumeState) bool {
Expand All @@ -68,7 +68,7 @@ func (dv *Router) prefixDataFetch(nName enc.Name) {
go func() {
fetchErr := state.Error()
if fetchErr != nil {
log.Warnf("prefix-table: failed to fetch object %s: %+v", state.Name(), fetchErr)
log.Warn(dv.pfx, "Failed to fetch prefix data", "name", state.Name(), "err", fetchErr)
time.Sleep(1 * time.Second) // wait on error
}

Expand Down
12 changes: 6 additions & 6 deletions dv/dv/readvertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (dv *Router) readvertiseOnInterest(args ndn.InterestHandlerArgs) {
res.Encode(),
signer)
if err != nil {
log.Warnf("readvertise: failed to make response Data: %+v", err)
log.Warn(dv, "Failed to make readvertise response Data", "err", err)
return
}
args.Reply(data.Wire)
Expand All @@ -42,23 +42,23 @@ func (dv *Router) readvertiseOnInterest(args ndn.InterestHandlerArgs) {
// readvertise: /localhost/nlsr/rib/unregister/h%0C%07%07%08%05cathyo%01A/params-sha256=026dd595c75032c5101b321fbc11eeb96277661c66bc0564ac7ea1a281ae8210
iname := args.Interest.Name()
if len(iname) != 6 {
log.Warnf("readvertise: invalid interest %s", iname)
log.Warn(dv, "Invalid readvertise Interest", "name", iname)
return
}

module, cmd, advC := iname[2], iname[3], iname[4]
if module.String() != "rib" {
log.Warnf("readvertise: unknown module %s", iname)
log.Warn(dv, "Unknown readvertise module", "name", iname)
return
}

params, err := mgmt.ParseControlParameters(enc.NewBufferReader(advC.Val), false)
if err != nil || params.Val == nil || params.Val.Name == nil {
log.Warnf("readvertise: failed to parse advertised name (%s)", err)
log.Warn(dv, "Failed to parse readvertised name", "err", err)
return
}

log.Debugf("readvertise: %s %s", cmd, params.Val.Name)
log.Debug(dv, "Received readvertise request", "cmd", cmd, "name", params.Val.Name)
dv.mutex.Lock()
defer dv.mutex.Unlock()

Expand All @@ -68,7 +68,7 @@ func (dv *Router) readvertiseOnInterest(args ndn.InterestHandlerArgs) {
case "unregister":
dv.pfx.Withdraw(params.Val.Name)
default:
log.Warnf("readvertise: unknown cmd %s", cmd)
log.Warn(dv, "Unknown readvertise cmd", "cmd", cmd)
return
}

Expand Down
Loading

0 comments on commit 5079fca

Please sign in to comment.