From 41539f917b67f7701accdae9d981b8cb00cab260 Mon Sep 17 00:00:00 2001 From: zhyass Date: Wed, 16 Oct 2024 16:00:28 +0800 Subject: [PATCH] fix review commend --- src/query/service/src/locks/lock_holder.rs | 119 ++++++++++++++++- src/query/service/src/locks/lock_manager.rs | 134 +++----------------- 2 files changed, 133 insertions(+), 120 deletions(-) diff --git a/src/query/service/src/locks/lock_holder.rs b/src/query/service/src/locks/lock_holder.rs index d8d011586a62..475f936b0d9c 100644 --- a/src/query/service/src/locks/lock_holder.rs +++ b/src/query/service/src/locks/lock_holder.rs @@ -21,18 +21,28 @@ use std::time::Instant; use backoff::backoff::Backoff; use databend_common_base::base::tokio::sync::Notify; use databend_common_base::base::tokio::time::sleep; +use databend_common_base::base::tokio::time::timeout; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; use databend_common_catalog::catalog::Catalog; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_meta_api::kv_pb_api::KVPbApi; use databend_common_meta_app::schema::CreateLockRevReq; use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::ExtendLockRevReq; +use databend_common_meta_app::schema::ListLockRevReq; +use databend_common_meta_app::schema::TableLockIdent; +use databend_common_meta_kvapi::kvapi::Key; +use databend_common_meta_types::protobuf::watch_request::FilterType; +use databend_common_meta_types::protobuf::WatchRequest; +use databend_common_metrics::lock::record_acquired_lock_nums; use databend_common_metrics::lock::record_created_lock_nums; use databend_common_storages_fuse::operations::set_backoff; +use databend_common_users::UserApiProvider; use futures::future::select; use futures::future::Either; +use futures_util::StreamExt; use rand::thread_rng; use rand::Rng; @@ -46,13 +56,118 @@ pub struct LockHolder { impl LockHolder { #[async_backtrace::framed] - pub async fn start( + pub(crate) async fn try_acquire_lock( + self: &Arc, + catalog: Arc, + req: CreateLockRevReq, + should_retry: bool, + acquire_timeout: Duration, + ) -> Result { + let start = Instant::now(); + + let ttl = req.ttl; + + let lock_key = req.lock_key.clone(); + let lock_type = lock_key.lock_type().to_string(); + let table_id = lock_key.get_table_id(); + let tenant = lock_key.get_tenant(); + + let revision = self.start(catalog.clone(), req).await?; + + let meta_api = UserApiProvider::instance().get_meta_store_client(); + let list_table_lock_req = ListLockRevReq::new(lock_key.clone()); + + loop { + // List all revisions and check if the current is the minimum. + let mut rev_list = catalog + .list_lock_revisions(list_table_lock_req.clone()) + .await? + .into_iter() + .map(|(x, _)| x) + .collect::>(); + // list_lock_revisions are returned in big-endian order, + // we need to sort them in ascending numeric order. + rev_list.sort(); + let position = rev_list.iter().position(|x| *x == revision).ok_or_else(|| + // If the current is not found in list, it means that the current has been expired. + ErrorCode::TableLockExpired(format!( + "the acquired table lock with revision '{}' is not in {:?}, maybe expired(elapsed: {:?})", + revision, + rev_list, + start.elapsed(), + )))?; + + if position == 0 { + // The lock is acquired by current session. + let extend_table_lock_req = + ExtendLockRevReq::new(lock_key.clone(), revision, ttl, true); + + catalog.extend_lock_revision(extend_table_lock_req).await?; + // metrics. + record_acquired_lock_nums(lock_type, table_id, 1); + break; + } + + let elapsed = start.elapsed(); + // if no need retry, return error directly. + if !should_retry || elapsed >= acquire_timeout { + return Err(ErrorCode::TableAlreadyLocked(format!( + "table is locked by other session, please retry later(revision: {}, elapsed: {:?})", + revision, + start.elapsed() + ))); + } + + let watch_delete_ident = TableLockIdent::new(tenant, table_id, rev_list[position - 1]); + + // Get the previous revision, watch the delete event. + let req = WatchRequest { + key: watch_delete_ident.to_string_key(), + key_end: None, + filter_type: FilterType::Delete.into(), + }; + let mut watch_stream = meta_api.watch(req).await?; + + let lock_meta = meta_api.get_pb(&watch_delete_ident).await?; + if lock_meta.is_none() { + log::warn!( + "Lock revision '{}' already does not exist, skipping", + rev_list[position - 1] + ); + continue; + } + + // Add a timeout period for watch. + if let Err(_cause) = timeout(acquire_timeout.abs_diff(elapsed), async move { + while let Some(Ok(resp)) = watch_stream.next().await { + if let Some(event) = resp.event { + if event.current.is_none() { + break; + } + } + } + }) + .await + { + return Err(ErrorCode::TableAlreadyLocked(format!( + "table is locked by other session, please retry later(revision: {}, elapsed: {:?})", + revision, + start.elapsed() + ))); + } + } + + Ok(revision) + } + + #[async_backtrace::framed] + async fn start( self: &Arc, - query_id: String, catalog: Arc, req: CreateLockRevReq, ) -> Result { let lock_key = req.lock_key.clone(); + let query_id = req.query_id.clone(); let ttl = req.ttl; let sleep_range = (ttl / 3)..=(ttl * 2 / 3); diff --git a/src/query/service/src/locks/lock_manager.rs b/src/query/service/src/locks/lock_manager.rs index 81c520c81264..7bd00139c58d 100644 --- a/src/query/service/src/locks/lock_manager.rs +++ b/src/query/service/src/locks/lock_manager.rs @@ -15,34 +15,21 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use std::time::Instant; use databend_common_base::base::tokio::sync::mpsc; -use databend_common_base::base::tokio::time::timeout; use databend_common_base::base::GlobalInstance; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; use databend_common_catalog::lock::Lock; use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_meta_api::kv_pb_api::KVPbApi; use databend_common_meta_app::schema::CreateLockRevReq; -use databend_common_meta_app::schema::ExtendLockRevReq; -use databend_common_meta_app::schema::ListLockRevReq; use databend_common_meta_app::schema::LockKey; use databend_common_meta_app::schema::TableInfo; -use databend_common_meta_app::schema::TableLockIdent; -use databend_common_meta_kvapi::kvapi::Key; -use databend_common_meta_types::protobuf::watch_request::FilterType; -use databend_common_meta_types::protobuf::WatchRequest; use databend_common_metrics::lock::metrics_inc_shutdown_lock_holder_nums; use databend_common_metrics::lock::metrics_inc_start_lock_holder_nums; -use databend_common_metrics::lock::record_acquired_lock_nums; use databend_common_pipeline_core::LockGuard; use databend_common_pipeline_core::UnlockApi; -use databend_common_users::UserApiProvider; -use futures_util::StreamExt; use parking_lot::RwLock; use crate::locks::lock_holder::LockHolder; @@ -96,122 +83,33 @@ impl LockManager { catalog_name: &str, should_retry: bool, ) -> Result>> { - let start = Instant::now(); + let acquire_timeout = Duration::from_secs(ctx.get_settings().get_acquire_lock_timeout()?); - let lock_type = lock_key.lock_type().to_string(); - let table_id = lock_key.get_table_id(); - let tenant = lock_key.get_tenant(); - let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?; - let query_id = ctx.get_id(); + let ttl = Duration::from_secs(ctx.get_settings().get_table_lock_expire_secs()?); let req = CreateLockRevReq::new( - lock_key.clone(), + lock_key, ctx.get_current_user()?.name, // user ctx.get_cluster().local_id.clone(), // node - query_id.clone(), // query_id - Duration::from_secs(expire_secs), + ctx.get_id(), // query_id + ttl, ); let catalog = ctx.get_catalog(catalog_name).await?; let lock_holder = Arc::new(LockHolder::default()); - let revision = lock_holder.start(query_id, catalog.clone(), req).await?; - - self.insert_lock(revision, lock_holder); - let guard = LockGuard::new(self.clone(), revision); - - let acquire_lock_timeout = ctx.get_settings().get_acquire_lock_timeout()?; - let duration = Duration::from_secs(acquire_lock_timeout); - let meta_api = UserApiProvider::instance().get_meta_store_client(); - let list_table_lock_req = ListLockRevReq::new(lock_key.clone()); - - let mut is_locked = false; - loop { - // List all revisions and check if the current is the minimum. - let mut rev_list = catalog - .list_lock_revisions(list_table_lock_req.clone()) - .await? - .into_iter() - .map(|(x, _)| x) - .collect::>(); - // list_lock_revisions are returned in big-endian order, - // we need to sort them in ascending numeric order. - rev_list.sort(); - let position = rev_list.iter().position(|x| *x == revision).ok_or_else(|| - // If the current is not found in list, it means that the current has been expired. - ErrorCode::TableLockExpired(format!( - "the acquired table lock with revision '{}' is not in {:?}, maybe expired(elapsed: {:?})", - revision, - rev_list, - start.elapsed(), - )))?; - - if position == 0 { - // The lock is acquired by current session. - let extend_table_lock_req = ExtendLockRevReq::new( - lock_key.clone(), - revision, - Duration::from_secs(expire_secs), - true, - ); - - catalog.extend_lock_revision(extend_table_lock_req).await?; - // metrics. - record_acquired_lock_nums(lock_type, table_id, 1); - break; - } - - let elapsed = start.elapsed(); - // if no need retry, return error directly. - if !should_retry || elapsed >= duration { - is_locked = true; - break; - } - - let watch_delete_ident = TableLockIdent::new(tenant, table_id, rev_list[position - 1]); - - // Get the previous revision, watch the delete event. - let req = WatchRequest { - key: watch_delete_ident.to_string_key(), - key_end: None, - filter_type: FilterType::Delete.into(), - }; - let mut watch_stream = meta_api.watch(req).await?; - - let lock_meta = meta_api.get_pb(&watch_delete_ident).await?; - if lock_meta.is_none() { - log::warn!( - "Lock revision '{}' already does not exist, skipping", - rev_list[position - 1] - ); - continue; - } - - // Add a timeout period for watch. - if let Err(_cause) = timeout(duration.abs_diff(elapsed), async move { - while let Some(Ok(resp)) = watch_stream.next().await { - if let Some(event) = resp.event { - if event.current.is_none() { - break; - } - } - } - }) + match lock_holder + .try_acquire_lock(catalog, req, should_retry, acquire_timeout) .await - { - is_locked = true; - break; + { + Ok(revision) => { + self.insert_lock(revision, lock_holder); + let guard = LockGuard::new(self.clone(), revision); + Ok(Some(Arc::new(guard))) + } + Err(err) => { + lock_holder.shutdown(); + Err(err) } - } - - if is_locked { - drop(guard); - Err(ErrorCode::TableAlreadyLocked(format!( - "table is locked by other session, please retry later(revision: {}, elapsed: {:?})", - revision, - start.elapsed() - ))) - } else { - Ok(Some(Arc::new(guard))) } }