diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index bd12ed0614..d57612dec3 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -657,6 +657,20 @@ impl Connection { ) -> Result { let serialized_values = values.serialized()?; + let values_size = serialized_values.size(); + if values_size != 0 { + let prepared = self.prepare(query).await?; + return self + .execute_with_consistency( + &prepared, + values, + consistency, + serial_consistency, + paging_state, + ) + .await; + } + let query_frame = query::Query { contents: Cow::Borrowed(&query.contents), parameters: query::QueryParameters { diff --git a/scylla/tests/integration/main.rs b/scylla/tests/integration/main.rs index ab64db0c5e..4b8920309a 100644 --- a/scylla/tests/integration/main.rs +++ b/scylla/tests/integration/main.rs @@ -5,4 +5,5 @@ mod lwt_optimisation; mod new_session; mod retries; mod shards; +mod silent_prepare_query; pub(crate) mod utils; diff --git a/scylla/tests/integration/retries.rs b/scylla/tests/integration/retries.rs index 04724ab2c7..bafe20180c 100644 --- a/scylla/tests/integration/retries.rs +++ b/scylla/tests/integration/retries.rs @@ -46,7 +46,7 @@ async fn speculative_execution_is_fired() { q.set_is_idempotent(true); // this is to allow speculative execution to fire let drop_frame_rule = RequestRule( - Condition::RequestOpcode(RequestOpcode::Query) + Condition::RequestOpcode(RequestOpcode::Prepare) .and(Condition::BodyContainsCaseSensitive(Box::new(*b"t"))), RequestReaction::drop_frame(), ); @@ -121,7 +121,7 @@ async fn retries_occur() { q.set_is_idempotent(true); // this is to allow retry to fire let forge_error_rule = RequestRule( - Condition::RequestOpcode(RequestOpcode::Query) + Condition::RequestOpcode(RequestOpcode::Prepare) .and(Condition::BodyContainsCaseSensitive(Box::new(*b"INTO t"))), RequestReaction::forge().server_error(), ); diff --git a/scylla/tests/integration/silent_prepare_query.rs b/scylla/tests/integration/silent_prepare_query.rs new file mode 100644 index 0000000000..59198affb0 --- /dev/null +++ b/scylla/tests/integration/silent_prepare_query.rs @@ -0,0 +1,110 @@ +use crate::utils::test_with_3_node_cluster; +use scylla::transport::session::Session; +use scylla::SessionBuilder; +use scylla::{query::Query, test_utils::unique_keyspace_name}; +use scylla_proxy::{ + Condition, ProxyError, Reaction, RequestOpcode, RequestReaction, RequestRule, ShardAwareness, + WorkerError, +}; +use std::sync::Arc; +use std::time::Duration; + +#[tokio::test] +#[ntest::timeout(30000)] +#[cfg(not(scylla_cloud_tests))] +async fn test_prepare_query_with_values() { + // unprepared query with non empty values should be prepared + const TIMEOUT_PER_REQUEST: Duration = Duration::from_millis(1000); + + let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move { + // DB preparation phase + let session: Session = SessionBuilder::new() + .known_node(proxy_uris[0].as_str()) + .address_translator(Arc::new(translation_map)) + .build() + .await + .unwrap(); + + let ks = unique_keyspace_name(); + session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap(); + session.use_keyspace(ks, false).await.unwrap(); + session + .query("CREATE TABLE t (a int primary key)", &[]) + .await + .unwrap(); + + let q = Query::from("INSERT INTO t (a) VALUES (?)"); + + let drop_unprepared_frame_rule = RequestRule( + Condition::RequestOpcode(RequestOpcode::Query) + .and(Condition::BodyContainsCaseSensitive(Box::new(*b"t"))), + RequestReaction::drop_frame(), + ); + + running_proxy.running_nodes[2] + .change_request_rules(Some(vec![drop_unprepared_frame_rule])); + + tokio::select! { + _res = session.query(q, (0,)) => (), + _ = tokio::time::sleep(TIMEOUT_PER_REQUEST) => panic!("Rules did not work: no received response"), + }; + + running_proxy + }).await; + + match res { + Ok(()) => (), + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), + Err(err) => panic!("{}", err), + } +} + +#[tokio::test] +#[ntest::timeout(30000)] +#[cfg(not(scylla_cloud_tests))] +async fn test_query_with_no_values() { + // unprepared query with empty values should not be prepared + const TIMEOUT_PER_REQUEST: Duration = Duration::from_millis(1000); + + let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move { + // DB preparation phase + let session: Session = SessionBuilder::new() + .known_node(proxy_uris[0].as_str()) + .address_translator(Arc::new(translation_map)) + .build() + .await + .unwrap(); + + let ks = unique_keyspace_name(); + session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap(); + session.use_keyspace(ks, false).await.unwrap(); + session + .query("CREATE TABLE t (a int primary key)", &[]) + .await + .unwrap(); + + let q = Query::from("INSERT INTO t (a) VALUES (1)"); + + let drop_prepared_frame_rule = RequestRule( + Condition::RequestOpcode(RequestOpcode::Prepare) + .and(Condition::BodyContainsCaseSensitive(Box::new(*b"t"))), + RequestReaction::drop_frame(), + ); + + running_proxy.running_nodes[2] + .change_request_rules(Some(vec![drop_prepared_frame_rule])); + + tokio::select! { + _res = session.query(q, ()) => (), + _ = tokio::time::sleep(TIMEOUT_PER_REQUEST) => panic!("Rules did not work: no received response"), + }; + + running_proxy + }).await; + + match res { + Ok(()) => (), + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), + Err(err) => panic!("{}", err), + } +}