Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: drop guard when acquire lock failed #16616

Merged
merged 5 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 124 additions & 3 deletions src/query/service/src/locks/lock_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,29 @@ use std::time::Instant;

use backoff::backoff::Backoff;
use databend_common_base::base::tokio::time::sleep;
use databend_common_base::base::tokio::time::timeout;
use databend_common_base::base::WatchNotify;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::catalog::Catalog;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_api::kv_pb_api::KVPbApi;
use databend_common_meta_app::schema::CreateLockRevReq;
use databend_common_meta_app::schema::DeleteLockRevReq;
use databend_common_meta_app::schema::ExtendLockRevReq;
use databend_common_meta_app::schema::ListLockRevReq;
use databend_common_meta_app::schema::TableLockIdent;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::protobuf::watch_request::FilterType;
use databend_common_meta_types::protobuf::WatchRequest;
use databend_common_metrics::lock::record_acquired_lock_nums;
use databend_common_metrics::lock::record_created_lock_nums;
use databend_common_storages_fuse::operations::set_backoff;
use databend_common_users::UserApiProvider;
use futures::future::select;
use futures::future::Either;
use futures_util::StreamExt;
use rand::thread_rng;
use rand::Rng;

Expand All @@ -46,13 +56,120 @@ pub struct LockHolder {

impl LockHolder {
#[async_backtrace::framed]
pub async fn start(
pub(crate) async fn try_acquire_lock(
self: &Arc<Self>,
catalog: Arc<dyn Catalog>,
req: CreateLockRevReq,
should_retry: bool,
acquire_timeout: Duration,
) -> Result<u64> {
let start = Instant::now();

let ttl = req.ttl;

let lock_key = req.lock_key.clone();
let lock_type = lock_key.lock_type().to_string();
let table_id = lock_key.get_table_id();
let tenant = lock_key.get_tenant();

let revision = self.start(catalog.clone(), req).await?;

let meta_api = UserApiProvider::instance().get_meta_store_client();
let list_table_lock_req = ListLockRevReq::new(lock_key.clone());

loop {
// List all revisions and check if the current is the minimum.
let mut rev_list = catalog
.list_lock_revisions(list_table_lock_req.clone())
.await?
.into_iter()
.map(|(x, _)| x)
.collect::<Vec<_>>();
// list_lock_revisions are returned in big-endian order,
// we need to sort them in ascending numeric order.
rev_list.sort();
let position = rev_list.iter().position(|x| *x == revision).ok_or_else(||
// If the current is not found in list, it means that the current has been expired.
ErrorCode::TableLockExpired(format!(
"The acquired table lock with revision '{}' maybe expired(elapsed: {:?})",
revision,
start.elapsed(),
)))?;

if position == 0 {
// The lock is acquired by current session.
let extend_table_lock_req =
ExtendLockRevReq::new(lock_key.clone(), revision, ttl, true);

catalog.extend_lock_revision(extend_table_lock_req).await?;
// metrics.
record_acquired_lock_nums(lock_type, table_id, 1);
break;
}

let prev_revision = rev_list[position - 1];
let elapsed = start.elapsed();
// if no need retry, return error directly.
if !should_retry || elapsed >= acquire_timeout {
return Err(ErrorCode::TableAlreadyLocked(format!(
"Table is locked by other session(rev: {}, prev: {}, elapsed: {:?})",
revision,
prev_revision,
start.elapsed()
)));
}

let watch_delete_ident = TableLockIdent::new(tenant, table_id, prev_revision);

// Get the previous revision, watch the delete event.
let req = WatchRequest {
key: watch_delete_ident.to_string_key(),
key_end: None,
filter_type: FilterType::Delete.into(),
};
let mut watch_stream = meta_api.watch(req).await?;

let lock_meta = meta_api.get_pb(&watch_delete_ident).await?;
if lock_meta.is_none() {
log::warn!(
"Lock revision '{}' already does not exist, skipping",
prev_revision
);
continue;
}

// Add a timeout period for watch.
if let Err(_cause) = timeout(acquire_timeout.abs_diff(elapsed), async move {
while let Some(Ok(resp)) = watch_stream.next().await {
if let Some(event) = resp.event {
if event.current.is_none() {
break;
}
}
}
})
.await
{
return Err(ErrorCode::TableAlreadyLocked(format!(
"Table is locked by other session(rev: {}, prev: {}, elapsed: {:?})",
revision,
prev_revision,
start.elapsed()
)));
}
}

Ok(revision)
}

#[async_backtrace::framed]
async fn start(
self: &Arc<Self>,
query_id: String,
catalog: Arc<dyn Catalog>,
req: CreateLockRevReq,
) -> Result<u64> {
let lock_key = req.lock_key.clone();
let query_id = req.query_id.clone();
let ttl = req.ttl;
let sleep_range = (ttl / 3)..=(ttl * 2 / 3);

Expand All @@ -61,6 +178,7 @@ impl LockHolder {
let revision = res.revision;
// metrics.
record_created_lock_nums(lock_key.lock_type().to_string(), lock_key.get_table_id(), 1);
log::debug!("create table lock success, revision={}", revision);

let delete_table_lock_req = DeleteLockRevReq::new(lock_key.clone(), revision);
let extend_table_lock_req = ExtendLockRevReq::new(lock_key.clone(), revision, ttl, false);
Expand Down Expand Up @@ -179,7 +297,10 @@ impl LockHolder {
let mut backoff = set_backoff(Some(Duration::from_millis(2)), None, max_retry_elapsed);
loop {
match catalog.delete_lock_revision(req.clone()).await {
Ok(_) => break,
Ok(_) => {
log::debug!("delete table lock success, revision={}", req.revision);
break;
}
Err(e) => match backoff.next_backoff() {
Some(duration) => {
log::debug!(
Expand Down
141 changes: 16 additions & 125 deletions src/query/service/src/locks/lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,21 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use databend_common_base::base::tokio::sync::mpsc;
use databend_common_base::base::tokio::time::timeout;
use databend_common_base::base::GlobalInstance;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::lock::Lock;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_api::kv_pb_api::KVPbApi;
use databend_common_meta_app::schema::CreateLockRevReq;
use databend_common_meta_app::schema::DeleteLockRevReq;
use databend_common_meta_app::schema::ExtendLockRevReq;
use databend_common_meta_app::schema::ListLockRevReq;
use databend_common_meta_app::schema::LockKey;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableLockIdent;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::protobuf::watch_request::FilterType;
use databend_common_meta_types::protobuf::WatchRequest;
use databend_common_metrics::lock::metrics_inc_shutdown_lock_holder_nums;
use databend_common_metrics::lock::metrics_inc_start_lock_holder_nums;
use databend_common_metrics::lock::record_acquired_lock_nums;
use databend_common_pipeline_core::LockGuard;
use databend_common_pipeline_core::UnlockApi;
use databend_common_users::UserApiProvider;
use futures_util::StreamExt;
use parking_lot::RwLock;

use crate::locks::lock_holder::LockHolder;
Expand Down Expand Up @@ -97,129 +83,34 @@ impl LockManager {
catalog_name: &str,
should_retry: bool,
) -> Result<Option<Arc<LockGuard>>> {
let start = Instant::now();
let acquire_timeout = Duration::from_secs(ctx.get_settings().get_acquire_lock_timeout()?);

let lock_type = lock_key.lock_type().to_string();
let table_id = lock_key.get_table_id();
let tenant = lock_key.get_tenant();
let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?;
let query_id = ctx.get_id();
let ttl = Duration::from_secs(ctx.get_settings().get_table_lock_expire_secs()?);
let req = CreateLockRevReq::new(
lock_key.clone(),
lock_key,
ctx.get_current_user()?.name, // user
ctx.get_cluster().local_id.clone(), // node
query_id.clone(), // query_id
Duration::from_secs(expire_secs),
ctx.get_id(), // query_id
ttl,
);

let catalog = ctx.get_catalog(catalog_name).await?;

let lock_holder = Arc::new(LockHolder::default());
let revision = lock_holder.start(query_id, catalog.clone(), req).await?;

self.insert_lock(revision, lock_holder);
let guard = LockGuard::new(self.clone(), revision);

let acquire_lock_timeout = ctx.get_settings().get_acquire_lock_timeout()?;
let duration = Duration::from_secs(acquire_lock_timeout);
let meta_api = UserApiProvider::instance().get_meta_store_client();

let list_table_lock_req = ListLockRevReq::new(lock_key.clone());

let delete_table_lock_req = DeleteLockRevReq::new(lock_key.clone(), revision);

loop {
// List all revisions and check if the current is the minimum.
let mut rev_list = catalog
.list_lock_revisions(list_table_lock_req.clone())
.await?
.into_iter()
.map(|(x, _)| x)
.collect::<Vec<_>>();
// list_lock_revisions are returned in big-endian order,
// we need to sort them in ascending numeric order.
rev_list.sort();
let position = rev_list.iter().position(|x| *x == revision).ok_or_else(||
// If the current is not found in list, it means that the current has been expired.
ErrorCode::TableLockExpired(format!(
"the acquired table lock with revision '{}' is not in {:?}, maybe expired(elapsed: {:?})",
revision,
rev_list,
start.elapsed(),
)))?;

if position == 0 {
// The lock is acquired by current session.
let extend_table_lock_req = ExtendLockRevReq::new(
lock_key.clone(),
revision,
Duration::from_secs(expire_secs),
true,
);

catalog.extend_lock_revision(extend_table_lock_req).await?;
// metrics.
record_acquired_lock_nums(lock_type, table_id, 1);
break;
match lock_holder
.try_acquire_lock(catalog, req, should_retry, acquire_timeout)
.await
{
Ok(revision) => {
self.insert_lock(revision, lock_holder);
let guard = LockGuard::new(self.clone(), revision);
Ok(Some(Arc::new(guard)))
}

let elapsed = start.elapsed();
// if no need retry, return error directly.
if !should_retry || elapsed >= duration {
catalog
.delete_lock_revision(delete_table_lock_req.clone())
.await?;
return Err(ErrorCode::TableAlreadyLocked(format!(
"table is locked by other session, please retry later(elapsed: {:?})",
elapsed
)));
Err(err) => {
lock_holder.shutdown();
Err(err)
}

let watch_delete_ident = TableLockIdent::new(tenant, table_id, rev_list[position - 1]);

// Get the previous revision, watch the delete event.
let req = WatchRequest {
key: watch_delete_ident.to_string_key(),
key_end: None,
filter_type: FilterType::Delete.into(),
};
let mut watch_stream = meta_api.watch(req).await?;

let lock_meta = meta_api.get_pb(&watch_delete_ident).await?;
if lock_meta.is_none() {
log::warn!(
"Lock revision '{}' already does not exist, skipping",
rev_list[position - 1]
);
continue;
}

// Add a timeout period for watch.
match timeout(duration.abs_diff(elapsed), async move {
while let Some(Ok(resp)) = watch_stream.next().await {
if let Some(event) = resp.event {
if event.current.is_none() {
break;
}
}
}
})
.await
{
Ok(_) => Ok(()),
Err(_) => {
catalog
.delete_lock_revision(delete_table_lock_req.clone())
.await?;
Err(ErrorCode::TableAlreadyLocked(format!(
"table is locked by other session, please retry later(elapsed: {:?})",
start.elapsed()
)))
}
}?;
}

Ok(Some(Arc::new(guard)))
}

fn insert_lock(&self, revision: u64, lock_holder: Arc<LockHolder>) {
Expand Down
Loading
Loading