diff --git a/src/Storage/DatabaseStorage.php b/src/Storage/DatabaseStorage.php index 46133f40..966e141f 100644 --- a/src/Storage/DatabaseStorage.php +++ b/src/Storage/DatabaseStorage.php @@ -45,76 +45,77 @@ public function store(Collection $items): void return; } - $this->connection()->transaction(function () use ($items) { - [$entries, $values] = $items->partition(fn (Entry|Value $entry) => $entry instanceof Entry); - - $entries - ->reject->isOnlyBuckets() - ->chunk($this->config->get('pulse.storage.database.chunk')) - ->each(fn ($chunk) => $this->connection() - ->table('pulse_entries') - ->insert( - $this->requiresManualKeyHash() - ? $chunk->map(fn ($entry) => [ - ...($attributes = $entry->attributes()), - 'key_hash' => md5($attributes['key']), - ])->all() - : $chunk->map->attributes()->all() - ) - ); + [$entries, $values] = $items->partition(fn (Entry|Value $entry) => $entry instanceof Entry); - [$counts, $minimums, $maximums, $sums, $averages] = array_values($entries - ->reduce(function ($carry, $entry) { - foreach ($entry->aggregations() as $aggregation) { - $carry[$aggregation][] = $entry; - } + $entryChunks = $entries + ->reject->isOnlyBuckets() + ->when( + $this->requiresManualKeyHash(), + fn ($entries) => $entries->map(fn ($entry) => [ + ...($attributes = $entry->attributes()), + 'key_hash' => md5($attributes['key']), + ]), + fn ($entries) => $entries->map->attributes() + ) + ->chunk($this->config->get('pulse.storage.database.chunk')); - return $carry; - }, ['count' => [], 'min' => [], 'max' => [], 'sum' => [], 'avg' => []]) - ); + [$counts, $minimums, $maximums, $sums, $averages] = array_values($entries + ->reduce(function ($carry, $entry) { + foreach ($entry->aggregations() as $aggregation) { + $carry[$aggregation][] = $entry; + } - $this - ->preaggregateCounts(collect($counts)) // @phpstan-ignore argument.templateType argument.templateType - ->chunk($this->config->get('pulse.storage.database.chunk')) - ->each(fn ($chunk) => $this->upsertCount($chunk->all())); - - $this - ->preaggregateMinimums(collect($minimums)) // @phpstan-ignore argument.templateType argument.templateType - ->chunk($this->config->get('pulse.storage.database.chunk')) - ->each(fn ($chunk) => $this->upsertMin($chunk->all())); - - $this - ->preaggregateMaximums(collect($maximums)) // @phpstan-ignore argument.templateType argument.templateType - ->chunk($this->config->get('pulse.storage.database.chunk')) - ->each(fn ($chunk) => $this->upsertMax($chunk->all())); - - $this - ->preaggregateSums(collect($sums)) // @phpstan-ignore argument.templateType argument.templateType - ->chunk($this->config->get('pulse.storage.database.chunk')) - ->each(fn ($chunk) => $this->upsertSum($chunk->all())); - - $this - ->preaggregateAverages(collect($averages)) // @phpstan-ignore argument.templateType argument.templateType - ->chunk($this->config->get('pulse.storage.database.chunk')) - ->each(fn ($chunk) => $this->upsertAvg($chunk->all())); - - $this - ->collapseValues($values) - ->chunk($this->config->get('pulse.storage.database.chunk')) - ->each(fn ($chunk) => $this->connection() - ->table('pulse_values') - ->upsert( - $this->requiresManualKeyHash() - ? $chunk->map(fn ($entry) => [ - ...($attributes = $entry->attributes()), - 'key_hash' => md5($attributes['key']), - ])->all() - : $chunk->map->attributes()->all(), // @phpstan-ignore method.notFound - ['type', 'key_hash'], - ['timestamp', 'value'] - ) - ); - }); + return $carry; + }, ['count' => [], 'min' => [], 'max' => [], 'sum' => [], 'avg' => []]) + ); + + $countChunks = $this->preaggregateCounts(collect($counts)) // @phpstan-ignore argument.templateType argument.templateType + ->chunk($this->config->get('pulse.storage.database.chunk')); + + $minimumChunks = $this->preaggregateMinimums(collect($minimums)) // @phpstan-ignore argument.templateType argument.templateType + ->chunk($this->config->get('pulse.storage.database.chunk')); + + $maximumChunks = $this->preaggregateMaximums(collect($maximums)) // @phpstan-ignore argument.templateType argument.templateType + ->chunk($this->config->get('pulse.storage.database.chunk')); + + $sumChunks = $this->preaggregateSums(collect($sums)) // @phpstan-ignore argument.templateType argument.templateType + ->chunk($this->config->get('pulse.storage.database.chunk')); + + $averageChunks = $this->preaggregateAverages(collect($averages)) // @phpstan-ignore argument.templateType argument.templateType + ->chunk($this->config->get('pulse.storage.database.chunk')); + + $valueChunks = $this + ->collapseValues($values) + ->when( + $this->requiresManualKeyHash(), + fn ($values) => $values->map(fn ($value) => [ + ...($attributes = $value->attributes()), + 'key_hash' => md5($attributes['key']), + ]), + fn ($values) => $values->map->attributes() // @phpstan-ignore method.notFound + ) + ->chunk($this->config->get('pulse.storage.database.chunk')); + + $this->connection()->transaction(function () use ($entryChunks, $countChunks, $minimumChunks, $maximumChunks, $sumChunks, $averageChunks, $valueChunks) { + $entryChunks->each(fn ($chunk) => $this->connection() + ->table('pulse_entries') + ->insert($chunk->all())); + + $countChunks->each(fn ($chunk) => $this->upsertCount($chunk->all())); + + $minimumChunks->each(fn ($chunk) => $this->upsertMin($chunk->all())); + + $maximumChunks->each(fn ($chunk) => $this->upsertMax($chunk->all())); + + $sumChunks->each(fn ($chunk) => $this->upsertSum($chunk->all())); + + $averageChunks->each(fn ($chunk) => $this->upsertAvg($chunk->all())); + + $valueChunks->each(fn ($chunk) => $this->connection() + ->table('pulse_values') + ->upsert($chunk->all(), ['type', 'key_hash'], ['timestamp', 'value']) + ); + }, 3); } /**