Skip to content

Commit

Permalink
chore: drop guard when acquire lock failed (#16616)
Browse files Browse the repository at this point in the history
* chore: drop guard when acquire lock failed

* add log

* fix review commend

* fix
  • Loading branch information
zhyass authored Oct 17, 2024
1 parent a37384b commit 6f70937
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 230 deletions.
127 changes: 124 additions & 3 deletions src/query/service/src/locks/lock_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,29 @@ use std::time::Instant;

use backoff::backoff::Backoff;
use databend_common_base::base::tokio::time::sleep;
use databend_common_base::base::tokio::time::timeout;
use databend_common_base::base::WatchNotify;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::catalog::Catalog;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_api::kv_pb_api::KVPbApi;
use databend_common_meta_app::schema::CreateLockRevReq;
use databend_common_meta_app::schema::DeleteLockRevReq;
use databend_common_meta_app::schema::ExtendLockRevReq;
use databend_common_meta_app::schema::ListLockRevReq;
use databend_common_meta_app::schema::TableLockIdent;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::protobuf::watch_request::FilterType;
use databend_common_meta_types::protobuf::WatchRequest;
use databend_common_metrics::lock::record_acquired_lock_nums;
use databend_common_metrics::lock::record_created_lock_nums;
use databend_common_storages_fuse::operations::set_backoff;
use databend_common_users::UserApiProvider;
use futures::future::select;
use futures::future::Either;
use futures_util::StreamExt;
use rand::thread_rng;
use rand::Rng;

Expand All @@ -46,13 +56,120 @@ pub struct LockHolder {

impl LockHolder {
#[async_backtrace::framed]
pub async fn start(
pub(crate) async fn try_acquire_lock(
self: &Arc<Self>,
catalog: Arc<dyn Catalog>,
req: CreateLockRevReq,
should_retry: bool,
acquire_timeout: Duration,
) -> Result<u64> {
let start = Instant::now();

let ttl = req.ttl;

let lock_key = req.lock_key.clone();
let lock_type = lock_key.lock_type().to_string();
let table_id = lock_key.get_table_id();
let tenant = lock_key.get_tenant();

let revision = self.start(catalog.clone(), req).await?;

let meta_api = UserApiProvider::instance().get_meta_store_client();
let list_table_lock_req = ListLockRevReq::new(lock_key.clone());

loop {
// List all revisions and check if the current is the minimum.
let mut rev_list = catalog
.list_lock_revisions(list_table_lock_req.clone())
.await?
.into_iter()
.map(|(x, _)| x)
.collect::<Vec<_>>();
// list_lock_revisions are returned in big-endian order,
// we need to sort them in ascending numeric order.
rev_list.sort();
let position = rev_list.iter().position(|x| *x == revision).ok_or_else(||
// If the current is not found in list, it means that the current has been expired.
ErrorCode::TableLockExpired(format!(
"The acquired table lock with revision '{}' maybe expired(elapsed: {:?})",
revision,
start.elapsed(),
)))?;

if position == 0 {
// The lock is acquired by current session.
let extend_table_lock_req =
ExtendLockRevReq::new(lock_key.clone(), revision, ttl, true);

catalog.extend_lock_revision(extend_table_lock_req).await?;
// metrics.
record_acquired_lock_nums(lock_type, table_id, 1);
break;
}

let prev_revision = rev_list[position - 1];
let elapsed = start.elapsed();
// if no need retry, return error directly.
if !should_retry || elapsed >= acquire_timeout {
return Err(ErrorCode::TableAlreadyLocked(format!(
"Table is locked by other session(rev: {}, prev: {}, elapsed: {:?})",
revision,
prev_revision,
start.elapsed()
)));
}

let watch_delete_ident = TableLockIdent::new(tenant, table_id, prev_revision);

// Get the previous revision, watch the delete event.
let req = WatchRequest {
key: watch_delete_ident.to_string_key(),
key_end: None,
filter_type: FilterType::Delete.into(),
};
let mut watch_stream = meta_api.watch(req).await?;

let lock_meta = meta_api.get_pb(&watch_delete_ident).await?;
if lock_meta.is_none() {
log::warn!(
"Lock revision '{}' already does not exist, skipping",
prev_revision
);
continue;
}

// Add a timeout period for watch.
if let Err(_cause) = timeout(acquire_timeout.abs_diff(elapsed), async move {
while let Some(Ok(resp)) = watch_stream.next().await {
if let Some(event) = resp.event {
if event.current.is_none() {
break;
}
}
}
})
.await
{
return Err(ErrorCode::TableAlreadyLocked(format!(
"Table is locked by other session(rev: {}, prev: {}, elapsed: {:?})",
revision,
prev_revision,
start.elapsed()
)));
}
}

Ok(revision)
}

#[async_backtrace::framed]
async fn start(
self: &Arc<Self>,
query_id: String,
catalog: Arc<dyn Catalog>,
req: CreateLockRevReq,
) -> Result<u64> {
let lock_key = req.lock_key.clone();
let query_id = req.query_id.clone();
let ttl = req.ttl;
let sleep_range = (ttl / 3)..=(ttl * 2 / 3);

Expand All @@ -61,6 +178,7 @@ impl LockHolder {
let revision = res.revision;
// metrics.
record_created_lock_nums(lock_key.lock_type().to_string(), lock_key.get_table_id(), 1);
log::debug!("create table lock success, revision={}", revision);

let delete_table_lock_req = DeleteLockRevReq::new(lock_key.clone(), revision);
let extend_table_lock_req = ExtendLockRevReq::new(lock_key.clone(), revision, ttl, false);
Expand Down Expand Up @@ -179,7 +297,10 @@ impl LockHolder {
let mut backoff = set_backoff(Some(Duration::from_millis(2)), None, max_retry_elapsed);
loop {
match catalog.delete_lock_revision(req.clone()).await {
Ok(_) => break,
Ok(_) => {
log::debug!("delete table lock success, revision={}", req.revision);
break;
}
Err(e) => match backoff.next_backoff() {
Some(duration) => {
log::debug!(
Expand Down
141 changes: 16 additions & 125 deletions src/query/service/src/locks/lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,21 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use databend_common_base::base::tokio::sync::mpsc;
use databend_common_base::base::tokio::time::timeout;
use databend_common_base::base::GlobalInstance;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::lock::Lock;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_api::kv_pb_api::KVPbApi;
use databend_common_meta_app::schema::CreateLockRevReq;
use databend_common_meta_app::schema::DeleteLockRevReq;
use databend_common_meta_app::schema::ExtendLockRevReq;
use databend_common_meta_app::schema::ListLockRevReq;
use databend_common_meta_app::schema::LockKey;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableLockIdent;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::protobuf::watch_request::FilterType;
use databend_common_meta_types::protobuf::WatchRequest;
use databend_common_metrics::lock::metrics_inc_shutdown_lock_holder_nums;
use databend_common_metrics::lock::metrics_inc_start_lock_holder_nums;
use databend_common_metrics::lock::record_acquired_lock_nums;
use databend_common_pipeline_core::LockGuard;
use databend_common_pipeline_core::UnlockApi;
use databend_common_users::UserApiProvider;
use futures_util::StreamExt;
use parking_lot::RwLock;

use crate::locks::lock_holder::LockHolder;
Expand Down Expand Up @@ -97,129 +83,34 @@ impl LockManager {
catalog_name: &str,
should_retry: bool,
) -> Result<Option<Arc<LockGuard>>> {
let start = Instant::now();
let acquire_timeout = Duration::from_secs(ctx.get_settings().get_acquire_lock_timeout()?);

let lock_type = lock_key.lock_type().to_string();
let table_id = lock_key.get_table_id();
let tenant = lock_key.get_tenant();
let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?;
let query_id = ctx.get_id();
let ttl = Duration::from_secs(ctx.get_settings().get_table_lock_expire_secs()?);
let req = CreateLockRevReq::new(
lock_key.clone(),
lock_key,
ctx.get_current_user()?.name, // user
ctx.get_cluster().local_id.clone(), // node
query_id.clone(), // query_id
Duration::from_secs(expire_secs),
ctx.get_id(), // query_id
ttl,
);

let catalog = ctx.get_catalog(catalog_name).await?;

let lock_holder = Arc::new(LockHolder::default());
let revision = lock_holder.start(query_id, catalog.clone(), req).await?;

self.insert_lock(revision, lock_holder);
let guard = LockGuard::new(self.clone(), revision);

let acquire_lock_timeout = ctx.get_settings().get_acquire_lock_timeout()?;
let duration = Duration::from_secs(acquire_lock_timeout);
let meta_api = UserApiProvider::instance().get_meta_store_client();

let list_table_lock_req = ListLockRevReq::new(lock_key.clone());

let delete_table_lock_req = DeleteLockRevReq::new(lock_key.clone(), revision);

loop {
// List all revisions and check if the current is the minimum.
let mut rev_list = catalog
.list_lock_revisions(list_table_lock_req.clone())
.await?
.into_iter()
.map(|(x, _)| x)
.collect::<Vec<_>>();
// list_lock_revisions are returned in big-endian order,
// we need to sort them in ascending numeric order.
rev_list.sort();
let position = rev_list.iter().position(|x| *x == revision).ok_or_else(||
// If the current is not found in list, it means that the current has been expired.
ErrorCode::TableLockExpired(format!(
"the acquired table lock with revision '{}' is not in {:?}, maybe expired(elapsed: {:?})",
revision,
rev_list,
start.elapsed(),
)))?;

if position == 0 {
// The lock is acquired by current session.
let extend_table_lock_req = ExtendLockRevReq::new(
lock_key.clone(),
revision,
Duration::from_secs(expire_secs),
true,
);

catalog.extend_lock_revision(extend_table_lock_req).await?;
// metrics.
record_acquired_lock_nums(lock_type, table_id, 1);
break;
match lock_holder
.try_acquire_lock(catalog, req, should_retry, acquire_timeout)
.await
{
Ok(revision) => {
self.insert_lock(revision, lock_holder);
let guard = LockGuard::new(self.clone(), revision);
Ok(Some(Arc::new(guard)))
}

let elapsed = start.elapsed();
// if no need retry, return error directly.
if !should_retry || elapsed >= duration {
catalog
.delete_lock_revision(delete_table_lock_req.clone())
.await?;
return Err(ErrorCode::TableAlreadyLocked(format!(
"table is locked by other session, please retry later(elapsed: {:?})",
elapsed
)));
Err(err) => {
lock_holder.shutdown();
Err(err)
}

let watch_delete_ident = TableLockIdent::new(tenant, table_id, rev_list[position - 1]);

// Get the previous revision, watch the delete event.
let req = WatchRequest {
key: watch_delete_ident.to_string_key(),
key_end: None,
filter_type: FilterType::Delete.into(),
};
let mut watch_stream = meta_api.watch(req).await?;

let lock_meta = meta_api.get_pb(&watch_delete_ident).await?;
if lock_meta.is_none() {
log::warn!(
"Lock revision '{}' already does not exist, skipping",
rev_list[position - 1]
);
continue;
}

// Add a timeout period for watch.
match timeout(duration.abs_diff(elapsed), async move {
while let Some(Ok(resp)) = watch_stream.next().await {
if let Some(event) = resp.event {
if event.current.is_none() {
break;
}
}
}
})
.await
{
Ok(_) => Ok(()),
Err(_) => {
catalog
.delete_lock_revision(delete_table_lock_req.clone())
.await?;
Err(ErrorCode::TableAlreadyLocked(format!(
"table is locked by other session, please retry later(elapsed: {:?})",
start.elapsed()
)))
}
}?;
}

Ok(Some(Arc::new(guard)))
}

fn insert_lock(&self, revision: u64, lock_holder: Arc<LockHolder>) {
Expand Down
Loading

0 comments on commit 6f70937

Please sign in to comment.