Skip to content

Commit

Permalink
fix(query): use custom connector for udf client (#16697)
Browse files Browse the repository at this point in the history
* fix(query): use custom connector for udf client

* z

* z

* z

* z

* z

* z

* z
  • Loading branch information
everpcpc authored Oct 30, 2024
1 parent ee8fb25 commit e3a3d70
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 7 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions src/common/grpc/src/dns_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl DNSResolver {
}

#[derive(Clone)]
struct DNSService;
pub struct DNSService;

impl tower_service::Service<Name> for DNSService {
type Response = DNSServiceAddrs;
Expand All @@ -109,11 +109,11 @@ impl tower_service::Service<Name> for DNSService {
}
}

struct DNSServiceFuture {
pub struct DNSServiceFuture {
inner: JoinHandle<Result<DNSServiceAddrs>>,
}

struct DNSServiceAddrs {
pub struct DNSServiceAddrs {
inner: std::vec::IntoIter<IpAddr>,
}

Expand Down
1 change: 1 addition & 0 deletions src/common/grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use client_conf::RpcClientConf;
pub use client_conf::RpcClientTlsConfig;
pub use dns_resolver::ConnectionFactory;
pub use dns_resolver::DNSResolver;
pub use dns_resolver::DNSService;
pub use dns_resolver::GrpcConnectionError;
pub use grpc_token::GrpcClaim;
pub use grpc_token::GrpcToken;
Expand Down
2 changes: 2 additions & 0 deletions src/query/expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ databend-common-ast = { workspace = true }
databend-common-base = { workspace = true }
databend-common-datavalues = { workspace = true }
databend-common-exception = { workspace = true }
databend-common-grpc = { workspace = true }
databend-common-hashtable = { workspace = true }
databend-common-io = { workspace = true }
educe = { workspace = true }
Expand All @@ -38,6 +39,7 @@ geo = { workspace = true }
geos = { workspace = true }
geozero = { workspace = true }
hex = { workspace = true }
hyper-util = { workspace = true }
itertools = { workspace = true }
jsonb = { workspace = true }
lexical-core = { workspace = true }
Expand Down
18 changes: 14 additions & 4 deletions src/query/expression/src/utils/udf_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ use databend_common_base::headers::HEADER_TENANT;
use databend_common_base::version::DATABEND_SEMVER;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_grpc::DNSService;
use futures::stream;
use futures::StreamExt;
use futures::TryStreamExt;
use hyper_util::client::legacy::connect::HttpConnector;
use tonic::metadata::KeyAndValueRef;
use tonic::metadata::MetadataKey;
use tonic::metadata::MetadataMap;
Expand Down Expand Up @@ -78,16 +80,24 @@ impl UDFFlightClient {
.keep_alive_timeout(Duration::from_secs(UDF_KEEP_ALIVE_TIMEOUT_SEC))
.keep_alive_while_idle(true);

let inner = FlightServiceClient::connect(endpoint)
let mut connector = HttpConnector::new_with_resolver(DNSService);
connector.enforce_http(false);
connector.set_nodelay(true);
connector.set_keepalive(Some(Duration::from_secs(UDF_TCP_KEEP_ALIVE_SEC)));
connector.set_connect_timeout(Some(Duration::from_secs(conn_timeout)));
connector.set_reuse_address(true);

let channel = endpoint
.connect_with_connector(connector)
.await
.map_err(|err| {
ErrorCode::UDFServerConnectError(format!(
"Cannot connect to UDF Server {}: {:?}",
addr, err
))
})?
.max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE);

})?;
let inner =
FlightServiceClient::new(channel).max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE);
Ok(UDFFlightClient {
inner,
batch_rows,
Expand Down

0 comments on commit e3a3d70

Please sign in to comment.