Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

Commit

Permalink
feature: stream timeouts
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
  • Loading branch information
rustatian committed Aug 22, 2023
1 parent d4175c2 commit 9bdcfb7
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 96 deletions.
18 changes: 0 additions & 18 deletions .vscode/settings.json

This file was deleted.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/google/uuid v1.3.0
github.com/prometheus/client_golang v1.16.0
github.com/roadrunner-server/errors v1.3.0
github.com/roadrunner-server/goridge/v3 v3.7.0
github.com/roadrunner-server/goridge/v3 v3.8.0
github.com/roadrunner-server/tcplisten v1.4.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/stretchr/testify v1.8.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/roadrunner-server/errors v1.3.0 h1:kLVXpXne0jMReN7pj8KIhyYyjqKjsPC5DRGqMsd4/Fo=
github.com/roadrunner-server/errors v1.3.0/go.mod h1:XYVuhXvxi3yQaP/zCLB6QRZ0JvQIRaBa0SKFHL4WLKg=
github.com/roadrunner-server/goridge/v3 v3.7.0 h1:+Z8pezA4vvZ+/LpF7tjmwOYHa9jKrjbGtBn7RpRAswI=
github.com/roadrunner-server/goridge/v3 v3.7.0/go.mod h1:xgheswRjWvQBHRf3AEkFgLnYOSzYg13ZH0OCuDIcJpg=
github.com/roadrunner-server/goridge/v3 v3.8.0 h1:V4EmDs6KfvV+F9ilh4LhmqZy76JGozdDH/S/1v2G2AA=
github.com/roadrunner-server/goridge/v3 v3.8.0/go.mod h1:L5UkNzD8aKLz6TzpqmmiHOJ6EnsadsWEYNoqK/4qoK0=
github.com/roadrunner-server/tcplisten v1.4.0 h1:yWo09zktv/CSV6VywLfw4pwNcUchgTiIrW4uIICtO5M=
github.com/roadrunner-server/tcplisten v1.4.0/go.mod h1:A6+VSnW2ETGnN/e/CMdP63ZXqQDaC0UDMU6QmyuB0yM=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
Expand Down
14 changes: 9 additions & 5 deletions payload/payload.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package payload

import (
"github.com/roadrunner-server/sdk/v4/utils"
"unsafe"
)

// Payload carries binary header and body to stack and
Expand All @@ -11,13 +11,17 @@ type Payload struct {
Context []byte
// body contains binary payload to be processed by WorkerProcess.
Body []byte
// Type of codec used to decode/encode payload
// Type of codec used to decode/encode payload.
Codec byte
// IsStream indicates that payload is stream
IsStream bool
// Flags
Flags byte
}

