Skip to content

Commit

Permalink
Add killsignal and killtimeout to exec/rtsp
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed Jun 16, 2024
1 parent a56d335 commit da5f060
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 77 deletions.
39 changes: 39 additions & 0 deletions internal/exec/closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package exec

import (
"errors"
"net/url"
"os"
"os/exec"
"syscall"
"time"

"github.com/AlexxIT/go2rtc/pkg/core"
)

// closer support custom killsignal with custom killtimeout
type closer struct {
cmd *exec.Cmd
query url.Values
}

func (c *closer) Close() (err error) {
sig := os.Kill
if s := c.query.Get("killsignal"); s != "" {
sig = syscall.Signal(core.Atoi(s))
}

log.Trace().Msgf("[exec] kill with signal=%d", sig)
err = c.cmd.Process.Signal(sig)

if s := c.query.Get("killtimeout"); s != "" {
timeout := time.Duration(core.Atoi(s)) * time.Second
timer := time.AfterFunc(timeout, func() {
log.Trace().Msgf("[exec] kill after timeout=%s", s)
_ = c.cmd.Process.Kill()
})
defer timer.Stop() // stop timer if Wait ends before timeout
}

return errors.Join(err, c.cmd.Wait())
}
51 changes: 30 additions & 21 deletions internal/exec/exec.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package exec

import (
"bufio"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
"net/url"
"os"
"os/exec"
Expand Down Expand Up @@ -49,8 +51,10 @@ func Init() {
}

func execHandle(rawURL string) (core.Producer, error) {
rawURL, rawQuery, _ := strings.Cut(rawURL, "#")
query := streams.ParseQuery(rawQuery)

var path string
var query url.Values

// RTSP flow should have `{output}` inside URL
// pipe flow may have `#{params}` inside URL
Expand All @@ -62,9 +66,6 @@ func execHandle(rawURL string) (core.Producer, error) {
sum := md5.Sum([]byte(rawURL))
path = "/" + hex.EncodeToString(sum[:])
rawURL = rawURL[:i] + "rtsp://127.0.0.1:" + rtsp.Port + path + rawURL[i+8:]
} else if i = strings.IndexByte(rawURL, '#'); i > 0 {
query = streams.ParseQuery(rawURL[i+1:])
rawURL = rawURL[:i]
}

args := shell.QuoteSplit(rawURL[5:]) // remove `exec:`
Expand All @@ -74,23 +75,34 @@ func execHandle(rawURL string) (core.Producer, error) {
debug: log.Debug().Enabled(),
}

if path == "" {
return handlePipe(rawURL, cmd, query)
if query.Get("backchannel") == "1" {
return stdin.NewClient(cmd)
}

return handleRTSP(rawURL, cmd, path)
}
cl := &closer{cmd: cmd, query: query}

func handlePipe(source string, cmd *exec.Cmd, query url.Values) (core.Producer, error) {
if query.Get("backchannel") == "1" {
return stdin.NewClient(cmd)
if path == "" {
return handlePipe(rawURL, cmd, cl)
}

r, err := PipeCloser(cmd, query)
return handleRTSP(rawURL, cmd, cl, path)
}

func handlePipe(source string, cmd *exec.Cmd, cl io.Closer) (core.Producer, error) {
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}

rc := struct {
io.Reader
io.Closer
}{
// add buffer for pipe reader to reduce syscall
bufio.NewReaderSize(stdout, core.BufferSize),
cl,
}

log.Debug().Strs("args", cmd.Args).Msg("[exec] run pipe")

ts := time.Now()
Expand All @@ -99,9 +111,9 @@ func handlePipe(source string, cmd *exec.Cmd, query url.Values) (core.Producer,
return nil, err
}

prod, err := magic.Open(r)
prod, err := magic.Open(rc)
if err != nil {
_ = r.Close()
_ = rc.Close()
return nil, fmt.Errorf("exec/pipe: %w\n%s", err, cmd.Stderr)
}

Expand All @@ -115,7 +127,7 @@ func handlePipe(source string, cmd *exec.Cmd, query url.Values) (core.Producer,
return prod, nil
}

func handleRTSP(source string, cmd *exec.Cmd, path string) (core.Producer, error) {
func handleRTSP(source string, cmd *exec.Cmd, cl io.Closer, path string) (core.Producer, error) {
if log.Trace().Enabled() {
cmd.Stdout = os.Stdout
}
Expand Down Expand Up @@ -147,20 +159,17 @@ func handleRTSP(source string, cmd *exec.Cmd, path string) (core.Producer, error
}()

select {
case <-time.After(time.Second * 60):
_ = cmd.Process.Kill()
case <-time.After(time.Minute):
log.Error().Str("source", source).Msg("[exec] timeout")
_ = cl.Close()
return nil, errors.New("exec: timeout")
case <-done:
// limit message size
return nil, fmt.Errorf("exec/rtsp\n%s", cmd.Stderr)
case prod := <-waiter:
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run rtsp")
setRemoteInfo(prod, source, cmd.Args)
prod.OnClose = func() error {
log.Debug().Msgf("[exec] kill rtsp")
return errors.Join(cmd.Process.Kill(), cmd.Wait())
}
prod.OnClose = cl.Close
return prod, nil
}
}
Expand Down
56 changes: 0 additions & 56 deletions internal/exec/pipe.go

This file was deleted.

0 comments on commit da5f060

Please sign in to comment.