diff --git a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/README.md b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/README.md index c8478b53b2978..0299c39364a72 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/README.md +++ b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/README.md @@ -12,10 +12,11 @@ Cache worker fetches data from fullnode GRPC and push data to Cache. * Yaml Example ```yaml -fullnode_grpc_address: 127.0.0.1:50051 -redis_address: 127.0.0.1:6379 -health_check_port: 8081 -file_store: - file_store_type: GcsFileStore - gcs_file_store_bucket_name: indexer-grpc-file-store-bucketname +health_check_port: 8083 +server_config: + fullnode_grpc_address: 0.0.0.0:50052 + file_store_config: + file_store_type: GcsFileStore + gcs_file_store_bucket_name: indexer-grpc-file-store-bucketname + redis_main_instance_address: 127.0.0.1:6379 ``` diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/README.md b/ecosystem/indexer-grpc/indexer-grpc-data-service/README.md index de6b6fe025496..667452f932fc4 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/README.md +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/README.md @@ -22,6 +22,9 @@ server_config: file_store_config: file_store_type: GcsFileStore gcs_file_store_bucket_name: indexer-grpc-file-store-bucketname + data_service_grpc_tls_config: + cert_path: /path/to/cert.cert + key_path: /path/to/key.pem redis_read_replica_address: 127.0.0.1:6379 ``` diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/main.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/main.rs index 646aa1611cd0d..f86b31ba33ff4 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/main.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/main.rs @@ -23,8 +23,24 @@ use tonic::{ #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] -pub struct IndexerGrpcDataServiceConfig { +pub struct TlsConfig { + // TLS config. pub data_service_grpc_listen_address: String, + pub cert_path: String, + pub key_path: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct NonTlsConfig { + pub data_service_grpc_listen_address: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct IndexerGrpcDataServiceConfig { + pub data_service_grpc_tls_config: Option, + pub data_service_grpc_non_tls_config: Option, pub whitelisted_auth_tokens: Vec, pub file_store_config: IndexerGrpcFileStoreConfig, pub redis_read_replica_address: String, @@ -33,8 +49,6 @@ pub struct IndexerGrpcDataServiceConfig { #[async_trait::async_trait] impl RunnableConfig for IndexerGrpcDataServiceConfig { async fn run(&self) -> Result<()> { - let grpc_address = self.data_service_grpc_listen_address.clone(); - let token_set = build_auth_token_set(self.whitelisted_auth_tokens.clone()); let authentication_inceptor = move |req: Request<()>| -> std::result::Result, Status> { @@ -61,7 +75,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig { .register_encoded_file_descriptor_set(TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET) .register_encoded_file_descriptor_set(UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET) .build() - .expect("Failed to build reflection service"); + .map_err(|e| anyhow::anyhow!("Failed to build reflection service: {}", e))?; // Add authentication interceptor. let server = RawDataServerWrapper::new( @@ -72,12 +86,57 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig { .send_compressed(CompressionEncoding::Gzip) .accept_compressed(CompressionEncoding::Gzip); let svc_with_interceptor = InterceptedService::new(svc, authentication_inceptor); - Server::builder() - .add_service(reflection_service) - .add_service(svc_with_interceptor) - .serve(grpc_address.to_socket_addrs().unwrap().next().unwrap()) - .await - .map_err(|e| anyhow::anyhow!("Failed to serve: {}", e)) + + let svc_with_interceptor_clone = svc_with_interceptor.clone(); + let reflection_service_clone = reflection_service.clone(); + + let mut tasks = vec![]; + if self.data_service_grpc_non_tls_config.is_some() { + let config = self.data_service_grpc_non_tls_config.clone().unwrap(); + let grpc_address = config + .data_service_grpc_listen_address + .to_socket_addrs() + .map_err(|e| anyhow::anyhow!(e))? + .next() + .ok_or_else(|| anyhow::anyhow!("Failed to parse grpc address"))?; + tasks.push(tokio::spawn(async move { + Server::builder() + .add_service(svc_with_interceptor_clone) + .add_service(reflection_service_clone) + .serve(grpc_address) + .await + .map_err(|e| anyhow::anyhow!(e)) + })); + } + if self.data_service_grpc_tls_config.is_some() { + let config = self.data_service_grpc_tls_config.clone().unwrap(); + let grpc_address = config + .data_service_grpc_listen_address + .to_socket_addrs() + .map_err(|e| anyhow::anyhow!(e))? + .next() + .ok_or_else(|| anyhow::anyhow!("Failed to parse grpc address"))?; + + let cert = tokio::fs::read(config.cert_path.clone()).await?; + let key = tokio::fs::read(config.key_path.clone()).await?; + let identity = tonic::transport::Identity::from_pem(cert, key); + tasks.push(tokio::spawn(async move { + Server::builder() + .tls_config(tonic::transport::ServerTlsConfig::new().identity(identity))? + .add_service(svc_with_interceptor) + .add_service(reflection_service) + .serve(grpc_address) + .await + .map_err(|e| anyhow::anyhow!(e)) + })); + } + + if tasks.is_empty() { + return Err(anyhow::anyhow!("No grpc config provided")); + } + + futures::future::try_join_all(tasks).await?; + Ok(()) } fn get_server_name(&self) -> String { diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs index 9968929d5e490..d5461a6bb570b 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs @@ -50,7 +50,7 @@ pub static ERROR_COUNT: Lazy = Lazy::new(|| { /// Data latency for data service based on latest processed transaction based on selected processor. pub static PROCESSED_LATENCY_IN_SECS: Lazy = Lazy::new(|| { register_gauge_vec!( - "indexer_grpc_data_service_data_latency_in_secs", + "indexer_grpc_data_service_latest_data_latency_in_secs", "Latency of data service based on latest processed transaction", &["request_token", "processor_name"], ) @@ -60,7 +60,7 @@ pub static PROCESSED_LATENCY_IN_SECS: Lazy = Lazy::new(|| { /// Data latency for data service based on latest processed transaction for all processors. pub static PROCESSED_LATENCY_IN_SECS_ALL: Lazy = Lazy::new(|| { register_histogram_vec!( - "indexer_grpc_data_service_data_latency_in_secs_all", + "indexer_grpc_data_service_latest_data_latency_in_secs_all", "Latency of data service based on latest processed transaction", &["request_token"] ) diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs index 22aa91a53fc46..db7e3e6cb2457 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs @@ -9,7 +9,7 @@ use aptos_indexer_grpc_utils::{ build_protobuf_encoded_transaction_wrappers, cache_operator::{CacheBatchGetStatus, CacheOperator}, config::IndexerGrpcFileStoreConfig, - constants::{GRPC_AUTH_TOKEN_HEADER, GRPC_REQUEST_NAME_HEADER}, + constants::{BLOB_STORAGE_SIZE, GRPC_AUTH_TOKEN_HEADER, GRPC_REQUEST_NAME_HEADER}, file_store_operator::{FileStoreOperator, GcsFileStoreOperator, LocalFileStoreOperator}, time_diff_since_pb_timestamp_in_secs, EncodedTransactionWithVersion, }; @@ -261,15 +261,19 @@ impl RawData for RawDataServerWrapper { ]) .inc_by(current_batch_size as u64); if let Some(data_latency_in_secs) = data_latency_in_secs { - PROCESSED_LATENCY_IN_SECS - .with_label_values(&[ - request_metadata.request_token.as_str(), - request_metadata.request_name.as_str(), - ]) - .set(data_latency_in_secs); - PROCESSED_LATENCY_IN_SECS_ALL - .with_label_values(&[request_metadata.request_source.as_str()]) - .observe(data_latency_in_secs); + // If it's a partial batch, we record the latency because it usually means + // the data is the latest. + if current_batch_size % BLOB_STORAGE_SIZE != 0 { + PROCESSED_LATENCY_IN_SECS + .with_label_values(&[ + request_metadata.request_token.as_str(), + request_metadata.request_name.as_str(), + ]) + .set(data_latency_in_secs); + PROCESSED_LATENCY_IN_SECS_ALL + .with_label_values(&[request_metadata.request_source.as_str()]) + .observe(data_latency_in_secs); + } } }, Err(SendTimeoutError::Timeout(_)) => { diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs index 604d30b4fa7a3..46a78b76cc134 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs @@ -16,7 +16,7 @@ const CACHE_SIZE_ESTIMATION: u64 = 3_000_000_u64; // lower than the latest version - CACHE_SIZE_EVICTION_LOWER_BOUND. // The gap between CACHE_SIZE_ESTIMATION and this is to give buffer since // reading latest version and actual data not atomic(two operations). -const CACHE_SIZE_EVICTION_LOWER_BOUND: u64 = 12_000_000_u64; +const CACHE_SIZE_EVICTION_LOWER_BOUND: u64 = 4_000_000_u64; // Keys for cache. const CACHE_KEY_LATEST_VERSION: &str = "latest_version";