Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watchdog workers/childs, fix systemd-restart on crash by exception #44

Merged
merged 7 commits into from
Sep 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cakephp/app/Config/core.php
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@
CakeLog::config('default', array(
'engine' => 'Syslog',
'prefix' => 'statusengine',
'flag' => LOG_ODELAY | LOG_PID,
'types' => array('emergency', 'alert', 'critical', 'error', 'warning', 'notice', 'info'),
));

Expand Down
242 changes: 173 additions & 69 deletions cakephp/app/Console/Command/StatusengineLegacyShell.php
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ class StatusengineLegacyShell extends AppShell{
**/
protected $ObjectsRepository = [];

/**
* Array for childs to watch for
*
* @var array
**/
protected $childs = [];

/**
* CakePHP's option parser
*
Expand Down Expand Up @@ -177,12 +184,13 @@ public function main(){
$NebTypes = new NebTypes();
$NebTypes->defineNebTypesAsGlobals();

$this->childPids = [];
$this->childs = [];
$this->_constants();
$this->clearQ = false;

//the Gearman worker
$this->worker = null;
$this->work = false;
$this->createParentHosts = [];
$this->createParentServices = [];

Expand Down Expand Up @@ -219,6 +227,7 @@ public function main(){
$this->bulkLastCheck = time();

$this->lastDatasourcePing = time();
$this->lastChildPing = time();

$emptyMethods = ['truncate', 'delete'];
$emptyMethod = strtolower(Configure::read('empty_method'));
Expand Down Expand Up @@ -453,6 +462,7 @@ public function dumpObjects($job){
case START_OBJECT_DUMP:
if($this->workerMode === true){
$this->sendSignal(SIGUSR2);
$this->work = false;
}

$this->dumpObjects = true;
Expand Down Expand Up @@ -570,6 +580,7 @@ public function dumpObjects($job){

if($this->workerMode === true){
$this->sendSignal(SIGUSR1);
$this->work = true;
}

if($this->workerMode === false && $this->processPerfdata === true){
Expand Down Expand Up @@ -2262,7 +2273,7 @@ public function processDowntimes($job){

if($payload->type == NEBTYPE_DOWNTIME_ADD || $payload->type == NEBTYPE_DOWNTIME_LOAD){
//Add a new downtime

//Only update downtime history table on ADD, not on LOAD events, to avoid was_started=0 and wrong timestamps
if($payload->type == NEBTYPE_DOWNTIME_ADD){
$downtime = $this->Downtimehistory->find('first', [
Expand Down Expand Up @@ -2822,6 +2833,7 @@ public function gearmanConnect(){
* witch is bad because if GearmanWorker::work() stuck, PHP can not execute the signal handler
*/
$this->worker->addOptions(GEARMAN_WORKER_NON_BLOCKING);
$this->worker->setTimeout(1000);

$this->worker->addServer(Configure::read('server'), Configure::read('port'));

Expand Down Expand Up @@ -3140,6 +3152,44 @@ public function parseCheckCommand($checkCommand){
return $return;
}

/**
* create a forks a client with given queues
*
* @author Daniel Hoffend <dh@dotlan.net>
* @return void
**/
protected function createChild(array $worker)
{

CakeLog::info('Forking a new worker child');
// don't duplicate the worker object on fork
$oldworker = false;
if ($this->worker) {
$oldworker = $this->worker;
$this->worker = false;
}
// disconnect dbo if connected
$pid = pcntl_fork();
if(!$pid){
//We are the child
$this->childs = [];
$this->ObjectsRepository = [];
$this->BulkRepository = [];
CakeLog::info('Hey, my queues are: '.implode(',', array_keys($worker['queues'])));
$this->bindQueues = true;
$this->queues = $worker['queues'];
$this->worker = false;
$this->bindWorkerSignalHandler();
$this->Objects->getDatasource()->reconnect();
$this->waitForInstructions();
exit;
} else {
// restore
$this->worker = $oldworker;
return $pid;
}
}

/**
* This function will fork the child processes (worker) if you run with -w
*
Expand All @@ -3150,26 +3200,25 @@ public function parseCheckCommand($checkCommand){
*/
public function forkWorker(){
$workers = Configure::read('workers');

declare(ticks = 1);

// prepare childs
foreach($workers as $worker){
declare(ticks = 1);
CakeLog::info('Forking a new worker child');
$pid = pcntl_fork();
if(!$pid){
//We are the child
CakeLog::info('Hey, my queues are: '.implode(',', array_keys($worker['queues'])));
$this->bindQueues = true;
$this->queues = $worker['queues'];
$this->work = false;
$this->bindChildSignalHandler();
$this->waitForInstructions();
}else{
//we are the parrent
$this->childPids[] = $pid;
$this->childs[] = [
'worker' => $worker,
'pid' => null
];
};

}
// create all workers
foreach($this->childs as $id => $child){
$this->childs[$id]['pid'] = $this->createChild($child['worker']);
}

pcntl_signal(SIGTERM, [$this, 'signalHandler']);
pcntl_signal(SIGINT, [$this, 'signalHandler']);
pcntl_signal(SIGCHLD, [$this, 'childSignalHandler']);

//Every worker is created now, so lets rock!

Expand All @@ -3187,20 +3236,37 @@ public function forkWorker(){
$this->gearmanConnect();
CakeLog::info('Lets rock!');
$this->sendSignal(SIGUSR1);
$this->worker->setTimeout(1000);
$this->work = true;

while(true){
pcntl_signal_dispatch();
$this->worker->work();

if($this->worker->returnCode() == GEARMAN_SUCCESS){
continue;
// check for dead childs every 10s and recreate if needed
if ($this->lastChildPing + 10 <= time()) {
foreach ($this->childs AS $id => $child) {
// send ping to child
if (!$child['pid']) {
CakeLog::info('Found missing child');
$this->Objects->getDatasource()->disconnect();
$this->childs[$id]['pid'] = $this->createChild($child['worker']);
$this->Objects->getDatasource()->reconnect();
}
}
$this->lastChildPing = time();
}

if(!@$this->worker->wait()){
if($this->worker->returnCode() == GEARMAN_NO_ACTIVE_FDS){
//Lost connection - lets wait a bit
sleep(1);
// do the work
if (@$this->worker->work() ||
$this->worker->returnCode() == GEARMAN_IO_WAIT ||
$this->worker->returnCode() == GEARMAN_NO_JOBS) {
if ($this->worker->returnCode() == GEARMAN_SUCCESS) continue;

// wait for new jobs
if (!@$this->worker->wait()){
if ($this->worker->returnCode() == GEARMAN_NO_ACTIVE_FDS){
//Lost connection - lets wait a bit
sleep(1);
}
}
}
}
Expand Down Expand Up @@ -3284,10 +3350,9 @@ public function waitForInstructions(){
}

CakeLog::info('I will continue my work');
$this->childWork();
$this->workerLoop();
}


pcntl_signal_dispatch();
//Check if the parent process still exists
if($this->parentPid != posix_getppid()){
Expand All @@ -3298,54 +3363,59 @@ public function waitForInstructions(){
}
}

public function bindChildSignalHandler(){
pcntl_signal(SIGTERM, [$this, 'childSignalHandler']);
pcntl_signal(SIGUSR1, [$this, 'childSignalHandler']);
pcntl_signal(SIGUSR2, [$this, 'childSignalHandler']);
public function bindWorkerSignalHandler(){
pcntl_signal(SIGTERM, [$this, 'workerSignalHandler']);
pcntl_signal(SIGUSR1, [$this, 'workerSignalHandler']);
pcntl_signal(SIGUSR2, [$this, 'workerSignalHandler']);
}

public function childWork(){
while($this->work === true){
public function workerLoop(){
while ($this->work === true){
pcntl_signal_dispatch();
$this->worker->work();
if($this->worker->returnCode() == GEARMAN_SUCCESS){
continue;
}

if(!@$this->worker->wait()){
if($this->worker->returnCode() == GEARMAN_NO_ACTIVE_FDS){
sleep(1);
// check every second if there's something left to push
if ($this->useBulkQueries && $this->bulkLastCheck < time()) {
foreach ($this->BulkRepository as $name => $repo) {
$repo->pushIfRequired();
}
$this->bulkLastCheck = time();
}

//Check if the parent process still exists
if($this->parentPid != posix_getppid()){
CakeLog::error('My parent process is gone I guess I am orphaned and will exit now!');
exit(3);
// ping datasource every now and then to keep the pdo connection alive
// simulate mysql_ping()
if ($this->lastDatasourcePing + 120 < time()) {
try {
$this->Objects->getDatasource()->execute('SELECT 1');
} catch(PDOException $e) {
$this->Objects->getDatasource()->reconnect();
}
$this->lastDatasourcePing = time();
}

// check every second if there's something left to push
if($this->useBulkQueries && $this->bulkLastCheck < time()) {
foreach ($this->BulkRepository as $name => $repo) {
$repo->pushIfRequired();
// do the work
if (@$this->worker->work() ||
$this->worker->returnCode() == GEARMAN_IO_WAIT ||
$this->worker->returnCode() == GEARMAN_NO_JOBS) {
if ($this->worker->returnCode() == GEARMAN_SUCCESS) continue;

// wait for new jobs
if (!@$this->worker->wait()){
//Lost connection - lets wait a bit
if($this->worker->returnCode() == GEARMAN_NO_ACTIVE_FDS){
sleep(1);
}
$this->bulkLastCheck = time();
}

// ping datasource every now and then to keep the pdo connection alive
// simulate mysql_ping()
if($this->lastDatasourcePing + 120 < time()) {
try {
$this->Objects->getDatasource()->execute('SELECT 1');
} catch(PDOException $e) {
$this->Objects->getDatasource()->reconnect();
//Check if the parent process still exists
if ($this->parentPid != posix_getppid()){
CakeLog::error('My parent process is gone I guess I am orphaned and will exit now!');
exit(3);
}
$this->lastDatasourcePing = time();
}
}
}
}

public function childSignalHandler($signo){
public function workerSignalHandler($signo){
CakeLog::info('Recived signal: '.$signo);
switch($signo){
case SIGTERM:
Expand All @@ -3372,7 +3442,6 @@ public function childSignalHandler($signo){
$this->work = false;
break;
}
$this->bindChildSignalHandler();
}

/**
Expand All @@ -3396,6 +3465,41 @@ public function signalHandler($signo){

}

/**
* check the process signals returned by child processes (like killed, exit, etc)
*
* @author Daniel Hoffend <dh@dotlan.net>
* @param int $signo
* @param null|int $pid
* @param $status
* @return bool
**/
public function childSignalHandler($signo, $pid=null, $status=null)
{
CakeLog::info("Received signal $signo by child $pid (status $status)");
if (!$pid)
$pid = pcntl_waitpid(-1, $status, WNOHANG);

while($pid > 0) {
// search for the child
$id = null;
foreach ($this->childs AS $k => $child) {
if ($child['pid'] == $pid)
$id = $k;
}
// check exit status end log it
if ($id !== null && isset($this->childs[$id])) {
$exitCode = pcntl_wexitstatus($status);
CakeLog::info('child '.$pid.' exited with status: '.$status);
$this->childs[$id]['pid'] = null;
}
// free pid
$pid = pcntl_waitpid(-1, $status, WNOHANG);
}
return true;
}


/**
* This function sends a singal to every child process
*
Expand All @@ -3408,20 +3512,20 @@ public function sendSignal($signal){
$gmanClient = new GearmanClient();
$gmanClient->addServer(Configure::read('server'), Configure::read('port'));
if($signal !== SIGTERM){
foreach($this->childPids as $cpid){
CakeLog::info('Send signal to child pid: '.$cpid);
posix_kill($cpid, $signal);
foreach($this->childs as $child){
CakeLog::info('Send signal to child pid: '.$child['pid']);
posix_kill($child['pid'], $signal);
}
}

if($signal == SIGTERM){
foreach($this->childPids as $cpid){
CakeLog::info('Will kill pid: '.$cpid);
posix_kill($cpid, SIGTERM);
foreach($this->childs as $child){
CakeLog::info('Will kill pid: '.$child['pid']);
posix_kill($child['pid'], SIGTERM);
}
foreach($this->childPids as $cpid){
pcntl_waitpid($cpid, $status);
CakeLog::info('Child ['.$cpid.'] killed successfully');
foreach($this->childs as $child){
pcntl_waitpid($child['pid'], $status);
CakeLog::info('Child ['.$child['pid'].'] killed successfully');
}
}
}
Expand Down
Loading