Skip to content

Commit

Permalink
fw: fix race in cs
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Jan 15, 2025
1 parent 12b6f40 commit 93aa1c8
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 62 deletions.
7 changes: 3 additions & 4 deletions fw/face/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ type Table struct {
}

func init() {
FaceTable.faces = sync.Map{}
FaceTable.nextFaceID.Store(1)
go FaceTable.ExpirationHandler()
go FaceTable.expirationHandler()
}

func (t *Table) String() string {
Expand Down Expand Up @@ -87,8 +86,8 @@ func (t *Table) Remove(id uint64) {
core.Log.Info(t, "Unregistered face", "faceid", id)
}

// ExpirationHandler stops the faces that have expired
func (t *Table) ExpirationHandler() {
// expirationHandler stops the faces that have expired
func (t *Table) expirationHandler() {
for !core.ShouldQuit {
// Check for expired faces every 10 seconds
time.Sleep(10 * time.Second)
Expand Down
16 changes: 8 additions & 8 deletions fw/mgmt/cs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,25 +80,25 @@ func (c *ContentStoreModule) config(interest *Interest) {

if params.Capacity != nil {
core.Log.Info(c, "Setting CS capacity", "capacity", *params.Capacity)
table.SetCsCapacity(int(*params.Capacity))
table.CsCapacity.Store(int32(*params.Capacity))
}

if params.Mask != nil && params.Flags != nil {
if *params.Mask&mgmt.CsEnableAdmit > 0 {
val := *params.Flags&mgmt.CsEnableAdmit > 0
core.Log.Info(c, "Setting CS admit flag", "value", val)
table.SetCsAdmit(val)
table.CsAdmit.Store(val)
}

if *params.Mask&mgmt.CsEnableServe > 0 {
val := *params.Flags&mgmt.CsEnableServe > 0
core.Log.Info(c, "Setting CS serve flag", "value", val)
table.SetCsServe(val)
table.CsServe.Store(val)
}
}

c.manager.sendCtrlResp(interest, 200, "OK", &mgmt.ControlArgs{
Capacity: utils.IdPtr(uint64(table.CsCapacity())),
Capacity: utils.IdPtr(uint64(table.CsCapacity.Load())),
Flags: utils.IdPtr(c.getFlags()),
})
}
Expand All @@ -112,7 +112,7 @@ func (c *ContentStoreModule) info(interest *Interest) {
// Generate new dataset
status := mgmt.CsInfoMsg{
CsInfo: &mgmt.CsInfo{
Capacity: uint64(table.CsCapacity()),
Capacity: uint64(table.CsCapacity.Load()),
Flags: c.getFlags(),
NCsEntries: 0,
},
Expand All @@ -131,11 +131,11 @@ func (c *ContentStoreModule) info(interest *Interest) {

func (c *ContentStoreModule) getFlags() uint64 {
flags := uint64(0)
if table.CsAdmit() {
if table.CsAdmit.Load() {
flags |= mgmt.CsEnableAdmit
}
if table.CsServe() {
flags |= mgmt.CsEnableAdmit
if table.CsServe.Load() {
flags |= mgmt.CsEnableServe
}
return flags
}
2 changes: 1 addition & 1 deletion fw/table/cs-lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (l *CsLRU) BeforeUse(index uint64, wire []byte) {
// EvictEntries is called to instruct the policy to evict enough entries to reduce the Content Store size
// below its size limit.
func (l *CsLRU) EvictEntries() {
for l.queue.Len() > csCapacity {
for l.queue.Len() > int(CsCapacity.Load()) {
indexToErase := l.queue.Front().Value.(uint64)
l.cs.eraseCsDataFromReplacementStrategy(indexToErase) // TODO: find better name for this method
l.queue.Remove(l.queue.Front())
Expand Down
45 changes: 8 additions & 37 deletions fw/table/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package table

import (
"sync/atomic"
"time"

"github.com/named-data/ndnd/fw/core"
Expand All @@ -18,13 +19,13 @@ import (
var deadNonceListLifetime time.Duration

// csCapacity contains the default capacity of each forwarding thread's Content Store.
var csCapacity int
var CsCapacity atomic.Int32

// csAdmit determines whether contents will be admitted to the Content Store.
var csAdmit bool
var CsAdmit atomic.Bool

// csServe determines whether contents will be served from the Content Store.
var csServe bool
var CsServe atomic.Bool

// csReplacementPolicy contains the replacement policy used by Content Stores in the forwarder.
var csReplacementPolicy string
Expand All @@ -34,10 +35,10 @@ var producerRegions []string

// Configure configures the forwarding system.
func Configure() {
// Content Store
csCapacity = int(core.C.Tables.ContentStore.Capacity)
csAdmit = core.C.Tables.ContentStore.Admit
csServe = core.C.Tables.ContentStore.Serve
// Content Store (mutable config)
CsCapacity.Store(int32(core.C.Tables.ContentStore.Capacity))
CsAdmit.Store(core.C.Tables.ContentStore.Admit)
CsServe.Store(core.C.Tables.ContentStore.Serve)
csReplacementPolicy = core.C.Tables.ContentStore.ReplacementPolicy

// Dead Nonce List
Expand Down Expand Up @@ -68,33 +69,3 @@ func CreateFIBTable() {
core.Log.Fatal(nil, "Unknown FIB table algorithm", "algo", core.C.Tables.Fib.Algorithm)
}
}

// SetCsCapacity sets the CS capacity from management.
func SetCsCapacity(capacity int) {
csCapacity = capacity
}

// CsCapacity returns the CS capacity
func CsCapacity() int {
return csCapacity
}

// SetCsAdmit sets the CS admit flag from management.
func SetCsAdmit(admit bool) {
csAdmit = admit
}

// CsAdmit returns the CS admit flag
func CsAdmit() bool {
return csAdmit
}

// SetCsServe sets the CS serve flag from management.
func SetCsServe(serve bool) {
csServe = serve
}

// CsServe returns the CS serve flag
func CsServe() bool {
return csServe
}
4 changes: 2 additions & 2 deletions fw/table/pit-cs-tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,12 @@ func (p *PitCsTree) CsSize() int {

// IsCsAdmitting returns whether the CS is admitting content.
func (p *PitCsTree) IsCsAdmitting() bool {
return csAdmit
return CsAdmit.Load()
}

// IsCsServing returns whether the CS is serving content.
func (p *PitCsTree) IsCsServing() bool {
return csServe
return CsServe.Load()
}

// InsertOutRecord inserts an outrecord for the given interest, updating the
Expand Down
20 changes: 10 additions & 10 deletions fw/table/pit-cs-tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,27 +84,27 @@ func TestNewPitCSTree(t *testing.T) {
}

func TestIsCsAdmitting(t *testing.T) {
csAdmit = false
CsAdmit.Store(false)
csReplacementPolicy = "lru"

pitCS := NewPitCS(func(PitEntry) {})
assert.Equal(t, pitCS.IsCsAdmitting(), csAdmit)
assert.Equal(t, pitCS.IsCsAdmitting(), CsAdmit.Load())

csAdmit = true
CsAdmit.Store(true)
pitCS = NewPitCS(func(PitEntry) {})
assert.Equal(t, pitCS.IsCsAdmitting(), csAdmit)
assert.Equal(t, pitCS.IsCsAdmitting(), CsAdmit.Load())
}

func TestIsCsServing(t *testing.T) {
csServe = false
CsServe.Store(false)
csReplacementPolicy = "lru"

pitCS := NewPitCS(func(PitEntry) {})
assert.Equal(t, pitCS.IsCsServing(), csServe)
assert.Equal(t, pitCS.IsCsServing(), CsServe.Load())

csServe = true
CsServe.Store(true)
pitCS = NewPitCS(func(PitEntry) {})
assert.Equal(t, pitCS.IsCsServing(), csServe)
assert.Equal(t, pitCS.IsCsServing(), CsServe.Load())
}

func TestInsertInterest(t *testing.T) {
Expand Down Expand Up @@ -477,7 +477,7 @@ func TestGetOutRecords(t *testing.T) {

func FindMatchingDataFromCS(t *testing.T) {
csReplacementPolicy = "lru"
csCapacity = 1024
CsCapacity.Store(1024)
pitCS := NewPitCS(func(PitEntry) {})

// Data does not already exist
Expand Down Expand Up @@ -527,7 +527,7 @@ func FindMatchingDataFromCS(t *testing.T) {
assert.True(t, bytes.Equal(csWire, VALID_DATA_1))

// Reduced CS capacity to check that eviction occurs
csCapacity = 1
CsCapacity.Store(1)
pitCS = NewPitCS(func(PitEntry) {})
pitCS.InsertData(data1, VALID_DATA_1)
pitCS.InsertData(data2, VALID_DATA_2)
Expand Down

0 comments on commit 93aa1c8

Please sign in to comment.