Skip to content

Commit

Permalink
Merge pull request #35 from zjkmxy/hackathon-pr
Browse files Browse the repository at this point in the history
Hackathon performance improvements (part 2)
  • Loading branch information
zjkmxy authored Apr 20, 2022
2 parents c50f36a + 3111add commit 23e417f
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 23 deletions.
3 changes: 0 additions & 3 deletions fw/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,6 @@ func (t *Thread) finalizeInterest(pitEntry table.PitEntry) {
if !pitEntry.Satisfied() {
t.NUnsatisfiedInterests += uint64(len(pitEntry.InRecords()))
}

// Remove from PIT
t.pitCS.RemoveInterest(pitEntry)
}

func (t *Thread) processIncomingData(pendingPacket *ndn.PendingPacket) {
Expand Down
51 changes: 47 additions & 4 deletions table/pit-cs-tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"github.com/cespare/xxhash"
"github.com/named-data/YaNFD/core"
"github.com/named-data/YaNFD/ndn"
"github.com/named-data/YaNFD/utils/priority_queue"
)

const expiredPitTickerInterval = 100 * time.Millisecond

// PitCsTree represents a PIT-CS implementation that uses a name tree
type PitCsTree struct {
basePitCsTable
Expand All @@ -22,12 +25,15 @@ type PitCsTree struct {
nCsEntries int
csReplacement CsReplacementPolicy
csMap map[uint64]*nameTreeCsEntry

pitExpiryQueue priority_queue.Queue[*nameTreePitEntry, int64]
}

type nameTreePitEntry struct {
basePitEntry // compose with BasePitEntry
pitCsTable *PitCsTree // pointer to tree
node *pitCsTreeNode // the tree node associated with this entry
queueIndex int // index of entry in the expiring queue
}

type nameTreeCsEntry struct {
Expand Down Expand Up @@ -56,6 +62,7 @@ func NewPitCS() *PitCsTree {
pitCs.root.pitEntries = make([]*nameTreePitEntry, 0)
pitCs.expiringPitEntries = make(chan PitEntry, tableQueueSize)
pitCs.pitTokenMap = make(map[uint32]*nameTreePitEntry)
pitCs.pitExpiryQueue = priority_queue.New[*nameTreePitEntry, int64]()

// This value has already been validated from loading the configuration, so we know it will be one of the following (or else fatal)
switch csReplacementPolicy {
Expand All @@ -66,9 +73,43 @@ func NewPitCS() *PitCsTree {
}
pitCs.csMap = make(map[uint64]*nameTreeCsEntry)

// Set up the expired PIT entries goroutine
go pitCs.expirationPitLoop()

return pitCs
}

func (p *PitCsTree) expirationPitLoop() {
for !core.ShouldQuit {
if p.pitExpiryQueue.Len() > 0 {
sleepTime := time.Duration(p.pitExpiryQueue.PeekPriority()-time.Now().UnixNano()) * time.Nanosecond
if sleepTime > 0 {
if sleepTime > expiredPitTickerInterval {
sleepTime = expiredPitTickerInterval
}
time.Sleep(sleepTime)
}
} else {
time.Sleep(expiredPitTickerInterval)
}
for p.pitExpiryQueue.Len() > 0 && p.pitExpiryQueue.PeekPriority() <= time.Now().UnixNano() {
entry := p.pitExpiryQueue.Pop()
entry.queueIndex = -1
p.expiringPitEntries <- entry
p.RemoveInterest(entry)
}
}
}

func (p *PitCsTree) updatePitExpiry(pitEntry PitEntry) {
e := pitEntry.(*nameTreePitEntry)
if e.queueIndex < 0 {
e.queueIndex = p.pitExpiryQueue.Push(e, e.expirationTime.UnixNano())
} else {
p.pitExpiryQueue.Update(e.queueIndex, e, e.expirationTime.UnixNano())
}
}

func (e *nameTreePitEntry) PitCs() PitCsTable {
return e.pitCsTable
}
Expand Down Expand Up @@ -100,6 +141,7 @@ func (p *PitCsTree) InsertInterest(interest *ndn.Interest, hint *ndn.Name, inFac
entry.satisfied = false
node.pitEntries = append(node.pitEntries, entry)
entry.token = p.generateNewPitToken()
entry.queueIndex = -1
p.pitTokenMap[entry.token] = entry
}

Expand All @@ -122,14 +164,15 @@ func (p *PitCsTree) RemoveInterest(pitEntry PitEntry) bool {
e := pitEntry.(*nameTreePitEntry) // No error check needed because PitCsTree always uses nameTreePitEntry
for i, entry := range e.node.pitEntries {
if entry == pitEntry {
if i < len(e.node.pitEntries)-1 {
copy(e.node.pitEntries[i:], e.node.pitEntries[i+1:])
if len(e.node.pitEntries) > 1 {
e.node.pitEntries[i] = e.node.pitEntries[len(e.node.pitEntries)-1]
}
e.node.pitEntries = e.node.pitEntries[:len(e.node.pitEntries)-1]
if len(e.node.pitEntries) == 0 {
entry.node.pruneIfEmpty()
}
p.nPitEntries--
delete(p.pitTokenMap, e.token)
return true
}
}
Expand Down Expand Up @@ -273,8 +316,8 @@ func (p *pitCsTreeNode) pruneIfEmpty() {
// Remove from parent's children
for i, child := range curNode.parent.children {
if child == p {
if i < len(curNode.parent.children)-1 {
copy(curNode.parent.children[i:], curNode.parent.children[i+1:])
if len(curNode.parent.children) > 1 {
curNode.parent.children[i] = curNode.parent.children[len(curNode.parent.children)-1]
}
curNode.parent.children = curNode.parent.children[:len(curNode.parent.children)-1]
break
Expand Down
20 changes: 4 additions & 16 deletions table/pit-cs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type PitCsTable interface {
IsCsServing() bool

eraseCsDataFromReplacementStrategy(index uint64)
updatePitExpiry(pitEntry PitEntry)

ExpiringPitEntries() chan PitEntry
}
Expand Down Expand Up @@ -139,16 +140,13 @@ func (bpe *basePitEntry) InsertInRecord(interest *ndn.Interest, face uint64, inc
// SetExpirationTimerToNow updates the expiration timer to the current time.
func SetExpirationTimerToNow(e PitEntry) {
e.SetExpirationTime(time.Now())
// ExpiringPitEntries() accesses a channel that is part of a particular
// Pit-CS table. The function sends `e` to this channel so that the
// Pit-CS table knows it is about to expire.
e.PitCs().ExpiringPitEntries() <- e
e.PitCs().updatePitExpiry(e)
}

// UpdateExpirationTimer updates the expiration timer to the latest expiration
// time of any in or out record in the entry.
func UpdateExpirationTimer(e PitEntry) {
e.SetExpirationTime((time.Unix(0, 0)))
e.SetExpirationTime(time.Now())

for _, record := range e.InRecords() {
if record.ExpirationTime.After(e.ExpirationTime()) {
Expand All @@ -162,17 +160,7 @@ func UpdateExpirationTimer(e PitEntry) {
}
}

go waitForPitExpiry(e)
}

func waitForPitExpiry(e PitEntry) {
if !e.ExpirationTime().IsZero() {
time.Sleep(e.ExpirationTime().Sub(time.Now().Add(time.Millisecond * 1))) // Add 1 millisecond to ensure *after* expiration
if e.ExpirationTime().Before(time.Now()) {
// Otherwise, has been updated by another PIT entry
e.PitCs().ExpiringPitEntries() <- e
}
}
e.PitCs().updatePitExpiry(e)
}

///// Setters and Getters /////
Expand Down

0 comments on commit 23e417f

Please sign in to comment.