Skip to content

Commit

Permalink
Merge pull request #44 from dhoffend/watchdog
Browse files Browse the repository at this point in the history
Watchdog workers/childs, fix systemd-restart on crash by exception
  • Loading branch information
nook24 authored Sep 6, 2018
2 parents e234866 + 0f82db2 commit c88df89
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 71 deletions.
1 change: 1 addition & 0 deletions cakephp/app/Config/core.php
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@
CakeLog::config('default', array(
'engine' => 'Syslog',
'prefix' => 'statusengine',
'flag' => LOG_ODELAY | LOG_PID,
'types' => array('emergency', 'alert', 'critical', 'error', 'warning', 'notice', 'info'),
));

Expand Down
242 changes: 173 additions & 69 deletions cakephp/app/Console/Command/StatusengineLegacyShell.php
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ class StatusengineLegacyShell extends AppShell{
**/
protected $ObjectsRepository = [];

/**
* Array for childs to watch for
*
* @var array
**/
protected $childs = [];

/**
* CakePHP's option parser
*
Expand Down Expand Up @@ -177,12 +184,13 @@ public function main(){
$NebTypes = new NebTypes();
$NebTypes->defineNebTypesAsGlobals();

$this->childPids = [];
$this->childs = [];
$this->_constants();
$this->clearQ = false;

//the Gearman worker
$this->worker = null;
$this->work = false;
$this->createParentHosts = [];
$this->createParentServices = [];

Expand Down Expand Up @@ -219,6 +227,7 @@ public function main(){
$this->bulkLastCheck = time();

$this->lastDatasourcePing = time();
$this->lastChildPing = time();

$emptyMethods = ['truncate', 'delete'];
$emptyMethod = strtolower(Configure::read('empty_method'));
Expand Down Expand Up @@ -453,6 +462,7 @@ public function dumpObjects($job){
case START_OBJECT_DUMP:
if($this->workerMode === true){
$this->sendSignal(SIGUSR2);
$this->work = false;
}

$this->dumpObjects = true;
Expand Down Expand Up @@ -570,6 +580,7 @@ public function dumpObjects($job){

if($this->workerMode === true){
$this->sendSignal(SIGUSR1);
$this->work = true;
}

if($this->workerMode === false && $this->processPerfdata === true){
Expand Down Expand Up @@ -2262,7 +2273,7 @@ public function processDowntimes($job){

if($payload->type == NEBTYPE_DOWNTIME_ADD || $payload->type == NEBTYPE_DOWNTIME_LOAD){
//Add a new downtime

//Only update downtime history table on ADD, not on LOAD events, to avoid was_started=0 and wrong timestamps
if($payload->type == NEBTYPE_DOWNTIME_ADD){
$downtime = $this->Downtimehistory->find('first', [
Expand Down Expand Up @@ -2822,6 +2833,7 @@ public function gearmanConnect(){
* witch is bad because if GearmanWorker::work() stuck, PHP can not execute the signal handler
*/
$this->worker->addOptions(GEARMAN_WORKER_NON_BLOCKING);
$this->worker->setTimeout(1000);

$this->worker->addServer(Configure::read('server'), Configure::read('port'));

Expand Down Expand Up @@ -3140,6 +3152,44 @@ public function parseCheckCommand($checkCommand){
return $return;
}

/**
* create a forks a client with given queues
*
* @author Daniel Hoffend <dh@dotlan.net>
* @return void
**/
protected function createChild(array $worker)
{

CakeLog::info('Forking a new worker child');
// don't duplicate the worker object on fork
$oldworker = false;
if ($this->worker) {
$oldworker = $this->worker;
$this->worker = false;
}
// disconnect dbo if connected
$pid = pcntl_fork();
if(!$pid){
//We are the child
$this->childs = [];
$this->ObjectsRepository = [];
$this->BulkRepository = [];
CakeLog::info('Hey, my queues are: '.implode(',', array_keys($worker['queues'])));
$this->bindQueues = true;
$this->queues = $worker['queues'];
$this->worker = false;
$this->bindWorkerSignalHandler();
$this->Objects->getDatasource()->reconnect();
$this->waitForInstructions();
exit;
} else {
// restore
$this->worker = $oldworker;
return $pid;
}
}

/**
* This function will fork the child processes (worker) if you run with -w
*
Expand All @@ -3150,26 +3200,25 @@ public function parseCheckCommand($checkCommand){
*/
public function forkWorker(){
$workers = Configure::read('workers');

declare(ticks = 1);

// prepare childs
foreach($workers as $worker){
declare(ticks = 1);
CakeLog::info('Forking a new worker child');
$pid = pcntl_fork();
if(!$pid){
//We are the child
CakeLog::info('Hey, my queues are: '.implode(',', array_keys($worker['queues'])));
$this->bindQueues = true;
$this->queues = $worker['queues'];
$this->work = false;
$this->bindChildSignalHandler();
$this->waitForInstructions();
}else{
//we are the parrent
$this->childPids[] = $pid;
$this->childs[] = [
'worker' => $worker,
'pid' => null
];
};

}
// create all workers
foreach($this->childs as $id => $child){
$this->childs[$id]['pid'] = $this->createChild($child['worker']);
}

pcntl_signal(SIGTERM, [$this, 'signalHandler']);
pcntl_signal(SIGINT, [$this, 'signalHandler']);
pcntl_signal(SIGCHLD, [$this, 'childSignalHandler']);

//Every worker is created now, so lets rock!

Expand All @@ -3187,20 +3236,37 @@ public function forkWorker(){
$this->gearmanConnect();
CakeLog::info('Lets rock!');
$this->sendSignal(SIGUSR1);
$this->worker->setTimeout(1000);
$this->work = true;

while(true){
pcntl_signal_dispatch();
$this->worker->work();

if($this->worker->returnCode() == GEARMAN_SUCCESS){
continue;
// check for dead childs every 10s and recreate if needed
if ($this->lastChildPing + 10 <= time()) {
foreach ($this->childs AS $id => $child) {
// send ping to child
if (!$child['pid']) {
CakeLog::info('Found missing child');
$this->Objects->getDatasource()->disconnect();
$this->childs[$id]['pid'] = $this->createChild($child['worker']);
$this->Objects->getDatasource()->reconnect();
}
}
$this->lastChildPing = time();
}

if(!@$this->worker->wait()){
if($this->worker->returnCode() == GEARMAN_NO_ACTIVE_FDS){
//Lost connection - lets wait a bit
sleep(1);
// do the work
if (@$this->worker->work() ||
$this->worker->returnCode() == GEARMAN_IO_WAIT ||
$this->worker->returnCode() == GEARMAN_NO_JOBS) {
if ($this->worker->returnCode() == GEARMAN_SUCCESS) continue;

// wait for new jobs
if (!@$this->worker->wait()){
if ($this->worker->returnCode() == GEARMAN_NO_ACTIVE_FDS){
//Lost connection - lets wait a bit
sleep(1);
}
}
}
}
Expand Down Expand Up @@ -3284,10 +3350,9 @@ public function waitForInstructions(){
}

CakeLog::info('I will continue my work');
$this->childWork();
$this->workerLoop();
}


pcntl_signal_dispatch();
//Check if the parent process still exists
if($this->parentPid != posix_getppid()){
Expand All @@ -3298,54 +3363,59 @@ public function waitForInstructions(){
}
}

public function bindChildSignalHandler(){
pcntl_signal(SIGTERM, [$this, 'childSignalHandler']);
pcntl_signal(SIGUSR1, [$this, 'childSignalHandler']);
pcntl_signal(SIGUSR2, [$this, 'childSignalHandler']);
public function bindWorkerSignalHandler(){
pcntl_signal(SIGTERM, [$this, 'workerSignalHandler']);
pcntl_signal(SIGUSR1, [$this, 'workerSignalHandler']);
pcntl_signal(SIGUSR2, [$this, 'workerSignalHandler']);
}

public function childWork(){
while($this->work === true){
public function workerLoop(){
while ($this->work === true){
pcntl_signal_dispatch();
$this->worker->work();
if($this->worker->returnCode() == GEARMAN_SUCCESS){
continue;
}

if(!@$this->worker->wait()){
if($this->worker->returnCode() == GEARMAN_NO_ACTIVE_FDS){
sleep(1);
// check every second if there's something left to push
if ($this->useBulkQueries && $this->bulkLastCheck < time()) {
foreach ($this->BulkRepository as $name => $repo) {
$repo->pushIfRequired();
}
$this->bulkLastCheck = time();
}

//Check if the parent process still exists
if($this->parentPid != posix_getppid()){
CakeLog::error('My parent process is gone I guess I am orphaned and will exit now!');
exit(3);
// ping datasource every now and then to keep the pdo connection alive
// simulate mysql_ping()
if ($this->lastDatasourcePing + 120 < time()) {
try {
$this->Objects->getDatasource()->execute('SELECT 1');
} catch(PDOException $e) {
$this->Objects->getDatasource()->reconnect();
}
$this->lastDatasourcePing = time();
}

// check every second if there's something left to push
if($this->useBulkQueries && $this->bulkLastCheck < time()) {
foreach ($this->BulkRepository as $name => $repo) {
$repo->pushIfRequired();
// do the work
if (@$this->worker->work() ||
$this->worker->returnCode() == GEARMAN_IO_WAIT ||
$this->worker->returnCode() == GEARMAN_NO_JOBS) {
if ($this->worker->returnCode() == GEARMAN_SUCCESS) continue;

// wait for new jobs
if (!@$this->worker->wait()){
//Lost connection - lets wait a bit
if($this->worker->returnCode() == GEARMAN_NO_ACTIVE_FDS){
sleep(1);
}
$this->bulkLastCheck = time();
}

// ping datasource every now and then to keep the pdo connection alive
// simulate mysql_ping()
if($this->lastDatasourcePing + 120 < time()) {
try {
$this->Objects->getDatasource()->execute('SELECT 1');
} catch(PDOException $e) {
$this->Objects->getDatasource()->reconnect();
//Check if the parent process still exists
if ($this->parentPid != posix_getppid()){
CakeLog::error('My parent process is gone I guess I am orphaned and will exit now!');
exit(3);
}
$this->lastDatasourcePing = time();
}
}
}
}

public function childSignalHandler($signo){
public function workerSignalHandler($signo){
CakeLog::info('Recived signal: '.$signo);
switch($signo){
case SIGTERM:
Expand All @@ -3372,7 +3442,6 @@ public function childSignalHandler($signo){
$this->work = false;
break;
}
$this->bindChildSignalHandler();
}

/**
Expand All @@ -3396,6 +3465,41 @@ public function signalHandler($signo){

}

/**
* check the process signals returned by child processes (like killed, exit, etc)
*
* @author Daniel Hoffend <dh@dotlan.net>
* @param int $signo
* @param null|int $pid
* @param $status
* @return bool
**/
public function childSignalHandler($signo, $pid=null, $status=null)
{
CakeLog::info("Received signal $signo by child $pid (status $status)");
if (!$pid)
$pid = pcntl_waitpid(-1, $status, WNOHANG);

while($pid > 0) {
// search for the child
$id = null;
foreach ($this->childs AS $k => $child) {
if ($child['pid'] == $pid)
$id = $k;
}
// check exit status end log it
if ($id !== null && isset($this->childs[$id])) {
$exitCode = pcntl_wexitstatus($status);
CakeLog::info('child '.$pid.' exited with status: '.$status);
$this->childs[$id]['pid'] = null;
}
// free pid
$pid = pcntl_waitpid(-1, $status, WNOHANG);
}
return true;
}


/**
* This function sends a singal to every child process
*
Expand All @@ -3408,20 +3512,20 @@ public function sendSignal($signal){
$gmanClient = new GearmanClient();
$gmanClient->addServer(Configure::read('server'), Configure::read('port'));
if($signal !== SIGTERM){
foreach($this->childPids as $cpid){
CakeLog::info('Send signal to child pid: '.$cpid);
posix_kill($cpid, $signal);
foreach($this->childs as $child){
CakeLog::info('Send signal to child pid: '.$child['pid']);
posix_kill($child['pid'], $signal);
}
}

if($signal == SIGTERM){
foreach($this->childPids as $cpid){
CakeLog::info('Will kill pid: '.$cpid);
posix_kill($cpid, SIGTERM);
foreach($this->childs as $child){
CakeLog::info('Will kill pid: '.$child['pid']);
posix_kill($child['pid'], SIGTERM);
}
foreach($this->childPids as $cpid){
pcntl_waitpid($cpid, $status);
CakeLog::info('Child ['.$cpid.'] killed successfully');
foreach($this->childs as $child){
pcntl_waitpid($child['pid'], $status);
CakeLog::info('Child ['.$child['pid'].'] killed successfully');
}
}
}
Expand Down
Loading

0 comments on commit c88df89

Please sign in to comment.