Skip to content

Commit

Permalink
Handle deadlocks (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
jessarcher authored Jan 18, 2024
1 parent d0913db commit 22aca19
Showing 1 changed file with 68 additions and 67 deletions.
135 changes: 68 additions & 67 deletions src/Storage/DatabaseStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,76 +45,77 @@ public function store(Collection $items): void
return;
}

$this->connection()->transaction(function () use ($items) {
[$entries, $values] = $items->partition(fn (Entry|Value $entry) => $entry instanceof Entry);

$entries
->reject->isOnlyBuckets()
->chunk($this->config->get('pulse.storage.database.chunk'))
->each(fn ($chunk) => $this->connection()
->table('pulse_entries')
->insert(
$this->requiresManualKeyHash()
? $chunk->map(fn ($entry) => [
...($attributes = $entry->attributes()),
'key_hash' => md5($attributes['key']),
])->all()
: $chunk->map->attributes()->all()
)
);
[$entries, $values] = $items->partition(fn (Entry|Value $entry) => $entry instanceof Entry);

[$counts, $minimums, $maximums, $sums, $averages] = array_values($entries
->reduce(function ($carry, $entry) {
foreach ($entry->aggregations() as $aggregation) {
$carry[$aggregation][] = $entry;
}
$entryChunks = $entries
->reject->isOnlyBuckets()
->when(
$this->requiresManualKeyHash(),
fn ($entries) => $entries->map(fn ($entry) => [
...($attributes = $entry->attributes()),
'key_hash' => md5($attributes['key']),
]),
fn ($entries) => $entries->map->attributes()
)
->chunk($this->config->get('pulse.storage.database.chunk'));

return $carry;
}, ['count' => [], 'min' => [], 'max' => [], 'sum' => [], 'avg' => []])
);
[$counts, $minimums, $maximums, $sums, $averages] = array_values($entries
->reduce(function ($carry, $entry) {
foreach ($entry->aggregations() as $aggregation) {
$carry[$aggregation][] = $entry;
}

$this
->preaggregateCounts(collect($counts)) // @phpstan-ignore argument.templateType argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'))
->each(fn ($chunk) => $this->upsertCount($chunk->all()));

$this
->preaggregateMinimums(collect($minimums)) // @phpstan-ignore argument.templateType argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'))
->each(fn ($chunk) => $this->upsertMin($chunk->all()));

$this
->preaggregateMaximums(collect($maximums)) // @phpstan-ignore argument.templateType argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'))
->each(fn ($chunk) => $this->upsertMax($chunk->all()));

$this
->preaggregateSums(collect($sums)) // @phpstan-ignore argument.templateType argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'))
->each(fn ($chunk) => $this->upsertSum($chunk->all()));

$this
->preaggregateAverages(collect($averages)) // @phpstan-ignore argument.templateType argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'))
->each(fn ($chunk) => $this->upsertAvg($chunk->all()));

$this
->collapseValues($values)
->chunk($this->config->get('pulse.storage.database.chunk'))
->each(fn ($chunk) => $this->connection()
->table('pulse_values')
->upsert(
$this->requiresManualKeyHash()
? $chunk->map(fn ($entry) => [
...($attributes = $entry->attributes()),
'key_hash' => md5($attributes['key']),
])->all()
: $chunk->map->attributes()->all(), // @phpstan-ignore method.notFound
['type', 'key_hash'],
['timestamp', 'value']
)
);
});
return $carry;
}, ['count' => [], 'min' => [], 'max' => [], 'sum' => [], 'avg' => []])
);

$countChunks = $this->preaggregateCounts(collect($counts)) // @phpstan-ignore argument.templateType argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'));

$minimumChunks = $this->preaggregateMinimums(collect($minimums)) // @phpstan-ignore argument.templateType argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'));

$maximumChunks = $this->preaggregateMaximums(collect($maximums)) // @phpstan-ignore argument.templateType argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'));

$sumChunks = $this->preaggregateSums(collect($sums)) // @phpstan-ignore argument.templateType argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'));

$averageChunks = $this->preaggregateAverages(collect($averages)) // @phpstan-ignore argument.templateType argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'));

$valueChunks = $this
->collapseValues($values)
->when(
$this->requiresManualKeyHash(),
fn ($values) => $values->map(fn ($value) => [
...($attributes = $value->attributes()),
'key_hash' => md5($attributes['key']),
]),
fn ($values) => $values->map->attributes() // @phpstan-ignore method.notFound
)
->chunk($this->config->get('pulse.storage.database.chunk'));

$this->connection()->transaction(function () use ($entryChunks, $countChunks, $minimumChunks, $maximumChunks, $sumChunks, $averageChunks, $valueChunks) {
$entryChunks->each(fn ($chunk) => $this->connection()
->table('pulse_entries')
->insert($chunk->all()));

$countChunks->each(fn ($chunk) => $this->upsertCount($chunk->all()));

$minimumChunks->each(fn ($chunk) => $this->upsertMin($chunk->all()));

$maximumChunks->each(fn ($chunk) => $this->upsertMax($chunk->all()));

$sumChunks->each(fn ($chunk) => $this->upsertSum($chunk->all()));

$averageChunks->each(fn ($chunk) => $this->upsertAvg($chunk->all()));

$valueChunks->each(fn ($chunk) => $this->connection()
->table('pulse_values')
->upsert($chunk->all(), ['type', 'key_hash'], ['timestamp', 'value'])
);
}, 3);
}

/**
Expand Down

0 comments on commit 22aca19

Please sign in to comment.