Skip to content

Commit

Permalink
iterator: move page fetching logic outside loop
Browse files Browse the repository at this point in the history
Refactors the query_pages method so that the contents of its inner loop
are moved to query_one_page, and query_pages just calls query_one_page.
This will be necessary to be able to instrument a single page fetch.
  • Loading branch information
piodul committed Mar 22, 2023
1 parent 4c67e66 commit 97239ee
Showing 1 changed file with 85 additions and 70 deletions.
155 changes: 85 additions & 70 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::future::Future;
use std::mem;
use std::net::SocketAddr;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -553,82 +554,96 @@ where
node: NodeRef<'_>,
) -> Result<PageSendAttemptedProof, QueryError> {
loop {
self.metrics.inc_total_paged_queries();
let query_start = std::time::Instant::now();

trace!(
connection = %connection.get_connect_address(),
"Sending"
);
self.log_attempt_start(connection.get_connect_address());
match self.query_one_page(connection, consistency, node).await? {
ControlFlow::Break(proof) => return Ok(proof),
ControlFlow::Continue(_) => {}
}
}
}

let query_response =
(self.page_query)(connection.clone(), consistency, self.paging_state.clone())
.await
.and_then(QueryResponse::into_non_error_query_response);

let elapsed = query_start.elapsed();

match query_response {
Ok(NonErrorQueryResponse {
response: NonErrorResponse::Result(result::Result::Rows(mut rows)),
tracing_id,
..
}) => {
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
self.log_attempt_success();
self.log_query_success();
self.execution_profile
.load_balancing_policy
.on_query_success(&self.statement_info, elapsed, node);

self.paging_state = rows.metadata.paging_state.take();

let received_page = ReceivedPage { rows, tracing_id };

// Send next page to RowIterator
let (proof, res) = self.sender.send(Ok(received_page)).await;
if res.is_err() {
// channel was closed, RowIterator was dropped - should shutdown
return Ok(proof);
}
async fn query_one_page(
&mut self,
connection: &Arc<Connection>,
consistency: Consistency,
node: NodeRef<'_>,
) -> Result<ControlFlow<PageSendAttemptedProof, ()>, QueryError> {
self.metrics.inc_total_paged_queries();
let query_start = std::time::Instant::now();

trace!(
connection = %connection.get_connect_address(),
"Sending"
);
self.log_attempt_start(connection.get_connect_address());

let query_response =
(self.page_query)(connection.clone(), consistency, self.paging_state.clone())
.await
.and_then(QueryResponse::into_non_error_query_response);

if self.paging_state.is_none() {
// Reached the last query, shutdown
return Ok(proof);
}
let elapsed = query_start.elapsed();

// Query succeeded, reset retry policy for future retries
self.retry_session.reset();
self.log_query_start();
}
Err(err) => {
self.metrics.inc_failed_paged_queries();
self.execution_profile
.load_balancing_policy
.on_query_failure(&self.statement_info, elapsed, node, &err);
return Err(err);
match query_response {
Ok(NonErrorQueryResponse {
response: NonErrorResponse::Result(result::Result::Rows(mut rows)),
tracing_id,
..
}) => {
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
self.log_attempt_success();
self.log_query_success();
self.execution_profile
.load_balancing_policy
.on_query_success(&self.statement_info, elapsed, node);

self.paging_state = rows.metadata.paging_state.take();

let received_page = ReceivedPage { rows, tracing_id };

// Send next page to RowIterator
let (proof, res) = self.sender.send(Ok(received_page)).await;
if res.is_err() {
// channel was closed, RowIterator was dropped - should shutdown
return Ok(ControlFlow::Break(proof));
}
Ok(NonErrorQueryResponse {
response: NonErrorResponse::Result(_),
tracing_id,
..
}) => {
// We have most probably sent a modification statement (e.g. INSERT or UPDATE),
// so let's return an empty iterator as suggested in #631.

// We must attempt to send something because the iterator expects it.
let (proof, _) = self.sender.send_empty_page(tracing_id).await;
return Ok(proof);
}
Ok(_) => {
self.metrics.inc_failed_paged_queries();
let err = QueryError::ProtocolError("Unexpected response to next page query");
self.execution_profile
.load_balancing_policy
.on_query_failure(&self.statement_info, elapsed, node, &err);
return Err(err);
if self.paging_state.is_none() {
// Reached the last query, shutdown
return Ok(ControlFlow::Break(proof));
}

// Query succeeded, reset retry policy for future retries
self.retry_session.reset();
self.log_query_start();

Ok(ControlFlow::Continue(()))
}
Err(err) => {
self.metrics.inc_failed_paged_queries();
self.execution_profile
.load_balancing_policy
.on_query_failure(&self.statement_info, elapsed, node, &err);
Err(err)
}
Ok(NonErrorQueryResponse {
response: NonErrorResponse::Result(_),
tracing_id,
..
}) => {
// We have most probably sent a modification statement (e.g. INSERT or UPDATE),
// so let's return an empty iterator as suggested in #631.

// We must attempt to send something because the iterator expects it.
let (proof, _) = self.sender.send_empty_page(tracing_id).await;
Ok(ControlFlow::Break(proof))
}
Ok(_) => {
self.metrics.inc_failed_paged_queries();
let err = QueryError::ProtocolError("Unexpected response to next page query");
self.execution_profile
.load_balancing_policy
.on_query_failure(&self.statement_info, elapsed, node, &err);
Err(err)
}
}
}
Expand Down

0 comments on commit 97239ee

Please sign in to comment.