-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathevents.go
159 lines (134 loc) · 3.04 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package fswatcher
import (
log "github.com/sirupsen/logrus"
"io/ioutil"
"os"
"path/filepath"
"time"
"sync"
)
// Listen for changes to files or directories.
// When the target is a directory, all subfiles and subdirectories are recursively monitored (the file at initialization is not processed).
// When the target is a file, only listen for changes to this file, and stop listening when the file is deleted.
type DeepWatch struct {
callable Callable
watcherList []Watcher
}
// Stop all watchers
func (dw DeepWatch) Stop() {
for i := range dw.watcherList {
w := dw.watcherList[i]
w.Stop()
}
}
// Start watch
func Watch(path string, callable Callable) (dw DeepWatch, err error) {
dw = DeepWatch{callable: callable}
dw.watcherList = make([]Watcher, 0)
fileInfo, err := os.Stat(path)
if err != nil {
return
}
if fileInfo.IsDir() {
dw.watchFolder(path)
} else {
dw.watchPath(path)
}
return
}
func (dw *DeepWatch) watchPath(path string) {
callable := dw.callable
w := Watcher{
Path: path,
Callable: callable,
}
lock.Lock()
dw.watcherList = append(dw.watcherList, w)
lock.Unlock()
w.Watch()
}
func (dw *DeepWatch) watchFolder(path string) (err error) {
dw.watchPath(path)
files, err := ioutil.ReadDir(path)
if err != nil {
return
}
for i := range files {
file := files[i]
sub := filepath.Join(path, file.Name())
if file.IsDir() {
subWatchErr := dw.watchFolder(sub)
if subWatchErr != nil {
log.Errorf("Error: watch sub folder exception: %s", err)
}
} else {
log.Debugf("Ignore initial file: %s", sub)
}
}
return nil
}
func (dw *DeepWatch) onCreateFunc() func(path string) {
return func(path string) {
fileInfo, err := os.Stat(path)
if err != nil {
log.Warnf("Get file stat failed: %s", path)
} else if fileInfo.IsDir() {
dw.watchFolder(path)
}
dw.callable.doOnCreate(path)
}
}
// 通知到达时延迟执行,可以被打断
type DelayTrigger struct {
filePath string
callback func(filePath string)
interrupt chan int
timeout time.Duration
started bool
mutex *sync.Mutex
}
const defaultDelay = 3 // seconds
func NewDelayTrigger(filePath string, delay int, callback func(filePath string)) DelayTrigger {
if delay == 0 {
delay = defaultDelay
}
mutex := &sync.Mutex{}
return DelayTrigger{
filePath: filePath,
callback: callback,
interrupt: make(chan int),
timeout: time.Second * time.Duration(delay),
mutex: mutex,
}
}
// 中断即将执行的动作
func (trigger DelayTrigger) Interrupt() {
select {
case trigger.interrupt <- 1:
case <-time.After(time.Second):
}
}
// 异步执行
func (trigger DelayTrigger) AsyncDo() {
trigger.mutex.Lock()
defer trigger.mutex.Unlock()
if trigger.started {
return
}
trigger.started = true
go func(){
select {
case i := <-trigger.interrupt:
if i > 0 {
return
}
case <-time.After(trigger.timeout):
if trigger.callback != nil {
log.Debugf("callback: %s", trigger.filePath)
trigger.mutex.Lock()
defer trigger.mutex.Unlock()
trigger.callback(trigger.filePath)
}
}
}()
}