-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.go
210 lines (186 loc) · 4.98 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package main
import (
"flag"
"fmt"
"github.com/coreos/etcd/client"
"github.com/rjeczalik/notify"
"io/ioutil"
"log"
"os"
"path/filepath"
"time"
"github.com/coreos/etcd/clientv3"
)
const MARK_FILE_NAME = ".ETCDIR_MARK_FILE_HUGSDBDND" // Name of lock-file for prevent bad things
const DEFAULT_DIRMODE = 0777
const DEFAULT_FILEMODE = 0777
const EVENT_CHANNEL_LEN = 1000
const LOCK_INTERVAL = time.Second // Wait since previous touch to can get lock directory once more.
var (
serverAddr = flag.String("server", "http://localhost:2379", "Client url for one of cluster servers")
serverPrefix = flag.String("prefix", "", "server root dir for map")
apiVersion = flag.Int("api", 3, "Api version 2 or 3")
printVersion = flag.Bool("version", false, "Print version and exit.")
VERSION = "unversioned"
)
type fileChangeEvent struct {
Path string
IsDir bool
IsRemoved bool
Content []byte
}
/*
Monitoring changes in file system.
It designed for run in separate goroutine.
*/
func fileMon(path string, bus chan fileChangeEvent) {
// Make the channel buffered to ensure no event is dropped. Notify will drop
// an event if the receiver is not able to keep up the sending pace.
c := make(chan notify.EventInfo, 1)
// Set up a watchpoint listening on events within current working directory.
// Dispatch each create and remove events separately to c.
if err := notify.Watch(path+"/...", c, notify.All); err != nil {
log.Fatal(err)
}
defer notify.Stop(c)
// Block until an event is received.
for {
event := <-c
fstat, err := os.Lstat(event.Path())
if os.IsNotExist(err) {
bus <- fileChangeEvent{Path: event.Path(), IsRemoved: true}
continue
}
if err != nil {
log.Println(err)
continue
}
if fstat.IsDir() {
bus <- fileChangeEvent{Path: event.Path(), IsDir: true}
continue
}
content, err := ioutil.ReadFile(event.Path())
if err != nil {
log.Println(err)
}
bus <- fileChangeEvent{Path: event.Path(), Content: content}
}
}
func cleanDir(dir string) {
dirFile, err := os.Open(dir)
if err != nil {
panic(err)
}
dirNames, err := dirFile.Readdirnames(-1)
if err != nil {
panic(err)
}
for _, item := range dirNames {
if item == MARK_FILE_NAME {
continue
}
err = os.RemoveAll(filepath.Join(dir, item))
if err != nil {
log.Println("I can't remove: ", filepath.Join(dir, item))
panic(err)
}
}
err = ioutil.WriteFile(filepath.Join(dir, MARK_FILE_NAME), []byte{}, DEFAULT_FILEMODE)
if err != nil {
log.Println("I can't create touchfile: ", filepath.Join(dir, MARK_FILE_NAME))
return
}
}
/*
Check if dir can be locked and lock it.
Return true if lock succesfully.
*/
func lock(dir string) bool {
lockFile := filepath.Join(dir, MARK_FILE_NAME)
stat, err := os.Stat(lockFile)
if err != nil {
log.Println("Can't stat lock file: ", lockFile, err)
return false
}
if time.Now().Sub(stat.ModTime()) <= LOCK_INTERVAL {
return false
}
pid := os.Getpid()
go func() {
for {
mess := fmt.Sprint("PID: ", pid, "\nLAST TIME: ", time.Now().String())
ioutil.WriteFile(lockFile, []byte(mess), DEFAULT_FILEMODE)
time.Sleep(LOCK_INTERVAL / 3)
}
}()
return true
}
func main() {
flag.Usage = printUsage
flag.Parse()
if *printVersion {
fmt.Println(VERSION)
return
}
if flag.NArg() != 1 {
printUsage()
return
}
dir, err := filepath.Abs(flag.Arg(0))
if err != nil {
panic(err)
}
dirStat, err := os.Stat(dir)
if err != nil {
panic(err)
}
if !dirStat.IsDir() {
fmt.Printf("'%v' is not dir.\n", dir)
return
}
_, err = os.Stat(filepath.Join(dir, MARK_FILE_NAME))
if os.IsNotExist(err) {
fmt.Printf(`You have to create file '%[1]v' before usage dir as syncdir. You can do it by command:
echo > %[1]v
`, filepath.Join(dir, MARK_FILE_NAME))
return
}
if !lock(dir) {
log.Println("Can't get lock. May be another instance work with the dir")
return
}
fmt.Println(os.Args)
etcdChan := make(chan fileChangeEvent, EVENT_CHANNEL_LEN)
fsChan := make(chan fileChangeEvent, EVENT_CHANNEL_LEN)
switch *apiVersion {
case 2:
etcdConfig := client.Config{Endpoints: []string{*serverAddr}}
fmt.Printf("%#v\n", etcdConfig)
etcdStartFrom := firstSyncEtcDir_v2(*serverPrefix, etcdConfig, dir)
go fileMon(dir, fsChan)
go etcdMon_v2(*serverPrefix, etcdConfig, etcdChan, etcdStartFrom)
syncProcess_v2(dir, *serverPrefix, etcdConfig, etcdChan, fsChan)
case 3:
etcdConfig := clientv3.Config{
Endpoints:[]string{*serverAddr},
}
fmt.Printf("%#v\n", etcdConfig)
c3, err := clientv3.New(etcdConfig)
if err != nil {
panic(err)
}
startRevision := firstSyncEtcDir_v3(*serverPrefix, c3, dir)
go fileMon(dir, fsChan)
go etcdMon_v3(*serverPrefix, c3, etcdChan, startRevision)
syncProcess_v3(dir,*serverPrefix, c3, etcdChan, fsChan)
default:
panic("Unsupported API version")
}
}
func printUsage() {
fmt.Printf(`%v [options] <syncdir>
syncdir - directory for show etcd content. ALL CURRENT CONTENT WILL BE LOST.
you have to create file '%v' in syncdir before can use it.
`, os.Args[0], MARK_FILE_NAME)
flag.PrintDefaults()
}