Skip to content

Commit

Permalink
#749 bug(supervisor, ttl): worker gets into the inconsistent state af…
Browse files Browse the repository at this point in the history
…ter TTL was reached

#749 bug(supervisor, ttl): worker gets into the inconsistent state after TTL was reached
  • Loading branch information
rustatian authored Jul 14, 2021
2 parents 9d018f2 + cb28ad0 commit cea3f6a
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 60 deletions.
54 changes: 41 additions & 13 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,37 @@
CHANGELOG
=========

v2.3.2 (14.07.2021)
-------------------

## 🩹 Fixes:

- 🐛 Fix: Bug with ttl incorrectly handled by the worker [PR](https://github.com/spiral/roadrunner/pull/749)
- 🐛 Fix: Add `RR_BROADCAST_PATH` to the `websockets` plugin [PR](https://github.com/spiral/roadrunner/pull/749)

## 📈 Summary:

- RR Milestone [2.3.2](https://github.com/spiral/roadrunner/milestone/31?closed=1)

---

v2.3.1 (30.06.2021)
-------------------

## 👀 New:

- ✏️ Rework `broadcast` plugin. Add architecture diagrams to the `doc` folder. [PR](https://github.com/spiral/roadrunner/pull/732)
- ✏️ Rework `broadcast` plugin. Add architecture diagrams to the `doc`
folder. [PR](https://github.com/spiral/roadrunner/pull/732)
- ✏️ Add `Clear` method to the KV plugin RPC. [PR](https://github.com/spiral/roadrunner/pull/736)

## 🩹 Fixes:

- 🐛 Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit reached [PR](https://github.com/spiral/roadrunner/pull/738)
- 🐛 Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next request [PR](https://github.com/spiral/roadrunner/pull/738)
- 🐛 Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717), [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719)
- 🐛 Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit
reached [PR](https://github.com/spiral/roadrunner/pull/738)
- 🐛 Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next
request [PR](https://github.com/spiral/roadrunner/pull/738)
- 🐛 Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717)
, [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719)
- 🐛 Fix: Bug with incorrect redis initialization and usage [Bug](https://github.com/spiral/roadrunner/issues/720)
- 🐛 Fix: Bug, Goridge duplicate error messages [Bug](https://github.com/spiral/goridge/issues/128)
- 🐛 Fix: Bug, incorrect request `origin` check [Bug](https://github.com/spiral/roadrunner/issues/727)
Expand All @@ -38,20 +57,29 @@ v2.3.0 (08.06.2021)
- ✏️ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of
thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus
on 2CPU cores and 1GB of RAM) [Issue](https://github.com/spiral/roadrunner/issues/513)
- ✏️ Protobuf binary messages for the `websockets` and `kv` RPC calls under the hood. [Issue](https://github.com/spiral/roadrunner/issues/711)
- ✏️ Json-schemas for the config file v1.0 (it also registered in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614))
- ✏️ Protobuf binary messages for the `websockets` and `kv` RPC calls under the
hood. [Issue](https://github.com/spiral/roadrunner/issues/711)
- ✏️ Json-schemas for the config file v1.0 (it also registered
in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614))
- ✏️ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead)
- ✏️ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error code. [Issue](https://github.com/spiral/roadrunner/issues/659)
- ✏️ Expose HTTP plugin metrics (workers memory, requests count, requests duration). [Issue](https://github.com/spiral/roadrunner/issues/489)
- ✏️ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh` scripts. [Issue](https://github.com/spiral/roadrunner/issues/658)
- ✏️ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation), [Issue](https://github.com/spiral/roadrunner/issues/545)
- ✏️ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error
code. [Issue](https://github.com/spiral/roadrunner/issues/659)
- ✏️ Expose HTTP plugin metrics (workers memory, requests count, requests duration)
. [Issue](https://github.com/spiral/roadrunner/issues/489)
- ✏️ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh`
scripts. [Issue](https://github.com/spiral/roadrunner/issues/658)
- ✏️ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation)
, [Issue](https://github.com/spiral/roadrunner/issues/545)

## 🩹 Fixes:

- 🐛 Fix: Bug with `informer.Workers` worked incorrectly: [Bug](https://github.com/spiral/roadrunner/issues/686)
- 🐛 Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in logs: [Bug](https://github.com/spiral/roadrunner/issues/659)
- 🐛 Fix: Error message will be properly shown in the log in case of `SoftJob` error: [Bug](https://github.com/spiral/roadrunner/issues/691)
- 🐛 Fix: Wrong applied middlewares for the `fcgi` server leads to the NPE: [Bug](https://github.com/spiral/roadrunner/issues/701)
- 🐛 Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in
logs: [Bug](https://github.com/spiral/roadrunner/issues/659)
- 🐛 Fix: Error message will be properly shown in the log in case of `SoftJob`
error: [Bug](https://github.com/spiral/roadrunner/issues/691)
- 🐛 Fix: Wrong applied middlewares for the `fcgi` server leads to the
NPE: [Bug](https://github.com/spiral/roadrunner/issues/701)

## 📦 Packages:

Expand Down
24 changes: 24 additions & 0 deletions pkg/pool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,27 @@ type Pool interface {
// ExecWithContext executes task with context which is used with timeout
execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
}

// Watcher is an interface for the Sync workers lifecycle
type Watcher interface {
// Watch used to add workers to the container
Watch(workers []worker.BaseProcess) error

// Get provide first free worker
Get(ctx context.Context) (worker.BaseProcess, error)

// Push enqueues worker back
Push(w worker.BaseProcess)

// Allocate - allocates new worker and put it into the WorkerWatcher
Allocate() error

// Destroy destroys the underlying container
Destroy(ctx context.Context)

// List return all container w/o removing it from internal storage
List() []worker.BaseProcess

// Remove will remove worker from the container
Remove(wb worker.BaseProcess)
}
2 changes: 1 addition & 1 deletion pkg/pool/static_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type StaticPool struct {
listeners []events.Listener

// manages worker states and TTLs
ww workerWatcher.Watcher
ww Watcher

// allocate new worker
allocator worker.Allocator
Expand Down
39 changes: 33 additions & 6 deletions pkg/pool/supervisor_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func (sp *supervised) control() { //nolint:gocognit
worker.StateDestroyed,
worker.StateInactive,
worker.StateStopped,
worker.StateStopping:
worker.StateStopping,
worker.StateKilling:
continue
}

Expand All @@ -132,23 +133,40 @@ func (sp *supervised) control() { //nolint:gocognit
}

if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
// SOFT termination. DO NOT STOP active workers
/*
worker at this point might be in the middle of request execution:
---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
^
TTL Reached, state - invalid |
-----> Worker Stopped here
*/

if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
continue
}

if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
// SOFT termination. DO NOT STOP active workers
/*
worker at this point might be in the middle of request execution:
---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
^
TTL Reached, state - invalid |
-----> Worker Stopped here
*/

if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}

// mark it as invalid, worker likely in the StateWorking, so, it will be killed after work will be done
// just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
continue
Expand Down Expand Up @@ -190,11 +208,20 @@ func (sp *supervised) control() { //nolint:gocognit
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
/*
worker at this point might be in the middle of request execution:
---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
^
TTL Reached, state - invalid |
-----> Worker Stopped here
*/

if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}

// just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/pool/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var cfgSupervised = Config{
Expand Down Expand Up @@ -122,6 +124,58 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
assert.NotEqual(t, pid, p.Workers()[0].Pid())
}

func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
var cfgExecTTL = Config{
NumWorkers: uint64(1),
Supervisor: &SupervisorConfig{
WatchTick: 1 * time.Second,
TTL: 5 * time.Second,
},
}
ctx := context.Background()
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") },
pipe.NewPipeFactory(),
cfgExecTTL,
)

assert.NoError(t, err)
assert.NotNil(t, p)

pid := p.Workers()[0].Pid()

resp, err := p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})

assert.NoError(t, err)
assert.Equal(t, string(resp.Body), "hello world")
assert.Empty(t, resp.Context)

time.Sleep(time.Second)
assert.NotEqual(t, pid, p.Workers()[0].Pid())
require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
pid = p.Workers()[0].Pid()

resp, err = p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})

assert.NoError(t, err)
assert.Equal(t, string(resp.Body), "hello world")
assert.Empty(t, resp.Context)

time.Sleep(time.Second)
// should be new worker with new pid
assert.NotEqual(t, pid, p.Workers()[0].Pid())
require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)

p.Destroy(context.Background())
}

func TestSupervisedPool_Idle(t *testing.T) {
var cfgExecTTL = Config{
NumWorkers: uint64(1),
Expand Down
7 changes: 7 additions & 0 deletions pkg/worker/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, err)
}

// supervisor may set state of the worker during the work
// in this case we should not re-write the worker state
if tw.process.State().Value() != StateWorking {
tw.process.State().RegisterExec()
return rsp, nil
}

tw.process.State().Set(StateReady)
tw.process.State().RegisterExec()

Expand Down
2 changes: 1 addition & 1 deletion pkg/worker_watcher/container/vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Vec struct {
workers chan worker.BaseProcess
}

func NewVector(initialNumOfWorkers uint64) Vector {
func NewVector(initialNumOfWorkers uint64) *Vec {
vec := &Vec{
destroy: 0,
workers: make(chan worker.BaseProcess, initialNumOfWorkers),
Expand Down
31 changes: 0 additions & 31 deletions pkg/worker_watcher/interface.go

This file was deleted.

21 changes: 16 additions & 5 deletions pkg/worker_watcher/worker_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,18 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)

// Vector interface represents vector container
type Vector interface {
// Enqueue used to put worker to the vector
Enqueue(worker.BaseProcess)
// Dequeue used to get worker from the vector
Dequeue(ctx context.Context) (worker.BaseProcess, error)
// Destroy used to stop releasing the workers
Destroy()
}

// NewSyncWorkerWatcher is a constructor for the Watcher
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
ww := &workerWatcher{
container: container.NewVector(numWorkers),
numWorkers: numWorkers,
Expand All @@ -26,7 +36,7 @@ func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events

type workerWatcher struct {
sync.RWMutex
container container.Vector
container Vector
// used to control the Destroy stage (that all workers are in the container)
numWorkers uint64
workers []worker.BaseProcess
Expand Down Expand Up @@ -150,11 +160,12 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {

// Push O(1) operation
func (ww *workerWatcher) Push(w worker.BaseProcess) {
if w.State().Value() != worker.StateReady {
switch w.State().Value() {
case worker.StateReady:
ww.container.Enqueue(w)
default:
_ = w.Kill()
return
}
ww.container.Enqueue(w)
}

// Destroy all underlying container (but let them to complete the task)
Expand Down
Loading

0 comments on commit cea3f6a

Please sign in to comment.