Skip to content

Commit

Permalink
feat: implement updateManyAndReturn
Browse files Browse the repository at this point in the history
  • Loading branch information
jacek-prisma committed Dec 20, 2024
1 parent cc0167b commit 4318f68
Show file tree
Hide file tree
Showing 16 changed files with 244 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ mod insert_null_in_required_field;
mod non_embedded_upsert;
mod update;
mod update_many;
mod update_many_and_return;
mod upsert;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
use query_engine_tests::*;

#[test_suite(capabilities(UpdateMany, UpdateReturning))]
mod update_many_and_return {}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ impl WriteOperations for MongoDbConnection {
.await
}

async fn update_records_returning(
&mut self,
_model: &Model,
_record_filter: connector_interface::RecordFilter,
_args: WriteArgs,
_selected_fields: FieldSelection,
_traceparent: Option<TraceParent>,
) -> connector_interface::Result<ManyRecords> {
unimplemented!()
}

async fn update_record(
&mut self,
model: &Model,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ impl WriteOperations for MongoDbTransaction<'_> {
.await
}

async fn update_records_returning(
&mut self,
_model: &Model,
_record_filter: connector_interface::RecordFilter,
_args: connector_interface::WriteArgs,
_selected_fields: FieldSelection,
_traceparent: Option<TraceParent>,
) -> connector_interface::Result<ManyRecords> {
unimplemented!()
}

