Skip to content

Commit

Permalink
Merge pull request #415 from umccr/feat/filemanager-current-state
Browse files Browse the repository at this point in the history
feat: filemanager current state
  • Loading branch information
mmalenic authored Jul 16, 2024
2 parents 3e9439e + 3a39f80 commit d759538
Show file tree
Hide file tree
Showing 4 changed files with 353 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
//! Query builder involving list operations on the database.
//!

use sea_orm::prelude::Expr;
use sea_orm::sea_query::extension::postgres::PgExpr;
use sea_orm::{
ColumnTrait, Condition, EntityTrait, FromQueryResult, PaginatorTrait, QueryFilter, QueryOrder,
QuerySelect, Select,
};

use crate::database::entities::object::Entity as ObjectEntity;
use crate::database::entities::s3_object::Entity as S3ObjectEntity;
use crate::database::entities::s3_object::{Column as S3ObjectColumn, Column as ObjectColumn};
Expand All @@ -17,6 +10,15 @@ use crate::error::{Error, Result};
use crate::routes::filtering::{ObjectsFilterAll, S3ObjectsFilterAll};
use crate::routes::list::{ListCount, ListResponse};
use crate::routes::pagination::Pagination;
use sea_orm::prelude::Expr;
use sea_orm::sea_query::extension::postgres::PgExpr;
use sea_orm::sea_query::{Alias, Asterisk, PostgresQueryBuilder, Query};
use sea_orm::Order::{Asc, Desc};
use sea_orm::{
ColumnTrait, Condition, EntityTrait, FromQueryResult, PaginatorTrait, QueryFilter, QueryOrder,
QuerySelect, QueryTrait, Select,
};
use tracing::trace;

/// A query builder for list operations.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -70,6 +72,15 @@ impl<'a> ListQueryBuilder<'a, S3ObjectEntity> {
}

/// Filter records by all fields in the filter variable.
///
/// This creates a query which is similar to:
///
/// ```sql
/// select * from s3_object
/// where event_type = filter.event_type and
/// bucket = filter.bucket and
/// ...;
/// ```
pub fn filter_all(mut self, filter: S3ObjectsFilterAll) -> Self {
let condition = Condition::all()
.add_option(filter.event_type.map(|v| S3ObjectColumn::EventType.eq(v)))
Expand Down Expand Up @@ -102,6 +113,55 @@ impl<'a> ListQueryBuilder<'a, S3ObjectEntity> {
);

self.select = self.select.filter(condition);

self.trace_query("filter_all");

self
}

/// Update this query to find objects that represent the current state of S3 objects. That is,
/// this gets all non-deleted objects. This means that only `Created` events will be returned,
/// and these will be the most up to date at this point, without any 'Deleted' events coming
/// after them.
///
/// This creates a query which is roughly equivalent to the following:
///
/// ```sql
/// select * from (
/// select distinct on (bucket, key, version_id) * from s3_object
/// order by bucket, key, version_id, sequencer desc
/// ) as s3_object
/// where event_type = 'Created';
/// ```
///
/// This finds all distinct objects within a (bucket, key, version_id) grouping such that they
/// are most recent and only `Created` events.
pub fn current_state(mut self) -> Self {
let subquery = Query::select()
.column(Asterisk)
.distinct_on([
S3ObjectColumn::Bucket,
S3ObjectColumn::Key,
S3ObjectColumn::VersionId,
])
.from(S3ObjectEntity)
.order_by_columns([
(S3ObjectColumn::Bucket, Asc),
(S3ObjectColumn::Key, Asc),
(S3ObjectColumn::VersionId, Asc),
(S3ObjectColumn::Sequencer, Desc),
])
.take();

// Clear the current from state (which should be `from s3_object`), and
// Update it to the distinct_on subquery.
QuerySelect::query(&mut self.select)
.from_clear()
.from_subquery(subquery, Alias::new("s3_object"))
.and_where(S3ObjectColumn::EventType.eq("Created"));

self.trace_query("current_state");

self
}
}
Expand All @@ -127,12 +187,20 @@ where
}

