Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: deprecate: Operation::AsIs and will be removed #16913

Merged
merged 5 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/meta/api/src/background_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Instant;

use chrono::Utc;
use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::background::background_job_id_ident::BackgroundJobId;
Expand Down Expand Up @@ -40,14 +42,14 @@ use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::seq_value::SeqValue;
use databend_common_meta_types::MatchSeq::Any;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::UpsertKV;
use fastrace::func_name;
use futures::TryStreamExt;
use log::debug;
Expand Down Expand Up @@ -222,18 +224,18 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
let meta = req.task_info.clone();

let resp = self
.upsert_kv(UpsertKVReq::new(
.upsert_kv(UpsertKV::new(
name_key.to_string_key().as_str(),
Any,
Operation::Update(serialize_struct(&meta)?),
Some(MetaSpec::new_expire(req.expire_at)),
Some(MetaSpec::new_ttl(req.ttl)),
))
.await?;
// confirm a successful update
assert!(resp.is_changed());
Ok(UpdateBackgroundTaskReply {
last_updated: Utc::now(),
expire_at: req.expire_at,
expire_at: Instant::now() + req.ttl,
})
}

Expand Down
8 changes: 4 additions & 4 deletions src/meta/api/src/background_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use chrono::DateTime;
use chrono::Utc;
use databend_common_meta_app::background::BackgroundJobIdent;
Expand Down Expand Up @@ -127,13 +129,11 @@ impl BackgroundApiTestSuite {

info!("--- create a background task");
let create_on = Utc::now();
// expire after 5 secs
let expire_at = create_on + chrono::Duration::seconds(5);
{
let req = UpdateBackgroundTaskReq {
task_name: task_ident.clone(),
task_info: new_background_task(BackgroundTaskState::STARTED, create_on),
expire_at: expire_at.timestamp() as u64,
ttl: Duration::from_secs(5),
};

let res = mt.update_background_task(req).await;
Expand All @@ -155,7 +155,7 @@ impl BackgroundApiTestSuite {
let req = UpdateBackgroundTaskReq {
task_name: task_ident.clone(),
task_info: new_background_task(BackgroundTaskState::DONE, create_on),
expire_at: expire_at.timestamp() as u64,
ttl: Duration::from_secs(5),
};

let res = mt.update_background_task(req).await;
Expand Down
4 changes: 3 additions & 1 deletion src/meta/api/src/kv_pb_api/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ where T: FromToProto {
Ok(Operation::Update(buf))
}
Operation::Delete => Ok(Operation::Delete),
Operation::AsIs => Ok(Operation::AsIs),
_ => {
unreachable!("Operation::AsIs is not supported")
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,13 +516,13 @@ mod tests {
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::seq_value::SeqValue;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::TxnReply;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::UpsertKV;
use databend_common_proto_conv::FromToProto;
use futures::StreamExt;
use futures::TryStreamExt;
Expand All @@ -541,7 +541,7 @@ mod tests {
impl KVApi for FooKV {
type Error = MetaError;

async fn upsert_kv(&self, _req: UpsertKVReq) -> Result<UpsertKVReply, Self::Error> {
async fn upsert_kv(&self, _req: UpsertKV) -> Result<UpsertKVReply, Self::Error> {
unimplemented!()
}

Expand Down
4 changes: 0 additions & 4 deletions src/meta/api/src/kv_pb_api/upsert_pb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ impl<K: kvapi::Key> UpsertPB<K> {
}
}

pub fn with_expire_sec(self, expire_at_sec: u64) -> Self {
self.with(MetaSpec::new_expire(expire_at_sec))
}

/// Set the time to last for the value.
/// When the ttl is passed, the value is deleted.
pub fn with_ttl(self, ttl: Duration) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/api/src/name_id_value_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,12 +388,12 @@ mod tests {
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::TxnReply;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::UpsertKV;
use databend_common_proto_conv::FromToProto;
use futures::StreamExt;
use prost::Message;
Expand All @@ -408,7 +408,7 @@ mod tests {
impl KVApi for Foo {
type Error = MetaError;

async fn upsert_kv(&self, _req: UpsertKVReq) -> Result<UpsertKVReply, Self::Error> {
async fn upsert_kv(&self, _req: UpsertKV) -> Result<UpsertKVReply, Self::Error> {
unimplemented!()
}

Expand Down
5 changes: 2 additions & 3 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ use databend_common_meta_app::tenant::ToTenant;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::Operation;
Expand Down Expand Up @@ -238,7 +237,7 @@ async fn upsert_test_data(
value: Vec<u8>,
) -> Result<u64, KVAppError> {
let res = kv_api
.upsert_kv(UpsertKVReq {
.upsert_kv(UpsertKV {
key: key.to_string_key(),
seq: MatchSeq::GE(0),
value: Operation::Update(value),
Expand All @@ -255,7 +254,7 @@ async fn delete_test_data(
key: &impl kvapi::Key,
) -> Result<(), KVAppError> {
let _res = kv_api
.upsert_kv(UpsertKVReq {
.upsert_kv(UpsertKV {
key: key.to_string_key(),
seq: MatchSeq::GE(0),
value: Operation::Delete,
Expand Down
4 changes: 2 additions & 2 deletions src/meta/api/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use databend_common_meta_app::primitive::Id;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::TableNameIdent;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::txn_condition::Target;
use databend_common_meta_types::ConditionResult;
Expand All @@ -40,6 +39,7 @@ use databend_common_meta_types::TxnGetResponse;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::TxnOpResponse;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::UpsertKV;
use databend_common_proto_conv::FromToProto;
use log::debug;

Expand Down Expand Up @@ -200,7 +200,7 @@ pub async fn fetch_id<T: kvapi::Key>(
generator: T,
) -> Result<u64, MetaError> {
let res = kv_api
.upsert_kv(UpsertKVReq {
.upsert_kv(UpsertKV {
key: generator.to_string_key(),
seq: MatchSeq::GE(0),
value: Operation::Update(b"".to_vec()),
Expand Down
5 changes: 3 additions & 2 deletions src/meta/app/src/background/background_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::fmt;
use std::fmt::Display;
use std::fmt::Formatter;
use std::time::Duration;
use std::time::Instant;

use chrono::DateTime;
use chrono::Utc;
Expand Down Expand Up @@ -156,7 +157,7 @@ impl BackgroundTaskInfo {
pub struct UpdateBackgroundTaskReq {
pub task_name: BackgroundTaskIdent,
pub task_info: BackgroundTaskInfo,
pub expire_at: u64,
pub ttl: Duration,
}

impl Display for UpdateBackgroundTaskReq {
Expand All @@ -176,7 +177,7 @@ impl Display for UpdateBackgroundTaskReq {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UpdateBackgroundTaskReply {
pub last_updated: DateTime<Utc>,
pub expire_at: u64,
pub expire_at: Instant,
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down
8 changes: 4 additions & 4 deletions src/meta/binaries/meta/kvapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
use std::sync::Arc;

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::UpsertKV;
use databend_common_meta_types::With;
use databend_meta::configs::Config;

pub enum KvApiCommand {
Get(String),
Upsert(UpsertKVReq),
Upsert(UpsertKV),
MGet(Vec<String>),
List(String),
}
Expand All @@ -36,7 +36,7 @@ impl KvApiCommand {
return Err("The number of keys must be 1".to_string());
}

let req = UpsertKVReq::update(config.key[0].as_str(), config.value.as_bytes());
let req = UpsertKV::update(config.key[0].as_str(), config.value.as_bytes());

let req = if let Some(expire_after) = config.expire_after {
req.with(MetaSpec::new_ttl(std::time::Duration::from_secs(
Expand All @@ -52,7 +52,7 @@ impl KvApiCommand {
if config.key.len() != 1 {
return Err("The number of keys must be 1".to_string());
}
Self::Upsert(UpsertKVReq::delete(&config.key[0]))
Self::Upsert(UpsertKV::delete(&config.key[0]))
}
"get" => {
if config.key.len() != 1 {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/binaries/metabench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_client::ClientHandle;
use databend_common_meta_client::MetaGrpcClient;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::Operation;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::UpsertKV;
use databend_common_tracing::init_logging;
use databend_common_tracing::FileConfig;
use databend_common_tracing::StderrConfig;
Expand Down Expand Up @@ -171,7 +171,7 @@ async fn benchmark_upsert(client: &Arc<ClientHandle>, prefix: u64, client_num: u
let value = Operation::Update(node_key().as_bytes().to_vec());

let res = client
.upsert_kv(UpsertKVReq::new(node_key(), seq, value, None))
.upsert_kv(UpsertKV::new(node_key(), seq, value, None))
.await;

print_res(i, "upsert_kv", &res);
Expand Down
6 changes: 3 additions & 3 deletions src/meta/binaries/metaverifier/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use databend_common_exception::Result;
use databend_common_meta_client::ClientHandle;
use databend_common_meta_client::MetaGrpcClient;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::Operation;
use databend_common_meta_types::UpsertKV;
use databend_common_tracing::init_logging;
use databend_common_tracing::FileConfig;
use databend_common_tracing::StderrConfig;
Expand Down Expand Up @@ -229,7 +229,7 @@ async fn verifier(
let value = Operation::Update(node_key.as_bytes().to_vec());

let _res = client
.upsert_kv(UpsertKVReq::new(&node_key, seq, value, None))
.upsert_kv(UpsertKV::new(&node_key, seq, value, None))
.await?;

let n: u64 = rng.gen_range(0..=100);
Expand All @@ -238,7 +238,7 @@ async fn verifier(
let value = Operation::Delete;

let _res = client
.upsert_kv(UpsertKVReq::new(&node_key, seq, value, None))
.upsert_kv(UpsertKV::new(&node_key, seq, value, None))
.await?;
} else {
kv.insert(node_key);
Expand Down
6 changes: 3 additions & 3 deletions src/meta/client/src/grpc_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use databend_common_meta_kvapi::kvapi::ListKVReq;
use databend_common_meta_kvapi::kvapi::MGetKVReply;
use databend_common_meta_kvapi::kvapi::MGetKVReq;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf::ClientInfo;
use databend_common_meta_types::protobuf::ClusterStatus;
use databend_common_meta_types::protobuf::RaftRequest;
Expand All @@ -33,6 +32,7 @@ use databend_common_meta_types::protobuf::WatchResponse;
use databend_common_meta_types::InvalidArgument;
use databend_common_meta_types::TxnReply;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::UpsertKV;
use log::debug;
use tonic::codegen::BoxStream;
use tonic::Request;
Expand All @@ -52,7 +52,7 @@ pub trait RequestFor: Clone + fmt::Debug {

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, derive_more::From)]
pub enum MetaGrpcReq {
UpsertKV(UpsertKVReq),
UpsertKV(UpsertKV),
}

impl TryInto<MetaGrpcReq> for Request<RaftRequest> {
Expand Down Expand Up @@ -177,7 +177,7 @@ impl RequestFor for Streamed<ListKVReq> {
type Reply = BoxStream<StreamItem>;
}

impl RequestFor for UpsertKVReq {
impl RequestFor for UpsertKV {
type Reply = UpsertKVReply;
}

Expand Down
4 changes: 2 additions & 2 deletions src/meta/client/src/kv_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::ListKVReq;
use databend_common_meta_kvapi::kvapi::MGetKVReq;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::TxnReply;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::UpsertKV;
use futures::StreamExt;
use futures::TryStreamExt;

Expand All @@ -32,7 +32,7 @@ impl kvapi::KVApi for ClientHandle {
type Error = MetaError;

#[fastrace::trace]
async fn upsert_kv(&self, act: UpsertKVReq) -> Result<UpsertKVReply, Self::Error> {
async fn upsert_kv(&self, act: UpsertKV) -> Result<UpsertKVReply, Self::Error> {
let reply = self.request(act).await?;
Ok(reply)
}
Expand Down
3 changes: 3 additions & 0 deletions src/meta/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
/// require the client to call kv_read_v1 for get/mget/list,
/// which is added `2024-01-07: since 1.2.287`
///
/// - 2024-11-2*: since 1.2.6**
/// 👥 client: remove use of `Operation::AsIs`
///
/// Server feature set:
/// ```yaml
/// server_features:
Expand Down
Loading
Loading