Skip to content

Commit

Permalink
fix review commend
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Oct 16, 2024
1 parent ccd95fa commit 41539f9
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 120 deletions.
119 changes: 117 additions & 2 deletions src/query/service/src/locks/lock_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,28 @@ use std::time::Instant;
use backoff::backoff::Backoff;
use databend_common_base::base::tokio::sync::Notify;
use databend_common_base::base::tokio::time::sleep;
use databend_common_base::base::tokio::time::timeout;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::catalog::Catalog;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_api::kv_pb_api::KVPbApi;
use databend_common_meta_app::schema::CreateLockRevReq;
use databend_common_meta_app::schema::DeleteLockRevReq;
use databend_common_meta_app::schema::ExtendLockRevReq;
use databend_common_meta_app::schema::ListLockRevReq;
use databend_common_meta_app::schema::TableLockIdent;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::protobuf::watch_request::FilterType;
use databend_common_meta_types::protobuf::WatchRequest;
use databend_common_metrics::lock::record_acquired_lock_nums;
use databend_common_metrics::lock::record_created_lock_nums;
use databend_common_storages_fuse::operations::set_backoff;
use databend_common_users::UserApiProvider;
use futures::future::select;
use futures::future::Either;
use futures_util::StreamExt;
use rand::thread_rng;
use rand::Rng;

