forked from unrealsync/unrealsync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
256 lines (223 loc) · 8.17 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package main
import (
"errors"
"fmt"
"io"
"os"
"os/exec"
"runtime"
"strings"
"time"
)
type Client struct {
settings Settings
stopCh chan bool
errorCh chan error
}
func MakeClient(settings Settings) *Client {
return &Client{settings: settings}
}
func (r *Client) initialServerSync() (err error) {
progressLn("Initial file sync using rsync at " + r.settings.host + "...")
args := []string{"-e", "ssh " + strings.Join(sshOptions(r.settings), " ")}
for dir := range r.settings.excludes {
args = append(args, "--exclude="+dir)
}
err = openOutLogForRead(r.settings.host, true)
if err != nil {
return
}
if r.settings.sudouser != "" {
args = append(args, "--rsync-path", "sudo -u "+r.settings.sudouser+" rsync")
}
//"--delete-excluded",
args = append(args, "-a", "--delete", sourceDir+"/", r.settings.host+":"+r.settings.dir+"/")
command := exec.Command("rsync", args...)
go killOnStop(command, r.stopCh)
output, err := command.Output()
if err != nil {
escapedArgs := make([]string, len(args))
for i, arg := range args {
// since we'll use '' to escape arguments, we need to escape single quotes differently
arg = strings.Replace(arg, "'", "'\"'\"'", -1)
escapedArgs[i] = "'" + arg + "'"
}
stringCommand := "rsync " + strings.Join(escapedArgs, " ")
progressLn("Cannot perform initial sync. Please ensure that you can execute the following command:\n", stringCommand)
if exitErr, ok := err.(*exec.ExitError); ok {
debugLn("rsync output:\n", string(output), "\nstderr:\n", string(exitErr.Stderr))
} else {
// actually, this is mostly impossible to have something different than exec.ExitError here
debugLn("rsync output:\n", string(output), "\nCannot get stderr!")
}
panic("Cannot perform initial sync")
}
return
}
func (r *Client) copyUnrealsyncBinaries(unrealsyncBinaryPathForHost string) {
progressLn("Copying unrealsync binary " + unrealsyncBinaryPathForHost + " to " + r.settings.host)
args := sshOptions(r.settings)
destination := r.settings.host + ":" + r.settings.dir + "/.unrealsync/unrealsync"
args = append(args, unrealsyncBinaryPathForHost, destination)
execOrPanic("scp", args, r.stopCh)
}
func (r *Client) startServer() {
r.stopCh = make(chan bool)
r.errorCh = make(chan error)
var cmd *exec.Cmd
var stdin io.WriteCloser
var stdout io.ReadCloser
defer func() {
if err := recover(); err != nil {
close(r.stopCh)
trace := make([]byte, 10000)
runtime.Stack(trace, false)
progressWithPrefix("ERROR", "Stopped for server ", r.settings.host, ": ", err, "\n")
debugLn("Trace for ", r.settings.host, ":\n", string(trace))
if cmd != nil {
err := cmd.Process.Kill()
if err != nil {
progressLn("Could not kill ssh process for " + r.settings.host + ": " + err.Error())
// no action
}
err = cmd.Wait()
if err != nil {
// we will have ExitError if we killed process or if it failed to start
// We can't provide any additional information here if process failed to start
// since we already linked command's stderr to the os.Stderr and captured command's output
if _, ok := err.(*exec.ExitError); !ok {
progressLn("Could not wait ssh process for " + r.settings.host + ":" + err.Error())
}
}
}
go func() {
time.Sleep(retryInterval)
progressLn("Reconnecting to " + r.settings.host)
r.startServer()
}()
}
}()
r.initialServerSync()
ostype, osarch, unrealsyncBinaryPath, unrealsyncVersion := r.createDirectoriesAt()
progressLn("Discovered ostype:" + ostype + " osarch:" + osarch + " binary:" + unrealsyncBinaryPath + " version:" + unrealsyncVersion + " at " + r.settings.host)
if r.settings.remoteBinPath != "" {
unrealsyncBinaryPath = r.settings.remoteBinPath
} else if unrealsyncBinaryPath == "" || !isCompatibleVersions(unrealsyncVersion, version) {
unrealsyncBinaryPathForHost := unrealsyncDir + "/unrealsync-" + ostype + "-" + osarch
if _, err := os.Stat(unrealsyncBinaryPathForHost); os.IsNotExist(err) {
progressLn(unrealsyncBinaryPathForHost, " doesn't exists. Cannot find compatible unrealsync on remote and local hosts")
panic("cannot find unrealsync binary for remote host (" + unrealsyncBinaryPathForHost + ")")
}
r.copyUnrealsyncBinaries(unrealsyncBinaryPathForHost)
unrealsyncBinaryPath = r.settings.dir + "/.unrealsync/unrealsync"
}
cmd, stdin, stdout = r.launchUnrealsyncAt(unrealsyncBinaryPath)
stream := make(chan BufBlocker)
// receive from singlestdinwriter (stream) and send into ssh stdin
go singleStdinWriter(stream, stdin, r.errorCh, r.stopCh)
// read log and send into ssh stdin via singlestdinwriter (stream)
// stops if stopChan closes and closes stream
go doSendChanges(stream, r.settings.host, r.stopCh, r.errorCh)
// read ssh stdout and send into ssh stdin via singlestdinwriter (stream)
go pingReplyThread(stdout, r.settings.host, stream, r.errorCh)
err := <-r.errorCh
panic(err)
}
func (r *Client) launchUnrealsyncAt(unrealsyncBinaryPath string) (*exec.Cmd, io.WriteCloser, io.ReadCloser) {
progressLn("Launching unrealsync at " + r.settings.host + "...")
args := sshOptions(r.settings)
// TODO: escaping
flags := "--server --hostname=" + r.settings.host
if isDebug {
flags += " --debug"
}
for dir := range r.settings.excludes {
flags += " --exclude " + dir
}
unrealsyncLaunchCmd := unrealsyncBinaryPath + " " + flags + " " + r.settings.dir
if r.settings.sudouser != "" {
unrealsyncLaunchCmd = "sudo -u " + r.settings.sudouser + " " + unrealsyncLaunchCmd
}
args = append(args, r.settings.host, unrealsyncLaunchCmd)
debugLn("ssh", args)
cmd := exec.Command("ssh", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
fatalLn("Cannot get stdout pipe: ", err.Error())
}
stdin, err := cmd.StdinPipe()
if err != nil {
fatalLn("Cannot get stdin pipe: ", err.Error())
}
cmd.Stderr = os.Stderr
if err = cmd.Start(); err != nil {
panic("Cannot start command ssh " + strings.Join(args, " ") + ": " + err.Error())
}
return cmd, stdin, stdout
}
func (r *Client) createDirectoriesAt() (ostype, osarch, unrealsyncBinaryPath, unrealsyncVersion string) {
progressLn("Creating directories at " + r.settings.host + "...")
args := sshOptions(r.settings)
// TODO: escaping
dir := r.settings.dir + "/.unrealsync"
args = append(args, r.settings.host, "if [ ! -d "+dir+" ]; then mkdir -m a=rwx -p "+dir+"; fi;"+
"rm -f "+dir+"/unrealsync &&"+
"uname && uname -m && if ! which unrealsync 2>/dev/null ; then echo 'no-binary'; echo 'no-version';"+
"else unrealsync --version 2>/dev/null ; echo 'no-version' ; fi")
output := execOrPanic("ssh", args, r.stopCh)
uname := strings.Split(strings.TrimSpace(output), "\n")
return strings.ToLower(uname[0]), uname[1], uname[2], uname[3]
}
func singleStdinWriter(stream chan BufBlocker, stdin io.WriteCloser, errorCh chan error, stopCh chan bool) {
var bufBlocker BufBlocker
for {
select {
case bufBlocker = <-stream:
case <-stopCh:
break
}
_, err := stdin.Write(bufBlocker.buf)
if err != nil {
sendErrorNonBlocking(errorCh, err)
break
}
select {
case bufBlocker.sent <- true:
case <-stopCh:
break
}
}
}
func pingReplyThread(stdout io.ReadCloser, hostname string, stream chan BufBlocker, errorCh chan error) {
bufBlocker := BufBlocker{buf: make([]byte, 20), sent: make(chan bool)}
bufBlocker.buf = []byte(actionPong + fmt.Sprintf("%10d", 0))
buf := make([]byte, 10)
for {
readBytes, err := io.ReadFull(stdout, buf)
if err != nil {
sendErrorNonBlocking(errorCh, errors.New("Could not read from server: "+hostname+" err:"+err.Error()))
break
}
actionStr := string(buf)
debugLn("Read ", readBytes, " from ", hostname, " ", buf)
if actionStr == actionPing {
stream <- bufBlocker
<-bufBlocker.sent
} else if actionStr == actionStopServer {
currentProcess, err := os.FindProcess(os.Getpid())
if err != nil {
panic("Cannot find current process")
}
progressLn("Got StopServer command from the remote client")
currentProcess.Kill()
}
}
}
func (r *Client) notifySendQueueSize(sendQueueSize int64) (err error) {
if r.settings.sendQueueSizeLimit != 0 && sendQueueSize > r.settings.sendQueueSizeLimit {
err = errors.New("SendQueueSize limit exceeded for " + r.settings.host)
progressLn(err)
sendErrorNonBlocking(r.errorCh, err)
}
return
}