Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: bump tonic to v0.12 #17889

Merged
merged 34 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ef2deb2
try try tonic 0.12
BugenZhao Jul 31, 2024
9988967
bump more
BugenZhao Aug 1, 2024
5fc57af
manually downgrade array
BugenZhao Aug 1, 2024
34062d7
bump more more
BugenZhao Aug 1, 2024
dba2100
downgrade arrow again, generate patch
BugenZhao Aug 1, 2024
aa20289
cleanups
BugenZhao Aug 1, 2024
9830565
bump console-api
BugenZhao Aug 1, 2024
975e8e2
bump google cloud pubsub & downgrade tonic in ginepro
BugenZhao Aug 1, 2024
1c5d456
add oltp embedded back
BugenZhao Aug 1, 2024
8e89f84
fix monitor connection impl
BugenZhao Aug 1, 2024
7f6f955
fix compatibility of monitor connection
BugenZhao Aug 1, 2024
8504598
remove
BugenZhao Aug 1, 2024
4267c37
use crates.io madsim
BugenZhao Aug 1, 2024
5e6ae4a
patch tokio-stream
BugenZhao Aug 2, 2024
d0446b2
fix clippy
BugenZhao Aug 2, 2024
ad83a3f
fix more clippy
BugenZhao Aug 2, 2024
44249b8
do not patch tokio-stream
BugenZhao Aug 2, 2024
0e033b9
try try fix madsim
BugenZhao Aug 2, 2024
263d38c
fix check
BugenZhao Aug 2, 2024
4c65d11
use madsim-tonic 0.5.1
BugenZhao Aug 5, 2024
96b346b
Merge remote-tracking branch 'origin/main' into bz/tonic-0-12
BugenZhao Aug 5, 2024
6f26695
Merge branch 'main' into bz/tonic-0-12
wenym1 Aug 8, 2024
e4b87be
Merge branch 'main' into bz/tonic-0-12
BugenZhao Aug 12, 2024
9a1e018
Merge remote-tracking branch 'origin/main' into bz/tonic-0-12
BugenZhao Aug 12, 2024
fcec07c
also tolerate `"get error from control stream"`
BugenZhao Aug 12, 2024
b9b987b
tolerate more
BugenZhao Aug 12, 2024
5c36d92
Merge branch 'main' into bz/tonic-0-12
BugenZhao Aug 13, 2024
fa856f1
Merge branch 'main' into bz/tonic-0-12
BugenZhao Aug 13, 2024
82c4d34
try correctly release all resources in kafka sink protobuf test
BugenZhao Aug 13, 2024
8d8c136
enable span log to try try investigate stuck
BugenZhao Aug 13, 2024
7d41c78
Revert "enable span log to try try investigate stuck"
BugenZhao Aug 14, 2024
96de157
disable `SCHEMA_REGISTRY_DEBUG`
BugenZhao Aug 14, 2024
f7af5e4
Merge remote-tracking branch 'origin/main' into bz/tonic-0-12
BugenZhao Aug 14, 2024
e83f83f
fix protobuf.slt
BugenZhao Aug 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
360 changes: 238 additions & 122 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 11 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,19 @@ aws-smithy-types = { version = "1", default-features = false, features = [
aws-endpoint = "0.60"
aws-types = "1"
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
etcd-client = { package = "madsim-etcd-client", version = "0.4" }
etcd-client = { package = "madsim-etcd-client", version = "0.6" }
futures-async-stream = "0.2.9"
hytra = "0.1"
rdkafka = { package = "madsim-rdkafka", version = "0.4.1", features = [
"cmake-build",
] }
hashbrown = { version = "0.14", features = ["ahash", "inline-more", "nightly"] }
criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "492c244e0be91feb659c0cd48a624bbd96045a33" }
prost = { version = "0.12" }
prost-build = { version = "0.12" }
tonic = { package = "madsim-tonic", version = "0.5.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.5" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "e6cd165b9bc85783b42c106e99186b86b73e3507" }
prost = { version = "0.13" }
prost-build = { version = "0.13" }
icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "1860eb315183a5f3f72b4097c1e40d49407f8373", features = [
"prometheus",
] }
Expand Down Expand Up @@ -180,6 +180,7 @@ tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git"
"profiling",
"stats",
], rev = "64a2d9" }
# TODO(http-bump): bump to use tonic 0.12 once minitrace-opentelemetry is updated
opentelemetry = "0.23"
opentelemetry-otlp = "0.16"
opentelemetry_sdk = { version = "0.23", default-features = false }
Expand All @@ -195,6 +196,7 @@ sea-orm = { version = "0.12.14", features = [
"runtime-tokio-native-tls",
] }
sqlx = "0.7"
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055", features = ["net", "fs"] }
tokio-util = "0.7"
tracing-opentelemetry = "0.24"
rand = { version = "0.8", features = ["small_rng"] }
Expand Down Expand Up @@ -335,7 +337,9 @@ opt-level = 2
# Patch third-party crates for deterministic simulation.
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" }
getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" }
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e" }
# Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies.
# Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`.
# tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055" }
tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88" }
futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" }
Expand Down
1 change: 0 additions & 1 deletion ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ services:
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry:8082
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: message_queue:29092
SCHEMA_REGISTRY_DEBUG: 'true'