// String returns payload body as string
func (p *Payload) String() string {
return utils.AsString(p.Body)
if len(p.Body) == 0 {
return ""
}

Check warning on line 24 in payload/payload.go

View check run for this annotation

Codecov / codecov/patch

payload/payload.go#L23-L24

Added lines #L23 - L24 were not covered by tests

return unsafe.String(unsafe.SliceData(p.Body), len(p.Body))
}
14 changes: 6 additions & 8 deletions pool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,27 @@ import (
type Config struct {
// Debug flag creates new fresh worker before every request.
Debug bool

// Command used to override the server command with the custom one
Command []string `mapstructure:"command"`

// AfterInitCommand command used to override the server's AfterInitCommand
AfterInitCommand []string `mapstructure:"after_init"`

// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
NumWorkers uint64 `mapstructure:"num_workers"`

// MaxJobs defines how many executions is allowed for the worker until
// its destruction. set 1 to create new process for each new task, 0 to let
// worker handle as many tasks as it can.
MaxJobs uint64 `mapstructure:"max_jobs"`

// AllocateTimeout defines for how long pool will be waiting for a worker to
// be freed to handle the task. Defaults to 60s.
AllocateTimeout time.Duration `mapstructure:"allocate_timeout"`

// DestroyTimeout defines for how long pool should be waiting for worker to
// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
DestroyTimeout time.Duration `mapstructure:"destroy_timeout"`

// ResetTimeout defines how long pool should wait before start killing workers
ResetTimeout time.Duration `mapstructure:"reset_timeout"`

// Stream read operation timeout
StreamTimeout time.Duration `mapstructure:"stream_timeout"`
// Supervision config to limit worker and pool memory usage.
Supervisor *SupervisorConfig `mapstructure:"supervisor"`
}
Expand All @@ -50,6 +44,10 @@ func (cfg *Config) InitDefaults() {
cfg.AllocateTimeout = time.Minute
}

if cfg.StreamTimeout == 0 {
cfg.StreamTimeout = time.Minute
}

if cfg.DestroyTimeout == 0 {
cfg.DestroyTimeout = time.Minute
}
Expand Down
11 changes: 5 additions & 6 deletions pool/static_pool/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"runtime"

"github.com/roadrunner-server/goridge/v3/pkg/frame"
"github.com/roadrunner-server/sdk/v4/events"
"github.com/roadrunner-server/sdk/v4/payload"
"go.uber.org/zap"
Expand All @@ -30,8 +31,8 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s
// create channel for the stream (only if there are no errors)
resp := make(chan *PExec, 1)

switch rsp.IsStream {
case true:
switch {
case rsp.Flags&frame.STREAM != 0:

Check warning on line 35 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L35

Added line #L35 was not covered by tests
// in case of stream we should not return worker back immediately
go func() {
// would be called on Goexit
Expand All @@ -55,7 +56,7 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s
select {
// we received stop signal
case <-stopCh:
err = w.StreamCancel()
err = w.StreamCancel(ctx)

Check warning on line 59 in pool/static_pool/debug.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/debug.go#L59

Added line #L59 was not covered by tests
if err != nil {
sp.log.Warn("stream cancel error", zap.Error(err))
}
Expand All @@ -76,14 +77,12 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s
}()

return resp, nil
case false:
default:
resp <- newPExec(rsp, nil)
// return worker back
sp.ww.Release(w)
// close the channel
close(resp)
return resp, nil
default:
panic("workers_pool unreachable!")
}
}
15 changes: 9 additions & 6 deletions pool/static_pool/workers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync/atomic"

"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/goridge/v3/pkg/frame"
"github.com/roadrunner-server/sdk/v4/events"
"github.com/roadrunner-server/sdk/v4/fsm"
"github.com/roadrunner-server/sdk/v4/payload"
Expand Down Expand Up @@ -219,8 +220,8 @@ begin:
// create channel for the stream (only if there are no errors)
resp := make(chan *PExec, 1)

switch rsp.IsStream {
case true:
switch {
case rsp.Flags&frame.STREAM != 0:

Check warning on line 224 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L224

Added line #L224 was not covered by tests
sp.log.Debug("stream mode", zap.Int64("pid", w.Pid()))
// in case of stream we should not return worker back immediately
go func() {
Expand All @@ -238,10 +239,14 @@ begin:
select {
// we received stop signal
case <-stopCh:
err = w.StreamCancel()
ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout)
err = w.StreamCancel(ctxT)

Check warning on line 243 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L242-L243

Added lines #L242 - L243 were not covered by tests
if err != nil {
sp.log.Warn("stream cancel error", zap.Error(err))
w.State().Transition(fsm.StateInvalid)

Check warning on line 246 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L246

Added line #L246 was not covered by tests
}

cancelT()

Check warning on line 249 in pool/static_pool/workers_pool.go

View check run for this annotation

Codecov / codecov/patch

pool/static_pool/workers_pool.go#L249

Added line #L249 was not covered by tests
runtime.Goexit()
default:
pld, next, errI := w.StreamIter()
Expand All @@ -260,16 +265,14 @@ begin:
}()

return resp, nil
case false:
default:
sp.log.Debug("req-resp mode", zap.Int64("pid", w.Pid()))
resp <- newPExec(rsp, nil)
// return worker back
sp.ww.Release(w)
// close the channel
close(resp)
return resp, nil
default:
panic("workers_pool unreachable!")
}
}

Expand Down
4 changes: 2 additions & 2 deletions pool/static_pool/workers_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"sync"
"testing"
"time"
"unsafe"

"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v4/fsm"
"github.com/roadrunner-server/sdk/v4/ipc/pipe"
"github.com/roadrunner-server/sdk/v4/payload"
"github.com/roadrunner-server/sdk/v4/pool"
"github.com/roadrunner-server/sdk/v4/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -950,7 +950,7 @@ func BenchmarkToStringUnsafe(b *testing.B) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
res := utils.AsString(testPayload)
res := unsafe.String(unsafe.SliceData(testPayload), len(testPayload))
_ = res
}
}
Expand Down
38 changes: 0 additions & 38 deletions utils/convert.go

