Skip to content

Commit

Permalink
fixup! executors: implement streaming exec
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahmood Ali committed May 9, 2019
1 parent dfc2559 commit 6592b7f
Showing 1 changed file with 38 additions and 28 deletions.
66 changes: 38 additions & 28 deletions drivers/shared/executor/exec_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,22 +178,25 @@ func handleStdin(logger hclog.Logger, stdin io.WriteCloser, stream drivers.ExecT
return
}

if m.Stdin != nil && len(m.Stdin.Data) != 0 {
_, err := stdin.Write(m.Stdin.Data)
if err != nil {
errCh <- err
return
if m.Stdin != nil {
if len(m.Stdin.Data) != 0 {
_, err := stdin.Write(m.Stdin.Data)
if err != nil {
errCh <- err
return
}
}
if m.Stdin.Close {
stdin.Close()
}
} else if m.Stdin != nil && m.Stdin.Close {
stdin.Close()
} else if m.TtySize != nil {
err := setTTYSize(stdin, m.TtySize.Height, m.TtySize.Width)
if err != nil {
errCh <- fmt.Errorf("attempted to resize a non-tty session")
errCh <- fmt.Errorf("failed to resize tty: %v", err)
return
}
} else {
// ignore heartbeats or unexpected tty events
// ignore heartbeats
}
}
}()
Expand All @@ -208,6 +211,19 @@ func handleStdout(logger hclog.Logger, reader io.Reader, wg *sync.WaitGroup, sen
for {
buf := make([]byte, 4096)
n, err := reader.Read(buf)
// always send output first if we read something
if n > 0 {
if err := send(&drivers.ExecTaskStreamingResponseMsg{
Stdout: &dproto.ExecTaskStreamingIOOperation{
Data: buf[:n],
},
}); err != nil {
errCh <- err
return
}
}

// then process error
if isClosedError(err) {
if err := send(&drivers.ExecTaskStreamingResponseMsg{
Stdout: &dproto.ExecTaskStreamingIOOperation{
Expand All @@ -223,16 +239,6 @@ func handleStdout(logger hclog.Logger, reader io.Reader, wg *sync.WaitGroup, sen
return
}

// tty only reportsstdout
if err := send(&drivers.ExecTaskStreamingResponseMsg{
Stdout: &dproto.ExecTaskStreamingIOOperation{
Data: buf[:n],
},
}); err != nil {
errCh <- err
return
}

}
}()
}
Expand All @@ -246,6 +252,19 @@ func handleStderr(logger hclog.Logger, reader io.Reader, wg *sync.WaitGroup, sen
for {
buf := make([]byte, 4096)
n, err := reader.Read(buf)
// always send output first if we read something
if n > 0 {
if err := send(&drivers.ExecTaskStreamingResponseMsg{
Stderr: &dproto.ExecTaskStreamingIOOperation{
Data: buf[:n],
},
}); err != nil {
errCh <- err
return
}
}

// then process error
if isClosedError(err) {
if err := send(&drivers.ExecTaskStreamingResponseMsg{
Stderr: &dproto.ExecTaskStreamingIOOperation{
Expand All @@ -261,15 +280,6 @@ func handleStderr(logger hclog.Logger, reader io.Reader, wg *sync.WaitGroup, sen
return
}

if err := send(&drivers.ExecTaskStreamingResponseMsg{
Stderr: &dproto.ExecTaskStreamingIOOperation{
Data: buf[:n],
},
}); err != nil {
errCh <- err
return
}

}
}()
}
Expand Down

0 comments on commit 6592b7f

Please sign in to comment.