Skip to content

Commit

Permalink
update the tls part for data service. (#8632)
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-aptos authored and banool committed Jul 7, 2023
1 parent 9b3b72a commit 003997a
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 29 deletions.
13 changes: 7 additions & 6 deletions ecosystem/indexer-grpc/indexer-grpc-cache-worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ Cache worker fetches data from fullnode GRPC and push data to Cache.

* Yaml Example
```yaml
fullnode_grpc_address: 127.0.0.1:50051
redis_address: 127.0.0.1:6379
health_check_port: 8081
file_store:
file_store_type: GcsFileStore
gcs_file_store_bucket_name: indexer-grpc-file-store-bucketname
health_check_port: 8083
server_config:
fullnode_grpc_address: 0.0.0.0:50052
file_store_config:
file_store_type: GcsFileStore
gcs_file_store_bucket_name: indexer-grpc-file-store-bucketname
redis_main_instance_address: 127.0.0.1:6379
```
3 changes: 3 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ server_config:
file_store_config:
file_store_type: GcsFileStore
gcs_file_store_bucket_name: indexer-grpc-file-store-bucketname
data_service_grpc_tls_config:
cert_path: /path/to/cert.cert
key_path: /path/to/key.pem
redis_read_replica_address: 127.0.0.1:6379
```
Expand Down
79 changes: 69 additions & 10 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,24 @@ use tonic::{

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerGrpcDataServiceConfig {
pub struct TlsConfig {
// TLS config.
pub data_service_grpc_listen_address: String,
pub cert_path: String,
pub key_path: String,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct NonTlsConfig {
pub data_service_grpc_listen_address: String,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerGrpcDataServiceConfig {
pub data_service_grpc_tls_config: Option<TlsConfig>,
pub data_service_grpc_non_tls_config: Option<NonTlsConfig>,
pub whitelisted_auth_tokens: Vec<String>,
pub file_store_config: IndexerGrpcFileStoreConfig,
pub redis_read_replica_address: String,
Expand All @@ -33,8 +49,6 @@ pub struct IndexerGrpcDataServiceConfig {
#[async_trait::async_trait]
impl RunnableConfig for IndexerGrpcDataServiceConfig {
async fn run(&self) -> Result<()> {
let grpc_address = self.data_service_grpc_listen_address.clone();

let token_set = build_auth_token_set(self.whitelisted_auth_tokens.clone());
let authentication_inceptor =
move |req: Request<()>| -> std::result::Result<Request<()>, Status> {
Expand All @@ -61,7 +75,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
.register_encoded_file_descriptor_set(TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET)
.build()
.expect("Failed to build reflection service");
.map_err(|e| anyhow::anyhow!("Failed to build reflection service: {}", e))?;

// Add authentication interceptor.
let server = RawDataServerWrapper::new(
Expand All @@ -72,12 +86,57 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip);
let svc_with_interceptor = InterceptedService::new(svc, authentication_inceptor);
Server::builder()
.add_service(reflection_service)
.add_service(svc_with_interceptor)
.serve(grpc_address.to_socket_addrs().unwrap().next().unwrap())
.await
.map_err(|e| anyhow::anyhow!("Failed to serve: {}", e))

let svc_with_interceptor_clone = svc_with_interceptor.clone();
let reflection_service_clone = reflection_service.clone();

let mut tasks = vec![];
if self.data_service_grpc_non_tls_config.is_some() {
let config = self.data_service_grpc_non_tls_config.clone().unwrap();
let grpc_address = config
.data_service_grpc_listen_address
.to_socket_addrs()
.map_err(|e| anyhow::anyhow!(e))?
.next()
.ok_or_else(|| anyhow::anyhow!("Failed to parse grpc address"))?;
tasks.push(tokio::spawn(async move {
Server::builder()
.add_service(svc_with_interceptor_clone)
.add_service(reflection_service_clone)
.serve(grpc_address)
.await
.map_err(|e| anyhow::anyhow!(e))
}));
}
if self.data_service_grpc_tls_config.is_some() {
let config = self.data_service_grpc_tls_config.clone().unwrap();
let grpc_address = config
.data_service_grpc_listen_address
.to_socket_addrs()
.map_err(|e| anyhow::anyhow!(e))?
.next()
.ok_or_else(|| anyhow::anyhow!("Failed to parse grpc address"))?;

let cert = tokio::fs::read(config.cert_path.clone()).await?;
let key = tokio::fs::read(config.key_path.clone()).await?;
let identity = tonic::transport::Identity::from_pem(cert, key);
tasks.push(tokio::spawn(async move {
Server::builder()
.tls_config(tonic::transport::ServerTlsConfig::new().identity(identity))?
.add_service(svc_with_interceptor)
.add_service(reflection_service)
.serve(grpc_address)
.await
.map_err(|e| anyhow::anyhow!(e))
}));
}

if tasks.is_empty() {
return Err(anyhow::anyhow!("No grpc config provided"));
}

futures::future::try_join_all(tasks).await?;
Ok(())
}

fn get_server_name(&self) -> String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub static ERROR_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
/// Data latency for data service based on latest processed transaction based on selected processor.
pub static PROCESSED_LATENCY_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"indexer_grpc_data_service_data_latency_in_secs",
"indexer_grpc_data_service_latest_data_latency_in_secs",
"Latency of data service based on latest processed transaction",
&["request_token", "processor_name"],
)
Expand All @@ -60,7 +60,7 @@ pub static PROCESSED_LATENCY_IN_SECS: Lazy<GaugeVec> = Lazy::new(|| {
/// Data latency for data service based on latest processed transaction for all processors.
pub static PROCESSED_LATENCY_IN_SECS_ALL: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"indexer_grpc_data_service_data_latency_in_secs_all",
"indexer_grpc_data_service_latest_data_latency_in_secs_all",
"Latency of data service based on latest processed transaction",
&["request_token"]
)
Expand Down
24 changes: 14 additions & 10 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use aptos_indexer_grpc_utils::{
build_protobuf_encoded_transaction_wrappers,
cache_operator::{CacheBatchGetStatus, CacheOperator},
config::IndexerGrpcFileStoreConfig,
constants::{GRPC_AUTH_TOKEN_HEADER, GRPC_REQUEST_NAME_HEADER},
constants::{BLOB_STORAGE_SIZE, GRPC_AUTH_TOKEN_HEADER, GRPC_REQUEST_NAME_HEADER},
file_store_operator::{FileStoreOperator, GcsFileStoreOperator, LocalFileStoreOperator},
time_diff_since_pb_timestamp_in_secs, EncodedTransactionWithVersion,
};
Expand Down Expand Up @@ -261,15 +261,19 @@ impl RawData for RawDataServerWrapper {
])
.inc_by(current_batch_size as u64);
if let Some(data_latency_in_secs) = data_latency_in_secs {
PROCESSED_LATENCY_IN_SECS
.with_label_values(&[
request_metadata.request_token.as_str(),
request_metadata.request_name.as_str(),
])
.set(data_latency_in_secs);
PROCESSED_LATENCY_IN_SECS_ALL
.with_label_values(&[request_metadata.request_source.as_str()])
.observe(data_latency_in_secs);
// If it's a partial batch, we record the latency because it usually means
// the data is the latest.
if current_batch_size % BLOB_STORAGE_SIZE != 0 {
PROCESSED_LATENCY_IN_SECS
.with_label_values(&[
request_metadata.request_token.as_str(),
request_metadata.request_name.as_str(),
])
.set(data_latency_in_secs);
PROCESSED_LATENCY_IN_SECS_ALL
.with_label_values(&[request_metadata.request_source.as_str()])
.observe(data_latency_in_secs);
}
}
},
Err(SendTimeoutError::Timeout(_)) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const CACHE_SIZE_ESTIMATION: u64 = 3_000_000_u64;
// lower than the latest version - CACHE_SIZE_EVICTION_LOWER_BOUND.
// The gap between CACHE_SIZE_ESTIMATION and this is to give buffer since
// reading latest version and actual data not atomic(two operations).
const CACHE_SIZE_EVICTION_LOWER_BOUND: u64 = 12_000_000_u64;
const CACHE_SIZE_EVICTION_LOWER_BOUND: u64 = 4_000_000_u64;

// Keys for cache.
const CACHE_KEY_LATEST_VERSION: &str = "latest_version";
Expand Down

0 comments on commit 003997a

Please sign in to comment.