Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(supervisor, ttl): worker gets into the inconsistent state after TTL was reached #749

Merged
merged 3 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,37 @@
CHANGELOG
=========

v2.3.2 (14.07.2021)
-------------------

## 🩹 Fixes:

- 🐛 Fix: Bug with ttl incorrectly handled by the worker [PR](https://github.com/spiral/roadrunner/pull/749)
- 🐛 Fix: Add `RR_BROADCAST_PATH` to the `websockets` plugin [PR](https://github.com/spiral/roadrunner/pull/749)

## 📈 Summary:

- RR Milestone [2.3.2](https://github.com/spiral/roadrunner/milestone/31?closed=1)

---

v2.3.1 (30.06.2021)
-------------------

## 👀 New:

- ✏️ Rework `broadcast` plugin. Add architecture diagrams to the `doc` folder. [PR](https://github.com/spiral/roadrunner/pull/732)
- ✏️ Rework `broadcast` plugin. Add architecture diagrams to the `doc`
folder. [PR](https://github.com/spiral/roadrunner/pull/732)
- ✏️ Add `Clear` method to the KV plugin RPC. [PR](https://github.com/spiral/roadrunner/pull/736)

## 🩹 Fixes:

- 🐛 Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit reached [PR](https://github.com/spiral/roadrunner/pull/738)
- 🐛 Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next request [PR](https://github.com/spiral/roadrunner/pull/738)
- 🐛 Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717), [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719)
- 🐛 Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit
reached [PR](https://github.com/spiral/roadrunner/pull/738)
- 🐛 Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next
request [PR](https://github.com/spiral/roadrunner/pull/738)
- 🐛 Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717)
, [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719)
- 🐛 Fix: Bug with incorrect redis initialization and usage [Bug](https://github.com/spiral/roadrunner/issues/720)
- 🐛 Fix: Bug, Goridge duplicate error messages [Bug](https://github.com/spiral/goridge/issues/128)
- 🐛 Fix: Bug, incorrect request `origin` check [Bug](https://github.com/spiral/roadrunner/issues/727)
Expand All @@ -38,20 +57,29 @@ v2.3.0 (08.06.2021)
- ✏️ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of
thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus
on 2CPU cores and 1GB of RAM) [Issue](https://github.com/spiral/roadrunner/issues/513)
- ✏️ Protobuf binary messages for the `websockets` and `kv` RPC calls under the hood. [Issue](https://github.com/spiral/roadrunner/issues/711)
- ✏️ Json-schemas for the config file v1.0 (it also registered in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614))
- ✏️ Protobuf binary messages for the `websockets` and `kv` RPC calls under the
hood. [Issue](https://github.com/spiral/roadrunner/issues/711)
- ✏️ Json-schemas for the config file v1.0 (it also registered
in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614))
- ✏️ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead)
- ✏️ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error code. [Issue](https://github.com/spiral/roadrunner/issues/659)
- ✏️ Expose HTTP plugin metrics (workers memory, requests count, requests duration). [Issue](https://github.com/spiral/roadrunner/issues/489)
- ✏️ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh` scripts. [Issue](https://github.com/spiral/roadrunner/issues/658)
- ✏️ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation), [Issue](https://github.com/spiral/roadrunner/issues/545)
- ✏️ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error
code. [Issue](https://github.com/spiral/roadrunner/issues/659)
- ✏️ Expose HTTP plugin metrics (workers memory, requests count, requests duration)
. [Issue](https://github.com/spiral/roadrunner/issues/489)
- ✏️ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh`
scripts. [Issue](https://github.com/spiral/roadrunner/issues/658)
- ✏️ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation)
, [Issue](https://github.com/spiral/roadrunner/issues/545)

## 🩹 Fixes:

- 🐛 Fix: Bug with `informer.Workers` worked incorrectly: [Bug](https://github.com/spiral/roadrunner/issues/686)
- 🐛 Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in logs: [Bug](https://github.com/spiral/roadrunner/issues/659)
- 🐛 Fix: Error message will be properly shown in the log in case of `SoftJob` error: [Bug](https://github.com/spiral/roadrunner/issues/691)
- 🐛 Fix: Wrong applied middlewares for the `fcgi` server leads to the NPE: [Bug](https://github.com/spiral/roadrunner/issues/701)
- 🐛 Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in
logs: [Bug](https://github.com/spiral/roadrunner/issues/659)
- 🐛 Fix: Error message will be properly shown in the log in case of `SoftJob`
error: [Bug](https://github.com/spiral/roadrunner/issues/691)
- 🐛 Fix: Wrong applied middlewares for the `fcgi` server leads to the
NPE: [Bug](https://github.com/spiral/roadrunner/issues/701)

## 📦 Packages:

Expand Down
24 changes: 24 additions & 0 deletions pkg/pool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,27 @@ type Pool interface {
// ExecWithContext executes task with context which is used with timeout
execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
}

// Watcher is an interface for the Sync workers lifecycle
type Watcher interface {
// Watch used to add workers to the container
Watch(workers []worker.BaseProcess) error

// Get provide first free worker
Get(ctx context.Context) (worker.BaseProcess, error)

// Push enqueues worker back
Push(w worker.BaseProcess)

// Allocate - allocates new worker and put it into the WorkerWatcher
Allocate() error

// Destroy destroys the underlying container
Destroy(ctx context.Context)

// List return all container w/o removing it from internal storage
List() []worker.BaseProcess

// Remove will remove worker from the container
Remove(wb worker.BaseProcess)
}
2 changes: 1 addition & 1 deletion pkg/pool/static_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type StaticPool struct {
listeners []events.Listener

// manages worker states and TTLs
ww workerWatcher.Watcher
ww Watcher

// allocate new worker
allocator worker.Allocator
Expand Down
39 changes: 33 additions & 6 deletions pkg/pool/supervisor_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func (sp *supervised) control() { //nolint:gocognit
worker.StateDestroyed,
worker.StateInactive,
worker.StateStopped,
worker.StateStopping:
worker.StateStopping,
worker.StateKilling:
continue
}

Expand All @@ -132,23 +133,40 @@ func (sp *supervised) control() { //nolint:gocognit
}

if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
// SOFT termination. DO NOT STOP active workers
/*
worker at this point might be in the middle of request execution:

---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
^
TTL Reached, state - invalid |
-----> Worker Stopped here
*/

if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
continue
}

if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
// SOFT termination. DO NOT STOP active workers
/*
worker at this point might be in the middle of request execution:

---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
^
TTL Reached, state - invalid |
-----> Worker Stopped here
*/

if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}

// mark it as invalid, worker likely in the StateWorking, so, it will be killed after work will be done
// just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
continue
Expand Down Expand Up @@ -190,11 +208,20 @@ func (sp *supervised) control() { //nolint:gocognit
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
/*
worker at this point might be in the middle of request execution:

---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
^
TTL Reached, state - invalid |
-----> Worker Stopped here
*/

if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}

// just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/pool/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var cfgSupervised = Config{
Expand Down Expand Up @@ -122,6 +124,58 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
assert.NotEqual(t, pid, p.Workers()[0].Pid())
}

func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
var cfgExecTTL = Config{
NumWorkers: uint64(1),
Supervisor: &SupervisorConfig{
WatchTick: 1 * time.Second,
TTL: 5 * time.Second,
},
}
ctx := context.Background()
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") },
pipe.NewPipeFactory(),
cfgExecTTL,
)

assert.NoError(t, err)
assert.NotNil(t, p)

pid := p.Workers()[0].Pid()

resp, err := p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})

assert.NoError(t, err)
assert.Equal(t, string(resp.Body), "hello world")
assert.Empty(t, resp.Context)

time.Sleep(time.Second)
assert.NotEqual(t, pid, p.Workers()[0].Pid())
require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
pid = p.Workers()[0].Pid()

resp, err = p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})

assert.NoError(t, err)
assert.Equal(t, string(resp.Body), "hello world")
assert.Empty(t, resp.Context)

time.Sleep(time.Second)
// should be new worker with new pid
assert.NotEqual(t, pid, p.Workers()[0].Pid())
require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)

p.Destroy(context.Background())
}

func TestSupervisedPool_Idle(t *testing.T) {
var cfgExecTTL = Config{
NumWorkers: uint64(1),
Expand Down
7 changes: 7 additions & 0 deletions pkg/worker/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, err)
}

// supervisor may set state of the worker during the work
// in this case we should not re-write the worker state
if tw.process.State().Value() != StateWorking {
tw.process.State().RegisterExec()
return rsp, nil
}

tw.process.State().Set(StateReady)
tw.process.State().RegisterExec()

Expand Down
2 changes: 1 addition & 1 deletion pkg/worker_watcher/container/vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Vec struct {
workers chan worker.BaseProcess
}

func NewVector(initialNumOfWorkers uint64) Vector {
func NewVector(initialNumOfWorkers uint64) *Vec {
vec := &Vec{
destroy: 0,
workers: make(chan worker.BaseProcess, initialNumOfWorkers),
Expand Down
31 changes: 0 additions & 31 deletions pkg/worker_watcher/interface.go

This file was deleted.

21 changes: 16 additions & 5 deletions pkg/worker_watcher/worker_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,18 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)

// Vector interface represents vector container
type Vector interface {
// Enqueue used to put worker to the vector
Enqueue(worker.BaseProcess)
// Dequeue used to get worker from the vector
Dequeue(ctx context.Context) (worker.BaseProcess, error)
// Destroy used to stop releasing the workers
Destroy()
}

// NewSyncWorkerWatcher is a constructor for the Watcher
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
ww := &workerWatcher{
container: container.NewVector(numWorkers),
numWorkers: numWorkers,
Expand All @@ -26,7 +36,7 @@ func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events

type workerWatcher struct {
sync.RWMutex
container container.Vector
container Vector
// used to control the Destroy stage (that all workers are in the container)
numWorkers uint64
workers []worker.BaseProcess
Expand Down Expand Up @@ -150,11 +160,12 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {

// Push O(1) operation
func (ww *workerWatcher) Push(w worker.BaseProcess) {
if w.State().Value() != worker.StateReady {
switch w.State().Value() {
case worker.StateReady:
ww.container.Enqueue(w)
default:
_ = w.Kill()
return
}
ww.container.Enqueue(w)
}

// Destroy all underlying container (but let them to complete the task)
Expand Down
Loading