forked from StackExchange/wmi
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathnotification_query.go
257 lines (230 loc) · 6.83 KB
/
notification_query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
// +build windows
package wmi
import (
"errors"
"fmt"
"reflect"
"sync"
"time"
"github.com/bi-zone/go-ole"
"github.com/bi-zone/go-ole/oleutil"
"github.com/hashicorp/go-multierror"
"github.com/scjalliance/comshim"
)
var (
// ErrAlreadyRunning is returned when NotificationQuery is already running.
ErrAlreadyRunning = errors.New("already running")
)
const (
wbemErrTimedOut = 0x80043001
defaultNotificationTimeout = time.Second
)
// NotificationQuery represents subscription to the WMI events.
// For more info see https://docs.microsoft.com/en-us/windows/desktop/wmisdk/swbemservices-execnotificationquery
type NotificationQuery struct {
Decoder
sync.Mutex
query string
state state
doneCh chan struct{}
eventCh interface{}
connectServerArgs []interface{}
queryTimeoutMs int64
}
// NewNotificationQuery creates a NotificationQuery from the given WQL @query
// string. The method just creates the object and does no WMI calls, so all WMI
// errors (query syntax, connection, etc.) will be returned on query start.
//
// @eventCh should be a channel of structures or structure pointers. The
// structure type should satisfy limitations described in `Decoder.Unmarshal`.
//
// Returns error if @eventCh is not `chan T` nor `chan *T`.
func NewNotificationQuery(eventCh interface{}, query string) (*NotificationQuery, error) {
if !isChannelTypeOK(eventCh) {
return nil, errors.New("eventCh has incorrect type; should be `chan T` or `chan *T`")
}
q := NotificationQuery{
state: stateNotStarted,
eventCh: eventCh,
query: query,
}
q.SetNotificationTimeout(defaultNotificationTimeout)
return &q, nil
}
// SetNotificationTimeout specifies a time query could send waiting for the next
// event at the worst case. Waiting for the next event locks notification thread
// so in other words @t specifies a time for notification thread to react to the
// `Stop()` command at the worst.
//
// Default NotificationTimeout is 1s. It could be safely changed after the query
// `Start()`.
//
// Setting it to negative Duration makes that interval infinite.
func (q *NotificationQuery) SetNotificationTimeout(t time.Duration) {
q.Lock()
defer q.Unlock()
if t < 0 {
q.queryTimeoutMs = -1
return
}
q.queryTimeoutMs = int64(t / time.Millisecond)
}
// SetConnectServerArgs sets `SWbemLocator.ConnectServer` args. Args are
// directly passed to `ole` call and support most of primitive types.
// Should be called before query being started.
//
// Args reference: https://docs.microsoft.com/en-us/windows/desktop/wmisdk/swbemlocator-connectserver
// Passing details: https://github.com/go-ole/go-ole/blob/master/idispatch_windows.go#L60
func (q *NotificationQuery) SetConnectServerArgs(args ...interface{}) {
q.connectServerArgs = args
}
// StartNotifications connects to the WMI service and starts receiving
// notifications generated by the query.
//
// Errors are usually happen on initialization phase (connect to WMI,
// query execution, first result unmarshalling) so you could assume that
// "it's either starts and going to give me notifications or fails fast enough".
func (q *NotificationQuery) StartNotifications() (err error) {
q.Lock()
switch q.state {
case stateStarted:
q.Unlock()
return ErrAlreadyRunning
case stateStopped:
q.Unlock()
return nil
}
q.doneCh = make(chan struct{})
q.state = stateStarted
q.Unlock()
// Mark as stopped on any return.
defer func() {
q.state = stateStopped
close(q.doneCh)
}()
// Be aware of reflections and COM usage.
defer func() {
if r := recover(); r != nil {
err = multierror.Append(err, fmt.Errorf("runtime panic; %v", err))
}
}()
// Notify that we are going to use COM.
comshim.Add(1)
defer comshim.Done()
// Connect to WMI service.
service, err := ConnectSWbemServices(q.connectServerArgs...)
if err != nil {
return fmt.Errorf("failed to connect WMI service; %s", err)
}
defer func() {
if clErr := service.Close(); clErr != nil {
err = multierror.Append(err, clErr)
}
}()
q.Dereferencer = service
// Subscribe to the events. ExecNotificationQuery call must have that flags
// and no other.
sWbemEventSource, err := oleutil.CallMethod(
service.sWbemServices,
"ExecNotificationQuery",
q.query,
"WQL",
0x00000010|0x00000020, // WBEM_FLAG_RETURN_IMMEDIATELY | WBEM_FLAG_FORWARD_ONLY
)
if err != nil {
return fmt.Errorf("ExecNotificationQuery failed; %s", err)
}
eventSource := sWbemEventSource.ToIDispatch()
defer eventSource.Release()
reflectedDoneChan := reflect.ValueOf(q.doneCh)
reflectedResChan := reflect.ValueOf(q.eventCh)
eventType := reflectedResChan.Type().Elem()
for {
// If it is a time to stop somebody will listen on doneCh.
select {
case q.doneCh <- struct{}{}:
return nil
default:
}
// Or try to query new events waiting no longer than queryTimeoutMs.
eventIUnknown, err := eventSource.CallMethod("NextEvent", q.queryTimeoutMs)
if err != nil {
if isTimeoutError(err) {
continue
}
return fmt.Errorf("unexpected NextEvent error; %s", err)
}
event := eventIUnknown.ToIDispatch()
// Unmarshal event.
e := reflect.New(eventType)
if err := q.Unmarshal(event, e.Interface()); err != nil {
return fmt.Errorf("failed to unmarshal event; %s", err)
}
_ = eventIUnknown.Clear() // Nah. We can't handle it anyway.
// Send to the user.
sent := trySend(reflectedResChan, reflectedDoneChan, e.Elem())
if !sent {
return nil // Query stopped
}
}
}
// Stop stops the running query waiting until everything is released. It could
// take some time for query to receive a stop signal. See `SetNotificationTimeout`
// for more info.
func (q *NotificationQuery) Stop() {
q.Lock()
defer q.Unlock()
if q.state == stateStarted {
<-q.doneCh
}
q.state = stateStopped
}
type state int
const (
stateNotStarted state = iota
stateStarted
stateStopped
)
func isTimeoutError(err error) bool {
oleErr, ok := err.(*ole.OleError)
if !ok {
return false
}
exception, ok := oleErr.SubError().(ole.EXCEPINFO)
return ok && exception.SCODE() == wbemErrTimedOut
}
func isChannelTypeOK(eventCh interface{}) bool {
chT := reflect.TypeOf(eventCh)
if chT.Kind() != reflect.Chan {
return false
}
elemT := chT.Elem()
switch elemT.Kind() {
case reflect.Struct:
return true
case reflect.Ptr:
return elemT.Elem().Kind() == reflect.Struct
}
return false
}
// trySend does a send in select block like:
// select {
// case resCh <- resEl:
// return true
// case doneCh <- struct{}{}:
// return false
// }
func trySend(resCh, doneCh, resEl reflect.Value) (sendSuccessful bool) {
resCase := reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: resCh,
Send: resEl,
}
doneCase := reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: doneCh,
Send: reflect.ValueOf(struct{}{}),
}
idx, _, _ := reflect.Select([]reflect.SelectCase{resCase, doneCase})
return idx == 0
}