Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make grpc service able to be added dynamically #3160

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ bitflags = "2.4.1"
bytemuck = "1.12"
bytes = { version = "1.5", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.4", features = ["derive"] }
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license.workspace = true
[dependencies]
arrow.workspace = true
chrono.workspace = true
clap = { version = "4.0", features = ["derive"] }
clap.workspace = true
client.workspace = true
futures-util.workspace = true
indicatif = "0.17.1"
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async-trait.workspace = true
auth.workspace = true
catalog.workspace = true
chrono.workspace = true
clap = { version = "4.4", features = ["derive"] }
clap.workspace = true
client.workspace = true
common-base.workspace = true
common-catalog.workspace = true
Expand Down
16 changes: 13 additions & 3 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_config::WalConfig;
use common_telemetry::{info, logging};
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::service::DatanodeServiceBuilder;
use meta_client::MetaClientOptions;
use servers::Mode;
use snafu::{OptionExt, ResultExt};
Expand All @@ -38,6 +39,10 @@ impl Instance {
fn new(datanode: Datanode) -> Self {
Self { datanode }
}

pub fn datanode_mut(&mut self) -> &mut Datanode {
&mut self.datanode
}
}

#[async_trait]
Expand Down Expand Up @@ -219,15 +224,20 @@ impl StartCommand {
client: Arc::new(meta_client.clone()),
});

let datanode = DatanodeBuilder::new(opts, plugins)
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins)
.with_meta_client(meta_client)
.with_kv_backend(meta_backend)
.enable_region_server_service()
.enable_http_service()
.build()
.await
.context(StartDatanodeSnafu)?;

let services = DatanodeServiceBuilder::new(&opts)
.with_default_grpc_server(&datanode.region_server())
.enable_http_service()
.build()
.context(StartDatanodeSnafu)?;
datanode.setup_services(services);

