Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(ci): flaky test #16207

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 8 additions & 21 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3814,11 +3814,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
);

if succ {
break;
return Ok(CreateLockRevReply { revision });
}
}

Ok(CreateLockRevReply { revision })
}

#[logcall::logcall]
Expand All @@ -3842,7 +3840,12 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let (tb_meta_seq, _) = get_table_by_id_or_err(self, &tbid, ctx).await?;

let (lock_seq, lock_meta_opt): (_, Option<LockMeta>) = get_pb_value(self, &key).await?;
table_lock_has_to_exist(lock_seq, table_id, ctx)?;
if lock_seq == 0 || lock_meta_opt.is_none() {
return Err(KVAppError::AppError(AppError::TableLockExpired(
TableLockExpired::new(table_id, ctx),
)));
}

let mut lock_meta = lock_meta_opt.unwrap();
// Set `acquire_lock = true` to initialize `acquired_on` when the
// first time this lock is acquired. Before the lock is
Expand Down Expand Up @@ -3879,10 +3882,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
);

if succ {
break;
return Ok(());
}
}
Ok(())
}

#[logcall::logcall]
Expand Down Expand Up @@ -5904,21 +5906,6 @@ async fn update_mask_policy(
Ok(())
}

/// Return OK if a table lock exists by checking the seq.
///
/// Otherwise returns TableLockExpired error
fn table_lock_has_to_exist(seq: u64, table_id: u64, msg: impl Display) -> Result<(), KVAppError> {
if seq == 0 {
debug!(seq = seq, table_id = table_id; "table lock does not exist");

Err(KVAppError::AppError(AppError::TableLockExpired(
TableLockExpired::new(table_id, format!("{}: {}", msg, table_id)),
)))
} else {
Ok(())
}
}

#[tonic::async_trait]
pub(crate) trait UndropTableStrategy {
fn table_name_ident(&self) -> &TableNameIdent;
Expand Down
6 changes: 6 additions & 0 deletions src/meta/app/src/schema/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ impl LockKey {
}
}

pub fn get_tenant(&self) -> &Tenant {
match self {
LockKey::Table { tenant, .. } => tenant,
}
}

