Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: (wip) reproduce panic #17920

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
388 changes: 257 additions & 131 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 10 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,18 @@ aws-smithy-types = { version = "1", default-features = false, features = [
aws-endpoint = "0.60"
aws-types = "1"
axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
etcd-client = { package = "madsim-etcd-client", version = "0.4" }
etcd-client = { package = "madsim-etcd-client", version = "0.6" }
futures-async-stream = "0.2.9"
hytra = "0.1"
rdkafka = { package = "madsim-rdkafka", version = "0.4.1", features = [
"cmake-build",
] }
hashbrown = { version = "0.14", features = ["ahash", "inline-more", "nightly"] }
criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "492c244e0be91feb659c0cd48a624bbd96045a33" }
prost = { version = "0.12" }
tonic = { package = "madsim-tonic", version = "0.5.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.5" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "e6cd165b9bc85783b42c106e99186b86b73e3507" }
prost = { version = "0.13" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "07d53893d7788b4e41fc11efad8a6be828405c31", features = [
"prometheus",
] }
Expand Down Expand Up @@ -179,6 +179,7 @@ tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git"
"profiling",
"stats",
], rev = "64a2d9" }
# TODO(http-bump): bump to use tonic 0.12 once minitrace-opentelemetry is updated
opentelemetry = "0.23"
opentelemetry-otlp = "0.16"
opentelemetry_sdk = { version = "0.23", default-features = false }
Expand All @@ -194,6 +195,7 @@ sea-orm = { version = "0.12.14", features = [
"runtime-tokio-native-tls",
] }
sqlx = "0.7"
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055", features = ["net", "fs"] }
tokio-util = "0.7"
tracing-opentelemetry = "0.24"
rand = { version = "0.8", features = ["small_rng"] }
Expand Down Expand Up @@ -334,7 +336,9 @@ opt-level = 2
# Patch third-party crates for deterministic simulation.
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" }
getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" }
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e" }
# Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies.
# Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`.
# tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055" }
tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88" }
futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" }
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ echo "--- Download artifacts"
download-and-decompress-artifact risingwave_simulation .
chmod +x ./risingwave_simulation

export RUST_LOG="risingwave_meta::barrier::recovery=debug,\
export RUST_LOG="info,risingwave_meta::barrier::recovery=debug,\
risingwave_meta::manager::catalog=debug,\
risingwave_meta::rpc::ddl_controller=debug,\
risingwave_meta::barrier::mod=debug,\
Expand Down
3 changes: 1 addition & 2 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ parking_lot = { workspace = true }
parquet = { workspace = true }
paste = "1"
prometheus = { version = "0.13", features = ["process"] }
prost = "0.12"
rand = { workspace = true }
risingwave_common = { workspace = true }
risingwave_common_estimate_size = { workspace = true }
Expand All @@ -63,7 +62,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"fs",
] }
tokio-metrics = "0.3.0"
tokio-stream = "0.1"
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tonic = { workspace = true }
tracing = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use bytes::Bytes;
use futures_async_stream::try_stream;
use hashbrown::hash_map::Entry;
use itertools::Itertools;
use prost::Message;
use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{Field, Schema};
Expand All @@ -35,6 +34,7 @@ use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::HashAggNode;
use risingwave_pb::data::DataChunk as PbDataChunk;
use risingwave_pb::Message;

use crate::error::{BatchError, Result};
use crate::executor::aggregation::build as build_agg;
Expand Down
7 changes: 2 additions & 5 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
let pk_prefix = OwnedRow::new(scan_range.eq_conds);

if self.lookup_prefix_len == self.table.pk_indices().len() {
let row = self
.table
.get_row(&pk_prefix, self.epoch.clone().into())
.await?;
let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;

if let Some(row) = row {
self.row_list.push(row);
Expand All @@ -366,7 +363,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
let iter = self
.table
.batch_iter_with_pk_bounds(
self.epoch.clone().into(),
self.epoch.into(),
&pk_prefix,
..,
false,
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::sync::Arc;
use bytes::Bytes;
use futures_async_stream::try_stream;
use itertools::Itertools;
use prost::Message;
use risingwave_common::array::{Array, DataChunk, RowRef};
use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::Schema;
Expand All @@ -34,6 +33,7 @@ use risingwave_common_estimate_size::EstimateSize;
use risingwave_expr::expr::{build_from_prost, BoxedExpression, Expression};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::data::DataChunk as PbDataChunk;
use risingwave_pb::Message;

use super::{ChunkedData, JoinType, RowId};
use crate::error::{BatchError, Result};
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
..Default::default()
}),
}),
epoch: Some(self.epoch.clone()),
epoch: Some(self.epoch),
tracing_context: TracingContext::from_current_span().to_protobuf(),
};

Expand Down Expand Up @@ -237,7 +237,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
&plan_node,
&task_id,
self.context.clone(),
self.epoch.clone(),
self.epoch,
self.shutdown_rx.clone(),
);

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> {
plan_node,
self.task_id,
self.context.clone(),
self.epoch.clone(),
self.epoch,
self.shutdown_rx.clone(),
)
}
Expand All @@ -188,7 +188,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> {
}

