Skip to content

Commit

Permalink
fix(chirp): add bypass for recursive messages (#708)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Apr 23, 2024
1 parent 1f4b169 commit 566088f
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 99 deletions.
2 changes: 2 additions & 0 deletions lib/chirp/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ impl Client {
parameters: parameters.clone(),
ts,
trace: (*self.trace).clone(),
allow_recursive: opts.allow_recursive,
body: body_buf,
};
let mut message_buf = Vec::with_capacity(prost::Message::encoded_len(&message));
Expand Down Expand Up @@ -1797,6 +1798,7 @@ impl TailAllConfig {
#[derive(Default, Debug, Clone)]
pub struct MessageOptions {
pub dont_log_body: bool,
pub allow_recursive: bool,
}

#[derive(Debug, Clone, PartialEq)]
Expand Down
17 changes: 17 additions & 0 deletions lib/chirp/client/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ macro_rules! msg {
$mod1$(::$mod2)*::Message $body,
::chirp_client::MessageOptions {
dont_log_body: true,
..Default::default()
},
)
};

(
[$container:expr] @recursive $mod1:ident$(::$mod2:ident)* ($($param:expr),*)
$body:tt
) => {
$container
.chirp()
.message::<$mod1$(::$mod2)*::Message>(
$mod1$(::$mod2)*::build_params($($param),*),
$mod1$(::$mod2)*::Message $body,
::chirp_client::MessageOptions {
allow_recursive: true,
..Default::default()
},
)
};
Expand Down
206 changes: 110 additions & 96 deletions lib/chirp/worker/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,117 +548,129 @@ where
);

