diff --git a/src/query/service/src/locks/lock_holder.rs b/src/query/service/src/locks/lock_holder.rs index 9f77de14a76d..5b292a63c274 100644 --- a/src/query/service/src/locks/lock_holder.rs +++ b/src/query/service/src/locks/lock_holder.rs @@ -20,19 +20,29 @@ use std::time::Instant; use backoff::backoff::Backoff; use databend_common_base::base::tokio::time::sleep; +use databend_common_base::base::tokio::time::timeout; use databend_common_base::base::WatchNotify; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; use databend_common_catalog::catalog::Catalog; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_meta_api::kv_pb_api::KVPbApi; use databend_common_meta_app::schema::CreateLockRevReq; use databend_common_meta_app::schema::DeleteLockRevReq; use databend_common_meta_app::schema::ExtendLockRevReq; +use databend_common_meta_app::schema::ListLockRevReq; +use databend_common_meta_app::schema::TableLockIdent; +use databend_common_meta_kvapi::kvapi::Key; +use databend_common_meta_types::protobuf::watch_request::FilterType; +use databend_common_meta_types::protobuf::WatchRequest; +use databend_common_metrics::lock::record_acquired_lock_nums; use databend_common_metrics::lock::record_created_lock_nums; use databend_common_storages_fuse::operations::set_backoff; +use databend_common_users::UserApiProvider; use futures::future::select; use futures::future::Either; +use futures_util::StreamExt; use rand::thread_rng; use rand::Rng; @@ -46,13 +56,120 @@ pub struct LockHolder { impl LockHolder { #[async_backtrace::framed] - pub async fn start( + pub(crate) async fn try_acquire_lock( + self: &Arc, + catalog: Arc, + req: CreateLockRevReq, + should_retry: bool, + acquire_timeout: Duration, + ) -> Result { + let start = Instant::now(); + + let ttl = req.ttl; + + let lock_key = req.lock_key.clone(); + let lock_type = lock_key.lock_type().to_string(); + let table_id = lock_key.get_table_id(); + let tenant = lock_key.get_tenant(); + + let revision = self.start(catalog.clone(), req).await?; + + let meta_api = UserApiProvider::instance().get_meta_store_client(); + let list_table_lock_req = ListLockRevReq::new(lock_key.clone()); + + loop { + // List all revisions and check if the current is the minimum. + let mut rev_list = catalog + .list_lock_revisions(list_table_lock_req.clone()) + .await? + .into_iter() + .map(|(x, _)| x) + .collect::>(); + // list_lock_revisions are returned in big-endian order, + // we need to sort them in ascending numeric order. + rev_list.sort(); + let position = rev_list.iter().position(|x| *x == revision).ok_or_else(|| + // If the current is not found in list, it means that the current has been expired. + ErrorCode::TableLockExpired(format!( + "The acquired table lock with revision '{}' maybe expired(elapsed: {:?})", + revision, + start.elapsed(), + )))?; + + if position == 0 { + // The lock is acquired by current session. + let extend_table_lock_req = + ExtendLockRevReq::new(lock_key.clone(), revision, ttl, true); + + catalog.extend_lock_revision(extend_table_lock_req).await?; + // metrics. + record_acquired_lock_nums(lock_type, table_id, 1); + break; + } + + let prev_revision = rev_list[position - 1]; + let elapsed = start.elapsed(); + // if no need retry, return error directly. + if !should_retry || elapsed >= acquire_timeout { + return Err(ErrorCode::TableAlreadyLocked(format!( + "Table is locked by other session(rev: {}, prev: {}, elapsed: {:?})", + revision, + prev_revision, + start.elapsed() + ))); + } + + let watch_delete_ident = TableLockIdent::new(tenant, table_id, prev_revision); + + // Get the previous revision, watch the delete event. + let req = WatchRequest { + key: watch_delete_ident.to_string_key(), + key_end: None, + filter_type: FilterType::Delete.into(), + }; + let mut watch_stream = meta_api.watch(req).await?; + + let lock_meta = meta_api.get_pb(&watch_delete_ident).await?; + if lock_meta.is_none() { + log::warn!( + "Lock revision '{}' already does not exist, skipping", + prev_revision + ); + continue; + } + + // Add a timeout period for watch. + if let Err(_cause) = timeout(acquire_timeout.abs_diff(elapsed), async move { + while let Some(Ok(resp)) = watch_stream.next().await { + if let Some(event) = resp.event { + if event.current.is_none() { + break; + } + } + } + }) + .await + { + return Err(ErrorCode::TableAlreadyLocked(format!( + "Table is locked by other session(rev: {}, prev: {}, elapsed: {:?})", + revision, + prev_revision, + start.elapsed() + ))); + } + } + + Ok(revision) + } + + #[async_backtrace::framed] + async fn start( self: &Arc, - query_id: String, catalog: Arc, req: CreateLockRevReq, ) -> Result { let lock_key = req.lock_key.clone(); + let query_id = req.query_id.clone(); let ttl = req.ttl; let sleep_range = (ttl / 3)..=(ttl * 2 / 3); @@ -61,6 +178,7 @@ impl LockHolder { let revision = res.revision; // metrics. record_created_lock_nums(lock_key.lock_type().to_string(), lock_key.get_table_id(), 1); + log::debug!("create table lock success, revision={}", revision); let delete_table_lock_req = DeleteLockRevReq::new(lock_key.clone(), revision); let extend_table_lock_req = ExtendLockRevReq::new(lock_key.clone(), revision, ttl, false); @@ -179,7 +297,10 @@ impl LockHolder { let mut backoff = set_backoff(Some(Duration::from_millis(2)), None, max_retry_elapsed); loop { match catalog.delete_lock_revision(req.clone()).await { - Ok(_) => break, + Ok(_) => { + log::debug!("delete table lock success, revision={}", req.revision); + break; + } Err(e) => match backoff.next_backoff() { Some(duration) => { log::debug!( diff --git a/src/query/service/src/locks/lock_manager.rs b/src/query/service/src/locks/lock_manager.rs index e1b86aa0f1c2..7bd00139c58d 100644 --- a/src/query/service/src/locks/lock_manager.rs +++ b/src/query/service/src/locks/lock_manager.rs @@ -15,35 +15,21 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use std::time::Instant; use databend_common_base::base::tokio::sync::mpsc; -use databend_common_base::base::tokio::time::timeout; use databend_common_base::base::GlobalInstance; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; use databend_common_catalog::lock::Lock; use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_meta_api::kv_pb_api::KVPbApi; use databend_common_meta_app::schema::CreateLockRevReq; -use databend_common_meta_app::schema::DeleteLockRevReq; -use databend_common_meta_app::schema::ExtendLockRevReq; -use databend_common_meta_app::schema::ListLockRevReq; use databend_common_meta_app::schema::LockKey; use databend_common_meta_app::schema::TableInfo; -use databend_common_meta_app::schema::TableLockIdent; -use databend_common_meta_kvapi::kvapi::Key; -use databend_common_meta_types::protobuf::watch_request::FilterType; -use databend_common_meta_types::protobuf::WatchRequest; use databend_common_metrics::lock::metrics_inc_shutdown_lock_holder_nums; use databend_common_metrics::lock::metrics_inc_start_lock_holder_nums; -use databend_common_metrics::lock::record_acquired_lock_nums; use databend_common_pipeline_core::LockGuard; use databend_common_pipeline_core::UnlockApi; -use databend_common_users::UserApiProvider; -use futures_util::StreamExt; use parking_lot::RwLock; use crate::locks::lock_holder::LockHolder; @@ -97,129 +83,34 @@ impl LockManager { catalog_name: &str, should_retry: bool, ) -> Result>> { - let start = Instant::now(); + let acquire_timeout = Duration::from_secs(ctx.get_settings().get_acquire_lock_timeout()?); - let lock_type = lock_key.lock_type().to_string(); - let table_id = lock_key.get_table_id(); - let tenant = lock_key.get_tenant(); - let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?; - let query_id = ctx.get_id(); + let ttl = Duration::from_secs(ctx.get_settings().get_table_lock_expire_secs()?); let req = CreateLockRevReq::new( - lock_key.clone(), + lock_key, ctx.get_current_user()?.name, // user ctx.get_cluster().local_id.clone(), // node - query_id.clone(), // query_id - Duration::from_secs(expire_secs), + ctx.get_id(), // query_id + ttl, ); let catalog = ctx.get_catalog(catalog_name).await?; let lock_holder = Arc::new(LockHolder::default()); - let revision = lock_holder.start(query_id, catalog.clone(), req).await?; - - self.insert_lock(revision, lock_holder); - let guard = LockGuard::new(self.clone(), revision); - - let acquire_lock_timeout = ctx.get_settings().get_acquire_lock_timeout()?; - let duration = Duration::from_secs(acquire_lock_timeout); - let meta_api = UserApiProvider::instance().get_meta_store_client(); - - let list_table_lock_req = ListLockRevReq::new(lock_key.clone()); - - let delete_table_lock_req = DeleteLockRevReq::new(lock_key.clone(), revision); - - loop { - // List all revisions and check if the current is the minimum. - let mut rev_list = catalog - .list_lock_revisions(list_table_lock_req.clone()) - .await? - .into_iter() - .map(|(x, _)| x) - .collect::>(); - // list_lock_revisions are returned in big-endian order, - // we need to sort them in ascending numeric order. - rev_list.sort(); - let position = rev_list.iter().position(|x| *x == revision).ok_or_else(|| - // If the current is not found in list, it means that the current has been expired. - ErrorCode::TableLockExpired(format!( - "the acquired table lock with revision '{}' is not in {:?}, maybe expired(elapsed: {:?})", - revision, - rev_list, - start.elapsed(), - )))?; - - if position == 0 { - // The lock is acquired by current session. - let extend_table_lock_req = ExtendLockRevReq::new( - lock_key.clone(), - revision, - Duration::from_secs(expire_secs), - true, - ); - - catalog.extend_lock_revision(extend_table_lock_req).await?; - // metrics. - record_acquired_lock_nums(lock_type, table_id, 1); - break; + match lock_holder + .try_acquire_lock(catalog, req, should_retry, acquire_timeout) + .await + { + Ok(revision) => { + self.insert_lock(revision, lock_holder); + let guard = LockGuard::new(self.clone(), revision); + Ok(Some(Arc::new(guard))) } - - let elapsed = start.elapsed(); - // if no need retry, return error directly. - if !should_retry || elapsed >= duration { - catalog - .delete_lock_revision(delete_table_lock_req.clone()) - .await?; - return Err(ErrorCode::TableAlreadyLocked(format!( - "table is locked by other session, please retry later(elapsed: {:?})", - elapsed - ))); + Err(err) => { + lock_holder.shutdown(); + Err(err) } - - let watch_delete_ident = TableLockIdent::new(tenant, table_id, rev_list[position - 1]); - - // Get the previous revision, watch the delete event. - let req = WatchRequest { - key: watch_delete_ident.to_string_key(), - key_end: None, - filter_type: FilterType::Delete.into(), - }; - let mut watch_stream = meta_api.watch(req).await?; - - let lock_meta = meta_api.get_pb(&watch_delete_ident).await?; - if lock_meta.is_none() { - log::warn!( - "Lock revision '{}' already does not exist, skipping", - rev_list[position - 1] - ); - continue; - } - - // Add a timeout period for watch. - match timeout(duration.abs_diff(elapsed), async move { - while let Some(Ok(resp)) = watch_stream.next().await { - if let Some(event) = resp.event { - if event.current.is_none() { - break; - } - } - } - }) - .await - { - Ok(_) => Ok(()), - Err(_) => { - catalog - .delete_lock_revision(delete_table_lock_req.clone()) - .await?; - Err(ErrorCode::TableAlreadyLocked(format!( - "table is locked by other session, please retry later(elapsed: {:?})", - start.elapsed() - ))) - } - }?; } - - Ok(Some(Arc::new(guard))) } fn insert_lock(&self, revision: u64, lock_holder: Arc) { diff --git a/tests/sqllogictests/scripts/prepare_iceberg_tpch_data.py b/tests/sqllogictests/scripts/prepare_iceberg_tpch_data.py index 8b9fc86d6b41..4f4462b76178 100644 --- a/tests/sqllogictests/scripts/prepare_iceberg_tpch_data.py +++ b/tests/sqllogictests/scripts/prepare_iceberg_tpch_data.py @@ -1,132 +1,159 @@ from pyspark.sql import SparkSession -from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType, DecimalType +from pyspark.sql.types import ( + StructType, + StructField, + IntegerType, + DoubleType, + StringType, + DateType, + DecimalType, +) data_path = "tests/sqllogictests/data/tests/suites/0_stateless/13_tpch/data" -spark = SparkSession.builder \ - .appName("CSV to Iceberg REST Catalog") \ - .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \ - .config("spark.sql.catalog.iceberg.type", "rest") \ - .config("spark.sql.catalog.iceberg.uri", "http://127.0.0.1:8181") \ - .config("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ - .config("spark.sql.catalog.iceberg.warehouse", "s3://iceberg-tpch/") \ - .config("spark.sql.catalog.iceberg.s3.access-key-id", "admin") \ - .config("spark.sql.catalog.iceberg.s3.secret-access-key", "password") \ - .config("spark.sql.catalog.iceberg.s3.path-style-access", "true") \ - .config("spark.sql.catalog.iceberg.s3.endpoint", "http://127.0.0.1:9000") \ - .config("spark.sql.catalog.iceberg.client.region", "us-east-1") \ - .config("spark.jars.packages", - "org.apache.iceberg:iceberg-aws-bundle:1.6.1,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1") \ +spark = ( + SparkSession.builder.appName("CSV to Iceberg REST Catalog") + .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.iceberg.type", "rest") + .config("spark.sql.catalog.iceberg.uri", "http://127.0.0.1:8181") + .config("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + .config("spark.sql.catalog.iceberg.warehouse", "s3://iceberg-tpch/") + .config("spark.sql.catalog.iceberg.s3.access-key-id", "admin") + .config("spark.sql.catalog.iceberg.s3.secret-access-key", "password") + .config("spark.sql.catalog.iceberg.s3.path-style-access", "true") + .config("spark.sql.catalog.iceberg.s3.endpoint", "http://127.0.0.1:9000") + .config("spark.sql.catalog.iceberg.client.region", "us-east-1") + .config( + "spark.jars.packages", + "org.apache.iceberg:iceberg-aws-bundle:1.6.1,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1", + ) .getOrCreate() +) tables = { "lineitem": ( - StructType([ - StructField("l_orderkey", IntegerType(), True), - StructField("l_partkey", IntegerType(), True), - StructField("l_suppkey", IntegerType(), True), - StructField("l_linenumber", IntegerType(), True), - StructField("l_quantity", DecimalType(15, 2), True), - StructField("l_extendedprice", DecimalType(15, 2), True), - StructField("l_discount", DecimalType(15, 2), True), - StructField("l_tax", DecimalType(15, 2), True), - StructField("l_returnflag", StringType(), True), - StructField("l_linestatus", StringType(), True), - StructField("l_shipdate", DateType(), True), - StructField("l_commitdate", DateType(), True), - StructField("l_receiptdate", DateType(), True), - StructField("l_shipinstruct", StringType(), True), - StructField("l_shipmode", StringType(), True), - StructField("l_comment", StringType(), True) - ]), - f"{data_path}/lineitem.tbl" + StructType( + [ + StructField("l_orderkey", IntegerType(), True), + StructField("l_partkey", IntegerType(), True), + StructField("l_suppkey", IntegerType(), True), + StructField("l_linenumber", IntegerType(), True), + StructField("l_quantity", DecimalType(15, 2), True), + StructField("l_extendedprice", DecimalType(15, 2), True), + StructField("l_discount", DecimalType(15, 2), True), + StructField("l_tax", DecimalType(15, 2), True), + StructField("l_returnflag", StringType(), True), + StructField("l_linestatus", StringType(), True), + StructField("l_shipdate", DateType(), True), + StructField("l_commitdate", DateType(), True), + StructField("l_receiptdate", DateType(), True), + StructField("l_shipinstruct", StringType(), True), + StructField("l_shipmode", StringType(), True), + StructField("l_comment", StringType(), True), + ] + ), + f"{data_path}/lineitem.tbl", ), "orders": ( - StructType([ - StructField("o_orderkey", IntegerType(), True), - StructField("o_custkey", IntegerType(), True), - StructField("o_orderstatus", StringType(), True), - StructField("o_totalprice", DecimalType(15, 2), True), - StructField("o_orderdate", DateType(), True), - StructField("o_orderpriority", StringType(), True), - StructField("o_clerk", StringType(), True), - StructField("o_shippriority", IntegerType(), True), - StructField("o_comment", StringType(), True) - ]), - f"{data_path}/orders.tbl" + StructType( + [ + StructField("o_orderkey", IntegerType(), True), + StructField("o_custkey", IntegerType(), True), + StructField("o_orderstatus", StringType(), True), + StructField("o_totalprice", DecimalType(15, 2), True), + StructField("o_orderdate", DateType(), True), + StructField("o_orderpriority", StringType(), True), + StructField("o_clerk", StringType(), True), + StructField("o_shippriority", IntegerType(), True), + StructField("o_comment", StringType(), True), + ] + ), + f"{data_path}/orders.tbl", ), "customer": ( - StructType([ - StructField("c_custkey", IntegerType(), True), - StructField("c_name", StringType(), True), - StructField("c_address", StringType(), True), - StructField("c_nationkey", IntegerType(), True), - StructField("c_phone", StringType(), True), - StructField("c_acctbal", DecimalType(15, 2), True), - StructField("c_mktsegment", StringType(), True), - StructField("c_comment", StringType(), True) - ]), - f"{data_path}/customer.tbl" + StructType( + [ + StructField("c_custkey", IntegerType(), True), + StructField("c_name", StringType(), True), + StructField("c_address", StringType(), True), + StructField("c_nationkey", IntegerType(), True), + StructField("c_phone", StringType(), True), + StructField("c_acctbal", DecimalType(15, 2), True), + StructField("c_mktsegment", StringType(), True), + StructField("c_comment", StringType(), True), + ] + ), + f"{data_path}/customer.tbl", ), "nation": ( - StructType([ - StructField("n_nationkey", IntegerType(), True), - StructField("n_name", StringType(), True), - StructField("n_regionkey", IntegerType(), True), - StructField("n_comment", StringType(), True) - ]), - f"{data_path}/nation.tbl" + StructType( + [ + StructField("n_nationkey", IntegerType(), True), + StructField("n_name", StringType(), True), + StructField("n_regionkey", IntegerType(), True), + StructField("n_comment", StringType(), True), + ] + ), + f"{data_path}/nation.tbl", ), "region": ( - StructType([ - StructField("r_regionkey", IntegerType(), True), - StructField("r_name", StringType(), True), - StructField("r_comment", StringType(), True) - ]), - f"{data_path}/region.tbl" + StructType( + [ + StructField("r_regionkey", IntegerType(), True), + StructField("r_name", StringType(), True), + StructField("r_comment", StringType(), True), + ] + ), + f"{data_path}/region.tbl", ), "part": ( - StructType([ - StructField("p_partkey", IntegerType(), True), - StructField("p_name", StringType(), True), - StructField("p_mfgr", StringType(), True), - StructField("p_brand", StringType(), True), - StructField("p_type", StringType(), True), - StructField("p_size", IntegerType(), True), - StructField("p_container", StringType(), True), - StructField("p_retailprice", DecimalType(15, 2), True), - StructField("p_comment", StringType(), True) - ]), - f"{data_path}/part.tbl" + StructType( + [ + StructField("p_partkey", IntegerType(), True), + StructField("p_name", StringType(), True), + StructField("p_mfgr", StringType(), True), + StructField("p_brand", StringType(), True), + StructField("p_type", StringType(), True), + StructField("p_size", IntegerType(), True), + StructField("p_container", StringType(), True), + StructField("p_retailprice", DecimalType(15, 2), True), + StructField("p_comment", StringType(), True), + ] + ), + f"{data_path}/part.tbl", ), "supplier": ( - StructType([ - StructField("s_suppkey", IntegerType(), True), - StructField("s_name", StringType(), True), - StructField("s_address", StringType(), True), - StructField("s_nationkey", IntegerType(), True), - StructField("s_phone", StringType(), True), - StructField("s_acctbal", DecimalType(15, 2), True), - StructField("s_comment", StringType(), True) - ]), - f"{data_path}/supplier.tbl" + StructType( + [ + StructField("s_suppkey", IntegerType(), True), + StructField("s_name", StringType(), True), + StructField("s_address", StringType(), True), + StructField("s_nationkey", IntegerType(), True), + StructField("s_phone", StringType(), True), + StructField("s_acctbal", DecimalType(15, 2), True), + StructField("s_comment", StringType(), True), + ] + ), + f"{data_path}/supplier.tbl", ), "partsupp": ( - StructType([ - StructField("ps_partkey", IntegerType(), True), - StructField("ps_suppkey", IntegerType(), True), - StructField("ps_availqty", IntegerType(), True), - StructField("ps_supplycost", DecimalType(15, 2), True), - StructField("ps_comment", StringType(), True) - ]), - f"{data_path}/partsupp.tbl" - ) + StructType( + [ + StructField("ps_partkey", IntegerType(), True), + StructField("ps_suppkey", IntegerType(), True), + StructField("ps_availqty", IntegerType(), True), + StructField("ps_supplycost", DecimalType(15, 2), True), + StructField("ps_comment", StringType(), True), + ] + ), + f"{data_path}/partsupp.tbl", + ), } for table_name, (schema, file_path) in tables.items(): full_table_name = f"iceberg.tpch.{table_name}" - #spark.sql(f"DROP TABLE IF EXISTS {full_table_name}") + # spark.sql(f"DROP TABLE IF EXISTS {full_table_name}") create_table = f""" CREATE OR REPLACE TABLE {full_table_name} (