diff --git a/src/Commands/CheckCommand.php b/src/Commands/CheckCommand.php index 1eb5702e..e2d821d3 100644 --- a/src/Commands/CheckCommand.php +++ b/src/Commands/CheckCommand.php @@ -3,11 +3,12 @@ namespace Laravel\Pulse\Commands; use Carbon\CarbonImmutable; -use Carbon\CarbonInterval; use Illuminate\Console\Command; use Illuminate\Contracts\Cache\LockProvider; use Illuminate\Events\Dispatcher; +use Illuminate\Support\Env; use Illuminate\Support\Sleep; +use Illuminate\Support\Str; use Laravel\Pulse\Events\IsolatedBeat; use Laravel\Pulse\Events\SharedBeat; use Laravel\Pulse\Pulse; @@ -25,7 +26,7 @@ class CheckCommand extends Command * * @var string */ - public $signature = 'pulse:check'; + public $signature = 'pulse:check {--once : Take a single snapshot}'; /** * The command's description. @@ -42,38 +43,36 @@ public function handle( CacheStoreResolver $cache, Dispatcher $event, ): int { - $lastRestart = $cache->store()->get('laravel:pulse:restart'); + $isVapor = (bool) Env::get('VAPOR_SSM_PATH'); - $interval = CarbonInterval::seconds(5); + $instance = $isVapor ? 'vapor' : Str::random(); - $lastSnapshotAt = CarbonImmutable::now()->floorSeconds((int) $interval->totalSeconds); + $lastRestart = $cache->store()->get('laravel:pulse:restart'); $lock = ($store = $cache->store()->getStore()) instanceof LockProvider - ? $store->lock('laravel:pulse:check', (int) $interval->totalSeconds) + ? $store->lock('laravel:pulse:check', 1) : null; while (true) { - $now = CarbonImmutable::now(); - - if ($now->subSeconds((int) $interval->totalSeconds)->lessThan($lastSnapshotAt)) { - Sleep::for(500)->milliseconds(); - - continue; - } - if ($lastRestart !== $cache->store()->get('laravel:pulse:restart')) { return self::SUCCESS; } - $lastSnapshotAt = $now->floorSeconds((int) $interval->totalSeconds); + $now = CarbonImmutable::now(); if ($lock?->get()) { - $event->dispatch(new IsolatedBeat($lastSnapshotAt, $interval)); + $event->dispatch(new IsolatedBeat($now)); } - $event->dispatch(new SharedBeat($lastSnapshotAt, $interval)); + $event->dispatch(new SharedBeat($now, $instance)); $pulse->ingest(); + + if ($isVapor || $this->option('once')) { + return self::SUCCESS; + } + + Sleep::until($now->addSecond()); } } } diff --git a/src/Events/IsolatedBeat.php b/src/Events/IsolatedBeat.php index 32d88b2e..d2cca9e4 100644 --- a/src/Events/IsolatedBeat.php +++ b/src/Events/IsolatedBeat.php @@ -3,7 +3,6 @@ namespace Laravel\Pulse\Events; use Carbon\CarbonImmutable; -use Carbon\CarbonInterval; class IsolatedBeat { @@ -12,7 +11,6 @@ class IsolatedBeat */ public function __construct( public CarbonImmutable $time, - public CarbonInterval $interval, ) { // } diff --git a/src/Events/SharedBeat.php b/src/Events/SharedBeat.php index ffec6fcf..322197a1 100644 --- a/src/Events/SharedBeat.php +++ b/src/Events/SharedBeat.php @@ -3,7 +3,6 @@ namespace Laravel\Pulse\Events; use Carbon\CarbonImmutable; -use Carbon\CarbonInterval; class SharedBeat { @@ -12,7 +11,7 @@ class SharedBeat */ public function __construct( public CarbonImmutable $time, - public CarbonInterval $interval, + public string $instance, ) { // } diff --git a/src/Recorders/Concerns/Throttling.php b/src/Recorders/Concerns/Throttling.php new file mode 100644 index 00000000..d4ff8eb8 --- /dev/null +++ b/src/Recorders/Concerns/Throttling.php @@ -0,0 +1,42 @@ +instance.":{$key}"; + } + + $cache = App::make(CacheStoreResolver::class); + + $key = 'laravel:pulse:throttle:'.$key; + + $lastRunAt = $cache->store()->get($key); + + if ($lastRunAt !== null && CarbonImmutable::createFromTimestamp($lastRunAt)->addSeconds($this->secondsUntil($interval))->isFuture()) { + return; + } + + $callback($event); + + $cache->store()->put($key, $event->time->getTimestamp(), $interval); + } +} diff --git a/src/Recorders/Servers.php b/src/Recorders/Servers.php index e8e82d10..b001a8b2 100644 --- a/src/Recorders/Servers.php +++ b/src/Recorders/Servers.php @@ -13,6 +13,8 @@ */ class Servers { + use Concerns\Throttling; + /** * The events to listen for. * @@ -35,51 +37,49 @@ public function __construct( */ public function record(SharedBeat $event): void { - if ($event->time->second % 15 !== 0) { - return; - } - - $server = $this->config->get('pulse.recorders.'.self::class.'.server_name'); - $slug = Str::slug($server); + $this->throttle(15, $event, function ($event) { + $server = $this->config->get('pulse.recorders.'.self::class.'.server_name'); + $slug = Str::slug($server); - $memoryTotal = match (PHP_OS_FAMILY) { - 'Darwin' => intval(`sysctl hw.memsize | grep -Eo '[0-9]+'` / 1024 / 1024), - 'Linux' => intval(`cat /proc/meminfo | grep MemTotal | grep -E -o '[0-9]+'` / 1024), - 'Windows' => intval(((int) trim(`wmic ComputerSystem get TotalPhysicalMemory | more +1`)) / 1024 / 1024), - 'BSD' => intval(`sysctl hw.physmem | grep -Eo '[0-9]+'` / 1024 / 1024), - default => throw new RuntimeException('The pulse:check command does not currently support '.PHP_OS_FAMILY), - }; + $memoryTotal = match (PHP_OS_FAMILY) { + 'Darwin' => intval(`sysctl hw.memsize | grep -Eo '[0-9]+'` / 1024 / 1024), + 'Linux' => intval(`cat /proc/meminfo | grep MemTotal | grep -E -o '[0-9]+'` / 1024), + 'Windows' => intval(((int) trim(`wmic ComputerSystem get TotalPhysicalMemory | more +1`)) / 1024 / 1024), + 'BSD' => intval(`sysctl hw.physmem | grep -Eo '[0-9]+'` / 1024 / 1024), + default => throw new RuntimeException('The pulse:check command does not currently support '.PHP_OS_FAMILY), + }; - $memoryUsed = match (PHP_OS_FAMILY) { - 'Darwin' => $memoryTotal - intval(intval(`vm_stat | grep 'Pages free' | grep -Eo '[0-9]+'`) * intval(`pagesize`) / 1024 / 1024), // MB - 'Linux' => $memoryTotal - intval(`cat /proc/meminfo | grep MemAvailable | grep -E -o '[0-9]+'` / 1024), // MB - 'Windows' => $memoryTotal - intval(((int) trim(`wmic OS get FreePhysicalMemory | more +1`)) / 1024), // MB - 'BSD' => intval(intval(`( sysctl vm.stats.vm.v_cache_count | grep -Eo '[0-9]+' ; sysctl vm.stats.vm.v_inactive_count | grep -Eo '[0-9]+' ; sysctl vm.stats.vm.v_active_count | grep -Eo '[0-9]+' ) | awk '{s+=$1} END {print s}'`) * intval(`pagesize`) / 1024 / 1024), // MB - default => throw new RuntimeException('The pulse:check command does not currently support '.PHP_OS_FAMILY), - }; + $memoryUsed = match (PHP_OS_FAMILY) { + 'Darwin' => $memoryTotal - intval(intval(`vm_stat | grep 'Pages free' | grep -Eo '[0-9]+'`) * intval(`pagesize`) / 1024 / 1024), // MB + 'Linux' => $memoryTotal - intval(`cat /proc/meminfo | grep MemAvailable | grep -E -o '[0-9]+'` / 1024), // MB + 'Windows' => $memoryTotal - intval(((int) trim(`wmic OS get FreePhysicalMemory | more +1`)) / 1024), // MB + 'BSD' => intval(intval(`( sysctl vm.stats.vm.v_cache_count | grep -Eo '[0-9]+' ; sysctl vm.stats.vm.v_inactive_count | grep -Eo '[0-9]+' ; sysctl vm.stats.vm.v_active_count | grep -Eo '[0-9]+' ) | awk '{s+=$1} END {print s}'`) * intval(`pagesize`) / 1024 / 1024), // MB + default => throw new RuntimeException('The pulse:check command does not currently support '.PHP_OS_FAMILY), + }; - $cpu = match (PHP_OS_FAMILY) { - 'Darwin' => (int) `top -l 1 | grep -E "^CPU" | tail -1 | awk '{ print $3 + $5 }'`, - 'Linux' => (int) `top -bn1 | grep -E '^(%Cpu|CPU)' | awk '{ print $2 + $4 }'`, - 'Windows' => (int) trim(`wmic cpu get loadpercentage | more +1`), - 'BSD' => (int) `top -b -d 2| grep 'CPU: ' | tail -1 | awk '{print$10}' | grep -Eo '[0-9]+\.[0-9]+' | awk '{ print 100 - $1 }'`, - default => throw new RuntimeException('The pulse:check command does not currently support '.PHP_OS_FAMILY), - }; + $cpu = match (PHP_OS_FAMILY) { + 'Darwin' => (int) `top -l 1 | grep -E "^CPU" | tail -1 | awk '{ print $3 + $5 }'`, + 'Linux' => (int) `top -bn1 | grep -E '^(%Cpu|CPU)' | awk '{ print $2 + $4 }'`, + 'Windows' => (int) trim(`wmic cpu get loadpercentage | more +1`), + 'BSD' => (int) `top -b -d 2| grep 'CPU: ' | tail -1 | awk '{print$10}' | grep -Eo '[0-9]+\.[0-9]+' | awk '{ print 100 - $1 }'`, + default => throw new RuntimeException('The pulse:check command does not currently support '.PHP_OS_FAMILY), + }; - $this->pulse->record('cpu', $slug, $cpu, $event->time)->avg()->onlyBuckets(); - $this->pulse->record('memory', $slug, $memoryUsed, $event->time)->avg()->onlyBuckets(); - $this->pulse->set('system', $slug, json_encode([ - 'name' => $server, - 'cpu' => $cpu, - 'memory_used' => $memoryUsed, - 'memory_total' => $memoryTotal, - 'storage' => collect($this->config->get('pulse.recorders.'.self::class.'.directories')) // @phpstan-ignore argument.templateType argument.templateType - ->map(fn (string $directory) => [ - 'directory' => $directory, - 'total' => $total = intval(round(disk_total_space($directory) / 1024 / 1024)), // MB - 'used' => intval(round($total - (disk_free_space($directory) / 1024 / 1024))), // MB - ]) - ->all(), - ], flags: JSON_THROW_ON_ERROR), $event->time); + $this->pulse->record('cpu', $slug, $cpu, $event->time)->avg()->onlyBuckets(); + $this->pulse->record('memory', $slug, $memoryUsed, $event->time)->avg()->onlyBuckets(); + $this->pulse->set('system', $slug, json_encode([ + 'name' => $server, + 'cpu' => $cpu, + 'memory_used' => $memoryUsed, + 'memory_total' => $memoryTotal, + 'storage' => collect($this->config->get('pulse.recorders.'.self::class.'.directories')) // @phpstan-ignore argument.templateType argument.templateType + ->map(fn (string $directory) => [ + 'directory' => $directory, + 'total' => $total = intval(round(disk_total_space($directory) / 1024 / 1024)), // MB + 'used' => intval(round($total - (disk_free_space($directory) / 1024 / 1024))), // MB + ]) + ->all(), + ], flags: JSON_THROW_ON_ERROR), $event->time); + }); } } diff --git a/tests/Feature/Commands/CheckCommandTest.php b/tests/Feature/Commands/CheckCommandTest.php new file mode 100644 index 00000000..1f10d909 --- /dev/null +++ b/tests/Feature/Commands/CheckCommandTest.php @@ -0,0 +1,202 @@ +startOfDay()); +}); + +it('loops when not on vapor', function () { + $called = 0; + Sleep::fake(); + Sleep::whenFakingSleep(function () use (&$called) { + if ($called > 5) { + throw new RuntimeException('bail'); + } + }); + Event::listen(function (SharedBeat $beat) use (&$called) { + $called++; + }); + + try { + Artisan::call('pulse:check'); + } catch (RuntimeException $e) { + if ($e->getMessage() !== 'bail') { + throw $e; + } + } + + expect($called)->toBe(6); + Sleep::assertSequence([ + Sleep::for(1)->second(), + Sleep::for(1)->second(), + Sleep::for(1)->second(), + Sleep::for(1)->second(), + Sleep::for(1)->second(), + Sleep::for(1)->second(), + ]); +}); + +it('can run the check command once', function () { + Sleep::fake(); + $called = 0; + Event::listen(function (SharedBeat $beat) use (&$called) { + $called++; + }); + + Artisan::call('pulse:check', ['--once' => true]); + + expect($called)->toBe(1); + Sleep::assertNeverSlept(); +}); + +it('exists instead of looping when on vapor', function () { + Env::getRepository()->set('VAPOR_SSM_PATH', 1); + Sleep::fake(); + $called = 0; + Event::listen(function (SharedBeat $beat) use (&$called) { + $called++; + }); + + Artisan::call('pulse:check'); + + expect($called)->toBe(1); + Sleep::assertNeverSlept(); + + Env::getRepository()->clear('VAPOR_SSM_PATH'); +}); + +it('can throttle shared beat listeners', function () { + $iteration = 1; + Event::listen(SharedBeat::class, $listener = new ThrottledBeatListener(CarbonInterval::seconds(3))); + Sleep::fake(); + Sleep::whenFakingSleep(function ($duration) use (&$iteration, $listener) { + Carbon::setTestNow(now()->add($duration)); + + expect($listener->runs)->toBe(match ($iteration) { + 1, 2, 3 => 1, + 4, 5, 6 => 2, + 7, 8, 9 => 3, + 10, 11, 12 => 4, + }); + + if ($iteration === 12) { + throw new RuntimeException('bail'); + } + + $iteration++; + }); + + try { + Artisan::call('pulse:check'); + } catch (RuntimeException $e) { + if ($e->getMessage() !== 'bail') { + throw $e; + } + } +}); + +it('can throttle isolated beat listeners', function () { + $iteration = 1; + Event::listen(IsolatedBeat::class, $listener = new ThrottledBeatListener(interval: 3)); + Sleep::fake(); + Sleep::whenFakingSleep(function ($duration) use (&$iteration, $listener) { + Carbon::setTestNow(now()->add($duration)); + + expect($listener->runs)->toBe(match ($iteration) { + 1, 2, 3 => 1, + 4, 5, 6 => 2, + 7, 8, 9 => 3, + 10, 11, 12 => 4, + }); + + if ($iteration === 12) { + throw new RuntimeException('bail'); + } + + $iteration++; + }); + + try { + Artisan::call('pulse:check'); + } catch (RuntimeException $e) { + if ($e->getMessage() !== 'bail') { + throw $e; + } + } + + expect($listener->runs)->toBe(4); +}); + +it('does not share throttle locks across check command instances for shared beats', function () { + Event::listen(SharedBeat::class, $listener = new ThrottledBeatListener(3)); + + Artisan::call('pulse:check', ['--once' => true]); + expect($listener->runs)->toBe(1); + Artisan::call('pulse:check', ['--once' => true]); + expect($listener->runs)->toBe(2); +}); + +it('does share throttle locks across check command instances for shared beats when on vapor', function () { + Env::getRepository()->set('VAPOR_SSM_PATH', 1); + Event::listen(SharedBeat::class, $listener = new ThrottledBeatListener(3)); + + Artisan::call('pulse:check', ['--once' => true]); + expect($listener->runs)->toBe(1); + Artisan::call('pulse:check', ['--once' => true]); + expect($listener->runs)->toBe(1); + + Env::getRepository()->clear('VAPOR_SSM_PATH'); +}); + +it('does share throttle locks across instances for IsolatedBeats', function () { + Event::listen(IsolatedBeat::class, $listener = new ThrottledBeatListener(3)); + + Artisan::call('pulse:check', ['--once' => true]); + expect($listener->runs)->toBe(1); + Artisan::call('pulse:check', ['--once' => true]); + expect($listener->runs)->toBe(1); +}); + +it('only fires isolated beats once per second across check command instances', function () { + $called = 0; + Event::listen(IsolatedBeat::class, function () use (&$called) { + $called++; + }); + + Artisan::call('pulse:check', ['--once' => true]); + expect($called)->toBe(1); + Carbon::setTestNow(now()->endOfSecond()); + Artisan::call('pulse:check', ['--once' => true]); + expect($called)->toBe(1); + Carbon::setTestNow(now()->addMillisecond(1)); + Artisan::call('pulse:check', ['--once' => true]); + expect($called)->toBe(2); +}); + +class ThrottledBeatListener +{ + use Throttling; + + public $runs = 0; + + public function __construct(protected $interval) + { + // + } + + public function __invoke($event) + { + $this->throttle($this->interval, $event, function () { + $this->runs++; + }); + } +} diff --git a/tests/Feature/Recorders/ServersTest.php b/tests/Feature/Recorders/ServersTest.php index 90406c65..8490df98 100644 --- a/tests/Feature/Recorders/ServersTest.php +++ b/tests/Feature/Recorders/ServersTest.php @@ -1,5 +1,6 @@ startOfMinute()); - event(app(SharedBeat::class)); + event(new SharedBeat(CarbonImmutable::now(), 'instance-id')); Pulse::ingest(); expect(Pulse::ignore(fn () => DB::table('pulse_entries')->count()))->toBe(0); diff --git a/tests/Pest.php b/tests/Pest.php index a1940f9b..38144a35 100644 --- a/tests/Pest.php +++ b/tests/Pest.php @@ -37,6 +37,7 @@ }) ->afterEach(function () { Str::createUuidsNormally(); + Sleep::fake(false); if (Pulse::wantsIngesting()) { throw new RuntimeException('There are pending entries.');