From 9bdcfb7dbe18756c41659365ea6a22ae0f2f6dbb Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 22 Aug 2023 22:46:59 +0200 Subject: [PATCH] feature: stream timeouts Signed-off-by: Valery Piashchynski --- .vscode/settings.json | 18 -------- go.mod | 2 +- go.sum | 4 +- payload/payload.go | 14 +++--- pool/config.go | 14 +++--- pool/static_pool/debug.go | 11 +++-- pool/static_pool/workers_pool.go | 15 ++++--- pool/static_pool/workers_pool_test.go | 4 +- utils/convert.go | 38 ---------------- worker/worker.go | 64 ++++++++++++++++++++++----- 10 files changed, 88 insertions(+), 96 deletions(-) delete mode 100644 .vscode/settings.json delete mode 100644 utils/convert.go diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index f1c4357..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "cSpell.words": [ - "allocs", - "errgroup", - "Failboot", - "goridge", - "mapstructure", - "memleak", - "nolint", - "pexec", - "pldd", - "serv", - "stretchr", - "stylecheck", - "wexec", - "wrks" - ] -} diff --git a/go.mod b/go.mod index a807f11..c81e069 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.3.0 github.com/prometheus/client_golang v1.16.0 github.com/roadrunner-server/errors v1.3.0 - github.com/roadrunner-server/goridge/v3 v3.7.0 + github.com/roadrunner-server/goridge/v3 v3.8.0 github.com/roadrunner-server/tcplisten v1.4.0 github.com/shirou/gopsutil v3.21.11+incompatible github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index f7836f6..2744565 100644 --- a/go.sum +++ b/go.sum @@ -38,8 +38,8 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= github.com/roadrunner-server/errors v1.3.0 h1:kLVXpXne0jMReN7pj8KIhyYyjqKjsPC5DRGqMsd4/Fo= github.com/roadrunner-server/errors v1.3.0/go.mod h1:XYVuhXvxi3yQaP/zCLB6QRZ0JvQIRaBa0SKFHL4WLKg= -github.com/roadrunner-server/goridge/v3 v3.7.0 h1:+Z8pezA4vvZ+/LpF7tjmwOYHa9jKrjbGtBn7RpRAswI= -github.com/roadrunner-server/goridge/v3 v3.7.0/go.mod h1:xgheswRjWvQBHRf3AEkFgLnYOSzYg13ZH0OCuDIcJpg= +github.com/roadrunner-server/goridge/v3 v3.8.0 h1:V4EmDs6KfvV+F9ilh4LhmqZy76JGozdDH/S/1v2G2AA= +github.com/roadrunner-server/goridge/v3 v3.8.0/go.mod h1:L5UkNzD8aKLz6TzpqmmiHOJ6EnsadsWEYNoqK/4qoK0= github.com/roadrunner-server/tcplisten v1.4.0 h1:yWo09zktv/CSV6VywLfw4pwNcUchgTiIrW4uIICtO5M= github.com/roadrunner-server/tcplisten v1.4.0/go.mod h1:A6+VSnW2ETGnN/e/CMdP63ZXqQDaC0UDMU6QmyuB0yM= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= diff --git a/payload/payload.go b/payload/payload.go index 9a35fb0..4953e21 100644 --- a/payload/payload.go +++ b/payload/payload.go @@ -1,7 +1,7 @@ package payload import ( - "github.com/roadrunner-server/sdk/v4/utils" + "unsafe" ) // Payload carries binary header and body to stack and @@ -11,13 +11,17 @@ type Payload struct { Context []byte // body contains binary payload to be processed by WorkerProcess. Body []byte - // Type of codec used to decode/encode payload + // Type of codec used to decode/encode payload. Codec byte - // IsStream indicates that payload is stream - IsStream bool + // Flags + Flags byte } // String returns payload body as string func (p *Payload) String() string { - return utils.AsString(p.Body) + if len(p.Body) == 0 { + return "" + } + + return unsafe.String(unsafe.SliceData(p.Body), len(p.Body)) } diff --git a/pool/config.go b/pool/config.go index af6f3d1..3db7750 100644 --- a/pool/config.go +++ b/pool/config.go @@ -9,33 +9,27 @@ import ( type Config struct { // Debug flag creates new fresh worker before every request. Debug bool - // Command used to override the server command with the custom one Command []string `mapstructure:"command"` - // AfterInitCommand command used to override the server's AfterInitCommand AfterInitCommand []string `mapstructure:"after_init"` - // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. NumWorkers uint64 `mapstructure:"num_workers"` - // MaxJobs defines how many executions is allowed for the worker until // its destruction. set 1 to create new process for each new task, 0 to let // worker handle as many tasks as it can. MaxJobs uint64 `mapstructure:"max_jobs"` - // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. Defaults to 60s. AllocateTimeout time.Duration `mapstructure:"allocate_timeout"` - // DestroyTimeout defines for how long pool should be waiting for worker to // properly destroy, if timeout reached worker will be killed. Defaults to 60s. DestroyTimeout time.Duration `mapstructure:"destroy_timeout"` - // ResetTimeout defines how long pool should wait before start killing workers ResetTimeout time.Duration `mapstructure:"reset_timeout"` - + // Stream read operation timeout + StreamTimeout time.Duration `mapstructure:"stream_timeout"` // Supervision config to limit worker and pool memory usage. Supervisor *SupervisorConfig `mapstructure:"supervisor"` } @@ -50,6 +44,10 @@ func (cfg *Config) InitDefaults() { cfg.AllocateTimeout = time.Minute } + if cfg.StreamTimeout == 0 { + cfg.StreamTimeout = time.Minute + } + if cfg.DestroyTimeout == 0 { cfg.DestroyTimeout = time.Minute } diff --git a/pool/static_pool/debug.go b/pool/static_pool/debug.go index 518612e..ffae0c1 100644 --- a/pool/static_pool/debug.go +++ b/pool/static_pool/debug.go @@ -4,6 +4,7 @@ import ( "context" "runtime" + "github.com/roadrunner-server/goridge/v3/pkg/frame" "github.com/roadrunner-server/sdk/v4/events" "github.com/roadrunner-server/sdk/v4/payload" "go.uber.org/zap" @@ -30,8 +31,8 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s // create channel for the stream (only if there are no errors) resp := make(chan *PExec, 1) - switch rsp.IsStream { - case true: + switch { + case rsp.Flags&frame.STREAM != 0: // in case of stream we should not return worker back immediately go func() { // would be called on Goexit @@ -55,7 +56,7 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s select { // we received stop signal case <-stopCh: - err = w.StreamCancel() + err = w.StreamCancel(ctx) if err != nil { sp.log.Warn("stream cancel error", zap.Error(err)) } @@ -76,14 +77,12 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s }() return resp, nil - case false: + default: resp <- newPExec(rsp, nil) // return worker back sp.ww.Release(w) // close the channel close(resp) return resp, nil - default: - panic("workers_pool unreachable!") } } diff --git a/pool/static_pool/workers_pool.go b/pool/static_pool/workers_pool.go index 0811ffd..498d661 100644 --- a/pool/static_pool/workers_pool.go +++ b/pool/static_pool/workers_pool.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "github.com/roadrunner-server/errors" + "github.com/roadrunner-server/goridge/v3/pkg/frame" "github.com/roadrunner-server/sdk/v4/events" "github.com/roadrunner-server/sdk/v4/fsm" "github.com/roadrunner-server/sdk/v4/payload" @@ -219,8 +220,8 @@ begin: // create channel for the stream (only if there are no errors) resp := make(chan *PExec, 1) - switch rsp.IsStream { - case true: + switch { + case rsp.Flags&frame.STREAM != 0: sp.log.Debug("stream mode", zap.Int64("pid", w.Pid())) // in case of stream we should not return worker back immediately go func() { @@ -238,10 +239,14 @@ begin: select { // we received stop signal case <-stopCh: - err = w.StreamCancel() + ctxT, cancelT := context.WithTimeout(ctx, sp.cfg.StreamTimeout) + err = w.StreamCancel(ctxT) if err != nil { sp.log.Warn("stream cancel error", zap.Error(err)) + w.State().Transition(fsm.StateInvalid) } + + cancelT() runtime.Goexit() default: pld, next, errI := w.StreamIter() @@ -260,7 +265,7 @@ begin: }() return resp, nil - case false: + default: sp.log.Debug("req-resp mode", zap.Int64("pid", w.Pid())) resp <- newPExec(rsp, nil) // return worker back @@ -268,8 +273,6 @@ begin: // close the channel close(resp) return resp, nil - default: - panic("workers_pool unreachable!") } } diff --git a/pool/static_pool/workers_pool_test.go b/pool/static_pool/workers_pool_test.go index 49ee0db..27375d8 100644 --- a/pool/static_pool/workers_pool_test.go +++ b/pool/static_pool/workers_pool_test.go @@ -10,13 +10,13 @@ import ( "sync" "testing" "time" + "unsafe" "github.com/roadrunner-server/errors" "github.com/roadrunner-server/sdk/v4/fsm" "github.com/roadrunner-server/sdk/v4/ipc/pipe" "github.com/roadrunner-server/sdk/v4/payload" "github.com/roadrunner-server/sdk/v4/pool" - "github.com/roadrunner-server/sdk/v4/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -950,7 +950,7 @@ func BenchmarkToStringUnsafe(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - res := utils.AsString(testPayload) + res := unsafe.String(unsafe.SliceData(testPayload), len(testPayload)) _ = res } } diff --git a/utils/convert.go b/utils/convert.go deleted file mode 100644 index d96acfb..0000000 --- a/utils/convert.go +++ /dev/null @@ -1,38 +0,0 @@ -package utils - -import ( - "reflect" - "unsafe" -) - -// AsBytes returns a slice that refers to the data backing the string s. -func AsBytes(s string) []byte { - // get the pointer to the data of the string - p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data) - - var b []byte - hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - hdr.Data = uintptr(p) - // we need to set the cap and len for the string to byte convert - // because string is shorter than []bytes - hdr.Cap = len(s) - hdr.Len = len(s) - - // checker to check mutable access to the data - SetChecker(b) - return b -} - -// AsString returns a string that refers to the data backing the slice s. -func AsString(b []byte) string { - p := unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&b)).Data) - - var s string - hdr := (*reflect.StringHeader)(unsafe.Pointer(&s)) - hdr.Data = uintptr(p) - hdr.Len = len(b) - - // checker to check mutable access to the data - SetChecker(b) - return s -} diff --git a/worker/worker.go b/worker/worker.go index 4f246e9..c5cc854 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -221,12 +221,34 @@ func (w *Process) StreamIter() (*payload.Payload, bool, error) { return nil, false, err } - return pld, pld.IsStream, nil + // PING, we should respond with PONG + if pld.Flags&frame.PING != 0 { + // get a frame + fr := w.getFrame() + + fr.WriteVersion(fr.Header(), frame.Version1) + + fr.SetPongBit(fr.Header()) + fr.WriteCRC(fr.Header()) + + err := w.Relay().Send(fr) + w.State().RegisterExec() + if err != nil { + w.putFrame(fr) + w.State().Transition(fsm.StateErrored) + return nil, false, errors.E(errors.Network, err) + } + + w.putFrame(fr) + } + + return pld, pld.Flags&frame.STREAM == 0, nil } // StreamCancel sends stop bit to the worker -func (w *Process) StreamCancel() error { +func (w *Process) StreamCancel(ctx context.Context) error { const op = errors.Op("sync_worker_send_frame") + // get a frame fr := w.getFrame() @@ -244,7 +266,27 @@ func (w *Process) StreamCancel() error { } w.putFrame(fr) - return nil + + for { + select { + case <-ctx.Done(): + return errors.E(op, errors.TimeOut, ctx.Err()) + default: + rsp, err := w.receiveFrame() + if err != nil { + return err + } + + // stream has ended + if rsp.Flags&frame.STREAM == 0 { + w.State().Transition(fsm.StateReady) + return nil + } + + // trash + rsp = nil + } + } } type wexec struct { @@ -470,9 +512,9 @@ func (w *Process) receiveFrame() (*payload.Payload, error) { return nil, errors.E(op, errors.Network, errors.Str("nil frame received")) } - flags := frameR.ReadFlags() + codec := frameR.ReadFlags() - if flags&frame.ERROR != byte(0) { + if codec&frame.ERROR != byte(0) { // we need to copy the payload because we will put the frame back to the pool cp := make([]byte, len(frameR.Payload())) copy(cp, frameR.Payload()) @@ -497,12 +539,14 @@ func (w *Process) receiveFrame() (*payload.Payload, error) { return nil, errors.E(errors.Network, errors.Errorf("bad payload %s", cp)) } - isStream := frameR.IsStream(frameR.Header()) + // stream + stop -> waste + // stream + ping -> response + flags := frameR.Header()[10] pld := &payload.Payload{ - IsStream: isStream, - Codec: flags, - Body: make([]byte, len(frameR.Payload()[options[0]:])), - Context: make([]byte, len(frameR.Payload()[:options[0]])), + Flags: flags, + Codec: codec, + Body: make([]byte, len(frameR.Payload()[options[0]:])), + Context: make([]byte, len(frameR.Payload()[:options[0]])), } // by copying we free frame's payload slice