Skip to content

Commit

Permalink
fix(metrics): handle the case where the worker is already assigned to…
Browse files Browse the repository at this point in the history
… a thread (#1171)
  • Loading branch information
withinboredom authored Nov 21, 2024
1 parent 2d6a299 commit 08e99fc
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 29 deletions.
8 changes: 4 additions & 4 deletions caddy/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestWorkerWithInactiveWatcher(t *testing.T) {
{
skip_install_trust
admin localhost:2999
http_port 9080
http_port `+testPort+`
frankenphp {
worker {
Expand All @@ -26,13 +26,13 @@ func TestWorkerWithInactiveWatcher(t *testing.T) {
}
}
localhost:9080 {
localhost:`+testPort+` {
root ../testdata
rewrite worker-with-watcher.php
php
}
`, "caddyfile")

tester.AssertGetResponse("http://localhost:9080", http.StatusOK, "requests:1")
tester.AssertGetResponse("http://localhost:9080", http.StatusOK, "requests:2")
tester.AssertGetResponse("http://localhost:"+testPort, http.StatusOK, "requests:1")
tester.AssertGetResponse("http://localhost:"+testPort, http.StatusOK, "requests:2")
}
6 changes: 4 additions & 2 deletions php_thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import "C"
import (
"net/http"
"runtime"
"sync"
"unsafe"
)

Expand All @@ -19,6 +20,7 @@ type phpThread struct {
worker *worker
requestChan chan *http.Request
knownVariableKeys map[string]*C.zend_string
readiedOnce sync.Once
}

func initPHPThreads(numThreads int) {
Expand All @@ -28,7 +30,7 @@ func initPHPThreads(numThreads int) {
}
}

func (thread phpThread) getActiveRequest() *http.Request {
func (thread *phpThread) getActiveRequest() *http.Request {
if thread.workerRequest != nil {
return thread.workerRequest
}
Expand All @@ -46,5 +48,5 @@ func (thread *phpThread) pinString(s string) *C.char {

// C strings must be null-terminated
func (thread *phpThread) pinCString(s string) *C.char {
return thread.pinString(s+"\x00")
return thread.pinString(s + "\x00")
}
65 changes: 42 additions & 23 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/dunglas/frankenphp/internal/watcher"
Expand All @@ -24,6 +23,7 @@ type worker struct {
requestChan chan *http.Request
threads []*phpThread
threadMutex sync.RWMutex
ready chan struct{}
}

const maxWorkerErrorBackoff = 1 * time.Second
Expand All @@ -32,33 +32,35 @@ const maxWorkerConsecutiveFailures = 6

var (
watcherIsEnabled bool
workersReadyWG sync.WaitGroup
workerShutdownWG sync.WaitGroup
workersAreReady atomic.Bool
workersAreDone atomic.Bool
workersDone chan interface{}
workers = make(map[string]*worker)
)

func initWorkers(opt []workerOpt) error {
workersDone = make(chan interface{})
workersAreReady.Store(false)
workersAreDone.Store(false)

ready := sync.WaitGroup{}

for _, o := range opt {
worker, err := newWorker(o)
worker.threads = make([]*phpThread, 0, o.num)
if err != nil {
return err
}
workersReadyWG.Add(worker.num)
for i := 0; i < worker.num; i++ {
go worker.startNewWorkerThread()
}
ready.Add(1)
go func() {
for i := 0; i < worker.num; i++ {
<-worker.ready
}
ready.Done()
}()
}

workersReadyWG.Wait()
workersAreReady.Store(true)
ready.Wait()

return nil
}
Expand All @@ -80,7 +82,13 @@ func newWorker(o workerOpt) (*worker, error) {
}

o.env["FRANKENPHP_WORKER\x00"] = "1"
w := &worker{fileName: absFileName, num: o.num, env: o.env, requestChan: make(chan *http.Request)}
w := &worker{
fileName: absFileName,
num: o.num,
env: o.env,
requestChan: make(chan *http.Request),
ready: make(chan struct{}, o.num),
}
workers[absFileName] = w

return w, nil
Expand Down Expand Up @@ -145,8 +153,20 @@ func (worker *worker) startNewWorkerThread() {
fc := r.Context().Value(contextKey).(*FrankenPHPContext)

// if we are done, exit the loop that restarts the worker script
if workersAreDone.Load() {
break
select {
case _, ok := <-workersDone:
if !ok {
metrics.StopWorker(worker.fileName, StopReasonShutdown)

if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
c.Write(zap.String("worker", worker.fileName))
}

return
}
// continue on since the channel is still open
default:
// continue on since the channel is still open
}

// on exit status 0 we just run the worker script again
Expand Down Expand Up @@ -184,12 +204,7 @@ func (worker *worker) startNewWorkerThread() {
metrics.StopWorker(worker.fileName, StopReasonCrash)
}

metrics.StopWorker(worker.fileName, StopReasonShutdown)

// TODO: check if the termination is expected
if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
c.Write(zap.String("worker", worker.fileName))
}
// unreachable
}

func (worker *worker) handleRequest(r *http.Request) {
Expand All @@ -210,7 +225,6 @@ func (worker *worker) handleRequest(r *http.Request) {
}

func stopWorkers() {
workersAreDone.Store(true)
close(workersDone)
}

Expand Down Expand Up @@ -253,15 +267,11 @@ func restartWorkers(workerOpts []workerOpt) {

func assignThreadToWorker(thread *phpThread) {
fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
metrics.ReadyWorker(fc.scriptFilename)
worker, ok := workers[fc.scriptFilename]
if !ok {
panic("worker not found for script: " + fc.scriptFilename)
}
thread.worker = worker
if !workersAreReady.Load() {
workersReadyWG.Done()
}
thread.requestChan = make(chan *http.Request)
worker.threadMutex.Lock()
worker.threads = append(worker.threads, thread)
Expand All @@ -276,6 +286,15 @@ func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
if thread.worker == nil {
assignThreadToWorker(thread)
}
thread.readiedOnce.Do(func() {
// inform metrics that the worker is ready
metrics.ReadyWorker(thread.worker.fileName)
})

select {
case thread.worker.ready <- struct{}{}:
default:
}

if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName))
Expand Down

0 comments on commit 08e99fc

Please sign in to comment.