pub fn epoch(&self) -> BatchQueryEpoch {
self.epoch.clone()
self.epoch
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;
use bytes::Bytes;
use futures_async_stream::try_stream;
use itertools::Itertools;
use prost::Message;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::memory::MemoryContext;
Expand All @@ -28,6 +27,7 @@ use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::data::DataChunk as PbDataChunk;
use risingwave_pb::Message;

use super::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
Expand Down
7 changes: 3 additions & 4 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {

let ordered = seq_scan_node.ordered;

let epoch = source.epoch.clone();
let epoch = source.epoch;
let limit = seq_scan_node.limit;
let as_of = seq_scan_node
.as_of
Expand Down Expand Up @@ -341,8 +341,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
for point_get in point_gets {
let table = table.clone();
if let Some(row) =
Self::execute_point_get(table, point_get, query_epoch.clone(), histogram.clone())
.await?
Self::execute_point_get(table, point_get, query_epoch, histogram.clone()).await?
{
if let Some(chunk) = data_chunk_builder.append_one_row(row) {
returned += chunk.cardinality() as u64;
Expand Down Expand Up @@ -373,7 +372,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
table.clone(),
range,
ordered,
query_epoch.clone(),
query_epoch,
chunk_size,
limit,
histogram.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/spill/spill_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use futures_util::AsyncReadExt;
use opendal::layers::RetryLayer;
use opendal::services::{Fs, Memory};
use opendal::Operator;
use prost::Message;
use risingwave_common::array::DataChunk;
use risingwave_pb::data::DataChunk as PbDataChunk;
use risingwave_pb::Message;
use thiserror_ext::AsReport;
use tokio::sync::Mutex;
use twox_hash::XxHash64;
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/broadcast_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub fn new_broadcast_channel(
output_channel_size: usize,
) -> (ChanSenderImpl, Vec<ChanReceiverImpl>) {
let broadcast_info = match shuffle.distribution {
Some(exchange_info::Distribution::BroadcastInfo(ref v)) => v.clone(),
Some(exchange_info::Distribution::BroadcastInfo(ref v)) => *v,
_ => BroadcastInfo::default(),
};

Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
self.plan.root.as_ref().unwrap(),
&self.task_id,
self.context.clone(),
self.epoch.clone(),
self.epoch,
self.shutdown_rx.clone(),
)
.build(),
Expand Down
2 changes: 1 addition & 1 deletion src/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"time",
"signal",
] }
tokio-stream = "0.1"
tokio-stream = { workspace = true }
toml = "0.8"
tracing = "0.1"
tracing-subscriber = "0.3.17"
Expand Down
2 changes: 1 addition & 1 deletion src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
governor = { version = "0.6", default-features = false, features = ["std"] }
hashbrown = "0.14"
hex = "0.4.3"
http = "0.2"
http = "1"
humantime = "2.1"
hytra = { workspace = true }
itertools = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/common/common_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ normal = ["workspace-hack"]
async-trait = "0.1"
axum = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14" # required by tonic
http = "1"
prometheus = { version = "0.13" }
risingwave_common = { workspace = true }
risingwave_pb = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions src/common/common_service/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use std::task::{Context, Poll};

use futures::Future;
use hyper::Body;
use risingwave_common::util::tracing::TracingContext;
use tonic::body::BoxBody;
use tower::{Layer, Service};
use tracing::Instrument;

Expand Down Expand Up @@ -49,9 +49,9 @@ pub struct TracingExtract<S> {
inner: S,
}

impl<S> Service<hyper::Request<Body>> for TracingExtract<S>
impl<S> Service<http::Request<BoxBody>> for TracingExtract<S>
where
S: Service<hyper::Request<Body>> + Clone + Send + 'static,
S: Service<http::Request<BoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
{
type Error = S::Error;
Expand All @@ -63,7 +63,7 @@ where
self.inner.poll_ready(cx)
}

fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
// This is necessary because tonic internally uses `tower::buffer::Buffer`.
// See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
// for details on why this is necessary
Expand Down
14 changes: 9 additions & 5 deletions src/common/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
auto_impl = "1"
bytes = "1"
clap = { workspace = true }
easy-ext = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
http = "0.2"
hyper = { version = "0.14", features = ["client"] } # used by tonic
http = "1"
http-02 = { package = "http", version = "0.2" }
hyper = { version = "1" }
hyper-014 = { package = "hyper", version = "0.14" }
hyper-util = { version = "0.1", features = ["client-legacy"] }
hytra = { workspace = true }
itertools = { workspace = true }
parking_lot = { workspace = true }
Expand All @@ -32,13 +36,13 @@ serde = { version = "1", features = ["derive"] }
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio" }
tonic = { workspace = true }
tower-layer = "0.3.2"
tower-service = "0.3.2"
tracing = "0.1"
tracing-subscriber = "0.3.17"

[target.'cfg(not(madsim))'.dependencies]
http-body = "0.4.5"
tower-layer = "0.3.2"
tower-service = "0.3.2"
http-body = "1"
[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "0.16", default-features = false }
libc = "0.2"
Expand Down
Loading
Loading