Ok(Instance::new(datanode))
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/cmd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ lazy_static::lazy_static! {
pub trait App {
fn name(&self) -> &str;

/// A hook for implementor to make something happened before actual startup. Defaults to no-op.
fn pre_start(&mut self) -> error::Result<()> {
Ok(())
}

async fn start(&mut self) -> error::Result<()>;

async fn stop(&self) -> error::Result<()>;
Expand Down
99 changes: 8 additions & 91 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//! Datanode implementation.

use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;

Expand Down Expand Up @@ -45,11 +44,7 @@ use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::{join_dir, normalize_dir};
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::GrpcServerConfig;
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::{start_server, ServerHandler, ServerHandlers};
use servers::server::{start_server, ServerHandlers};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use store_api::path_utils::{region_dir, WAL_DIR};
Expand All @@ -62,8 +57,8 @@ use tokio::sync::Notify;
use crate::config::{DatanodeOptions, RegionEngineConfig};
use crate::error::{
BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu,
MissingNodeIdSnafu, OpenLogStoreSnafu, ParseAddrSnafu, Result, RuntimeResourceSnafu,
ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu,
ShutdownServerSnafu, StartServerSnafu,
};
use crate::event_listener::{
new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
Expand All @@ -75,8 +70,6 @@ use crate::region_server::{DummyTableProviderFactory, RegionServer};
use crate::store;

const OPEN_REGION_PARALLELISM: usize = 16;
const REGION_SERVER_SERVICE_NAME: &str = "REGION_SERVER_SERVICE";
const DATANODE_HTTP_SERVICE_NAME: &str = "DATANODE_HTTP_SERVICE";

/// Datanode service.
pub struct Datanode {
Expand Down Expand Up @@ -129,6 +122,10 @@ impl Datanode {
}
}

pub fn setup_services(&mut self, services: ServerHandlers) {
self.services = services;
}

/// Start services of datanode. This method call will block until services are shutdown.
pub async fn start_services(&mut self) -> Result<()> {
let _ = future::try_join_all(self.services.values().map(start_server))
Expand Down Expand Up @@ -173,8 +170,6 @@ pub struct DatanodeBuilder {
plugins: Plugins,
meta_client: Option<MetaClient>,
kv_backend: Option<KvBackendRef>,
enable_region_server_service: bool,
enable_http_service: bool,
}

impl DatanodeBuilder {
Expand All @@ -186,8 +181,6 @@ impl DatanodeBuilder {
plugins,
meta_client: None,
kv_backend: None,
enable_region_server_service: false,
enable_http_service: false,
}
}

Expand All @@ -205,20 +198,6 @@ impl DatanodeBuilder {
}
}

pub fn enable_region_server_service(self) -> Self {
Self {
enable_region_server_service: true,
..self
}
}

pub fn enable_http_service(self) -> Self {
Self {
enable_http_service: true,
..self
}
}

pub async fn build(mut self) -> Result<Datanode> {
let mode = &self.opts.mode;
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
Expand Down Expand Up @@ -269,8 +248,6 @@ impl DatanodeBuilder {
None
};

let services = self.create_datanode_services(&region_server)?;

let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
Some(self.opts.storage.data_home.clone()),
mode,
Expand All @@ -290,7 +267,7 @@ impl DatanodeBuilder {
.context(StartServerSnafu)?;

Ok(Datanode {
services,
services: HashMap::new(),
heartbeat_task,
region_server,
greptimedb_telemetry_task,
Expand All @@ -301,66 +278,6 @@ impl DatanodeBuilder {
})
}

fn create_datanode_services(&self, region_server: &RegionServer) -> Result<ServerHandlers> {
let mut services = HashMap::new();

if self.enable_region_server_service {
services.insert(
REGION_SERVER_SERVICE_NAME.to_string(),
self.create_region_server_service(region_server)?,
);
}

if self.enable_http_service {
services.insert(
DATANODE_HTTP_SERVICE_NAME.to_string(),
self.create_http_service()?,
);
}

Ok(services)
}

fn create_region_server_service(&self, region_server: &RegionServer) -> Result<ServerHandler> {
let opts = &self.opts;

let config = GrpcServerConfig {
max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize,
};

let server = Box::new(
GrpcServerBuilder::new(region_server.runtime())
.config(config)
.flight_handler(Arc::new(region_server.clone()))
.region_server_handler(Arc::new(region_server.clone()))
.build(),
);

let addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &opts.rpc_addr,
})?;

Ok((server, addr))
}

fn create_http_service(&self) -> Result<ServerHandler> {
let opts = &self.opts;

let server = Box::new(
HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml_string())
.build(),
);

let addr = opts.http.addr.parse().context(ParseAddrSnafu {
addr: &opts.http.addr,
})?;

Ok((server, addr))
}

#[cfg(test)]
/// Open all regions belong to this datanode.
async fn initialize_region_server(
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod greptimedb_telemetry;
pub mod heartbeat;
pub mod metrics;
pub mod region_server;
pub mod service;
mod store;
#[cfg(any(test, feature = "testing"))]
pub mod tests;
107 changes: 107 additions & 0 deletions src/datanode/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;

use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::{ServerHandler, ServerHandlers};
use snafu::ResultExt;

use crate::config::DatanodeOptions;
use crate::error::{ParseAddrSnafu, Result};
use crate::region_server::RegionServer;

const DATANODE_GRPC_SERVICE_NAME: &str = "DATANODE_GRPC_SERVICE";
const DATANODE_HTTP_SERVICE_NAME: &str = "DATANODE_HTTP_SERVICE";

pub struct DatanodeServiceBuilder<'a> {
opts: &'a DatanodeOptions,
grpc_server: Option<GrpcServer>,
enable_http_service: bool,
}

impl<'a> DatanodeServiceBuilder<'a> {
pub fn new(opts: &'a DatanodeOptions) -> Self {
Self {
opts,
grpc_server: None,
enable_http_service: false,
}
}

pub fn with_grpc_server(self, grpc_server: GrpcServer) -> Self {
Self {
grpc_server: Some(grpc_server),
..self
}
}

pub fn with_default_grpc_server(mut self, region_server: &RegionServer) -> Self {
let grpc_server = Self::grpc_server_builder(self.opts, region_server).build();
self.grpc_server = Some(grpc_server);
self
}

pub fn enable_http_service(self) -> Self {
Self {
enable_http_service: true,
..self
}
}

pub fn build(mut self) -> Result<ServerHandlers> {
let mut services = HashMap::new();

if let Some(grpc_server) = self.grpc_server.take() {
let addr: SocketAddr = self.opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &self.opts.rpc_addr,
})?;
let handler: ServerHandler = (Box::new(grpc_server), addr);
services.insert(DATANODE_GRPC_SERVICE_NAME.to_string(), handler);
}

if self.enable_http_service {
let http_server = HttpServerBuilder::new(self.opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(self.opts.to_toml_string())
.build();
let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
addr: &self.opts.http.addr,
})?;
let handler: ServerHandler = (Box::new(http_server), addr);
services.insert(DATANODE_HTTP_SERVICE_NAME.to_string(), handler);
}

Ok(services)
}

pub fn grpc_server_builder(
opts: &DatanodeOptions,
region_server: &RegionServer,
) -> GrpcServerBuilder {
let config = GrpcServerConfig {
max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize,
};

GrpcServerBuilder::new(config, region_server.runtime())
.flight_handler(Arc::new(region_server.clone()))
.region_server_handler(Arc::new(region_server.clone()))
}
}
Loading
Loading