This file was deleted.

64 changes: 54 additions & 10 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,34 @@ func (w *Process) StreamIter() (*payload.Payload, bool, error) {
return nil, false, err
}

return pld, pld.IsStream, nil
// PING, we should respond with PONG
if pld.Flags&frame.PING != 0 {
// get a frame
fr := w.getFrame()

fr.WriteVersion(fr.Header(), frame.Version1)

fr.SetPongBit(fr.Header())
fr.WriteCRC(fr.Header())

err := w.Relay().Send(fr)
w.State().RegisterExec()
if err != nil {
w.putFrame(fr)
w.State().Transition(fsm.StateErrored)
return nil, false, errors.E(errors.Network, err)
}

Check warning on line 240 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L225-L240

Added lines #L225 - L240 were not covered by tests

w.putFrame(fr)

Check warning on line 242 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L242

Added line #L242 was not covered by tests
}

return pld, pld.Flags&frame.STREAM == 0, nil

Check warning on line 245 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L245

Added line #L245 was not covered by tests
}

// StreamCancel sends stop bit to the worker
func (w *Process) StreamCancel() error {
func (w *Process) StreamCancel(ctx context.Context) error {

Check warning on line 249 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L249

Added line #L249 was not covered by tests
const op = errors.Op("sync_worker_send_frame")

Check warning on line 251 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L251

Added line #L251 was not covered by tests
// get a frame
fr := w.getFrame()

Expand All @@ -244,7 +266,27 @@ func (w *Process) StreamCancel() error {
}

w.putFrame(fr)
return nil

for {
select {
case <-ctx.Done():
return errors.E(op, errors.TimeOut, ctx.Err())
default:
rsp, err := w.receiveFrame()
if err != nil {
return err
}

Check warning on line 278 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L269-L278

Added lines #L269 - L278 were not covered by tests

// stream has ended
if rsp.Flags&frame.STREAM == 0 {
w.State().Transition(fsm.StateReady)
return nil
}

Check warning on line 284 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L281-L284

Added lines #L281 - L284 were not covered by tests

// trash
rsp = nil

Check warning on line 287 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L287

Added line #L287 was not covered by tests
}
}
}

type wexec struct {
Expand Down Expand Up @@ -470,9 +512,9 @@ func (w *Process) receiveFrame() (*payload.Payload, error) {
return nil, errors.E(op, errors.Network, errors.Str("nil frame received"))
}

flags := frameR.ReadFlags()
codec := frameR.ReadFlags()

if flags&frame.ERROR != byte(0) {
if codec&frame.ERROR != byte(0) {
// we need to copy the payload because we will put the frame back to the pool
cp := make([]byte, len(frameR.Payload()))
copy(cp, frameR.Payload())
Expand All @@ -497,12 +539,14 @@ func (w *Process) receiveFrame() (*payload.Payload, error) {
return nil, errors.E(errors.Network, errors.Errorf("bad payload %s", cp))
}

isStream := frameR.IsStream(frameR.Header())
// stream + stop -> waste
// stream + ping -> response
flags := frameR.Header()[10]
pld := &payload.Payload{
IsStream: isStream,
Codec: flags,
Body: make([]byte, len(frameR.Payload()[options[0]:])),
Context: make([]byte, len(frameR.Payload()[:options[0]])),
Flags: flags,
Codec: codec,
Body: make([]byte, len(frameR.Payload()[options[0]:])),
Context: make([]byte, len(frameR.Payload()[:options[0]])),
}

// by copying we free frame's payload slice
Expand Down

0 comments on commit 9bdcfb7

Please sign in to comment.