pub fn get_extra_info(&self) -> BTreeMap<String, String> {
match self {
LockKey::Table { .. } => BTreeMap::new(),
Expand Down
6 changes: 0 additions & 6 deletions src/query/catalog/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ pub enum LockTableOption {
pub trait Lock: Sync + Send {
fn lock_type(&self) -> LockType;

fn get_catalog(&self) -> &str;

fn get_table_id(&self) -> u64;

fn tenant_name(&self) -> &str;

async fn try_lock(
&self,
ctx: Arc<dyn TableContext>,
Expand Down
68 changes: 0 additions & 68 deletions src/query/service/src/locks/lock_ext.rs

This file was deleted.

36 changes: 13 additions & 23 deletions src/query/service/src/locks/lock_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ use databend_common_base::base::tokio::time::sleep;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::catalog::Catalog;
use databend_common_catalog::lock::Lock;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::CreateLockRevReq;
use databend_common_meta_app::schema::DeleteLockRevReq;
use databend_common_meta_app::schema::ExtendLockRevReq;
use databend_common_meta_app::schema::LockKey;
use databend_common_meta_app::tenant::Tenant;
use databend_common_metrics::lock::record_created_lock_nums;
use databend_common_storages_fuse::operations::set_backoff;
use fastrace::func_name;
use futures::future::select;
use futures::future::Either;
use rand::thread_rng;
Expand All @@ -48,34 +46,26 @@ pub struct LockHolder {

impl LockHolder {
#[async_backtrace::framed]
pub async fn start<T: Lock + ?Sized>(
pub async fn start(
self: &Arc<Self>,
query_id: String,
catalog: Arc<dyn Catalog>,
lock: &T,
revision: u64,
expire_secs: u64,
) -> Result<()> {
req: CreateLockRevReq,
) -> Result<u64> {
let lock_key = req.lock_key.clone();
let expire_secs = req.expire_secs;
let sleep_range = (expire_secs * 1000 / 3)..=(expire_secs * 1000 * 2 / 3);

let tenant_name = lock.tenant_name();
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
let lock_key = LockKey::Table {
tenant: tenant.clone(),
table_id: lock.get_table_id(),
};
// get a new table lock revision.
let res = catalog.create_lock_revision(req).await?;
let revision = res.revision;
// metrics.
record_created_lock_nums(lock_key.lock_type().to_string(), lock_key.get_table_id(), 1);

let delete_table_lock_req = DeleteLockRevReq::new(lock_key.clone(), revision);
let extend_table_lock_req =
ExtendLockRevReq::new(lock_key.clone(), revision, expire_secs, false);

self.try_extend_lock(
catalog.clone(),
extend_table_lock_req.clone(),
Some(Duration::from_millis(expire_secs * 1000)),
)
.await?;

GlobalIORuntime::instance().spawn({
let self_clone = self.clone();
async move {
Expand Down Expand Up @@ -122,7 +112,7 @@ impl LockHolder {
}
});

Ok(())
Ok(revision)
}

pub fn shutdown(&self) {
Expand Down
76 changes: 37 additions & 39 deletions src/query/service/src/locks/lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use databend_common_base::base::tokio::sync::mpsc;
use databend_common_base::base::tokio::time::timeout;
Expand All @@ -32,18 +33,15 @@ use databend_common_meta_app::schema::ListLockRevReq;
use databend_common_meta_app::schema::LockKey;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableLockIdent;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::protobuf::watch_request::FilterType;
use databend_common_meta_types::protobuf::WatchRequest;
use databend_common_metrics::lock::metrics_inc_shutdown_lock_holder_nums;
use databend_common_metrics::lock::metrics_inc_start_lock_holder_nums;
use databend_common_metrics::lock::record_acquired_lock_nums;
use databend_common_metrics::lock::record_created_lock_nums;
use databend_common_pipeline_core::LockGuard;
use databend_common_pipeline_core::UnlockApi;
use databend_common_users::UserApiProvider;
use fastrace::func_name;
use futures_util::StreamExt;
use parking_lot::RwLock;

Expand Down Expand Up @@ -91,39 +89,32 @@ impl LockManager {
/// NOTICE: the lock holder is not 100% reliable.
/// E.g., there is a very small probability of failure in extending or deleting the lock.
#[async_backtrace::framed]
pub async fn try_lock<T: Lock + ?Sized>(
pub async fn try_lock(
self: &Arc<Self>,
ctx: Arc<dyn TableContext>,
lock: &T,
lock_key: LockKey,
catalog_name: &str,
should_retry: bool,
) -> Result<Option<Arc<LockGuard>>> {
let user = ctx.get_current_user()?.name;
let node = ctx.get_cluster().local_id.clone();
let query_id = ctx.get_current_session_id();
let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?;

let catalog = ctx.get_catalog(lock.get_catalog()).await?;

let tenant_name = lock.tenant_name();
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
let table_id = lock.get_table_id();
let lock_key = LockKey::Table {
tenant: tenant.clone(),
table_id,
};
let start = Instant::now();

let req = CreateLockRevReq::new(lock_key.clone(), user, node, query_id, expire_secs);
let lock_type = lock_key.lock_type().to_string();
let table_id = lock_key.get_table_id();
let tenant = lock_key.get_tenant();
let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?;
let query_id = ctx.get_id();
let req = CreateLockRevReq::new(
lock_key.clone(),
ctx.get_current_user()?.name, // user
ctx.get_cluster().local_id.clone(), // node
query_id.clone(), // query_id
expire_secs,
);

// get a new table lock revision.
let res = catalog.create_lock_revision(req).await?;
let revision = res.revision;
// metrics.
record_created_lock_nums(lock.lock_type().to_string(), table_id, 1);
let catalog = ctx.get_catalog(catalog_name).await?;

let lock_holder = Arc::new(LockHolder::default());
lock_holder
.start(ctx.get_id(), catalog.clone(), lock, revision, expire_secs)
.await?;
let revision = lock_holder.start(query_id, catalog.clone(), req).await?;

self.insert_lock(revision, lock_holder);
let guard = LockGuard::new(self.clone(), revision);
Expand All @@ -141,10 +132,15 @@ impl LockManager {
let reply = catalog
.list_lock_revisions(list_table_lock_req.clone())
.await?;
let position = reply.iter().position(|(x, _)| *x == revision).ok_or_else(||
let rev_list = reply.into_iter().map(|(x, _)| x).collect::<Vec<_>>();
let position = rev_list.iter().position(|x| *x == revision).ok_or_else(||
// If the current is not found in list, it means that the current has been expired.
ErrorCode::TableLockExpired("the acquired table lock has been expired".to_string()),
)?;
ErrorCode::TableLockExpired(format!(
"the acquired table lock with revision '{}' is not in {:?}, maybe expired(elapsed: {:?})",
revision,
rev_list,
start.elapsed(),
)))?;

if position == 0 {
// The lock is acquired by current session.
Expand All @@ -153,7 +149,7 @@ impl LockManager {

catalog.extend_lock_revision(extend_table_lock_req).await?;
// metrics.
record_acquired_lock_nums(lock.lock_type().to_string(), table_id, 1);
record_acquired_lock_nums(lock_type, table_id, 1);
break;
}

Expand All @@ -162,12 +158,13 @@ impl LockManager {
catalog
.delete_lock_revision(delete_table_lock_req.clone())
.await?;
return Err(ErrorCode::TableAlreadyLocked(
"table is locked by other session, please retry later".to_string(),
));
return Err(ErrorCode::TableAlreadyLocked(format!(
"table is locked by other session, please retry later(elapsed: {:?})",
start.elapsed()
)));
}

let watch_delete_ident = TableLockIdent::new(&tenant, table_id, reply[position - 1].0);
let watch_delete_ident = TableLockIdent::new(tenant, table_id, rev_list[position - 1]);

// Get the previous revision, watch the delete event.
let req = WatchRequest {
Expand All @@ -193,9 +190,10 @@ impl LockManager {
catalog
.delete_lock_revision(delete_table_lock_req.clone())
.await?;
Err(ErrorCode::TableAlreadyLocked(
"table is locked by other session, please retry later".to_string(),
))
Err(ErrorCode::TableAlreadyLocked(format!(
"table is locked by other session, please retry later(elapsed: {:?})",
start.elapsed()
)))
}
}?;
}
Expand Down
2 changes: 0 additions & 2 deletions src/query/service/src/locks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod lock_ext;
mod lock_holder;
mod lock_manager;
mod table_lock;

pub use lock_ext::LockExt;
pub use lock_manager::LockManager;
Loading
Loading