diff --git a/go/tools/bzltestutil/wrap.go b/go/tools/bzltestutil/wrap.go index 19b9f0825c..6aca56932b 100644 --- a/go/tools/bzltestutil/wrap.go +++ b/go/tools/bzltestutil/wrap.go @@ -15,6 +15,7 @@ package bzltestutil import ( + "bufio" "bytes" "fmt" "io" @@ -59,10 +60,57 @@ func shouldAddTestV() bool { return false } +// streamMerger intelligently merges an input stdout and stderr stream and dumps +// the output to the writer `inner`. Additional synchronization is applied to +// ensure that one line at a time is written to the inner writer. +type streamMerger struct { + OutW, ErrW *io.PipeWriter + mutex sync.Mutex + inner io.Writer + wg sync.WaitGroup + outR, errR *bufio.Reader +} + +func NewStreamMerger(w io.Writer) *streamMerger { + outR, outW := io.Pipe() + errR, errW := io.Pipe() + return &streamMerger{ + inner: w, + OutW: outW, + ErrW: errW, + outR: bufio.NewReader(outR), + errR: bufio.NewReader(errR), + } +} + +func (m *streamMerger) Start() { + m.wg.Add(2) + process := func(r *bufio.Reader) { + for { + s, err := r.ReadString('\n') + if len(s) > 0 { + m.mutex.Lock() + io.WriteString(m.inner, s) + m.mutex.Unlock() + } + if err == io.EOF { + break + } + } + m.wg.Done() + } + go process(m.outR) + go process(m.errR) +} + +func (m *streamMerger) Wait() { + m.wg.Wait() +} + func Wrap(pkg string) error { var jsonBuffer bytes.Buffer jsonConverter := NewConverter(&jsonBuffer, pkg, Timestamp) - pipeRead, pipeWrite := io.Pipe() + streamMerger := NewStreamMerger(jsonConverter) args := os.Args[1:] if shouldAddTestV() { @@ -74,20 +122,13 @@ func Wrap(pkg string) error { } cmd := exec.Command(exePath, args...) cmd.Env = append(os.Environ(), "GO_TEST_WRAP=0") - cmd.Stderr = io.MultiWriter(os.Stderr, pipeWrite) - cmd.Stdout = io.MultiWriter(os.Stdout, pipeWrite) - var wg sync.WaitGroup - wg.Add(1) - go func() { - _, err := io.Copy(jsonConverter, pipeRead) - if err != nil { - panic(err) - } - wg.Done() - }() + cmd.Stderr = io.MultiWriter(os.Stderr, streamMerger.ErrW) + cmd.Stdout = io.MultiWriter(os.Stdout, streamMerger.OutW) + streamMerger.Start() err := cmd.Run() - pipeWrite.Close() - wg.Wait() + streamMerger.ErrW.Close() + streamMerger.OutW.Close() + streamMerger.Wait() jsonConverter.Close() if out, ok := os.LookupEnv("XML_OUTPUT_FILE"); ok { werr := writeReport(jsonBuffer, pkg, out)