-
Notifications
You must be signed in to change notification settings - Fork 11
/
asynccib.go
147 lines (133 loc) · 3.32 KB
/
asynccib.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package cib
import (
"fmt"
"os"
"sync"
"time"
pacemaker "github.com/ClusterLabs/go-pacemaker"
log "github.com/sirupsen/logrus"
)
// AsyncCib wraps the CIB retrieval from go-pacemaker in an
// asynchronous interface, so that other parts of the server have a
// single copy of the CIB available at any time.
//
// Also provides a subscription interface for the long polling request
// end point, via Wait().
type AsyncCib struct {
xmldoc string
version *pacemaker.CibVersion
lock sync.Mutex
notifier chan chan string
}
// LogRecord records the last warning and error messages, to avoid
// spamming the log with duplicate messages.
type LogRecord struct {
warning string
error string
}
// Start launches two goroutines, one which runs the go-pacemaker
// mainloop and one which listens for CIB events (the CIB fetcher
// goroutine).
func (acib *AsyncCib) Start() {
if acib.notifier == nil {
acib.notifier = make(chan chan string)
}
msg := ""
lastLog := LogRecord{warning: "", error: ""}
cibFile := os.Getenv("CIB_file")
cibFetcher := func() {
for {
var cib *pacemaker.Cib
var err error
if cibFile != "" {
cib, err = pacemaker.OpenCib(pacemaker.FromFile(cibFile))
} else {
cib, err = pacemaker.OpenCib()
}
if err != nil {
msg = fmt.Sprintf("Failed to connect to Pacemaker: %v", err)
if msg != lastLog.warning {
log.Warnf(msg)
lastLog.warning = msg
}
time.Sleep(5 * time.Second)
}
for cib != nil {
func() {
cibxml, err := cib.Query()
if err != nil {
msg = fmt.Sprintf("Failed to query CIB: %v", err)
if msg != lastLog.error {
log.Errorf(msg)
lastLog.error = msg
}
} else {
acib.notifyNewCib(cibxml)
}
}()
waiter := make(chan int)
_, err = cib.Subscribe(func(event pacemaker.CibEvent, doc *pacemaker.CibDocument) {
if event == pacemaker.UpdateEvent {
acib.notifyNewCib(doc)
} else {
msg = fmt.Sprintf("lost connection: %v", event)
if msg != lastLog.warning {
log.Warnf(msg)
lastLog.warning = msg
}
waiter <- 1
}
})
if err != nil {
log.Infof("Failed to subscribe, rechecking every 5 seconds")
time.Sleep(5 * time.Second)
} else {
<-waiter
}
}
}
}
go cibFetcher()
go pacemaker.Mainloop()
}
// Wait blocks for up to `timeout` seconds for a CIB change event.
func (acib *AsyncCib) Wait(timeout int, defval string) string {
requestChan := make(chan string)
select {
case acib.notifier <- requestChan:
case <-time.After(time.Duration(timeout) * time.Second):
return defval
}
return <-requestChan
}
// Get returns the current CIB XML document (or nil).
func (acib *AsyncCib) Get() string {
acib.lock.Lock()
defer acib.lock.Unlock()
return acib.xmldoc
}
// Version returns the current CIB version (or nil).
func (acib *AsyncCib) Version() *pacemaker.CibVersion {
acib.lock.Lock()
defer acib.lock.Unlock()
return acib.version
}
func (acib *AsyncCib) notifyNewCib(cibxml *pacemaker.CibDocument) {
text := cibxml.ToString()
version := cibxml.Version()
log.Infof("[CIB]: %v", version)
acib.lock.Lock()
acib.xmldoc = text
acib.version = version
acib.lock.Unlock()
// Notify anyone waiting
Loop:
for {
select {
case clientchan := <-acib.notifier:
clientchan <- version.String()
default:
break Loop
}
}
}