/// Paginate the query for the given page and page_size.
///
/// This produces a query similar to:
///
/// ```sql
/// select * from s3_object
/// limit page_size
/// offset page * page_size;
/// ```
pub async fn paginate(mut self, page: u64, page_size: u64) -> Result<Self> {
let offset = page.checked_mul(page_size).ok_or_else(|| OverflowError)?;
// Always add one to the limit to see if there is additional pages that can be fetched.
let limit = page_size.checked_add(1).ok_or_else(|| OverflowError)?;
self.select = self.select.offset(offset).limit(page_size);

self.trace_query("paginate");

self.select = self.select.offset(offset).limit(limit);
Ok(self)
}

Expand All @@ -143,7 +211,13 @@ where
) -> Result<ListResponse<M>> {
let page = pagination.page();
let page_size = pagination.page_size();
let query = self.paginate(page, page_size).await?;
let mut query = self.paginate(page, page_size).await?;

// Always add one to the limit to see if there is additional pages that can be fetched.
QuerySelect::query(&mut query.select).reset_limit();
query.select = query
.select
.limit(page_size.checked_add(1).ok_or_else(|| OverflowError)?);

let mut results = query.all().await?;

Expand All @@ -165,6 +239,13 @@ where
pub async fn to_list_count(self) -> Result<ListCount> {
Ok(ListCount::new(self.count().await?))
}

fn trace_query(&self, message: &str) {
trace!(
"{message}: {}",
self.select.as_query().to_string(PostgresQueryBuilder)
);
}
}

