Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ add order_by and descending options to scan and fetch_all queries #291

Merged
merged 15 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions askar-storage/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ impl<B: Backend> Backend for WrapBackend<B> {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<String>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
self.0
.scan(profile, kind, category, tag_filter, offset, limit)
self.0.scan(
profile, kind, category, tag_filter, offset, limit, order_by, descending,
)
}

#[inline]
Expand Down Expand Up @@ -142,9 +145,12 @@ impl Backend for AnyBackend {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<String>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
self.0
.scan(profile, kind, category, tag_filter, offset, limit)
self.0.scan(
profile, kind, category, tag_filter, offset, limit, order_by, descending,
)
}

#[inline]
Expand Down
14 changes: 14 additions & 0 deletions askar-storage/src/backend/db_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,17 @@ pub trait QueryPrepare {
}
query
}

fn order_by_query<'q>(mut query: String, order_by: Option<String>, descending: bool) -> String {
if let Some(order_by) = order_by {
query.push_str(" ORDER BY ");
query.push_str(&order_by);
ff137 marked this conversation as resolved.
Show resolved Hide resolved
if descending {
query.push_str(" DESC");
}
}
query
}
}

pub fn replace_arg_placeholders<Q: QueryPrepare + ?Sized>(
Expand Down Expand Up @@ -625,6 +636,8 @@ pub fn extend_query<'q, Q: QueryPrepare>(
tag_filter: Option<(String, Vec<Vec<u8>>)>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<String>,
descending: bool,
) -> Result<String, Error>
where
i64: for<'e> Encode<'e, Q::DB> + Type<Q::DB>,
Expand All @@ -636,6 +649,7 @@ where
query.push_str(" AND "); // assumes WHERE already occurs
query.push_str(&filter_clause);
};
query = Q::order_by_query(query, order_by, descending);
if offset.is_some() || limit.is_some() {
query = Q::limit_query(query, args, offset, limit);
};
Expand Down
13 changes: 12 additions & 1 deletion askar-storage/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub trait Backend: Debug + Send + Sync {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<String>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>>;

/// Create a new session against the store
Expand Down Expand Up @@ -185,7 +187,16 @@ pub async fn copy_profile<A: Backend, B: Backend>(
to_profile: &str,
) -> Result<(), Error> {
let scan = from_backend
.scan(Some(from_profile.into()), None, None, None, None, None)
.scan(
Some(from_profile.into()),
None,
None,
None,
None,
None,
None,
false,
)
.await?;
if let Err(e) = to_backend.create_profile(Some(to_profile.into())).await {
if e.kind() != ErrorKind::Duplicate {
Expand Down
23 changes: 20 additions & 3 deletions askar-storage/src/backend/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ impl Backend for PostgresBackend {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<String>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
Box::pin(async move {
let session = self.session(profile, false)?;
Expand All @@ -282,6 +284,8 @@ impl Backend for PostgresBackend {
tag_filter,
offset,
limit,
order_by,
descending,
false,
);
let stream = scan.then(move |enc_rows| {
Expand Down Expand Up @@ -347,8 +351,15 @@ impl BackendSession for DbSession<Postgres> {
})
.await?;
params.push(enc_category);
let query =
extend_query::<PostgresBackend>(COUNT_QUERY, &mut params, tag_filter, None, None)?;
let query = extend_query::<PostgresBackend>(
COUNT_QUERY,
&mut params,
tag_filter,
None,
None,
None,
false,
)?;
let mut active = acquire_session(&mut *self).await?;
let count = sqlx::query_scalar_with(query.as_str(), params)
.fetch_one(active.connection_mut())
Expand Down Expand Up @@ -440,6 +451,8 @@ impl BackendSession for DbSession<Postgres> {
tag_filter,
None,
limit,
None,
false,
for_update,
);
pin!(scan);
Expand Down Expand Up @@ -483,6 +496,8 @@ impl BackendSession for DbSession<Postgres> {
tag_filter,
None,
None,
None,
false,
)?;

let mut active = acquire_session(&mut *self).await?;
Expand Down Expand Up @@ -752,6 +767,8 @@ fn perform_scan(
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<String>,
descending: bool,
for_update: bool,
) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
try_stream! {
Expand All @@ -772,7 +789,7 @@ fn perform_scan(
}
}).await?;
params.push(enc_category);
let mut query = extend_query::<PostgresBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit)?;
let mut query = extend_query::<PostgresBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;
if for_update {
query.push_str(" FOR NO KEY UPDATE");
}
Expand Down
23 changes: 20 additions & 3 deletions askar-storage/src/backend/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ impl Backend for SqliteBackend {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<String>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
Box::pin(async move {
let session = self.session(profile, false)?;
Expand All @@ -276,6 +278,8 @@ impl Backend for SqliteBackend {
tag_filter,
offset,
limit,
order_by,
descending,
);
let stream = scan.then(move |enc_rows| {
let category = category.clone();
Expand Down Expand Up @@ -330,8 +334,15 @@ impl BackendSession for DbSession<Sqlite> {
})
.await?;
params.push(enc_category);
let query =
extend_query::<SqliteBackend>(COUNT_QUERY, &mut params, tag_filter, None, None)?;
let query = extend_query::<SqliteBackend>(
COUNT_QUERY,
&mut params,
tag_filter,
None,
None,
None,
false,
)?;
let mut active = acquire_session(&mut *self).await?;
let count = sqlx::query_scalar_with(query.as_str(), params)
.fetch_one(active.connection_mut())
Expand Down Expand Up @@ -413,6 +424,8 @@ impl BackendSession for DbSession<Sqlite> {
tag_filter,
None,
limit,
None,
false,
);
pin!(scan);
let mut enc_rows = vec![];
Expand Down Expand Up @@ -455,6 +468,8 @@ impl BackendSession for DbSession<Sqlite> {
tag_filter,
None,
None,
None,
false,
)?;

let mut active = acquire_session(&mut *self).await?;
Expand Down Expand Up @@ -703,6 +718,8 @@ fn perform_scan(
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<String>,
descending: bool,
) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
try_stream! {
let mut params = QueryParams::new();
Expand All @@ -720,7 +737,7 @@ fn perform_scan(
}
}).await?;
params.push(enc_category);
let query = extend_query::<SqliteBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit)?;
let query = extend_query::<SqliteBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;

let mut batch = Vec::with_capacity(PAGE_SIZE);

Expand Down
26 changes: 24 additions & 2 deletions askar-storage/tests/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,8 @@ pub async fn db_scan(db: AnyBackend) {
tag_filter,
offset,
limit,
None,
false,
)
.await
.expect(ERR_SCAN);
Expand All @@ -506,6 +508,8 @@ pub async fn db_scan(db: AnyBackend) {
tag_filter,
offset,
limit,
None,
false,
)
.await
.expect(ERR_SCAN);
Expand Down Expand Up @@ -879,7 +883,16 @@ pub async fn db_import_scan(db: AnyBackend) {
let copy = db.create_profile(None).await.expect(ERR_PROFILE);
let mut copy_conn = db.session(Some(copy.clone()), true).expect(ERR_SESSION);
let records = db
.scan(None, Some(EntryKind::Item), None, None, None, None)
.scan(
None,
Some(EntryKind::Item),
None,
None,
None,
None,
None,
false,
)
.await
.expect(ERR_SCAN);
copy_conn
Expand All @@ -889,7 +902,16 @@ pub async fn db_import_scan(db: AnyBackend) {
copy_conn.close(true).await.expect(ERR_COMMIT);

let mut scan = db
.scan(Some(copy), Some(EntryKind::Item), None, None, None, None)
.scan(
Some(copy),
Some(EntryKind::Item),
None,
None,
None,
None,
None,
false,
)
.await
.expect(ERR_SCAN);

Expand Down
7 changes: 6 additions & 1 deletion src/ffi/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,14 @@ pub extern "C" fn askar_scan_start(
tag_filter: FfiStr<'_>,
offset: i64,
limit: i64,
order_by: FfiStr<'_>,
descending: i8,
cb: Option<extern "C" fn(cb_id: CallbackId, err: ErrorCode, handle: ScanHandle)>,
cb_id: CallbackId,
) -> ErrorCode {
let order_by = order_by.into_opt_string();
let descending = descending != 0; // Convert to bool

catch_err! {
trace!("Scan store start");
let cb = cb.ok_or_else(|| err_msg!("No callback provided"))?;
Expand All @@ -568,7 +573,7 @@ pub extern "C" fn askar_scan_start(
spawn_ok(async move {
let result = async {
let store = handle.load().await?;
let scan = store.scan(profile, category, tag_filter, Some(offset), if limit < 0 { None }else {Some(limit)}).await?;
let scan = store.scan(profile, category, tag_filter, Some(offset), if limit < 0 { None }else {Some(limit)}, order_by, descending).await?;
Ok(FFI_SCANS.insert(handle, scan).await)
}.await;
cb.resolve(result);
Expand Down
4 changes: 4 additions & 0 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ impl Store {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<String>,
descending: bool,
) -> Result<Scan<'static, Entry>, Error> {
Ok(self
.0
Expand All @@ -135,6 +137,8 @@ impl Store {
tag_filter,
offset,
limit,
order_by,
descending,
)
.await?)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,8 @@ export class NodeJSAriesAskar implements AriesAskar {
}

public async scanStart(options: ScanStartOptions): Promise<ScanHandle> {
const { category, limit, offset, profile, storeHandle, tagFilter } = serializeArguments(options)
const { category, descending, limit, offset, orderBy, profile, storeHandle, tagFilter } =
serializeArguments(options)
const handle = await this.promisifyWithResponse<number>(
(cb, cbId) =>
this.nativeAriesAskar.askar_scan_start(
Expand All @@ -797,6 +798,8 @@ export class NodeJSAriesAskar implements AriesAskar {
tagFilter,
+offset || 0,
+limit || -1,
orderBy,
descending,
cb,
cbId,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,18 @@ export const nativeBindings = {
askar_scan_next: [FFI_ERROR_CODE, [FFI_SCAN_HANDLE, FFI_CALLBACK_PTR, FFI_CALLBACK_ID]],
askar_scan_start: [
FFI_ERROR_CODE,
[FFI_STORE_HANDLE, FFI_STRING, FFI_STRING, FFI_STRING, FFI_INT64, FFI_INT64, FFI_CALLBACK_PTR, FFI_CALLBACK_ID],
[
FFI_STORE_HANDLE,
FFI_STRING,
FFI_STRING,
FFI_STRING,
FFI_INT64,
FFI_INT64,
FFI_STRING,
FFI_INT8,
FFI_CALLBACK_PTR,
FFI_CALLBACK_ID,
],
],

askar_session_close: [FFI_ERROR_CODE, [FFI_SESSION_HANDLE, FFI_INT8, FFI_CALLBACK_PTR, FFI_CALLBACK_ID]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,8 @@ jsi::Value scanStart(jsi::Runtime &rt, jsi::Object options) {
auto profile = jsiToValue<std::string>(rt, options, "profile", true);
auto offset = jsiToValue<int64_t>(rt, options, "offset", true);
auto limit = jsiToValue<int64_t>(rt, options, "limit", true);
auto orderBy = jsiToValue<std::string>(rt, options, "orderBy", true);
auto descending = jsiToValue<int8_t>(rt, options, "descending");

jsi::Function cb = options.getPropertyAsFunction(rt, "cb");
State *state = new State(&cb);
Expand All @@ -510,7 +512,7 @@ jsi::Value scanStart(jsi::Runtime &rt, jsi::Object options) {
ErrorCode code = askar_scan_start(
storeHandle, profile.length() ? profile.c_str() : nullptr,
category.c_str(), tagFilter.length() ? tagFilter.c_str() : nullptr,
offset, limit, callbackWithResponse, CallbackId(state));
offset, limit, orderBy, descending, callbackWithResponse, CallbackId(state));

return createReturnValue(rt, code, nullptr);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ ErrorCode askar_scan_start(StoreHandle handle,
FfiStr tag_filter,
int64_t offset,
int64_t limit,
FfiStr order_by,
int8_t descending,
void (*cb)(CallbackId cb_id, ErrorCode err, ScanHandle handle),
CallbackId cb_id);

Expand Down
Loading
Loading