// Parse request structure
let (req_id_proto, ray_id_proto, req_ts, trace, body_buf, dont_log_body, req_debug) =
match &self.config.worker_kind {
WorkerKind::Rpc { .. } => {
match chirp::Request::decode(raw_msg_buf.as_slice()) {
Ok(req) => {
let reply = if let Some(x) =
nats_message.as_ref().and_then(|x| x.reply.clone())
{
let (
req_id_proto,
ray_id_proto,
req_ts,
trace,
body_buf,
dont_log_body,
req_debug,
allow_recursive,
) = match &self.config.worker_kind {
WorkerKind::Rpc { .. } => {
match chirp::Request::decode(raw_msg_buf.as_slice()) {
Ok(req) => {
let reply =
if let Some(x) = nats_message.as_ref().and_then(|x| x.reply.clone()) {
x
} else {
tracing::error!("handling rpc without provided nats reply");
return;
};

// Ack the request
{
// Build response
let res = chirp::Response {
kind: Some(chirp::response::Kind::Ack(chirp::response::Ack {})),
};
let mut res_buf =
Vec::with_capacity(prost::Message::encoded_len(&res));
match prost::Message::encode(&res, &mut res_buf) {
Ok(_) => {}
Err(err) => {
tracing::error!(
?err,
"failed to encode ack message, skipping request"
);
return;
}
// Ack the request
{
// Build response
let res = chirp::Response {
kind: Some(chirp::response::Kind::Ack(chirp::response::Ack {})),
};
let mut res_buf = Vec::with_capacity(prost::Message::encoded_len(&res));
match prost::Message::encode(&res, &mut res_buf) {
Ok(_) => {}
Err(err) => {
tracing::error!(
?err,
"failed to encode ack message, skipping request"
);
return;
}
}

// Send ack response.
//
// We do this in the background (which will race
// with the main response) since we want to handle
// the response as fast as possible without waiting.
// This means that the ack and the response may be
// out of order, which is expected.
tracing::trace!("sending ack message");
match self.nats.publish(reply, res_buf.into()).await {
Ok(_) => {}
Err(err) => {
tracing::error!(?err, "failed to send ack response");
}
// Send ack response.
//
// We do this in the background (which will race
// with the main response) since we want to handle
// the response as fast as possible without waiting.
// This means that the ack and the response may be
// out of order, which is expected.
tracing::trace!("sending ack message");
match self.nats.publish(reply, res_buf.into()).await {
Ok(_) => {}
Err(err) => {
tracing::error!(?err, "failed to send ack response");
}
}

// Parse the request
tracing::info!(
req_id = ?req.req_id,
ray_id = ?req.ray_id,
ts = ?req.ts,
trace = ?req.trace,
body_bytes = ?req.body.len(),
"received request"
);
(
req.req_id,
req.ray_id,
req.ts,
req.trace,
req.body,
req.dont_log_body,
req.debug,
)
}
Err(err) => {
tracing::error!(?err, "failed to decode chirp request");
return;
}
}
}
WorkerKind::Consumer { .. } => match chirp::Message::decode(raw_msg_buf.as_slice())
{
Ok(msg) => {
// Calculate recv lag
let recv_lag =
(rivet_util::timestamp::now() as f64 - msg.ts as f64) / 1000.;
metrics::CHIRP_MESSAGE_RECV_LAG
.with_label_values(&[&worker_name])
.observe(recv_lag);

// Parse the request
tracing::info!(
// TODO: Add back once we can decode UUIDs in Chirp
// req_id = ?msg.req_id,
// ray_id = ?msg.ray_id,
parameters = ?msg.parameters,
ts = ?msg.ts,
trace = ?msg.trace,
body_bytes = ?msg.body.len(),
?recv_lag,
"received message"
req_id = ?req.req_id,
ray_id = ?req.ray_id,
ts = ?req.ts,
trace = ?req.trace,
body_bytes = ?req.body.len(),
"received request"
);

// Enrich Redis metadata for debugging
if let Some(x) = &mut redis_message_meta {
x.parameters = Some(msg.parameters.clone());
}

(
msg.req_id, msg.ray_id, msg.ts, msg.trace, msg.body, false, None,
req.req_id,
req.ray_id,
req.ts,
req.trace,
req.body,
req.dont_log_body,
req.debug,
false,
)
}
Err(err) => {
tracing::error!(?err, "failed to decode chirp message");
tracing::error!(?err, "failed to decode chirp request");
return;
}
},
};
}
}
WorkerKind::Consumer { .. } => match chirp::Message::decode(raw_msg_buf.as_slice()) {
Ok(msg) => {
// Calculate recv lag
let recv_lag = (rivet_util::timestamp::now() as f64 - msg.ts as f64) / 1000.;
metrics::CHIRP_MESSAGE_RECV_LAG
.with_label_values(&[&worker_name])
.observe(recv_lag);

tracing::info!(
// TODO: Add back once we can decode UUIDs in Chirp
// req_id = ?msg.req_id,
// ray_id = ?msg.ray_id,
parameters = ?msg.parameters,
ts = ?msg.ts,
trace = ?msg.trace,
body_bytes = ?msg.body.len(),
?recv_lag,
"received message"
);

// Enrich Redis metadata for debugging
if let Some(x) = &mut redis_message_meta {
x.parameters = Some(msg.parameters.clone());
}

(
msg.req_id,
msg.ray_id,
msg.ts,
msg.trace,
msg.body,
false,
None,
msg.allow_recursive,
)
}
Err(err) => {
tracing::error!(?err, "failed to decode chirp message");
return;
}
},
};
let (req_id, ray_id) = if let (Some(req_id), Some(ray_id)) = (req_id_proto, ray_id_proto) {
(req_id.as_uuid(), ray_id.as_uuid())
} else {
Expand Down Expand Up @@ -733,6 +745,7 @@ where
trace,
),
dont_log_body,
allow_recursive,
}
};

Expand Down Expand Up @@ -819,11 +832,12 @@ where
tracing::info!(request = ?req, body = ?req.op_ctx.body(), "handling req");
}

let is_recursive = req
.op_ctx
.trace()
.iter()
.any(|x| x.context_name == req.op_ctx.name());
let is_recursive = !req.allow_recursive
&& req
.op_ctx
.trace()
.iter()
.any(|x| x.context_name == req.op_ctx.name());
let handle_res = if is_recursive {
Ok(Err(err_code!(
CHIRP_RECURSIVE_REQUEST,
Expand Down
1 change: 1 addition & 0 deletions lib/chirp/worker/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ where
pub(crate) req_ts: i64,
pub(crate) op_ctx: OperationContext<B>,
pub(crate) dont_log_body: bool,
pub(crate) allow_recursive: bool,
}

impl<B> Request<B>
Expand Down
5 changes: 4 additions & 1 deletion lib/chirp/worker/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ impl TestCtx {
Vec::new(),
);

Ok(TestCtx { name: service_name, op_ctx })
Ok(TestCtx {
name: service_name,
op_ctx,
})
}
}

Expand Down
8 changes: 7 additions & 1 deletion proto/chirp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ message Message {
repeated TraceEntry trace = 6;

bytes body = 3;

// Allows a message with a topic that is already in the trace to be published again.
//
// For example: `cluster-server-install` which re-triggers `cluster-server-scale`. The logic within the two scripts
// ensures it is not cyclical.
bool allow_recursive = 8;
}

message Request {
Expand All @@ -23,7 +29,7 @@ message Request {
bytes body = 3;
optional RequestDebug debug = 4;

// Suppresses both the request and reply of this requeset in the logs.
// Suppresses both the request and reply of this request in the logs.
//
// This is useful for very verbose requests and for requests with sensitive data.
bool dont_log_body = 5;
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/cluster/worker/src/workers/server_install/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ async fn worker(ctx: &OperationContext<cluster::msg::server_install::Message>) -

// Scale to get rid of tainted servers
let datacenter_id = unwrap_ref!(ctx.datacenter_id).as_uuid();
msg!([ctx] cluster::msg::datacenter_scale(datacenter_id) {
msg!([ctx] @recursive cluster::msg::datacenter_scale(datacenter_id) {
datacenter_id: ctx.datacenter_id,
})
.await?;
Expand Down

0 comments on commit 566088f

Please sign in to comment.