#[cfg(test)]
Expand All @@ -176,7 +257,9 @@ mod tests {
use crate::database::entities::object::Model as ObjectModel;
use crate::database::entities::s3_object::Model as S3ObjectModel;
use crate::database::entities::sea_orm_active_enums::EventType;
use crate::queries::tests::{initialize_database, initialize_database_reorder};
use crate::queries::tests::{
initialize_database, initialize_database_ratios_reorder, initialize_database_reorder,
};

use super::*;

Expand All @@ -191,6 +274,89 @@ mod tests {
assert_eq!(result, entries);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn test_current_s3_objects_10(pool: PgPool) {
let client = Client::from_pool(pool);

let entries = initialize_database_ratios_reorder(&client, 10, 4, 3)
.await
.s3_objects;
let builder = ListQueryBuilder::<S3ObjectEntity>::new(&client).current_state();
let result = builder.all().await.unwrap();

assert_eq!(result, vec![entries[2].clone(), entries[8].clone()]);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn test_current_s3_objects_30(pool: PgPool) {
let client = Client::from_pool(pool);

let entries = initialize_database_ratios_reorder(&client, 30, 8, 5)
.await
.s3_objects;
let builder = ListQueryBuilder::<S3ObjectEntity>::new(&client).current_state();
let result = builder.all().await.unwrap();

assert_eq!(
result,
vec![entries[6].clone(), entries[17].clone(), entries[24].clone()]
);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn test_current_s3_objects_with_paginate_10(pool: PgPool) {
let client = Client::from_pool(pool);

let entries = initialize_database_ratios_reorder(&client, 10, 4, 3)
.await
.s3_objects;

let builder = ListQueryBuilder::<S3ObjectEntity>::new(&client)
.current_state()
.paginate(0, 1)
.await
.unwrap();
let result = builder.all().await.unwrap();
assert_eq!(result, vec![entries[2].clone()]);

// Order of paginate call shouldn't matter.
let builder = ListQueryBuilder::<S3ObjectEntity>::new(&client)
.paginate(1, 1)
.await
.unwrap()
.current_state();
let result = builder.all().await.unwrap();
assert_eq!(result, vec![entries[8].clone()]);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn test_current_s3_objects_with_filter(pool: PgPool) {
let client = Client::from_pool(pool);

let entries = initialize_database_ratios_reorder(&client, 30, 8, 5)
.await
.s3_objects;

let builder = ListQueryBuilder::<S3ObjectEntity>::new(&client)
.current_state()
.filter_all(S3ObjectsFilterAll {
size: Some(14),
..Default::default()
});
let result = builder.all().await.unwrap();
assert_eq!(result, vec![entries[6].clone()]);

// Order of filter call shouldn't matter.
let builder = ListQueryBuilder::<S3ObjectEntity>::new(&client)
.filter_all(S3ObjectsFilterAll {
size: Some(4),
..Default::default()
})
.current_state();
let result = builder.all().await.unwrap();
assert_eq!(result, vec![entries[24].clone()]);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn test_list_objects_filter_attributes(pool: PgPool) {
let client = Client::from_pool(pool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,45 @@ pub(crate) mod tests {
/// Initialize the database state for testing and shuffle entries to simulate
/// out of order events.
pub(crate) async fn initialize_database_reorder(client: &Client, n: usize) -> Entries {
let mut data = initialize_database_with_shuffle(client, n, true).await;

// Return the correct ordering for test purposes
data.sort_by(|(_, a), (_, b)| a.sequencer.cmp(&b.sequencer));

data.into()
initialize_database_ratios_reorder(client, n, 2, 1).await
}

/// Initialize database state for testing.
pub(crate) async fn initialize_database(client: &Client, n: usize) -> Entries {
initialize_database_with_shuffle(client, n, false)
initialize_database_with_shuffle(client, n, false, 2, 1)
.await
.into()
}

/// Initialize database state for testing with custom bucket and key ratios of Created/Deleted
/// events and out of order events.
pub(crate) async fn initialize_database_ratios_reorder(
client: &Client,
n: usize,
bucket_ratio: usize,
key_ratio: usize,
) -> Entries {
let mut data =
initialize_database_with_shuffle(client, n, true, bucket_ratio, key_ratio).await;

// Return the correct ordering for test purposes
data.sort_by(|(_, a), (_, b)| a.sequencer.cmp(&b.sequencer));

data.into()
}

async fn initialize_database_with_shuffle(
client: &Client,
n: usize,
shuffle: bool,
bucket_ratio: usize,
key_ratio: usize,
) -> Vec<(Object, S3Object)> {
let mut output = vec![];

let mut entries: Vec<_> = (0..n).map(generate_entry).collect();
let mut entries: Vec<_> = (0..n)
.map(|index| generate_entry(index, bucket_ratio, key_ratio))
.collect();

if shuffle {
entries.shuffle(&mut thread_rng());
Expand All @@ -92,7 +108,11 @@ pub(crate) mod tests {
output
}

pub(crate) fn generate_entry(index: usize) -> (ActiveObject, ActiveS3Object) {
pub(crate) fn generate_entry(
index: usize,
bucket_ratio: usize,
key_ratio: usize,
) -> (ActiveObject, ActiveS3Object) {
let object_id = UuidGenerator::generate();
let event = event_type(index);
let date = || Set(Some(DateTime::default().add(Days::new(index as u64))));
Expand All @@ -113,10 +133,9 @@ pub(crate) mod tests {
object_id: Set(object_id),
public_id: Set(UuidGenerator::generate()),
event_type: Set(event.clone()),
// Half as many buckets as keys.
bucket: Set((index / 2).to_string()),
key: Set(index.to_string()),
version_id: Set(index.to_string()),
bucket: Set((index / bucket_ratio).to_string()),
key: Set((index / key_ratio).to_string()),
version_id: Set((index / key_ratio).to_string()),
date: date(),
size: Set(Some(index as i64)),
sha256: Set(Some(index.to_string())),
Expand Down
Loading

0 comments on commit d759538

Please sign in to comment.