pulsar-server:
container_name: pulsar-server
Expand Down
16 changes: 5 additions & 11 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -201,25 +201,19 @@ format plain encode protobuf (
message = 'recursive.AllTypes');

statement ok
drop sink sink_upsert;
drop table from_kafka cascade;

statement ok
drop sink sink_csr_nested;
drop table from_kafka_csr_trivial cascade;

statement ok
drop sink sink_csr_trivial;
drop table from_kafka_csr_nested cascade;

statement ok
drop sink sink0;
drop table from_kafka_raw cascade;

statement ok
drop table into_kafka;

statement ok
drop table from_kafka_raw;
drop table into_kafka cascade;

system ok
rpk topic delete test-rw-sink-upsert-protobuf

statement ok
drop table from_kafka;
2 changes: 1 addition & 1 deletion src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"fs",
] }
tokio-metrics = "0.3.0"
tokio-stream = "0.1"
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tonic = { workspace = true }
tracing = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use bytes::Bytes;
use futures_async_stream::try_stream;
use hashbrown::hash_map::Entry;
use itertools::Itertools;
use prost::Message;
use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{Field, Schema};
Expand All @@ -35,6 +34,7 @@ use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::HashAggNode;
use risingwave_pb::data::DataChunk as PbDataChunk;
use risingwave_pb::Message;

use crate::error::{BatchError, Result};
use crate::executor::aggregation::build as build_agg;
Expand Down
7 changes: 2 additions & 5 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
let pk_prefix = OwnedRow::new(scan_range.eq_conds);

if self.lookup_prefix_len == self.table.pk_indices().len() {
let row = self
.table
.get_row(&pk_prefix, self.epoch.clone().into())
.await?;
let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;

if let Some(row) = row {
self.row_list.push(row);
Expand All @@ -366,7 +363,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
let iter = self
.table
.batch_iter_with_pk_bounds(
self.epoch.clone().into(),
self.epoch.into(),
&pk_prefix,
..,
false,
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::sync::Arc;
use bytes::Bytes;
use futures_async_stream::try_stream;
use itertools::Itertools;
use prost::Message;
use risingwave_common::array::{Array, DataChunk, RowRef};
use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::Schema;
Expand All @@ -34,6 +33,7 @@ use risingwave_common_estimate_size::EstimateSize;
use risingwave_expr::expr::{build_from_prost, BoxedExpression, Expression};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::data::DataChunk as PbDataChunk;
use risingwave_pb::Message;

use super::{ChunkedData, JoinType, RowId};
use crate::error::{BatchError, Result};
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
..Default::default()
}),
}),
epoch: Some(self.epoch.clone()),
epoch: Some(self.epoch),
tracing_context: TracingContext::from_current_span().to_protobuf(),
};

Expand Down Expand Up @@ -237,7 +237,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
&plan_node,
&task_id,
self.context.clone(),
self.epoch.clone(),
self.epoch,
self.shutdown_rx.clone(),
);

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> {
plan_node,
self.task_id,
self.context.clone(),
self.epoch.clone(),
self.epoch,
self.shutdown_rx.clone(),
)
}
Expand All @@ -188,7 +188,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> {
}

