-
Notifications
You must be signed in to change notification settings - Fork 16
/
piping.go
202 lines (186 loc) · 4.56 KB
/
piping.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package someutils
import (
"errors"
"fmt"
"io"
"os"
"time"
)
func handleSignals(i *Invocation) {
for true {
select {
case signal, ok := <-i.SignalReceiver:
if !ok {
return
}
quit := handleSignal(i, signal)
if quit {
return
}
}
}
}
func handleSignal(i *Invocation, signal Signal) bool {
switch signal.Status() {
case 9:
i.Close()
return true
case 3:
i.Close()
return true
default:
fmt.Fprintln(os.Stderr, "Unhandled signal:", signal)
return false
}
}
func invoke(ps PipableSimple, i *Invocation) (error, int) {
// automatically handle the errMainPipe.In
i.ErrPipe.Drain()
i.AutoHandleSignals()
//go autoPipe(i.ErrPipe.Out, i.ErrPipe.In)
// automatically handle signals
//go autoHandleSignals(i.signalChan, i.MainPipe.In, i.MainPipe.Out, i.ErrPipe.In, i.ErrPipe.Out)
err, exitCode := ps.Exec(i.MainPipe.In, i.MainPipe.Out, i.ErrPipe.Out)
//go i.Close()
return err, exitCode
}
/*
type CodeError interface {
Err() error
Code() int
}
type ExitError struct {
err error
ExitCode int
}
func (exitError *ExitError) Err() error {
return exitError.err
}
func (exitError *ExitError) Code() int {
return exitError.ExitCode
}
*/
func autoPipe(out io.Writer, in io.Reader) {
j, err := io.Copy(out, in)
if err == io.EOF || err == io.ErrClosedPipe {
//ok
//fmt.Fprintln(os.Stderr, "expected error copying invocation", err)
} else if err != nil {
fmt.Fprintln(os.Stderr, "Unexpected error while copying errMainPipe.In to errMainPipe.Out", err)
} else {
if j > 0 {
//fmt.Fprintln(os.Stderr, "Finished copying errMainPipe.In to errMainPipe.Out", j)
}
//TODO close ErrInOutinvocation here?
}
}
// Run a Pipable asynchronously (using a goroutine)
func execAsync(pipable Pipable, invocation *Invocation, e chan *Invocation) {
go func() {
exitCode := execSynchronous(pipable, invocation)
invocation.ExitCode = &exitCode
e <- invocation
}()
}
// run a Pipable inline
func execSynchronous(pipable Pipable, invocation *Invocation) int {
//fmt.Fprintln(os.Stderr, "pipable starting")
err, code := invocation.Pipe(pipable)
//fmt.Fprintln(os.Stderr, "pipable finished")
if err == io.EOF || err == io.ErrClosedPipe {
} else if err != nil {
//return &ExitError{err, code}, signalChan
invocation.Err = err
}
go invocation.Close()
return code
}
const EXIT_OK = 0
// Await completion, or first error
func Wait(e chan *Invocation, count int) *Invocation {
var lastInvocation *Invocation
if count < 1 {
return NewErrorState(errors.New("No invocations to wait for!"))
}
for i := 0; i < count; i++ {
select {
case thisInvocation, ok := <-e:
if !ok {
fmt.Fprintln(os.Stderr, "Channel was closed!")
break
}
lastInvocation = thisInvocation
if lastInvocation.Err != nil {
if lastInvocation.ExitCode != nil && *lastInvocation.ExitCode != EXIT_OK { //if it exited with an exitCode of OK then continue
return lastInvocation
}
}
}
}
return lastInvocation
}
func NewErrorState(err error) *Invocation {
st := NewInvocation(nil, nil, nil)
st.Err = err
exitCode := 1
st.ExitCode = &exitCode
return st
}
// Await completion or error, for a duration
func WaitFor(e chan *Invocation, count int, timeout time.Duration) *Invocation {
var lastInvocation *Invocation
if count < 1 {
return NewErrorState(errors.New("No invocations to wait for!"))
}
for i := 0; i < count; i++ {
select {
//todo offset timeout against time already spent in previous iterations
case <-time.After(timeout):
return NewErrorState(errors.New("Timeout waiting for exit codes"))
case thisInvocation, ok := <-e:
if !ok {
break
}
lastInvocation = thisInvocation
if lastInvocation.Err != nil {
if lastInvocation.ExitCode != nil && *lastInvocation.ExitCode != EXIT_OK { //if it exited with an exitCode of OK then continue
return lastInvocation
}
}
}
}
return lastInvocation
}
// Await all errors forever
func AwaitAllErrors(e chan *Invocation, count int) (bool, []*Invocation) {
errs := []*Invocation{}
ok := true
for i := 0; i < count; i++ {
select {
case err := <-e:
if err != nil {
ok = false
}
errs = append(errs, err)
}
}
return ok, errs
}
// Await Errors for a duration
func AwaitAllErrorsFor(e chan *Invocation, count int, timeout time.Duration) (bool, []*Invocation) {
states := []*Invocation{}
ok := true
for i := 0; i < count; i++ {
select {
case <-time.After(timeout):
states = append(states, NewErrorState(errors.New("Timeout!")))
return false, states
case state := <-e:
if state.Err != nil {
ok = false
}
states = append(states, state)
}
}
return ok, states
}