Skip to content

Commit

Permalink
Add "human" error from exec source
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed May 3, 2024
1 parent fb1e761 commit b3c5ef8
Showing 1 changed file with 36 additions and 14 deletions.
50 changes: 36 additions & 14 deletions internal/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"io"
"net/url"
"os"
"os/exec"
Expand Down Expand Up @@ -102,14 +103,22 @@ func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error
}

func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
stderr := limitBuffer{buf: make([]byte, 512)}

if cmd.Stderr != nil {
cmd.Stderr = io.MultiWriter(cmd.Stderr, &stderr)
} else {
cmd.Stderr = &stderr
}

if log.Trace().Enabled() {
cmd.Stdout = os.Stdout
}

ch := make(chan core.Producer)
waiter := make(chan core.Producer)

waitersMu.Lock()
waiters[path] = ch
waiters[path] = waiter
waitersMu.Unlock()

defer func() {
Expand All @@ -127,26 +136,20 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
return nil, err
}

chErr := make(chan error)

done := make(chan error, 1)
go func() {
err := cmd.Wait()
// unblocking write to channel
select {
case chErr <- err:
default:
log.Trace().Str("url", url).Msg("[exec] close")
}
done <- cmd.Wait()
}()

select {
case <-time.After(time.Second * 60):
_ = cmd.Process.Kill()
log.Error().Str("url", url).Msg("[exec] timeout")
return nil, errors.New("timeout")
case err := <-chErr:
return nil, fmt.Errorf("exec: %s", err)
case prod := <-ch:
case <-done:
// limit message size
return nil, errors.New("exec: " + stderr.String())
case prod := <-waiter:
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run")
return prod, nil
}
Expand All @@ -159,3 +162,22 @@ var (
waiters = map[string]chan core.Producer{}
waitersMu sync.Mutex
)

type limitBuffer struct {
buf []byte
n int
}

func (l *limitBuffer) String() string {
if l.n == len(l.buf) {
return string(l.buf) + "..."
}
return string(l.buf[:l.n])
}

func (l *limitBuffer) Write(p []byte) (int, error) {
if l.n < cap(l.buf) {
l.n += copy(l.buf[l.n:], p)
}
return len(p), nil
}

0 comments on commit b3c5ef8

Please sign in to comment.