async fn update_record(
&mut self,
model: &Model,
Expand Down
13 changes: 13 additions & 0 deletions query-engine/connectors/query-connector/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,19 @@ pub trait WriteOperations {
traceparent: Option<TraceParent>,
) -> crate::Result<usize>;

/// Updates many records at once into the database and returns their
/// selected fields.
/// This method should not be used if the connector does not support
/// returning updated rows.
async fn update_records_returning(
&mut self,
model: &Model,
record_filter: RecordFilter,
args: WriteArgs,
selected_fields: FieldSelection,
traceparent: Option<TraceParent>,
) -> crate::Result<ManyRecords>;

/// Update record in the `Model` with the given `WriteArgs` filtered by the
/// `Filter`.
async fn update_record(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,22 @@ where
.await
}

async fn update_records_returning(
&mut self,
model: &Model,
record_filter: RecordFilter,
args: WriteArgs,
selected_fields: FieldSelection,
traceparent: Option<TraceParent>,
) -> connector::Result<ManyRecords> {
let ctx = Context::new(&self.connection_info, traceparent);
catch(
&self.connection_info,
write::update_records_returning(&self.inner, model, record_filter, args, selected_fields, &ctx),
)
.await
}

async fn update_record(
&mut self,
model: &Model,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use super::read::get_single_record;

use crate::column_metadata::{self, ColumnMetadata};
use crate::filter::FilterBuilder;
use crate::model_extensions::AsColumns;
use crate::query_builder::write::{build_update_and_set_query, chunk_update_with_ids};
use crate::row::ToSqlRow;
use crate::{Context, QueryExt, Queryable};

use connector_interface::*;
use itertools::Itertools;
use quaint::ast::Query;
use query_structure::*;

/// Performs an update with an explicit selection set.
Expand Down Expand Up @@ -77,7 +79,7 @@ pub(crate) async fn update_one_without_selection(
let id_args = pick_args(&model.primary_identifier().into(), &args);
// Perform the update and return the ids on which we've applied the update.
// Note: We are _not_ getting back the ids from the update. Either we got some ids passed from the parent operation or we perform a read _before_ doing the update.
let (_, ids) = update_many_from_ids_and_filter(conn, model, record_filter, args, ctx).await?;
let (_, ids) = update_many_from_ids_and_filter(conn, model, record_filter, args, None, ctx).await?;
// Since we could not get the ids back from the update, we need to apply in-memory transformation to the ids in case they were part of the update.
// This is critical to ensure the following operations can operate on the updated ids.
let merged_ids = merge_write_args(ids, id_args);
Expand All @@ -92,53 +94,50 @@ pub(crate) async fn update_one_without_selection(

// Generates a query like this:
// UPDATE "public"."User" SET "name" = $1 WHERE "public"."User"."age" > $1
pub(crate) async fn update_many_from_filter(
conn: &dyn Queryable,
pub(super) async fn update_many_from_filter(
model: &Model,
record_filter: RecordFilter,
args: WriteArgs,
selected_fields: Option<&ModelProjection>,
ctx: &Context<'_>,
) -> crate::Result<usize> {
) -> crate::Result<Query<'static>> {
let update = build_update_and_set_query(model, args, None, ctx);
let filter_condition = FilterBuilder::without_top_level_joins().visit_filter(record_filter.filter, ctx);
let update = update.so_that(filter_condition);
let count = conn.execute(update.into()).await?;

Ok(count as usize)
if let Some(selected_fields) = selected_fields {
Ok(update
.returning(selected_fields.as_columns(ctx).map(|c| c.set_is_selected(true)))
.into())
} else {
Ok(update.into())
}
}

// Generates a query like this:
// UPDATE "public"."User" SET "name" = $1 WHERE "public"."User"."id" IN ($2,$3,$4,$5,$6,$7,$8,$9,$10,$11) AND "public"."User"."age" > $1
pub(crate) async fn update_many_from_ids_and_filter(
pub(super) async fn update_many_from_ids_and_filter(
conn: &dyn Queryable,
model: &Model,
record_filter: RecordFilter,
args: WriteArgs,
selected_fields: Option<&ModelProjection>,
ctx: &Context<'_>,
) -> crate::Result<(usize, Vec<SelectionResult>)> {
) -> crate::Result<(Vec<Query<'static>>, Vec<SelectionResult>)> {
let filter_condition = FilterBuilder::without_top_level_joins().visit_filter(record_filter.filter.clone(), ctx);
let ids: Vec<SelectionResult> = conn.filter_selectors(model, record_filter, ctx).await?;

if ids.is_empty() {
return Ok((0, Vec::new()));
return Ok((vec![], Vec::new()));
}

let updates = {
let update = build_update_and_set_query(model, args, None, ctx);
let update = build_update_and_set_query(model, args, selected_fields, ctx);
let ids: Vec<&SelectionResult> = ids.iter().collect();

chunk_update_with_ids(update, model, &ids, filter_condition, ctx)?
};

let mut count = 0;

for update in updates {
let update_count = conn.execute(update).await?;

count += update_count;
}

Ok((count as usize, ids))
Ok((updates, ids))
}

fn process_result_row(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
};
use connector_interface::*;
use itertools::Itertools;
use quaint::ast::Insert;
use quaint::ast::{Insert, Query};
use quaint::{
error::ErrorKind,
prelude::{native_uuid, uuid_to_bin, uuid_to_bin_swapped, Aliasable, Select, SqlFamily},
Expand Down Expand Up @@ -370,6 +370,25 @@ pub(crate) async fn update_record(
}
}

async fn generate_updates(
conn: &dyn Queryable,
model: &Model,
record_filter: RecordFilter,
args: WriteArgs,
selected_fields: Option<&ModelProjection>,
ctx: &Context<'_>,
) -> crate::Result<Vec<Query<'static>>> {
if record_filter.has_selectors() {
let (updates, _) =
update_many_from_ids_and_filter(conn, model, record_filter, args, selected_fields, ctx).await?;
Ok(updates)
} else {
Ok(vec![
update_many_from_filter(model, record_filter, args, selected_fields, ctx).await?,
])
}
}

/// Update multiple records in a database defined in `conn` and the records
/// defined in `args`, and returning the number of updates
/// This works via two ways, when there are ids in record_filter.selectors, it uses that to update
Expand All @@ -385,15 +404,42 @@ pub(crate) async fn update_records(
return Ok(0);
}

if record_filter.has_selectors() {
let (count, _) = update_many_from_ids_and_filter(conn, model, record_filter, args, ctx).await?;
let mut count = 0;
for update in generate_updates(conn, model, record_filter, args, None, ctx).await? {
count += conn.execute(update).await?;
}
Ok(count as usize)
}

Ok(count)
} else {
let count = update_many_from_filter(conn, model, record_filter, args, ctx).await?;
/// Update records according to `WriteArgs`. Returns values of fields specified in
/// `selected_fields` for all updated rows.
pub(crate) async fn update_records_returning(
conn: &dyn Queryable,
model: &Model,
record_filter: RecordFilter,
args: WriteArgs,
selected_fields: FieldSelection,
ctx: &Context<'_>,
) -> crate::Result<ManyRecords> {
let field_names: Vec<String> = selected_fields.db_names().collect();
let idents = selected_fields.type_identifiers_with_arities();
let meta = column_metadata::create(&field_names, &idents);
let mut records = ManyRecords::new(field_names.clone());

let updates = generate_updates(conn, model, record_filter, args, Some(&selected_fields.into()), ctx).await?;

Ok(count)
for update in updates {
let result_set = conn.query(update).await?;

for result_row in result_set {
let sql_row = result_row.to_sql_row(&meta)?;
let record = Record::from(sql_row);

records.push(record);
}
}

Ok(records)
}

/// Delete multiple records in `conn`, defined in the `Filter`. Result is the number of items deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,29 @@ impl WriteOperations for SqlConnectorTransaction<'_> {
.await
}

async fn update_records_returning(
&mut self,
model: &Model,
record_filter: RecordFilter,
args: WriteArgs,
selected_fields: FieldSelection,
traceparent: Option<TraceParent>,
) -> connector::Result<ManyRecords> {
let ctx = Context::new(&self.connection_info, traceparent);
catch(
&self.connection_info,
write::update_records_returning(
self.inner.as_queryable(),
model,
record_filter,
args,
selected_fields,
&ctx,
),
)
.await
}

async fn update_record(
&mut self,
model: &Model,
Expand Down
28 changes: 24 additions & 4 deletions query-engine/core/src/interpreter/query_interpreters/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,31 @@ async fn update_many(
q: UpdateManyRecords,
traceparent: Option<TraceParent>,
) -> InterpretationResult<QueryResult> {
let res = tx
.update_records(&q.model, q.record_filter, q.args, traceparent)
.await?;
if let Some(selected_fields) = q.selected_fields {
let records = tx
.update_records_returning(&q.model, q.record_filter, q.args, selected_fields.fields, traceparent)
.await?;

Ok(QueryResult::Count(res))
let nested: Vec<QueryResult> =
super::read::process_nested(tx, selected_fields.nested, Some(&records), traceparent).await?;

let selection = RecordSelection {
name: q.name,
fields: selected_fields.order,
records,
nested,
model: q.model,
virtual_fields: vec![],
};

Ok(QueryResult::RecordSelection(Some(Box::new(selection))))
} else {
let affected_records = tx
.update_records(&q.model, q.record_filter, q.args, traceparent)
.await?;

Ok(QueryResult::Count(affected_records))
}
}

async fn delete_many(
Expand Down
11 changes: 11 additions & 0 deletions query-engine/core/src/query_ast/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,20 @@ pub struct UpdateRecordWithoutSelection {

#[derive(Debug, Clone)]
pub struct UpdateManyRecords {
pub name: String,
pub model: Model,
pub record_filter: RecordFilter,
pub args: WriteArgs,
/// Fields of updated records that client has requested to return.
/// `None` if the connector does not support returning the updated rows.
pub selected_fields: Option<UpdateManyRecordsFields>,
}

#[derive(Debug, Clone)]
pub struct UpdateManyRecordsFields {
pub fields: FieldSelection,
pub order: Vec<String>,
pub nested: Vec<ReadQuery>,
}

#[derive(Debug, Clone)]
Expand Down
3 changes: 2 additions & 1 deletion query-engine/core/src/query_graph_builder/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ impl<'a> QueryGraphBuilder<'a> {
(QueryTag::CreateMany, Some(m)) => QueryGraph::root(|g| write::create_many_records(g, query_schema, m, false, parsed_field)),
(QueryTag::CreateManyAndReturn, Some(m)) => QueryGraph::root(|g| write::create_many_records(g, query_schema, m, true, parsed_field)),
(QueryTag::UpdateOne, Some(m)) => QueryGraph::root(|g| write::update_record(g, query_schema, m, parsed_field)),
(QueryTag::UpdateMany, Some(m)) => QueryGraph::root(|g| write::update_many_records(g, query_schema, m, parsed_field)),
(QueryTag::UpdateMany, Some(m)) => QueryGraph::root(|g| write::update_many_records(g, query_schema, m, false, parsed_field)),
(QueryTag::UpdateManyAndReturn, Some(m)) => QueryGraph::root(|g| write::update_many_records(g, query_schema, m, true, parsed_field)),
(QueryTag::UpsertOne, Some(m)) => QueryGraph::root(|g| write::upsert_record(g, query_schema, m, parsed_field)),
(QueryTag::DeleteOne, Some(m)) => QueryGraph::root(|g| write::delete_record(g, query_schema, m, parsed_field)),
(QueryTag::DeleteMany, Some(m)) => QueryGraph::root(|g| write::delete_many_records(g, query_schema, m, parsed_field)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,15 @@ pub fn nested_update_many(
let find_child_records_node =
utils::insert_find_children_by_parent_node(graph, parent, parent_relation_field, filter)?;

let update_many_node =
update::update_many_record_node(graph, query_schema, Filter::empty(), child_model.clone(), data_map)?;
let update_many_node = update::update_many_record_node(
graph,
query_schema,
Filter::empty(),
child_model.clone(),
None,
None,
data_map,
)?;

graph.create_edge(
&find_child_records_node,
Expand Down
Loading

0 comments on commit 4318f68

Please sign in to comment.