-
Notifications
You must be signed in to change notification settings - Fork 42
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Process entity events in exclusively and in parallel (#1654)
* feat: aggregate events before executing them This adds yet another component to mediator called the EEA, which stands for the Event Execution Aggregator. As the name suggests, it aggregates events so we should process less of them in the end. We'll process an initial triggering event, with successive events being batched into one until the executor is ready. This was done by using a pessimistic soft-locking mechanism based on time which locks execution for a single entity. If an entity execution is locked, we add the event to a queue which effectively aggregates them into one. These events will be triggered once a flush happens. * go executor go! This limits the executor event handling code to just parse the message. It subsequently spawns a goroutine to do the actual evaluation. The intent is to not block the message handler when we're receiving events so we'd have faster executions. Add mechanism for executor to indicate when it's done. Add graceful termination to executor This makes sure that the executor cancels any profile runs based on a per execution timeout or the server itself shutting down. * logging * Skip flushing if entity no longer exists
- Loading branch information
Showing
26 changed files
with
1,666 additions
and
78 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
-- Copyright 2023 Stacklok, Inc | ||
-- | ||
-- Licensed under the Apache License, Version 2.0 (the "License"); | ||
-- you may not use this file except in compliance with the License. | ||
-- You may obtain a copy of the License at | ||
-- | ||
-- http://www.apache.org/licenses/LICENSE-2.0 | ||
-- | ||
-- Unless required by applicable law or agreed to in writing, software | ||
-- distributed under the License is distributed on an "AS IS" BASIS, | ||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
-- See the License for the specific language governing permissions and | ||
-- limitations under the License. | ||
|
||
DROP INDEX IF EXISTS flush_cache_idx; | ||
DROP INDEX IF EXISTS entity_execution_lock_idx; | ||
|
||
DROP TABLE IF EXISTS flush_cache; | ||
DROP TABLE IF EXISTS entity_execution_lock; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
-- Copyright 2023 Stacklok, Inc | ||
-- | ||
-- Licensed under the Apache License, Version 2.0 (the "License"); | ||
-- you may not use this file except in compliance with the License. | ||
-- You may obtain a copy of the License at | ||
-- | ||
-- http://www.apache.org/licenses/LICENSE-2.0 | ||
-- | ||
-- Unless required by applicable law or agreed to in writing, software | ||
-- distributed under the License is distributed on an "AS IS" BASIS, | ||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
-- See the License for the specific language governing permissions and | ||
-- limitations under the License. | ||
|
||
|
||
--- This implements two tables: | ||
--- * The entity execution lock table, which is used to prevent multiple | ||
--- instances of the same entity from running at the same time. | ||
--- * The flush cache table, which is used to cache entities to be executed | ||
--- once the lock is released. | ||
|
||
CREATE TABLE IF NOT EXISTS entity_execution_lock ( | ||
id UUID NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY, | ||
entity entities NOT NULL, | ||
locked_by UUID NOT NULL, | ||
last_lock_time TIMESTAMP NOT NULL, | ||
repository_id UUID NOT NULL REFERENCES repositories(id) ON DELETE CASCADE, | ||
artifact_id UUID REFERENCES artifacts(id) ON DELETE CASCADE, | ||
pull_request_id UUID REFERENCES pull_requests(id) ON DELETE CASCADE | ||
); | ||
|
||
CREATE UNIQUE INDEX IF NOT EXISTS entity_execution_lock_idx ON entity_execution_lock( | ||
entity, | ||
repository_id, | ||
COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID), | ||
COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID)); | ||
|
||
CREATE TABLE IF NOT EXISTS flush_cache ( | ||
id UUID NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY, | ||
entity entities NOT NULL, | ||
repository_id UUID NOT NULL REFERENCES repositories(id) ON DELETE CASCADE, | ||
artifact_id UUID REFERENCES artifacts(id) ON DELETE CASCADE, | ||
pull_request_id UUID REFERENCES pull_requests(id) ON DELETE CASCADE, | ||
queued_at TIMESTAMP NOT NULL DEFAULT NOW() | ||
); | ||
|
||
CREATE UNIQUE INDEX IF NOT EXISTS flush_cache_idx ON flush_cache( | ||
entity, | ||
repository_id, | ||
COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID), | ||
COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID)); |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
-- LockIfThresholdNotExceeded is used to lock an entity for execution. It will | ||
-- attempt to insert or update the entity_execution_lock table only if the | ||
-- last_lock_time is older than the threshold. If the lock is successful, it | ||
-- will return the lock record. If the lock is unsuccessful, it will return | ||
-- NULL. | ||
|
||
-- name: LockIfThresholdNotExceeded :one | ||
INSERT INTO entity_execution_lock( | ||
entity, | ||
locked_by, | ||
last_lock_time, | ||
repository_id, | ||
artifact_id, | ||
pull_request_id | ||
) VALUES( | ||
sqlc.arg(entity)::entities, | ||
gen_random_uuid(), | ||
NOW(), | ||
sqlc.arg(repository_id)::UUID, | ||
sqlc.narg(artifact_id)::UUID, | ||
sqlc.narg(pull_request_id)::UUID | ||
) ON CONFLICT(entity, repository_id, COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID), COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID)) | ||
DO UPDATE SET | ||
locked_by = gen_random_uuid(), | ||
last_lock_time = NOW() | ||
WHERE entity_execution_lock.last_lock_time < (NOW() - (@interval::TEXT || ' seconds')::interval) | ||
RETURNING *; | ||
|
||
-- ReleaseLock is used to release a lock on an entity. It will delete the | ||
-- entity_execution_lock record if the lock is held by the given locked_by | ||
-- value. | ||
|
||
-- name: ReleaseLock :exec | ||
DELETE FROM entity_execution_lock | ||
WHERE entity = sqlc.arg(entity)::entities AND repository_id = sqlc.arg(repository_id)::UUID AND | ||
COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(artifact_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND | ||
COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(pull_request_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND | ||
locked_by = sqlc.arg(locked_by)::UUID; | ||
|
||
-- name: UpdateLease :exec | ||
UPDATE entity_execution_lock SET last_lock_time = NOW() | ||
WHERE entity = $1 AND repository_id = $2 AND | ||
COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(artifact_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND | ||
COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(pull_request_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND | ||
locked_by = sqlc.arg(locked_by)::UUID; | ||
|
||
-- name: EnqueueFlush :one | ||
INSERT INTO flush_cache( | ||
entity, | ||
repository_id, | ||
artifact_id, | ||
pull_request_id | ||
) VALUES( | ||
sqlc.arg(entity)::entities, | ||
sqlc.arg(repository_id)::UUID, | ||
sqlc.narg(artifact_id)::UUID, | ||
sqlc.narg(pull_request_id)::UUID | ||
) ON CONFLICT(entity, repository_id, COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID), COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID)) | ||
DO NOTHING | ||
RETURNING *; | ||
|
||
-- name: FlushCache :one | ||
DELETE FROM flush_cache | ||
WHERE entity = $1 AND repository_id = $2 AND | ||
COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(artifact_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND | ||
COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(pull_request_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) | ||
RETURNING *; | ||
|
||
-- name: ListFlushCache :many | ||
SELECT * FROM flush_cache; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.