Skip to content

Commit

Permalink
feat: enable compression for metasrv client (#5078)
Browse files Browse the repository at this point in the history
* feat: enable compression for metasrv client

* refactor: simplify gRPC service router registration

* chore: fix unit tests
  • Loading branch information
WenyXu authored Dec 2, 2024
1 parent c049ce6 commit 0f116c8
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 15 deletions.
6 changes: 5 additions & 1 deletion src/meta-client/src/client/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use common_meta::rpc::store::{
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use tonic::Status;

Expand Down Expand Up @@ -173,7 +174,10 @@ impl Inner {
fn make_client(&self, addr: impl AsRef<str>) -> Result<ClusterClient<Channel>> {
let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?;

Ok(ClusterClient::new(channel))
Ok(ClusterClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd))
}

#[inline]
Expand Down
6 changes: 5 additions & 1 deletion src/meta-client/src/client/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use tonic::Streaming;

Expand Down Expand Up @@ -249,7 +250,10 @@ impl Inner {
.get(addr)
.context(error::CreateChannelSnafu)?;

Ok(HeartbeatClient::new(channel))
Ok(HeartbeatClient::new(channel)
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd))
}

#[inline]
Expand Down
6 changes: 5 additions & 1 deletion src/meta-client/src/client/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use tonic::Status;

Expand Down Expand Up @@ -141,7 +142,10 @@ impl Inner {
.get(addr)
.context(error::CreateChannelSnafu)?;

Ok(ProcedureServiceClient::new(channel))
Ok(ProcedureServiceClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd))
}

#[inline]
Expand Down
6 changes: 5 additions & 1 deletion src/meta-client/src/client/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;

use crate::client::{load_balance as lb, Id};
Expand Down Expand Up @@ -236,7 +237,10 @@ impl Inner {
.get(addr)
.context(error::CreateChannelSnafu)?;

Ok(StoreClient::new(channel))
Ok(StoreClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd))
}

#[inline]
Expand Down
27 changes: 20 additions & 7 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Receiver, Sender};
#[cfg(feature = "pg_kvbackend")]
use tokio_postgres::NoTls;
use tonic::codec::CompressionEncoding;
use tonic::transport::server::{Router, TcpIncoming};

use crate::election::etcd::EtcdElection;
Expand Down Expand Up @@ -178,14 +179,26 @@ pub async fn bootstrap_metasrv_with_router(
Ok(())
}

#[macro_export]
macro_rules! add_compressed_service {
($builder:expr, $server:expr) => {
$builder.add_service(
$server
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd),
)
};
}

pub fn router(metasrv: Arc<Metasrv>) -> Router {
tonic::transport::Server::builder()
.accept_http1(true) // for admin services
.add_service(HeartbeatServer::from_arc(metasrv.clone()))
.add_service(StoreServer::from_arc(metasrv.clone()))
.add_service(ClusterServer::from_arc(metasrv.clone()))
.add_service(ProcedureServiceServer::from_arc(metasrv.clone()))
.add_service(admin::make_admin_service(metasrv))
let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services
let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
router.add_service(admin::make_admin_service(metasrv))
}

pub async fn metasrv_builder(
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub mod selector;
pub mod service;
pub mod state;
pub mod table_meta_alloc;

pub use crate::error::Result;

mod greptimedb_telemetry;
Expand Down
13 changes: 9 additions & 4 deletions src/meta-srv/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use tonic::codec::CompressionEncoding;
use tower::service_fn;

use crate::add_compressed_service;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};

Expand Down Expand Up @@ -80,11 +82,14 @@ pub async fn mock(
let (client, server) = tokio::io::duplex(1024);
let metasrv = Arc::new(metasrv);
let service = metasrv.clone();

let _handle = tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(HeartbeatServer::from_arc(service.clone()))
.add_service(StoreServer::from_arc(service.clone()))
.add_service(ProcedureServiceServer::from_arc(service.clone()))
let mut router = tonic::transport::Server::builder();
let router = add_compressed_service!(router, HeartbeatServer::from_arc(service.clone()));
let router = add_compressed_service!(router, StoreServer::from_arc(service.clone()));
let router =
add_compressed_service!(router, ProcedureServiceServer::from_arc(service.clone()));
router
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
Expand Down

0 comments on commit 0f116c8

Please sign in to comment.