Skip to content

Commit

Permalink
Merge pull request #346 from umccr/refactor/filemanager-ingest
Browse files Browse the repository at this point in the history
refactor: filemanager ingest
  • Loading branch information
mmalenic authored Jun 12, 2024
2 parents a1c134f + e91a4f2 commit b50ee96
Show file tree
Hide file tree
Showing 24 changed files with 2,775 additions and 1,224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ create type storage_class as enum (
'StandardIa'
);

-- The type of event for this s3_object record.
create type event_type as enum (
'Created',
'Deleted',
-- If using 'paired' mode, 'Other' will be the event type.
'Other'
);

-- An object contain in AWS S3, maps as a one-to-one relationship with the object table.
create table s3_object (
-- The s3 object id.
Expand All @@ -23,18 +31,18 @@ create table s3_object (
public_id uuid not null,

-- General fields
-- The kind of event of this s3_object.
event_type event_type not null,
-- The bucket of the object.
bucket text not null,
-- The key of the object.
key text not null,
-- The version id of the object. A 'null' string is used to indicate no version id. This matches logic in AWS which
-- also returns 'null' strings. See https://docs.aws.amazon.com/AmazonS3/latest/userguide/versioning-workflows.html
version_id text not null default 'null',
-- When this object was created. A null value here means that a deleted event has occurred before a created event.
created_date timestamptz default null,
-- When this object was deleted, a null value means that the object has not yet been deleted.
deleted_date timestamptz default null,
-- provenance - history of all objects and how they move?
-- When this object was created/deleted. For created objects, this is the date the object was created. For deleted
-- objects, this is the date the object was deleted.
date timestamptz default null,
-- The size of the object.
size bigint default null,
-- A base64 encoded SHA256 checksum of the object.
Expand All @@ -47,19 +55,26 @@ create table s3_object (
e_tag text default null,
-- The S3 storage class of the object.
storage_class storage_class default null,
-- A sequencer value for when the object was created. Used to synchronise out of order and duplicate events.
created_sequencer text default null,
-- A sequencer value for when the object was deleted. Used to synchronise out of order and duplicate events.
deleted_sequencer text default null,
-- Record the number of times this event has been considered out of order, useful for debugging.
number_reordered bigint not null default 0,
-- A sequencer value for when the object. Used to synchronise out of order and duplicate events.
sequencer text default null,
-- A delete marker is a special object that is created when a versioned object is deleted.
is_delete_marker boolean not null default false,
-- Record the number of duplicate events received for this object, useful for debugging.
number_duplicate_events bigint not null default 0,

-- Attributes on a single s3_object.
attributes jsonb default null,

-- The sequencers should be unique with the bucket, key, and its version, otherwise this is a duplicate event.
constraint created_sequencer_unique unique (bucket, key, version_id, created_sequencer),
constraint deleted_sequencer_unique unique (bucket, key, version_id, deleted_sequencer)
-- The sequencers should be unique with the bucket, key, version_id and event_type,
-- otherwise this is a duplicate event.
constraint sequencer_unique unique (bucket, key, version_id, event_type, sequencer),

-- The following columns are only used for `paired` mode ingestion.
-- When this object was deleted, a null value means that the object has not yet been deleted.
deleted_date timestamptz default null,
-- A sequencer value for when the object was deleted. Used to synchronise out of order and duplicate events.
deleted_sequencer text default null,
-- Record the number of times this event has been considered out of order, useful for debugging.
number_reordered bigint not null default 0,
constraint deleted_sequencer_unique unique (bucket, key, version_id, event_type, deleted_sequencer)
);

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- Select the existing and most recent s3_objects (those that haven't yet been deleted)
-- based on the input bucket, key and version_id values into FlatS3EventMessage structs.
-- This query effectively fetches the current objects in S3 for a set of buckets, keys
-- Select the most recent s3_objects based on the input bucket, key and version_id values
-- into FlatS3EventMessage structs. This query effectively fetches the current state of the
-- database objects (for both created and deleted records) in S3 for a set of buckets, keys
-- and version_ids.
-- TODO, potentially replace this with sea-orm codegen and query builder.

Expand All @@ -20,33 +20,36 @@ with input as (
)
-- Select objects into a FlatS3EventMessage struct.
select
object_id,
s3_object_id,
public_id,
s3_object.bucket,
s3_object.key,
created_date as event_time,
date as event_time,
last_modified_date,
e_tag,
sha256,
storage_class as "storage_class?: StorageClass",
s3_object.version_id as "version_id!",
created_sequencer as sequencer,
number_reordered,
sequencer,
number_duplicate_events,
size,
is_delete_marker,
'Created' as "event_type!: EventType"
event_type as "event_type!: EventType",
0 as "number_reordered!"
from input
-- Grab the most recent object in each input group.
cross join lateral (
-- Cross join the input with one s3_object based on the most recent created_date.
-- Cross join the input with one s3_object based on the most recent event.
select
*
from s3_object
where
input.bucket = s3_object.bucket and
input.key = s3_object.key and
input.version_id = s3_object.version_id
order by s3_object.created_date desc
input.version_id = s3_object.version_id and
s3_object.event_type = 'Created'
order by s3_object.sequencer desc
limit 1
)
as s3_object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ insert into s3_object (
public_id,
bucket,
key,
created_date,
date,
size,
sha256,
last_modified_date,
e_tag,
storage_class,
version_id,
created_sequencer,
is_delete_marker
sequencer,
is_delete_marker,
event_type
)
values (
unnest($1::uuid[]),
Expand All @@ -29,7 +30,8 @@ values (
unnest($11::storage_class[]),
unnest($12::text[]),
unnest($13::text[]),
unnest($14::boolean[])
) on conflict on constraint created_sequencer_unique do update
unnest($14::boolean[]),
unnest($15::event_type[])
) on conflict on constraint sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning object_id, number_duplicate_events;
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ insert into s3_object (
version_id,
deleted_sequencer,
number_reordered,
is_delete_marker
is_delete_marker,
event_type
)
values (
unnest($1::uuid[]),
Expand All @@ -31,7 +32,8 @@ values (
unnest($12::text[]),
unnest($13::text[]),
unnest($14::bigint[]),
unnest($15::boolean[])
unnest($15::boolean[]),
unnest($16::event_type[])
) on conflict on constraint deleted_sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning object_id, number_duplicate_events;
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- Bulk insert of s3 objects.
insert into s3_object (
object_id,
s3_object_id,
public_id,
bucket,
key,
date,
size,
sha256,
last_modified_date,
e_tag,
storage_class,
version_id,
sequencer,
is_delete_marker,
event_type
)
values (
unnest($1::uuid[]),
unnest($2::uuid[]),
unnest($3::uuid[]),
unnest($4::text[]),
unnest($5::text[]),
unnest($6::timestamptz[]),
unnest($7::bigint[]),
unnest($8::text[]),
unnest($9::timestamptz[]),
unnest($10::text[]),
unnest($11::storage_class[]),
unnest($12::text[]),
unnest($13::text[]),
unnest($14::boolean[]),
unnest($15::event_type[])
) on conflict on constraint sequencer_unique do update
set number_duplicate_events = s3_object.number_duplicate_events + 1
returning object_id, number_duplicate_events;
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ with input as (
$9::storage_class[],
$10::text[],
$11::text[],
$12::boolean[]
$12::boolean[],
$13::event_type[]
) as input (
s3_object_id,
bucket,
Expand All @@ -30,7 +31,8 @@ with input as (
storage_class,
version_id,
created_sequencer,
is_delete_marker
is_delete_marker,
event_type
)
),
-- Then, select the objects that need to be updated.
Expand All @@ -48,7 +50,8 @@ current_objects as (
input.last_modified_date as input_last_modified_date,
input.e_tag as input_e_tag,
input.storage_class as input_storage_class,
input.is_delete_marker as input_is_delete_marker
input.is_delete_marker as input_is_delete_marker,
input.event_type as input_event_type
from s3_object
-- Grab the relevant values to update with.
join input on
Expand All @@ -69,15 +72,15 @@ objects_to_update as (
current_objects.deleted_sequencer > current_objects.input_created_sequencer and
(
-- Updating a null sequencer doesn't cause the event to be reprocessed.
current_objects.created_sequencer is null or
current_objects.sequencer is null or
-- If a sequencer already exists this event should be reprocessed because this
-- sequencer could belong to another object.
current_objects.created_sequencer < current_objects.input_created_sequencer
current_objects.sequencer < current_objects.input_created_sequencer
) and
-- And there should not be any objects with a created sequencer that is the same as the input created
-- sequencer because this is a duplicate event that would cause a constraint error in the update.
current_objects.input_created_sequencer not in (
select created_sequencer from current_objects where created_sequencer is not null
select sequencer from current_objects where sequencer is not null
)
-- Only one event entry should be updated, and that entry must be the one with the
-- deleted sequencer that is minimum, i.e. closest to the created sequencer which
Expand All @@ -88,17 +91,18 @@ objects_to_update as (
-- Finally, update the required objects.
update as (
update s3_object
set created_sequencer = objects_to_update.input_created_sequencer,
created_date = objects_to_update.input_created_date,
set sequencer = objects_to_update.input_created_sequencer,
date = objects_to_update.input_created_date,
size = objects_to_update.input_size,
sha256 = objects_to_update.input_sha256,
last_modified_date = objects_to_update.input_last_modified_date,
e_tag = objects_to_update.input_e_tag,
is_delete_marker = objects_to_update.input_is_delete_marker,
storage_class = objects_to_update.input_storage_class,
event_type = objects_to_update.input_event_type,
number_reordered = s3_object.number_reordered +
-- Note the asymmetry between this and the reorder for deleted query.
case when objects_to_update.deleted_sequencer is not null or objects_to_update.created_sequencer is not null then
case when objects_to_update.deleted_sequencer is not null or objects_to_update.sequencer is not null then
1
else
0
Expand All @@ -110,15 +114,17 @@ update as (
select
-- Note, this is the passed through value from the input in order to identify this event later.
input_id as "s3_object_id!",
object_id,
public_id,
bucket,
key,
created_date as event_time,
date as event_time,
last_modified_date,
e_tag,
sha256,
storage_class as "storage_class?: StorageClass",
version_id as "version_id!",
created_sequencer as sequencer,
sequencer,
number_reordered,
number_duplicate_events,
size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ with input as (
$3::text[],
$4::timestamptz[],
$5::text[],
$6::text[]
$6::text[],
$7::event_type[]
) as input (
s3_object_id,
bucket,
key,
deleted_date,
version_id,
deleted_sequencer
deleted_sequencer,
event_type
)
),
-- Then, select the objects that match the bucket, key and version_id
Expand All @@ -30,7 +32,8 @@ current_objects as (
input.key as input_key,
input.version_id as input_version_id,
input.deleted_sequencer as input_deleted_sequencer,
input.deleted_date as input_deleted_date
input.deleted_date as input_deleted_date,
input.event_type as input_event_type
from s3_object
-- Grab the relevant values to update with.
join input on
Expand All @@ -48,7 +51,7 @@ objects_to_update as (
where
-- Check the sequencer condition. We only update if there is a deleted
-- sequencer that is closer to the created sequencer.
current_objects.created_sequencer < current_objects.input_deleted_sequencer and
current_objects.sequencer < current_objects.input_deleted_sequencer and
(
-- Updating a null sequencer doesn't cause the event to be reprocessed.
current_objects.deleted_sequencer is null or
Expand All @@ -64,14 +67,15 @@ objects_to_update as (
-- Only one event entry should be updated, and that entry must be the one with the
-- created sequencer that is maximum, i.e. closest to the deleted sequencer which
-- is going to be inserted.
order by current_objects.created_sequencer desc
order by current_objects.sequencer desc
limit 1
),
-- Finally, update the required objects.
update as (
update s3_object
set deleted_sequencer = objects_to_update.input_deleted_sequencer,
deleted_date = objects_to_update.input_deleted_date,
event_type = objects_to_update.input_event_type,
number_reordered = s3_object.number_reordered +
case when objects_to_update.deleted_sequencer is null then 0 else 1 end
from objects_to_update
Expand All @@ -81,6 +85,8 @@ update as (
select
-- Note, this is the passed through value from the input in order to identify this event later.
input_id as "s3_object_id!",
object_id,
public_id,
bucket,
key,
deleted_date as event_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ At the database level, duplicate events are removed using a unique constraint on

Within the application code, out of order events are removed within the [events] module by comparing sequencer values.

By default, filemanager makes no assumption about the ordering of events, and ingests events in the order that they arrive.
The sequencer value is stored on the `s3_object` table, which allows ordering entries when querying.

#### Paired ingest mode
Ordering events on ingestion can be turned on by setting `PAIRED_INGEST_MODE=true` as an environment variable. This has
a performance cost on ingestion, but it removes the requirment to order events when querying the database.

At the database level, events are processed as they arrive. For each object in the database, the sequencer value is
recorded. When an event is inserted, it is first checked to see if it belongs to an already existing object, i.e. whether
there are any objects with sequencer values that are greater (for created events) or lower (for deleted events) than the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ Generally code that belongs to a particular cloud service should be put in its o
[database]: src/database
[events]: src/events
[handlers]: src/handlers
[env]: src/env.rs
[error]: src/error.rs
[mockall]: https://github.com/asomers/mockall
[s3-events]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html
Loading

0 comments on commit b50ee96

Please sign in to comment.