-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathintMutex.go
158 lines (131 loc) · 4.07 KB
/
intMutex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package goconcurrentcounter
import (
"sync"
"github.com/enriquebris/goconcurrentqueue"
)
// IntMutex defines a concurrent-safe int
type IntMutex struct {
value int
// mutex for main value
//mutex deadlock.Mutex
mutex sync.Mutex
// mutex for trigger's function
//triggerFuncMutex deadlock.Mutex
triggerFuncMutex sync.Mutex
// map with functions per value
mpFuncs map[int][]concurrentIntFuncEntry
// queue of functions to be executed just after trigger functions
runAfterTriggerFunctions goconcurrentqueue.Queue
}
// NewIntMutex creates and returns a *concurrentInt
func NewIntMutex(value int) *IntMutex {
ret := &IntMutex{}
ret.initialize(value)
return ret
}
func (st *IntMutex) initialize(value int) {
st.value = value
st.mpFuncs = make(map[int][]concurrentIntFuncEntry)
st.runAfterTriggerFunctions = goconcurrentqueue.NewFIFO()
}
// GetValue returns the value
func (st *IntMutex) GetValue() int {
st.mutex.Lock()
defer st.mutex.Unlock()
return st.value
}
// Update updates the value. It adds the upd parameter to the value.
// If the updated value has an associated function, it will be executed.
func (st *IntMutex) Update(upd int) {
st.mutex.Lock()
defer st.mutex.Unlock()
previousValue := st.value
st.value = st.value + upd
// executes associated function, if any
st.executeTriggerFunctions(st.value, previousValue)
}
// executeTriggerFunctions executes all trigger functions associated to the given value
func (st *IntMutex) executeTriggerFunctions(currentValue int, previousValue int) {
st.triggerFuncMutex.Lock()
defer func() {
// unlock
st.triggerFuncMutex.Unlock()
// execute the functions enqueued in runAfterTriggerFunctions
var (
rawFunc interface{}
err error
)
for err == nil {
rawFunc, err = st.runAfterTriggerFunctions.Dequeue()
if rawFunc != nil {
fn, _ := rawFunc.(concurrentIntFunc)
fn(currentValue, previousValue)
}
}
}()
if fns, ok := st.mpFuncs[currentValue]; ok {
for i := 0; i < len(fns); i++ {
fns[i].Func(currentValue, previousValue)
}
}
}
// GetTriggerOnValue returns the trigger function associated to the given value-name. It returns nil if no function
// is associated to the given values.
func (st *IntMutex) GetTriggerOnValue(value int, name string) concurrentIntFunc {
st.triggerFuncMutex.Lock()
defer st.triggerFuncMutex.Unlock()
if triggerFunctions, ok := st.mpFuncs[value]; ok {
for _, triggerFn := range triggerFunctions {
if triggerFn.Name == name {
return triggerFn.Func
}
}
}
return nil
}
// SetTriggerOnValue sets a trigger that would be executed once the given value were reached.
// Note: do not call UnsetTriggerOnValue() as part of the given function (it would arrive to a deadlock scenario),
// instead enqueue a function (using EnqueueToRunAfterCurrentTriggerFunctions()) that call it.
func (st *IntMutex) SetTriggerOnValue(value int, name string, fn concurrentIntFunc) {
st.triggerFuncMutex.Lock()
defer st.triggerFuncMutex.Unlock()
slice, ok := st.mpFuncs[value]
if !ok {
slice = make([]concurrentIntFuncEntry, 0)
}
entry := concurrentIntFuncEntry{
Name: name,
Func: fn,
}
slice = append(slice, entry)
st.mpFuncs[value] = slice
}
// UnsetTriggerOnValue removes the function to be executed on a value
func (st *IntMutex) UnsetTriggerOnValue(value int, name string) {
st.triggerFuncMutex.Lock()
defer st.triggerFuncMutex.Unlock()
if slice, ok := st.mpFuncs[value]; ok {
if len(slice) == 1 {
delete(st.mpFuncs, value)
return
}
newSlice := make([]concurrentIntFuncEntry, len(slice)-1)
index := 0
for i := 0; i < len(slice); i++ {
if slice[i].Name == name {
continue
}
newSlice[index] = slice[i]
index++
}
st.mpFuncs[value] = newSlice
}
}
// UnsetTriggersOnValue removes all function to be executed on a given value
func (st *IntMutex) UnsetTriggersOnValue(value int) {
delete(st.mpFuncs, value)
}
// EnqueueToRunAfterCurrentTriggerFunctions enqueues a function to be executed just after the trigger functions
func (st *IntMutex) EnqueueToRunAfterCurrentTriggerFunctions(fn concurrentIntFunc) {
st.runAfterTriggerFunctions.Enqueue(fn)
}