-
Notifications
You must be signed in to change notification settings - Fork 49
/
Copy pathregistry.go
130 lines (114 loc) · 2.53 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package limiters
import (
"container/heap"
"sync"
"time"
)
// pqItem is an item in the priority queue.
type pqItem struct {
value interface{}
exp time.Time
index int
key string
}
// gcPq is a priority queue.
type gcPq []*pqItem
func (pq gcPq) Len() int { return len(pq) }
func (pq gcPq) Less(i, j int) bool {
return pq[i].exp.Before(pq[j].exp)
}
func (pq gcPq) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *gcPq) Push(x interface{}) {
n := len(*pq)
item := x.(*pqItem)
item.index = n
*pq = append(*pq, item)
}
func (pq *gcPq) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
// Registry is a thread-safe garbage-collectable registry of values.
type Registry struct {
// Guards all the fields below it.
mx sync.Mutex
pq *gcPq
m map[string]*pqItem
}
// NewRegistry creates a new instance of Registry.
func NewRegistry() *Registry {
pq := make(gcPq, 0)
return &Registry{pq: &pq, m: make(map[string]*pqItem)}
}
// GetOrCreate gets an existing value by key and updates its expiration time.
// If the key lookup fails it creates a new value by calling the provided value closure and puts it on the queue.
func (r *Registry) GetOrCreate(key string, value func() interface{}, ttl time.Duration, now time.Time) interface{} {
r.mx.Lock()
defer r.mx.Unlock()
item, ok := r.m[key]
if ok {
// Update the expiration time.
item.exp = now.Add(ttl)
heap.Fix(r.pq, item.index)
} else {
item = &pqItem{
value: value(),
exp: now.Add(ttl),
key: key,
}
heap.Push(r.pq, item)
r.m[key] = item
}
return item.value
}
// DeleteExpired deletes expired items from the registry and returns the number of deleted items.
func (r *Registry) DeleteExpired(now time.Time) int {
r.mx.Lock()
defer r.mx.Unlock()
c := 0
for {
if len(*r.pq) == 0 {
break
}
item := (*r.pq)[0]
if now.Before(item.exp) {
break
}
delete(r.m, item.key)
heap.Pop(r.pq)
c++
}
return c
}
// Delete deletes an item from the registry.
func (r *Registry) Delete(key string) {
r.mx.Lock()
defer r.mx.Unlock()
item, ok := r.m[key]
if !ok {
return
}
delete(r.m, key)
heap.Remove(r.pq, item.index)
}
// Exists returns true if an item with the given key exists in the registry.
func (r *Registry) Exists(key string) bool {
r.mx.Lock()
defer r.mx.Unlock()
_, ok := r.m[key]
return ok
}
// Len returns the number of items in the registry.
func (r *Registry) Len() int {
r.mx.Lock()
defer r.mx.Unlock()
return len(*r.pq)
}