pub fn epoch(&self) -> BatchQueryEpoch {
self.epoch.clone()
self.epoch
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;
use bytes::Bytes;
use futures_async_stream::try_stream;
use itertools::Itertools;
use prost::Message;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::memory::MemoryContext;
Expand All @@ -28,6 +27,7 @@ use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::data::DataChunk as PbDataChunk;
use risingwave_pb::Message;

use super::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
Expand Down
7 changes: 3 additions & 4 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {

let ordered = seq_scan_node.ordered;

let epoch = source.epoch.clone();
let epoch = source.epoch;
let limit = seq_scan_node.limit;
let as_of = seq_scan_node
.as_of
Expand Down Expand Up @@ -341,8 +341,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
for point_get in point_gets {
let table = table.clone();
if let Some(row) =
Self::execute_point_get(table, point_get, query_epoch.clone(), histogram.clone())
.await?
Self::execute_point_get(table, point_get, query_epoch, histogram.clone()).await?
{
if let Some(chunk) = data_chunk_builder.append_one_row(row) {
returned += chunk.cardinality() as u64;
Expand Down Expand Up @@ -373,7 +372,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
table.clone(),
range,
ordered,
query_epoch.clone(),
query_epoch,
chunk_size,
limit,
histogram.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/spill/spill_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use futures_util::AsyncReadExt;
use opendal::layers::RetryLayer;
use opendal::services::{Fs, Memory};
use opendal::Operator;
use prost::Message;
use risingwave_common::array::DataChunk;
use risingwave_pb::data::DataChunk as PbDataChunk;
use risingwave_pb::Message;
use thiserror_ext::AsReport;
use tokio::sync::Mutex;
use twox_hash::XxHash64;
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/broadcast_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub fn new_broadcast_channel(
output_channel_size: usize,
) -> (ChanSenderImpl, Vec<ChanReceiverImpl>) {
let broadcast_info = match shuffle.distribution {
Some(exchange_info::Distribution::BroadcastInfo(ref v)) => v.clone(),
Some(exchange_info::Distribution::BroadcastInfo(ref v)) => *v,
_ => BroadcastInfo::default(),
};

Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
self.plan.root.as_ref().unwrap(),
&self.task_id,
self.context.clone(),
self.epoch.clone(),
self.epoch,
self.shutdown_rx.clone(),
)
.build(),
Expand Down
2 changes: 1 addition & 1 deletion src/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"time",
"signal",
] }
tokio-stream = "0.1"
tokio-stream = { workspace = true }
toml = "0.8"
tracing = "0.1"
tracing-subscriber = "0.3.17"
Expand Down
2 changes: 1 addition & 1 deletion src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
governor = { version = "0.6", default-features = false, features = ["std"] }
hashbrown = "0.14"
hex = "0.4.3"
http = "0.2"
http = "1"
humantime = "2.1"
hytra = { workspace = true }
itertools = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/common/common_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ normal = ["workspace-hack"]
async-trait = "0.1"
axum = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14" # required by tonic
http = "1"
prometheus = { version = "0.13" }
risingwave_common = { workspace = true }
risingwave_pb = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions src/common/common_service/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use std::task::{Context, Poll};

use futures::Future;
use hyper::Body;
use risingwave_common::util::tracing::TracingContext;
use tonic::body::BoxBody;
use tower::{Layer, Service};
use tracing::Instrument;

Expand Down Expand Up @@ -49,9 +49,9 @@ pub struct TracingExtract<S> {
inner: S,
}

impl<S> Service<hyper::Request<Body>> for TracingExtract<S>
impl<S> Service<http::Request<BoxBody>> for TracingExtract<S>
where
S: Service<hyper::Request<Body>> + Clone + Send + 'static,
S: Service<http::Request<BoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
{
type Error = S::Error;
Expand All @@ -63,7 +63,7 @@ where
self.inner.poll_ready(cx)
}

fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
// This is necessary because tonic internally uses `tower::buffer::Buffer`.
// See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
// for details on why this is necessary
Expand Down
14 changes: 9 additions & 5 deletions src/common/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
auto_impl = "1"
bytes = "1"
clap = { workspace = true }
easy-ext = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
http = "0.2"
hyper = { version = "0.14", features = ["client"] } # used by tonic
http = "1"
http-02 = { package = "http", version = "0.2" }
hyper = { version = "1" }
hyper-014 = { package = "hyper", version = "0.14" }
hyper-util = { version = "0.1", features = ["client-legacy"] }
hytra = { workspace = true }
itertools = { workspace = true }
parking_lot = { workspace = true }
Expand All @@ -32,13 +36,13 @@ serde = { version = "1", features = ["derive"] }
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio" }
tonic = { workspace = true }
tower-layer = "0.3.2"
tower-service = "0.3.2"
tracing = "0.1"
tracing-subscriber = "0.3.17"

[target.'cfg(not(madsim))'.dependencies]
http-body = "0.4.5"
tower-layer = "0.3.2"
tower-service = "0.3.2"
http-body = "1"
[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "0.16", default-features = false }
libc = "0.2"
Expand Down
Loading
Loading