Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quietly prepare unprepared query #806

Merged
merged 2 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,20 @@ impl Connection {
) -> Result<QueryResponse, QueryError> {
let serialized_values = values.serialized()?;

let values_size = serialized_values.size();
if values_size != 0 {
let prepared = self.prepare(query).await?;
return self
.execute_with_consistency(
&prepared,
values,
consistency,
serial_consistency,
paging_state,
)
.await;
}

let query_frame = query::Query {
contents: Cow::Borrowed(&query.contents),
parameters: query::QueryParameters {
Expand Down
1 change: 1 addition & 0 deletions scylla/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ mod lwt_optimisation;
mod new_session;
mod retries;
mod shards;
mod silent_prepare_query;
pub(crate) mod utils;
4 changes: 2 additions & 2 deletions scylla/tests/integration/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn speculative_execution_is_fired() {
q.set_is_idempotent(true); // this is to allow speculative execution to fire

let drop_frame_rule = RequestRule(
Condition::RequestOpcode(RequestOpcode::Query)
Condition::RequestOpcode(RequestOpcode::Prepare)
.and(Condition::BodyContainsCaseSensitive(Box::new(*b"t"))),
RequestReaction::drop_frame(),
);
Expand Down Expand Up @@ -121,7 +121,7 @@ async fn retries_occur() {
q.set_is_idempotent(true); // this is to allow retry to fire

let forge_error_rule = RequestRule(
Condition::RequestOpcode(RequestOpcode::Query)
Condition::RequestOpcode(RequestOpcode::Prepare)
.and(Condition::BodyContainsCaseSensitive(Box::new(*b"INTO t"))),
RequestReaction::forge().server_error(),
);
Expand Down
110 changes: 110 additions & 0 deletions scylla/tests/integration/silent_prepare_query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use crate::utils::test_with_3_node_cluster;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use scylla::{query::Query, test_utils::unique_keyspace_name};
use scylla_proxy::{
Condition, ProxyError, Reaction, RequestOpcode, RequestReaction, RequestRule, ShardAwareness,
WorkerError,
};
use std::sync::Arc;
use std::time::Duration;

#[tokio::test]
#[ntest::timeout(30000)]
#[cfg(not(scylla_cloud_tests))]
async fn test_prepare_query_with_values() {
// unprepared query with non empty values should be prepared
const TIMEOUT_PER_REQUEST: Duration = Duration::from_millis(1000);

let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move {
// DB preparation phase
let session: Session = SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.address_translator(Arc::new(translation_map))
.build()
.await
.unwrap();

let ks = unique_keyspace_name();
session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap();
session.use_keyspace(ks, false).await.unwrap();
session
.query("CREATE TABLE t (a int primary key)", &[])
.await
.unwrap();

let q = Query::from("INSERT INTO t (a) VALUES (?)");

let drop_unprepared_frame_rule = RequestRule(
Condition::RequestOpcode(RequestOpcode::Query)
.and(Condition::BodyContainsCaseSensitive(Box::new(*b"t"))),
RequestReaction::drop_frame(),
);

running_proxy.running_nodes[2]
.change_request_rules(Some(vec![drop_unprepared_frame_rule]));

tokio::select! {
_res = session.query(q, (0,)) => (),
_ = tokio::time::sleep(TIMEOUT_PER_REQUEST) => panic!("Rules did not work: no received response"),
};

running_proxy
}).await;

match res {
Ok(()) => (),
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
Err(err) => panic!("{}", err),
}
}

#[tokio::test]
#[ntest::timeout(30000)]
#[cfg(not(scylla_cloud_tests))]
async fn test_query_with_no_values() {
// unprepared query with empty values should not be prepared
const TIMEOUT_PER_REQUEST: Duration = Duration::from_millis(1000);

let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move {
// DB preparation phase
let session: Session = SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.address_translator(Arc::new(translation_map))
.build()
.await
.unwrap();

let ks = unique_keyspace_name();
session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap();
session.use_keyspace(ks, false).await.unwrap();
session
.query("CREATE TABLE t (a int primary key)", &[])
.await
.unwrap();

let q = Query::from("INSERT INTO t (a) VALUES (1)");

let drop_prepared_frame_rule = RequestRule(
Condition::RequestOpcode(RequestOpcode::Prepare)
.and(Condition::BodyContainsCaseSensitive(Box::new(*b"t"))),
RequestReaction::drop_frame(),
);

running_proxy.running_nodes[2]
.change_request_rules(Some(vec![drop_prepared_frame_rule]));

tokio::select! {
_res = session.query(q, ()) => (),
_ = tokio::time::sleep(TIMEOUT_PER_REQUEST) => panic!("Rules did not work: no received response"),
};

running_proxy
}).await;

match res {
Ok(()) => (),
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
Err(err) => panic!("{}", err),
}
}