Skip to content

Commit

Permalink
feat(channel): Add Change type to make tower internal dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Sep 6, 2024
1 parent ef7f0bb commit 871ccfc
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 9 deletions.
3 changes: 1 addition & 2 deletions examples/src/dynamic_load_balance/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ pub mod pb {
}

use pb::{echo_client::EchoClient, EchoRequest};
use tonic::transport::channel::Change;
use tonic::transport::Channel;

use tonic::transport::Endpoint;

use std::sync::Arc;

use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use tokio::time::timeout;
use tower::discover::Change;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down
1 change: 0 additions & 1 deletion tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ allowed_external_types = [
"futures_core::stream::Stream",
"h2::error::Error",
"http_body_util::combinators::box_body::UnsyncBoxBody",
"tower::discover::Change",
"tower_service::Service",
"tower_layer::Layer",
"tower_layer::stack::Stack",
Expand Down
3 changes: 2 additions & 1 deletion tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub(crate) mod service;
#[cfg(feature = "tls")]
mod tls;

pub use self::service::Change;
pub use endpoint::Endpoint;
#[cfg(feature = "tls")]
pub use tls::ClientTlsConfig;
Expand All @@ -30,7 +31,7 @@ use hyper::rt;
use tower::balance::p2c::Balance;
use tower::{
buffer::{self, Buffer},
discover::{Change, Discover},
discover::Discover,
util::BoxService,
Service,
};
Expand Down
18 changes: 13 additions & 5 deletions tonic/src/transport/channel/service/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@ use std::{
task::{Context, Poll},
};
use tokio::sync::mpsc::Receiver;

use tokio_stream::Stream;
use tower::discover::Change;
use tower::discover::Change as TowerChange;

type DiscoverResult<K, S, E> = Result<TowerChange<K, S>, E>;

type DiscoverResult<K, S, E> = Result<Change<K, S>, E>;
/// A change in the service set.
#[derive(Debug, Clone)]
pub enum Change<K, V> {
/// A new service identified by key `K` was identified.
Insert(K, V),
/// The service identified by key `K` disappeared.
Remove(K),
}

pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> {
changes: Receiver<Change<K, Endpoint>>,
Expand Down Expand Up @@ -39,10 +47,10 @@ impl<K: Hash + Eq + Clone> Stream for DynamicServiceStream<K> {
http.enforce_http(false);

let connection = Connection::lazy(endpoint.connector(http), endpoint);
let change = Ok(Change::Insert(k, connection));
let change = Ok(TowerChange::Insert(k, connection));
Poll::Ready(Some(change))
}
Change::Remove(k) => Poll::Ready(Some(Ok(Change::Remove(k)))),
Change::Remove(k) => Poll::Ready(Some(Ok(TowerChange::Remove(k)))),
},
}
}
Expand Down
1 change: 1 addition & 0 deletions tonic/src/transport/channel/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod connection;
pub(super) use self::connection::Connection;

mod discover;
pub use self::discover::Change;
pub(super) use self::discover::DynamicServiceStream;

mod io;
Expand Down

0 comments on commit 871ccfc

Please sign in to comment.