Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(grpc): send x-tenant-id and x-request-id in grpc headers #6904

Merged
merged 5 commits into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/common_utils/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,6 @@ pub const CONNECTOR_TRANSACTION_ID_HASH_BYTES: usize = 25;
/// Apple Pay validation url
pub const APPLEPAY_VALIDATION_URL: &str =
"https://apple-pay-gateway-cert.apple.com/paymentservices/startSession";

/// Request ID
pub const X_REQUEST_ID: &str = "x-request-id";
52 changes: 52 additions & 0 deletions crates/external_services/src/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub mod dynamic_routing;
pub mod health_check_client;
use std::{fmt::Debug, sync::Arc};

#[cfg(feature = "dynamic_routing")]
use common_utils::consts;
#[cfg(feature = "dynamic_routing")]
use dynamic_routing::{DynamicRoutingClientConfig, RoutingStrategy};
#[cfg(feature = "dynamic_routing")]
Expand All @@ -16,6 +18,8 @@ use http_body_util::combinators::UnsyncBoxBody;
use hyper::body::Bytes;
#[cfg(feature = "dynamic_routing")]
use hyper_util::client::legacy::connect::HttpConnector;
#[cfg(feature = "dynamic_routing")]
use router_env::logger;
use serde;
#[cfg(feature = "dynamic_routing")]
use tonic::Status;
Expand Down Expand Up @@ -76,3 +80,51 @@ impl GrpcClientSettings {
})
}
}

/// Contains grpc headers
#[derive(Debug)]
pub struct GrpcHeaders {
/// Tenant id
pub tenant_id: String,
/// Request id
pub request_id: Option<String>,
}

#[cfg(feature = "dynamic_routing")]
/// Trait to add necessary headers to the tonic Request
pub(crate) trait AddHeaders {
/// Add necessary header fields to the tonic Request
fn add_headers_to_grpc_request(&mut self, headers: GrpcHeaders);
}

