Skip to content

Commit

Permalink
session: instrument non-iter methods with RequestSpan
Browse files Browse the repository at this point in the history
Now, non-iterator methods create and fill a RequestSpan.
  • Loading branch information
piodul committed Mar 22, 2023
1 parent cb504ed commit 4c67e66
Showing 1 changed file with 48 additions and 13 deletions.
61 changes: 48 additions & 13 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ impl Session {
let query: Query = query.into();
let serialized_values = values.serialized()?;

let span = trace_span!("Request", query = %query.contents);
let span = RequestSpan::new_query(&query.contents, serialized_values.size());
let run_query_result = self
.run_query(
RoutingInfo::default(),
Expand Down Expand Up @@ -606,8 +606,9 @@ impl Session {
.and_then(QueryResponse::into_non_error_query_response)
}
},
&span,
)
.instrument(span)
.instrument(span.span().clone())
.await?;

let response = match run_query_result {
Expand All @@ -623,7 +624,9 @@ impl Session {
self.handle_auto_await_schema_agreement(&query.contents, &response)
.await?;

response.into_query_result()
let result = response.into_query_result()?;
span.record_result_fields(&result);
Ok(result)
}

async fn handle_set_keyspace_response(
Expand Down Expand Up @@ -887,7 +890,10 @@ impl Session {
let values_ref = &serialized_values;
let paging_state_ref = &paging_state;

let token = self.calculate_token(prepared, &serialized_values)?;
let partition_key = self.calculate_partition_key(prepared, &serialized_values)?;
let token = partition_key
.as_ref()
.map(|pk| prepared.get_partitioner_name().hash(pk));

let statement_info = RoutingInfo {
consistency: prepared
Expand All @@ -899,10 +905,19 @@ impl Session {
is_confirmed_lwt: prepared.is_confirmed_lwt(),
};

let span = trace_span!(
"Request",
prepared_id = %format_args!("{:X}", prepared.get_id())
);
let span =
RequestSpan::new_prepared(partition_key.as_ref(), token, serialized_values.size());

if !span.span().is_disabled() {
if let (Some(keyspace), Some(token)) = (statement_info.keyspace.as_ref(), token) {
let cluster_data = self.get_cluster_data();
let replicas: smallvec::SmallVec<[_; 8]> = cluster_data
.get_token_endpoints_iter(keyspace, token)
.collect();
span.record_replicas(&replicas)
}
}

let run_query_result: RunQueryResult<NonErrorQueryResponse> = self
.run_query(
statement_info,
Expand Down Expand Up @@ -933,8 +948,9 @@ impl Session {
.and_then(QueryResponse::into_non_error_query_response)
}
},
&span,
)
.instrument(span)
.instrument(span.span().clone())
.await?;

let response = match run_query_result {
Expand All @@ -950,7 +966,9 @@ impl Session {
self.handle_auto_await_schema_agreement(prepared.get_statement(), &response)
.await?;

response.into_query_result()
let result = response.into_query_result()?;
span.record_result_fields(&result);
Ok(result)
}

/// Run a prepared query with paging\
Expand Down Expand Up @@ -1097,6 +1115,8 @@ impl Session {
let values = BatchValuesFirstSerialized::new(&values, first_serialized_value);
let values_ref = &values;

let span = RequestSpan::new_batch();

let run_query_result = self
.run_query(
statement_info,
Expand Down Expand Up @@ -1127,14 +1147,17 @@ impl Session {
.await
}
},
&span,
)
.instrument(trace_span!("Batch"))
.instrument(span.span().clone())
.await?;

Ok(match run_query_result {
let result = match run_query_result {
RunQueryResult::IgnoredWriteError => QueryResult::default(),
RunQueryResult::Completed(response) => response,
})
};
span.record_result_fields(&result);
Ok(result)
}

/// Prepares all statements within the batch and returns a new batch where every
Expand Down Expand Up @@ -1413,6 +1436,7 @@ impl Session {
statement_config: &'a StatementConfig,
choose_connection: impl Fn(Arc<Node>) -> ConnFut,
do_query: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
request_span: &'a RequestSpan,
) -> Result<RunQueryResult<ResT>, QueryError>
where
ConnFut: Future<Output = Result<Arc<Connection>, QueryError>>,
Expand Down Expand Up @@ -1486,6 +1510,10 @@ impl Session {
}
});

if is_speculative {
request_span.inc_speculative_executions();
}

self.execute_query(
&shared_query_plan,
&choose_connection,
Expand All @@ -1497,6 +1525,7 @@ impl Session {
retry_session: retry_policy.new_session(),
history_data,
query_info: &statement_info,
request_span,
},
)
};
Expand Down Expand Up @@ -1532,6 +1561,7 @@ impl Session {
retry_session: retry_policy.new_session(),
history_data,
query_info: &statement_info,
request_span,
},
)
.await
Expand Down Expand Up @@ -1605,6 +1635,7 @@ impl Session {
continue 'nodes_in_plan;
}
};
context.request_span.record_shard_id(&connection);

self.metrics.inc_total_nonpaged_queries();
let query_start = std::time::Instant::now();
Expand Down Expand Up @@ -1721,12 +1752,15 @@ impl Session {
..Default::default()
};

let span = RequestSpan::new_none();

match self
.run_query(
info,
&config,
|node: Arc<Node>| async move { node.random_connection().await },
do_query,
&span,
)
.await?
{
Expand Down Expand Up @@ -1848,6 +1882,7 @@ struct ExecuteQueryContext<'a> {
retry_session: Box<dyn RetrySession>,
history_data: Option<HistoryData<'a>>,
query_info: &'a load_balancing::RoutingInfo<'a>,
request_span: &'a RequestSpan,
}

struct HistoryData<'a> {
Expand Down

0 comments on commit 4c67e66

Please sign in to comment.