Expand All @@ -46,13 +56,118 @@ pub struct LockHolder {

impl LockHolder {
#[async_backtrace::framed]
pub async fn start(
pub(crate) async fn try_acquire_lock(
self: &Arc<Self>,
catalog: Arc<dyn Catalog>,
req: CreateLockRevReq,
should_retry: bool,
acquire_timeout: Duration,
) -> Result<u64> {
let start = Instant::now();

let ttl = req.ttl;

let lock_key = req.lock_key.clone();
let lock_type = lock_key.lock_type().to_string();
let table_id = lock_key.get_table_id();
let tenant = lock_key.get_tenant();

let revision = self.start(catalog.clone(), req).await?;

let meta_api = UserApiProvider::instance().get_meta_store_client();
let list_table_lock_req = ListLockRevReq::new(lock_key.clone());

loop {
// List all revisions and check if the current is the minimum.
let mut rev_list = catalog
.list_lock_revisions(list_table_lock_req.clone())
.await?
.into_iter()
.map(|(x, _)| x)
.collect::<Vec<_>>();
// list_lock_revisions are returned in big-endian order,
// we need to sort them in ascending numeric order.
rev_list.sort();
let position = rev_list.iter().position(|x| *x == revision).ok_or_else(||
// If the current is not found in list, it means that the current has been expired.
ErrorCode::TableLockExpired(format!(
"the acquired table lock with revision '{}' is not in {:?}, maybe expired(elapsed: {:?})",
revision,
rev_list,
start.elapsed(),
)))?;

if position == 0 {
// The lock is acquired by current session.
let extend_table_lock_req =
ExtendLockRevReq::new(lock_key.clone(), revision, ttl, true);

catalog.extend_lock_revision(extend_table_lock_req).await?;
// metrics.
record_acquired_lock_nums(lock_type, table_id, 1);
break;
}

let elapsed = start.elapsed();
// if no need retry, return error directly.
if !should_retry || elapsed >= acquire_timeout {
return Err(ErrorCode::TableAlreadyLocked(format!(
"table is locked by other session, please retry later(revision: {}, elapsed: {:?})",
revision,
start.elapsed()
)));
}

let watch_delete_ident = TableLockIdent::new(tenant, table_id, rev_list[position - 1]);

// Get the previous revision, watch the delete event.
let req = WatchRequest {
key: watch_delete_ident.to_string_key(),
key_end: None,
filter_type: FilterType::Delete.into(),
};
let mut watch_stream = meta_api.watch(req).await?;

let lock_meta = meta_api.get_pb(&watch_delete_ident).await?;
if lock_meta.is_none() {
log::warn!(
"Lock revision '{}' already does not exist, skipping",
rev_list[position - 1]
);
continue;
}

// Add a timeout period for watch.
if let Err(_cause) = timeout(acquire_timeout.abs_diff(elapsed), async move {
while let Some(Ok(resp)) = watch_stream.next().await {
if let Some(event) = resp.event {
if event.current.is_none() {
break;
}
}
}
})
.await
{
return Err(ErrorCode::TableAlreadyLocked(format!(
"table is locked by other session, please retry later(revision: {}, elapsed: {:?})",
revision,
start.elapsed()
)));
}
}

Ok(revision)
}

#[async_backtrace::framed]
async fn start(
self: &Arc<Self>,
query_id: String,
catalog: Arc<dyn Catalog>,
req: CreateLockRevReq,
) -> Result<u64> {
let lock_key = req.lock_key.clone();
let query_id = req.query_id.clone();
let ttl = req.ttl;
let sleep_range = (ttl / 3)..=(ttl * 2 / 3);

Expand Down
134 changes: 16 additions & 118 deletions src/query/service/src/locks/lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,21 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use databend_common_base::base::tokio::sync::mpsc;
use databend_common_base::base::tokio::time::timeout;
use databend_common_base::base::GlobalInstance;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::lock::Lock;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_api::kv_pb_api::KVPbApi;
use databend_common_meta_app::schema::CreateLockRevReq;
use databend_common_meta_app::schema::ExtendLockRevReq;
use databend_common_meta_app::schema::ListLockRevReq;
use databend_common_meta_app::schema::LockKey;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableLockIdent;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::protobuf::watch_request::FilterType;
use databend_common_meta_types::protobuf::WatchRequest;
use databend_common_metrics::lock::metrics_inc_shutdown_lock_holder_nums;
use databend_common_metrics::lock::metrics_inc_start_lock_holder_nums;
use databend_common_metrics::lock::record_acquired_lock_nums;
use databend_common_pipeline_core::LockGuard;
use databend_common_pipeline_core::UnlockApi;
use databend_common_users::UserApiProvider;
use futures_util::StreamExt;
use parking_lot::RwLock;

use crate::locks::lock_holder::LockHolder;
Expand Down Expand Up @@ -96,122 +83,33 @@ impl LockManager {
catalog_name: &str,
should_retry: bool,
) -> Result<Option<Arc<LockGuard>>> {
let start = Instant::now();
let acquire_timeout = Duration::from_secs(ctx.get_settings().get_acquire_lock_timeout()?);

let lock_type = lock_key.lock_type().to_string();
let table_id = lock_key.get_table_id();
let tenant = lock_key.get_tenant();
let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?;
let query_id = ctx.get_id();
let ttl = Duration::from_secs(ctx.get_settings().get_table_lock_expire_secs()?);
let req = CreateLockRevReq::new(
lock_key.clone(),
lock_key,
ctx.get_current_user()?.name, // user
ctx.get_cluster().local_id.clone(), // node
query_id.clone(), // query_id
Duration::from_secs(expire_secs),
ctx.get_id(), // query_id
ttl,
);

let catalog = ctx.get_catalog(catalog_name).await?;

let lock_holder = Arc::new(LockHolder::default());
let revision = lock_holder.start(query_id, catalog.clone(), req).await?;

self.insert_lock(revision, lock_holder);
let guard = LockGuard::new(self.clone(), revision);

let acquire_lock_timeout = ctx.get_settings().get_acquire_lock_timeout()?;
let duration = Duration::from_secs(acquire_lock_timeout);
let meta_api = UserApiProvider::instance().get_meta_store_client();
let list_table_lock_req = ListLockRevReq::new(lock_key.clone());

let mut is_locked = false;
loop {
// List all revisions and check if the current is the minimum.
let mut rev_list = catalog
.list_lock_revisions(list_table_lock_req.clone())
.await?
.into_iter()
.map(|(x, _)| x)
.collect::<Vec<_>>();
// list_lock_revisions are returned in big-endian order,
// we need to sort them in ascending numeric order.
rev_list.sort();
let position = rev_list.iter().position(|x| *x == revision).ok_or_else(||
// If the current is not found in list, it means that the current has been expired.
ErrorCode::TableLockExpired(format!(
"the acquired table lock with revision '{}' is not in {:?}, maybe expired(elapsed: {:?})",
revision,
rev_list,
start.elapsed(),
)))?;

if position == 0 {
// The lock is acquired by current session.
let extend_table_lock_req = ExtendLockRevReq::new(
lock_key.clone(),
revision,
Duration::from_secs(expire_secs),
true,
);

catalog.extend_lock_revision(extend_table_lock_req).await?;
// metrics.
record_acquired_lock_nums(lock_type, table_id, 1);
break;
}

let elapsed = start.elapsed();
// if no need retry, return error directly.
if !should_retry || elapsed >= duration {
is_locked = true;
break;
}

let watch_delete_ident = TableLockIdent::new(tenant, table_id, rev_list[position - 1]);

// Get the previous revision, watch the delete event.
let req = WatchRequest {
key: watch_delete_ident.to_string_key(),
key_end: None,
filter_type: FilterType::Delete.into(),
};
let mut watch_stream = meta_api.watch(req).await?;

let lock_meta = meta_api.get_pb(&watch_delete_ident).await?;
if lock_meta.is_none() {
log::warn!(
"Lock revision '{}' already does not exist, skipping",
rev_list[position - 1]
);
continue;
}

// Add a timeout period for watch.
if let Err(_cause) = timeout(duration.abs_diff(elapsed), async move {
while let Some(Ok(resp)) = watch_stream.next().await {
if let Some(event) = resp.event {
if event.current.is_none() {
break;
}
}
}
})
match lock_holder
.try_acquire_lock(catalog, req, should_retry, acquire_timeout)
.await
{
is_locked = true;
break;
{
Ok(revision) => {
self.insert_lock(revision, lock_holder);
let guard = LockGuard::new(self.clone(), revision);
Ok(Some(Arc::new(guard)))
}
Err(err) => {
lock_holder.shutdown();
Err(err)
}
}

if is_locked {
drop(guard);
Err(ErrorCode::TableAlreadyLocked(format!(
"table is locked by other session, please retry later(revision: {}, elapsed: {:?})",
revision,
start.elapsed()
)))
} else {
Ok(Some(Arc::new(guard)))
}
}

Expand Down

0 comments on commit 41539f9

Please sign in to comment.