#[cfg(feature = "dynamic_routing")]
impl<T> AddHeaders for tonic::Request<T> {
#[track_caller]
fn add_headers_to_grpc_request(&mut self, headers: GrpcHeaders) {
headers.tenant_id
.parse()
.map(|tenant_id| {
self
.metadata_mut()
.append(consts::TENANT_HEADER, tenant_id)
})
.inspect_err(
|err| logger::warn!(header_parse_error=?err,"invalid {} received",consts::TENANT_HEADER),
)
.ok();

headers.request_id.map(|request_id| {
request_id
.parse()
.map(|request_id| {
self
.metadata_mut()
.append(consts::X_REQUEST_ID, request_id)
})
.inspect_err(
|err| logger::warn!(header_parse_error=?err,"invalid {} received",consts::X_REQUEST_ID),
)
.ok();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod elimination_rate {
}

use super::{Client, DynamicRoutingError, DynamicRoutingResult};
use crate::grpc_client::{AddHeaders, GrpcHeaders};

/// The trait Elimination Based Routing would have the functions required to support performance, calculation and invalidation bucket
#[async_trait::async_trait]
Expand All @@ -32,6 +33,7 @@ pub trait EliminationBasedRouting: dyn_clone::DynClone + Send + Sync {
params: String,
labels: Vec<RoutableConnectorChoice>,
configs: Option<EliminationConfig>,
headers: GrpcHeaders,
) -> DynamicRoutingResult<EliminationResponse>;
/// To update the bucket size and ttl for list of connectors with its respective bucket name
async fn update_elimination_bucket_config(
Expand All @@ -40,11 +42,13 @@ pub trait EliminationBasedRouting: dyn_clone::DynClone + Send + Sync {
params: String,
report: Vec<RoutableConnectorChoiceWithBucketName>,
config: Option<EliminationConfig>,
headers: GrpcHeaders,
) -> DynamicRoutingResult<UpdateEliminationBucketResponse>;
/// To invalidate the previous id's bucket
async fn invalidate_elimination_bucket(
&self,
id: String,
headers: GrpcHeaders,
) -> DynamicRoutingResult<InvalidateBucketResponse>;
}

Expand All @@ -56,6 +60,7 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
params: String,
label_input: Vec<RoutableConnectorChoice>,
configs: Option<EliminationConfig>,
headers: GrpcHeaders,
) -> DynamicRoutingResult<EliminationResponse> {
let labels = label_input
.into_iter()
Expand All @@ -64,13 +69,15 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {

let config = configs.map(ForeignTryFrom::foreign_try_from).transpose()?;

let request = tonic::Request::new(EliminationRequest {
let mut request = tonic::Request::new(EliminationRequest {
id,
params,
labels,
config,
});

request.add_headers_to_grpc_request(headers);

let response = self
.clone()
.get_elimination_status(request)
Expand All @@ -89,6 +96,7 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
params: String,
report: Vec<RoutableConnectorChoiceWithBucketName>,
configs: Option<EliminationConfig>,
headers: GrpcHeaders,
) -> DynamicRoutingResult<UpdateEliminationBucketResponse> {
let config = configs.map(ForeignTryFrom::foreign_try_from).transpose()?;

Expand All @@ -102,13 +110,15 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
})
.collect::<Vec<_>>();

let request = tonic::Request::new(UpdateEliminationBucketRequest {
let mut request = tonic::Request::new(UpdateEliminationBucketRequest {
id,
params,
labels_with_bucket_name,
config,
});

request.add_headers_to_grpc_request(headers);

let response = self
.clone()
.update_elimination_bucket(request)
Expand All @@ -122,8 +132,11 @@ impl EliminationBasedRouting for EliminationAnalyserClient<Client> {
async fn invalidate_elimination_bucket(
&self,
id: String,
headers: GrpcHeaders,
) -> DynamicRoutingResult<InvalidateBucketResponse> {
let request = tonic::Request::new(InvalidateBucketRequest { id });
let mut request = tonic::Request::new(InvalidateBucketRequest { id });

request.add_headers_to_grpc_request(headers);

let response = self
.clone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod success_rate {
tonic::include_proto!("success_rate");
}
use super::{Client, DynamicRoutingError, DynamicRoutingResult};
use crate::grpc_client::{AddHeaders, GrpcHeaders};
/// The trait Success Based Dynamic Routing would have the functions required to support the calculation and updation window
#[async_trait::async_trait]
pub trait SuccessBasedDynamicRouting: dyn_clone::DynClone + Send + Sync {
Expand All @@ -31,6 +32,7 @@ pub trait SuccessBasedDynamicRouting: dyn_clone::DynClone + Send + Sync {
success_rate_based_config: SuccessBasedRoutingConfig,
params: String,
label_input: Vec<RoutableConnectorChoice>,
headers: GrpcHeaders,
) -> DynamicRoutingResult<CalSuccessRateResponse>;
/// To update the success rate with the given label
async fn update_success_rate(
Expand All @@ -39,11 +41,13 @@ pub trait SuccessBasedDynamicRouting: dyn_clone::DynClone + Send + Sync {
success_rate_based_config: SuccessBasedRoutingConfig,
params: String,
response: Vec<RoutableConnectorChoiceWithStatus>,
headers: GrpcHeaders,
) -> DynamicRoutingResult<UpdateSuccessRateWindowResponse>;
/// To invalidates the success rate routing keys
async fn invalidate_success_rate_routing_keys(
&self,
id: String,
headers: GrpcHeaders,
) -> DynamicRoutingResult<InvalidateWindowsResponse>;
}

Expand All @@ -55,6 +59,7 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
success_rate_based_config: SuccessBasedRoutingConfig,
params: String,
label_input: Vec<RoutableConnectorChoice>,
headers: GrpcHeaders,
) -> DynamicRoutingResult<CalSuccessRateResponse> {
let labels = label_input
.into_iter()
Expand All @@ -66,13 +71,15 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
.map(ForeignTryFrom::foreign_try_from)
.transpose()?;

let request = tonic::Request::new(CalSuccessRateRequest {
let mut request = tonic::Request::new(CalSuccessRateRequest {
id,
params,
labels,
config,
});

request.add_headers_to_grpc_request(headers);

let response = self
.clone()
.fetch_success_rate(request)
Expand All @@ -91,6 +98,7 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
success_rate_based_config: SuccessBasedRoutingConfig,
params: String,
label_input: Vec<RoutableConnectorChoiceWithStatus>,
headers: GrpcHeaders,
) -> DynamicRoutingResult<UpdateSuccessRateWindowResponse> {
let config = success_rate_based_config
.config
Expand All @@ -105,13 +113,15 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
})
.collect();

let request = tonic::Request::new(UpdateSuccessRateWindowRequest {
let mut request = tonic::Request::new(UpdateSuccessRateWindowRequest {
id,
params,
labels_with_status,
config,
});

request.add_headers_to_grpc_request(headers);

let response = self
.clone()
.update_success_rate_window(request)
Expand All @@ -126,8 +136,11 @@ impl SuccessBasedDynamicRouting for SuccessRateCalculatorClient<Client> {
async fn invalidate_success_rate_routing_keys(
&self,
id: String,
headers: GrpcHeaders,
) -> DynamicRoutingResult<InvalidateWindowsResponse> {
let request = tonic::Request::new(InvalidateWindowsRequest { id });
let mut request = tonic::Request::new(InvalidateWindowsRequest { id });

request.add_headers_to_grpc_request(headers);

let response = self
.clone()
Expand Down
1 change: 1 addition & 0 deletions crates/router/src/core/payments/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,7 @@ pub async fn perform_success_based_routing(
success_based_routing_configs,
success_based_routing_config_params,
routable_connectors,
state.get_grpc_headers(),
)
.await
.change_context(errors::RoutingError::SuccessRateCalculationError)
Expand Down
5 changes: 4 additions & 1 deletion crates/router/src/core/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1408,7 +1408,10 @@ pub async fn success_based_routing_update_configs(
.as_ref()
.async_map(|sr_client| async {
sr_client
.invalidate_success_rate_routing_keys(prefix_of_dynamic_routing_keys)
.invalidate_success_rate_routing_keys(
prefix_of_dynamic_routing_keys,
state.get_grpc_headers(),
)
.await
.change_context(errors::ApiErrorResponse::GenericNotFoundError {
message: "Failed to invalidate the routing keys".to_string(),
Expand Down
2 changes: 2 additions & 0 deletions crates/router/src/core/routing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ pub async fn push_metrics_with_update_window_for_success_based_routing(
success_based_routing_configs.clone(),
success_based_routing_config_params.clone(),
routable_connectors.clone(),
state.get_grpc_headers(),
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
Expand Down Expand Up @@ -854,6 +855,7 @@ pub async fn push_metrics_with_update_window_for_success_based_routing(
},
payment_status_attribute == common_enums::AttemptStatus::Charged,
)],
state.get_grpc_headers(),
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
Expand Down
11 changes: 10 additions & 1 deletion crates/router/src/routes/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use common_utils::id_type;
use external_services::email::{
no_email::NoEmailClient, ses::AwsSes, smtp::SmtpServer, EmailClientConfigs, EmailService,
};
use external_services::{file_storage::FileStorageInterface, grpc_client::GrpcClients};
use external_services::{
file_storage::FileStorageInterface,
grpc_client::{GrpcClients, GrpcHeaders},
};
use hyperswitch_interfaces::{
encryption_interface::EncryptionManagementInterface,
secrets_interface::secret_state::{RawSecret, SecuredSecret},
Expand Down Expand Up @@ -123,6 +126,12 @@ impl SessionState {
event_context: events::EventContext::new(self.event_handler.clone()),
}
}
pub fn get_grpc_headers(&self) -> GrpcHeaders {
GrpcHeaders {
tenant_id: self.tenant.tenant_id.get_string_repr().to_string(),
request_id: self.request_id.map(|req_id| (*req_id).to_string()),
}
}
}

pub trait SessionStateInfo {
Expand Down
Loading