From 7423d1499640054768dc873dfe6142f9f3fbd9d4 Mon Sep 17 00:00:00 2001 From: matyhtf Date: Tue, 18 May 2021 14:15:08 +0800 Subject: [PATCH] add Process\Pool::detach() --- examples/process_pool/detach.php | 30 ++++++++++++++++++++ examples/process_pool/send.php | 5 ++++ ext-src/swoole_process_pool.cc | 13 +++++++++ include/swoole_process_pool.h | 12 ++++++++ src/os/process_pool.cc | 48 ++++++++++++++++++++++++++++++-- src/reactor/base.cc | 4 +-- 6 files changed, 107 insertions(+), 5 deletions(-) create mode 100644 examples/process_pool/detach.php create mode 100644 examples/process_pool/send.php diff --git a/examples/process_pool/detach.php b/examples/process_pool/detach.php new file mode 100644 index 00000000000..7a613698fa2 --- /dev/null +++ b/examples/process_pool/detach.php @@ -0,0 +1,30 @@ +on('WorkerStart', function (Process\Pool $pool, $workerId) { + echo("[Worker #{$workerId}] WorkerStart\n"); + if ($workerId == 1) { + + } +}); + +$pool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) { + echo("[Worker #{$workerId}] WorkerStop\n"); +}); + +$pool->on('Message', function ($pool, $msg) { + var_dump($msg); + $pool->detach(); + + while(1) { + sleep(1); + echo "pid=".posix_getpid()."\n"; + }; +}); + +$pool->listen('127.0.0.1', 8089); + +$pool->start(); diff --git a/examples/process_pool/send.php b/examples/process_pool/send.php new file mode 100644 index 00000000000..b9ae046e911 --- /dev/null +++ b/examples/process_pool/send.php @@ -0,0 +1,5 @@ + 'hello', 'uid' => 1991]); +fwrite($fp, pack('N', strlen($msg)) . $msg); +sleep(1); diff --git a/ext-src/swoole_process_pool.cc b/ext-src/swoole_process_pool.cc index c2132f6a5d9..da440a60160 100644 --- a/ext-src/swoole_process_pool.cc +++ b/ext-src/swoole_process_pool.cc @@ -125,6 +125,7 @@ static PHP_METHOD(swoole_process_pool, set); static PHP_METHOD(swoole_process_pool, on); static PHP_METHOD(swoole_process_pool, listen); static PHP_METHOD(swoole_process_pool, write); +static PHP_METHOD(swoole_process_pool, detach); static PHP_METHOD(swoole_process_pool, getProcess); static PHP_METHOD(swoole_process_pool, start); static PHP_METHOD(swoole_process_pool, shutdown); @@ -174,6 +175,7 @@ static const zend_function_entry swoole_process_pool_methods[] = PHP_ME(swoole_process_pool, getProcess, arginfo_swoole_process_pool_getProcess, ZEND_ACC_PUBLIC) PHP_ME(swoole_process_pool, listen, arginfo_swoole_process_pool_listen, ZEND_ACC_PUBLIC) PHP_ME(swoole_process_pool, write, arginfo_swoole_process_pool_write, ZEND_ACC_PUBLIC) + PHP_ME(swoole_process_pool, detach, arginfo_swoole_process_pool_void, ZEND_ACC_PUBLIC) PHP_ME(swoole_process_pool, start, arginfo_swoole_process_pool_void, ZEND_ACC_PUBLIC) PHP_ME(swoole_process_pool, shutdown, arginfo_swoole_process_pool_void, ZEND_ACC_PUBLIC) PHP_FE_END @@ -266,6 +268,9 @@ static void pool_signal_handler(int sig) { current_pool->reloading = true; current_pool->reload_init = false; break; + case SIGIO: + current_pool->read_message = true; + break; default: break; } @@ -497,6 +502,7 @@ static PHP_METHOD(swoole_process_pool, start) { ori_handlers[SIGTERM] = swSignal_set(SIGTERM, pool_signal_handler); ori_handlers[SIGUSR1] = swSignal_set(SIGUSR1, pool_signal_handler); ori_handlers[SIGUSR2] = swSignal_set(SIGUSR2, pool_signal_handler); + ori_handlers[SIGIO] = swSignal_set(SIGIO, pool_signal_handler); if (pool->ipc_mode == SW_IPC_NONE || pp->enable_coroutine) { if (pp->onWorkerStart == nullptr) { @@ -542,6 +548,13 @@ static PHP_METHOD(swoole_process_pool, start) { extern void php_swoole_process_set_worker(zval *zobject, Worker *worker); +static PHP_METHOD(swoole_process_pool, detach) { + if (current_pool == nullptr) { + RETURN_FALSE; + } + RETURN_BOOL(current_pool->detach()); +} + static PHP_METHOD(swoole_process_pool, getProcess) { long worker_id = -1; diff --git a/include/swoole_process_pool.h b/include/swoole_process_pool.h index 0d869ce1d0d..0fd2c1c6972 100644 --- a/include/swoole_process_pool.h +++ b/include/swoole_process_pool.h @@ -24,6 +24,7 @@ #include "swoole_lock.h" #include "swoole_pipe.h" +#include "swoole_channel.h" #include "swoole_msg_queue.h" enum swWorker_status { @@ -173,6 +174,7 @@ struct ProcessPool { bool reloading; bool running; bool reload_init; + bool read_message; bool started; uint8_t dispatch_mode; uint8_t ipc_mode; @@ -234,6 +236,7 @@ struct ProcessPool { Reactor *reactor; MsgQueue *queue; StreamInfo *stream_info_; + Channel *message_box = nullptr; void *ptr; @@ -257,9 +260,18 @@ struct ProcessPool { return &(workers[worker_id - start_id]); } + Worker *get_worker_by_pid(pid_t pid) { + auto iter = map_->find(pid); + if (iter == map_->end()) { + return nullptr; + } + return iter->second; + } + void set_max_request(uint32_t _max_request, uint32_t _max_request_grace); int get_max_request(); int set_protocol(int task_protocol, uint32_t max_packet_size); + bool detach(); int wait(); int start(); void shutdown(); diff --git a/src/os/process_pool.cc b/src/os/process_pool.cc index 56d05158e98..8634ab4c073 100644 --- a/src/os/process_pool.cc +++ b/src/os/process_pool.cc @@ -76,6 +76,11 @@ int ProcessPool::create(uint32_t _worker_num, key_t _msgqueue_key, swIPC_type _i return SW_ERR; } + message_box = Channel::make(65536, sizeof(WorkerStopMessage), SW_CHAN_LOCK | SW_CHAN_SHM); + if (message_box == nullptr) { + return SW_ERR; + } + if (_ipc_mode == SW_IPC_MSGQUEUE) { use_msgqueue = 1; msgqueue_key = _msgqueue_key; @@ -605,6 +610,21 @@ int ProcessPool_add_worker(ProcessPool *pool, Worker *worker) { return SW_OK; } +bool ProcessPool::detach() { + WorkerStopMessage msg; + msg.pid = getpid(); + msg.worker_id = SwooleG.process_id; + + if (message_box && message_box->push(&msg, sizeof(msg)) < 0) { + return false; + } + if (swoole_kill(master_pid, SIGIO) < 0) { + return false; + } + running = false; + return true; +} + int ProcessPool::wait() { pid_t new_pid, reload_worker_pid = 0; int ret; @@ -622,6 +642,25 @@ int ProcessPool::wait() { SwooleG.signal_alarm = false; SwooleTG.timer->select(); } + if (read_message) { + WorkerStopMessage msg; + while (message_box->pop(&msg, sizeof(msg)) > 0) { + if (!running) { + continue; + } + Worker *exit_worker = get_worker_by_pid(msg.pid); + if (exit_worker == nullptr) { + continue; + } + pid_t new_pid = spawn(exit_worker); + if (new_pid < 0) { + swSysWarn("Fork worker process failed"); + return SW_ERR; + } + map_->erase(msg.pid); + } + read_message = false; + } if (exit_status.get_pid() < 0) { if (!running) { break; @@ -645,8 +684,8 @@ int ProcessPool::wait() { } if (running) { - auto iter = map_->find(exit_status.get_pid()); - if (iter == map_->end()) { + Worker *exit_worker = get_worker_by_pid(exit_status.get_pid()); + if (exit_worker == nullptr) { if (onWorkerNotFound) { onWorkerNotFound(this, exit_status); } else { @@ -655,7 +694,6 @@ int ProcessPool::wait() { continue; } - Worker *exit_worker = iter->second; if (!exit_status.is_normal_exit()) { swWarn("worker#%d abnormal exit, status=%d, signal=%d" "%s", @@ -732,6 +770,10 @@ void ProcessPool::destroy() { delete map_; } + if (message_box) { + message_box->destroy(); + } + sw_mem_pool()->free(workers); } diff --git a/src/reactor/base.cc b/src/reactor/base.cc index 4e899670e83..03bb5719716 100644 --- a/src/reactor/base.cc +++ b/src/reactor/base.cc @@ -301,7 +301,7 @@ int Reactor::_write(Reactor *reactor, Socket *socket, const void *buf, size_t n) send_bytes = socket->send(buf, n, 0); return send_bytes; }; - auto append_fn = [&send_bytes, socket, buf, n](Buffer *buffer) { + auto append_fn = [&send_bytes, buf, n](Buffer *buffer) { ssize_t offset = send_bytes > 0 ? send_bytes : 0; buffer->append((const char *) buf + offset, n - offset); }; @@ -325,7 +325,7 @@ int Reactor::_writev(Reactor *reactor, network::Socket *socket, const iovec *iov send_bytes = socket->writev(iov, iovcnt); return send_bytes; }; - auto append_fn = [&send_bytes, socket, iov, iovcnt](Buffer *buffer) { + auto append_fn = [&send_bytes, iov, iovcnt](Buffer *buffer) { ssize_t offset = send_bytes > 0 ? send_bytes : 0; buffer->append(iov, iovcnt, offset); };