diff --git a/Cargo.lock b/Cargo.lock index e91d4d9ca32f4..61c501affe8ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4979,6 +4979,7 @@ dependencies = [ "temp-env", "tempfile", "test-harness", + "tokio", "tokio-stream", "tonic", "tonic-reflection", diff --git a/src/meta/raft-store/src/state_machine_api_ext.rs b/src/meta/raft-store/src/state_machine_api_ext.rs index 359af9a21681a..5da2041106025 100644 --- a/src/meta/raft-store/src/state_machine_api_ext.rs +++ b/src/meta/raft-store/src/state_machine_api_ext.rs @@ -107,11 +107,6 @@ pub trait StateMachineApiExt: StateMachineApi { future::ready(Ok(res)) }); - // Make it static - - let vs = strm.collect::>().await; - let strm = futures::stream::iter(vs); - Ok(strm.boxed()) } diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml index 2ea2d38a0f466..f10704003d81f 100644 --- a/src/meta/service/Cargo.toml +++ b/src/meta/service/Cargo.toml @@ -59,6 +59,7 @@ semver = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serfig = { workspace = true } +tokio = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } tonic-reflection = { workspace = true } diff --git a/src/meta/service/src/meta_service/meta_leader.rs b/src/meta/service/src/meta_service/meta_leader.rs index 9dfce821c98cc..719401559a436 100644 --- a/src/meta/service/src/meta_service/meta_leader.rs +++ b/src/meta/service/src/meta_service/meta_leader.rs @@ -36,11 +36,13 @@ use databend_common_meta_types::MetaOperationError; use databend_common_meta_types::Node; use databend_common_metrics::count::Count; use futures::StreamExt; +use futures::TryStreamExt; use log::debug; use log::info; use maplit::btreemap; use maplit::btreeset; use tonic::codegen::BoxStream; +use tonic::Status; use crate::message::ForwardRequest; use crate::message::ForwardRequestBody; @@ -147,12 +149,14 @@ impl<'a> Handler for MetaLeader<'a> { } MetaGrpcReadReq::ListKV(req) => { - // safe unwrap(): Infallible - let kvs = kv_api.prefix_list_kv(&req.prefix).await.unwrap(); - - let kv_iter = kvs.into_iter().map(|kv| Ok(StreamItem::from(kv))); - - let strm = futures::stream::iter(kv_iter); + let strm = + kv_api.list_kv(&req.prefix).await.map_err(|e| { + MetaOperationError::DataError(MetaDataError::ReadError( + MetaDataReadError::new("list_kv", &req.prefix, &e), + )) + })?; + + let strm = strm.map_err(|e| Status::internal(e.to_string())); Ok(strm.boxed()) }