Skip to content

Commit

Permalink
Merge pull request #806 from sylwiaszunejko/prepare_query
Browse files Browse the repository at this point in the history
Quietly prepare unprepared query
  • Loading branch information
piodul authored Sep 7, 2023
2 parents 6f0dcfd + ec03218 commit 5d27034
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 2 deletions.
14 changes: 14 additions & 0 deletions scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,20 @@ impl Connection {
) -> Result<QueryResponse, QueryError> {
let serialized_values = values.serialized()?;

let values_size = serialized_values.size();
if values_size != 0 {
let prepared = self.prepare(query).await?;
return self
.execute_with_consistency(
&prepared,
values,
consistency,
serial_consistency,
paging_state,
)
.await;
}

let query_frame = query::Query {
contents: Cow::Borrowed(&query.contents),
parameters: query::QueryParameters {
Expand Down
1 change: 1 addition & 0 deletions scylla/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ mod lwt_optimisation;
mod new_session;
mod retries;
mod shards;
mod silent_prepare_query;
pub(crate) mod utils;
4 changes: 2 additions & 2 deletions scylla/tests/integration/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn speculative_execution_is_fired() {
q.set_is_idempotent(true); // this is to allow speculative execution to fire

let drop_frame_rule = RequestRule(
Condition::RequestOpcode(RequestOpcode::Query)
Condition::RequestOpcode(RequestOpcode::Prepare)
.and(Condition::BodyContainsCaseSensitive(Box::new(*b"t"))),
RequestReaction::drop_frame(),
);
Expand Down Expand Up @@ -121,7 +121,7 @@ async fn retries_occur() {
q.set_is_idempotent(true); // this is to allow retry to fire

let forge_error_rule = RequestRule(
Condition::RequestOpcode(RequestOpcode::Query)
Condition::RequestOpcode(RequestOpcode::Prepare)
.and(Condition::BodyContainsCaseSensitive(Box::new(*b"INTO t"))),
RequestReaction::forge().server_error(),
);
Expand Down
110 changes: 110 additions & 0 deletions scylla/tests/integration/silent_prepare_query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use crate::utils::test_with_3_node_cluster;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use scylla::{query::Query, test_utils::unique_keyspace_name};
use scylla_proxy::{
Condition, ProxyError, Reaction, RequestOpcode, RequestReaction, RequestRule, ShardAwareness,
WorkerError,
};
use std::sync::Arc;
use std::time::Duration;

#[tokio::test]
#[ntest::timeout(30000)]
#[cfg(not(scylla_cloud_tests))]
async fn test_prepare_query_with_values() {
// unprepared query with non empty values should be prepared
const TIMEOUT_PER_REQUEST: Duration = Duration::from_millis(1000);

let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move {
// DB preparation phase
let session: Session = SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.address_translator(Arc::new(translation_map))
.build()
.await
.unwrap();

let ks = unique_keyspace_name();
session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap();
session.use_keyspace(ks, false).await.unwrap();
session
.query("CREATE TABLE t (a int primary key)", &[])
.await
.unwrap();

let q = Query::from("INSERT INTO t (a) VALUES (?)");

let drop_unprepared_frame_rule = RequestRule(
Condition::RequestOpcode(RequestOpcode::Query)
.and(Condition::BodyContainsCaseSensitive(Box::new(*b"t"))),
RequestReaction::drop_frame(),
);

running_proxy.running_nodes[2]
.change_request_rules(Some(vec![drop_unprepared_frame_rule]));

tokio::select! {
_res = session.query(q, (0,)) => (),
_ = tokio::time::sleep(TIMEOUT_PER_REQUEST) => panic!("Rules did not work: no received response"),
};

running_proxy
}).await;

match res {
Ok(()) => (),
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
Err(err) => panic!("{}", err),
}
}

#[tokio::test]
#[ntest::timeout(30000)]
#[cfg(not(scylla_cloud_tests))]
async fn test_query_with_no_values() {
// unprepared query with empty values should not be prepared
const TIMEOUT_PER_REQUEST: Duration = Duration::from_millis(1000);

let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move {
// DB preparation phase
let session: Session = SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.address_translator(Arc::new(translation_map))
.build()
.await
.unwrap();

let ks = unique_keyspace_name();
session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap();
session.use_keyspace(ks, false).await.unwrap();
session
.query("CREATE TABLE t (a int primary key)", &[])
.await
.unwrap();

let q = Query::from("INSERT INTO t (a) VALUES (1)");

let drop_prepared_frame_rule = RequestRule(
Condition::RequestOpcode(RequestOpcode::Prepare)
.and(Condition::BodyContainsCaseSensitive(Box::new(*b"t"))),
RequestReaction::drop_frame(),
);

running_proxy.running_nodes[2]
.change_request_rules(Some(vec![drop_prepared_frame_rule]));

tokio::select! {
_res = session.query(q, ()) => (),
_ = tokio::time::sleep(TIMEOUT_PER_REQUEST) => panic!("Rules did not work: no received response"),
};

running_proxy
}).await;

match res {
Ok(()) => (),
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
Err(err) => panic!("{}", err),
}
}

0 comments on commit 5d27034

Please sign in to comment.