Skip to content

Commit

Permalink
support run timeout (#7)
Browse files Browse the repository at this point in the history
* support run timeout

* fix tests
  • Loading branch information
motiisr authored Jan 23, 2022
1 parent da3f8a8 commit 47fd9c7
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 48 deletions.
15 changes: 13 additions & 2 deletions process-options.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type ProcessOptions struct {
Debug bool

OutputParser func(fromR io.Reader, bufferSize int) ProduceFn
ErrorParser func(fromR io.Reader, bufferSize int) ProduceFn
ErrorParser func(fromR io.Reader, bufferSize int) ProduceFn

// MaxSpawns is the maximum number of times that a process can be spawned
// Set to -1, for an unlimited amount of times.
Expand Down Expand Up @@ -100,7 +100,7 @@ type ProcessOptions struct {
// IdleTimeout is the duration that the process can remain idle (no output) before we terminate the process.
// Set to -1, for an unlimited idle timeout (not recommended)
// Will use defaultIdleTimeout when set to 0.
IdleTimeout time.Duration
IdleTimeout time.Duration

// TerminationGraceTimeout is the duration of time that the supervisor will wait after sending interrupt/terminate
// signals, before checking if the process is still alive.
Expand All @@ -110,6 +110,11 @@ type ProcessOptions struct {
// EventTimeFormat is the time format used when events are marshaled to string.
// Will use defaultEventTimeFormat when set to "".
EventTimeFormat string

// RunTimeout is the duration that the process can run before we terminate the process.
// Set to <= 0, for an unlimited run timeout
// Will use defaultRunTimeout when set to 0.
RunTimeout time.Duration
}

// init initializes the opts structure with default and required options.
Expand Down Expand Up @@ -154,12 +159,18 @@ func initProcessOptions(opts ProcessOptions) *ProcessOptions {
if opts.IdleTimeout == 0 {
opts.IdleTimeout = defaultIdleTimeout
}
if opts.IdleTimeout < 0 {
opts.IdleTimeout = time.Duration(maxDuration)
}
if opts.TerminationGraceTimeout == 0 {
opts.TerminationGraceTimeout = defaultTerminationGraceTimeout
}
if opts.EventTimeFormat == "" {
opts.EventTimeFormat = defaultEventTimeFormat
}
if opts.RunTimeout <= 0 {
opts.RunTimeout = defaultRunTimeout
}
if opts.In == nil {
opts.In = make(chan []byte)
}
Expand Down
42 changes: 25 additions & 17 deletions supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,29 @@ import (
"time"
)

const maxDuration = 1<<63 - 1
const (
defaultMaxSpawns = 1
defaultMaxSpawnAttempts = 10
defaultMaxSpawnBackOff = 2*time.Minute
defaultMaxRespawnBackOff = 2*time.Minute
defaultMaxSpawnBackOff = 2 * time.Minute
defaultMaxRespawnBackOff = 2 * time.Minute
defaultMaxInterruptAttempts = 5
defaultMaxTerminateAttempts = 5
defaultNotifyEventTimeout = time.Millisecond
defaultParserBufferSize = 4096
defaultIdleTimeout = 10 * time.Second
defaultRunTimeout = time.Duration(maxDuration)
defaultTerminationGraceTimeout = time.Second
defaultEventTimeFormat = time.RFC3339Nano
defaultEventTimeFormat = time.RFC3339Nano
)

var EnsureClosedTimeout = time.Second

type Event struct {
Id string
Code string
Message string
Time time.Time
Id string
Code string
Message string
Time time.Time
TimeFormat string
}

Expand Down Expand Up @@ -208,7 +210,7 @@ func (p *Process) unprotectedStart() error {
go readerToChan(p.opts.OutputParser(outPipe, p.opts.ParserBufferSize), p.opts.Out, isOutClosed, p.stopC, heartbeat)
go readerToChan(p.opts.ErrorParser(errPipe, p.opts.ParserBufferSize), p.opts.Err, isErrClosed, p.stopC, nil)

go monitorHeartBeat(p.opts.IdleTimeout, heartbeat, isMonitorClosed, p.stopC, p.Stop, p.notifyEvent)
go MonitorHeartBeat(p.opts.IdleTimeout, p.opts.RunTimeout, heartbeat, isMonitorClosed, p.stopC, p.Stop, p.notifyEvent)

var ensureOnce sync.Once
p.ensureAllClosed = func() {
Expand All @@ -218,7 +220,9 @@ func (p *Process) unprotectedStart() error {
default:
log.Printf("[%s] ensureAllClosed was called before stopC channel was closed.", p.opts.Id)
}
if p.opts.Debug { log.Printf("[%s] Starting to ensure all pipes have closed.", p.opts.Id) }
if p.opts.Debug {
log.Printf("[%s] Starting to ensure all pipes have closed.", p.opts.Id)
}
if cErr := ensureClosed("stdin", isInClosed, inPipe.Close); cErr != nil {
log.Printf("[%s] Possible memory leak, stdin go-routine not closed. Error: %s", p.opts.Id, cErr)
}
Expand Down Expand Up @@ -304,16 +308,17 @@ func readerToChan(producer ProduceFn, out chan<- *interface{}, closeWhenDone, st
}
}

// monitorHeartBeat monitors the heartbeat channel and stops the process if idleTimeout time is passed without a
// positive heartbeat, or if a negative heartbeat is passed.
// MonitorHeartBeat monitors the heartbeat channel and stops the process if idleTimeout time is passed without a
// positive heartbeat, or if a negative heartbeat is passed, or if the run timeout passed.
//
// isMonitorClosed will be closed when this function exists.
//
// When stopC closes, this function will exit immediately.
func monitorHeartBeat(idleTimeout time.Duration, heartbeat, isMonitorClosed, stopC chan bool, stop func() error, notifyEvent func(string, ...interface{})) {
func MonitorHeartBeat(idleTimeout time.Duration, runTimeout time.Duration, heartbeat, isMonitorClosed, stopC chan bool, stop func() error, notifyEvent func(string, ...interface{})) {
t := time.NewTimer(idleTimeout)
r := time.NewTimer(runTimeout)
defer t.Stop()

defer r.Stop()
for alive := true; alive; {
select {
case <-stopC:
Expand All @@ -334,6 +339,9 @@ func monitorHeartBeat(idleTimeout time.Duration, heartbeat, isMonitorClosed, sto
case <-t.C:
alive = false
notifyEvent("MissingHeartbeat", "Stopping process.")
case <-r.C:
alive = false
notifyEvent("RunTimePassed", "Stopping process.")
}
}

Expand Down Expand Up @@ -584,10 +592,10 @@ func (p *Process) EventNotifier() chan Event {
func (p *Process) notifyEvent(code string, message ...interface{}) {
// Create the event before calling Lock.
ev := Event{
Id: p.opts.Id,
Code: code,
Message: fmt.Sprint(message...),
Time: time.Now(),
Id: p.opts.Id,
Code: code,
Message: fmt.Sprint(message...),
Time: time.Now(),
TimeFormat: p.opts.EventTimeFormat,
}

Expand Down
76 changes: 47 additions & 29 deletions supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ func ensureProcessKilled(tb testing.TB, pid int) {

func TestStderrMemoryLeak(t *testing.T) {
p := su.NewProcess(su.ProcessOptions{
Id: funcName(),
Name: "./endless_errors.sh",
Dir: testDir(t),
OutputParser: su.MakeBytesParser,
ErrorParser: su.MakeBytesParser,
MaxSpawns: 1,
Id: funcName(),
Name: "./endless_errors.sh",
Dir: testDir(t),
OutputParser: su.MakeBytesParser,
ErrorParser: su.MakeBytesParser,
MaxSpawns: 1,
MaxSpawnAttempts: 1,
})

Expand Down Expand Up @@ -169,7 +169,7 @@ func TestJsonParser(t *testing.T) {
fatalIfErr(t, p.Start())
defer p.Stop()

time.AfterFunc(time.Millisecond * 30, func() {
time.AfterFunc(time.Millisecond*30, func() {
fatalIfErr(t, p.Stop())
})

Expand Down Expand Up @@ -198,7 +198,7 @@ invalid character '}'
{"c":"d"}`))
tmp := su.MakeJsonLineParser(out, 4096)
p := func() *interface{} {
a,_ := tmp()
a, _ := tmp()
return a
}

Expand Down Expand Up @@ -264,7 +264,7 @@ func TestMakeLineParser(t *testing.T) {
c := make(chan *interface{})
go func() {
x := su.MakeLineParser(out, 0)
for a,_ := x(); a != nil; a,_ = x() {
for a, _ := x(); a != nil; a, _ = x() {
c <- a
}
close(c)
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestProcess_Signal(t *testing.T) {
pid := p.Pid()

c := make(chan bool)
time.AfterFunc(time.Millisecond * 70, func() {
time.AfterFunc(time.Millisecond*70, func() {
fatalIfErr(t, syscall.Kill(-p.Pid(), syscall.SIGINT))
c <- true
})
Expand All @@ -327,15 +327,15 @@ func TestProcess_Signal(t *testing.T) {

func TestProcess_Close(t *testing.T) {
p := su.NewProcess(su.ProcessOptions{
Id: funcName(),
Name: "./trap.sh",
Args: []string{"endless.sh"},
Dir: testDir(t),
OutputParser: su.MakeLineParser,
ErrorParser: makeErrorParser,
EventNotifier: make(chan su.Event, 10),
MaxInterruptAttempts: 1,
MaxTerminateAttempts: 2,
Id: funcName(),
Name: "./trap.sh",
Args: []string{"endless.sh"},
Dir: testDir(t),
OutputParser: su.MakeLineParser,
ErrorParser: makeErrorParser,
EventNotifier: make(chan su.Event, 10),
MaxInterruptAttempts: 1,
MaxTerminateAttempts: 2,
TerminationGraceTimeout: time.Millisecond,
})

Expand Down Expand Up @@ -379,7 +379,7 @@ func TestProcess_Close(t *testing.T) {
default:
}
}
for code,err := range errs {
for code, err := range errs {
t.Errorf(`expected a %s event - "%s"`, code, err)
}
})
Expand Down Expand Up @@ -470,15 +470,15 @@ func TestProcess_Restart(t *testing.T) {

// initialGoroutines := runtime.NumGoroutine()
p := su.NewProcess(su.ProcessOptions{
Id: funcName(),
Name: "./endless.sh",
Dir: testDir(t),
OutputParser: su.MakeLineParser,
ErrorParser: makeErrorParser,
Out: make(chan *interface{}, 5),
IdleTimeout: time.Millisecond * 30,
MaxSpawns: 2,
MaxRespawnBackOff: time.Microsecond * 100,
Id: funcName(),
Name: "./endless.sh",
Dir: testDir(t),
OutputParser: su.MakeLineParser,
ErrorParser: makeErrorParser,
Out: make(chan *interface{}, 5),
IdleTimeout: time.Millisecond * 30,
MaxSpawns: 2,
MaxRespawnBackOff: time.Microsecond * 100,
TerminationGraceTimeout: time.Millisecond,
})

Expand Down Expand Up @@ -717,3 +717,21 @@ func test_timings(t *testing.T) {

log.Println(prodInNum, prodOutNum, incOutNum)
}

func TestMonitorRunTimeout(t *testing.T) {
heartbeat, isMonitorClosed, stopC := make(chan bool), make(chan bool), make(chan bool)
result := make(chan string)
resEvent := make(chan string)

stopF := func() error {
result <- "Stopped"
return nil
}
eventNotify := func(event string, message ...interface{}) {
resEvent <- event
}
go su.MonitorHeartBeat(20*time.Millisecond, 10*time.Millisecond, heartbeat, isMonitorClosed, stopC, stopF, eventNotify)
time.Sleep(20 * time.Millisecond)
assertExpectedEqualsActual(t, <-resEvent, "RunTimePassed")
assertExpectedEqualsActual(t, <-result, "Stopped")
}

0 comments on commit 47fd9c7

Please sign in to comment.