From 34c11b32be78937ee138873d1637678b26b9944a Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Fri, 16 Jun 2023 17:34:28 +0800 Subject: [PATCH 1/2] fix: avoid any updates after table is closed --- analytic_engine/src/compaction/scheduler.rs | 11 +- analytic_engine/src/instance/close.rs | 12 ++- analytic_engine/src/instance/drop.rs | 11 +- analytic_engine/src/instance/engine.rs | 101 +++++------------- analytic_engine/src/instance/mod.rs | 15 ++- analytic_engine/src/instance/wal_replayer.rs | 34 +++--- analytic_engine/src/instance/write.rs | 8 +- analytic_engine/src/table/data.rs | 103 +++++++++++++------ analytic_engine/src/table/mod.rs | 74 ++++++++----- table_engine/src/table.rs | 3 + 10 files changed, 213 insertions(+), 159 deletions(-) diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index 30cf277521..acccb33fd3 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -656,7 +656,16 @@ impl ScheduleWorker { self.max_unflushed_duration, ); - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec = if let Some(v) = table_data.acquire_serial_exec_ctx().await { + v + } else { + warn!( + "Table is closed, ignore this periodical flush, table:{}", + table_data.name + ); + continue; + }; + let flush_scheduler = serial_exec.flush_scheduler(); // Instance flush the table asynchronously. if let Err(e) = flusher diff --git a/analytic_engine/src/instance/close.rs b/analytic_engine/src/instance/close.rs index f45199c164..a77256d16c 100644 --- a/analytic_engine/src/instance/close.rs +++ b/analytic_engine/src/instance/close.rs @@ -3,12 +3,12 @@ //! Close table logic of instance use log::{info, warn}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table_engine::engine::CloseTableRequest; use crate::{ instance::{ - engine::{DoManifestSnapshot, FlushTable, Result}, + engine::{DoManifestSnapshot, FlushTable, OperateClosedTable, Result}, flush_compaction::{Flusher, TableFlushOptions}, }, manifest::{ManifestRef, SnapshotRequest}, @@ -37,8 +37,11 @@ impl Closer { // Flush table. let opts = TableFlushOptions::default(); - let mut serial_exec = table_data.serial_exec.lock().await; - let flush_scheduler = serial_exec.flush_scheduler(); + let mut serial_exec_ctx = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; + let flush_scheduler = serial_exec_ctx.flush_scheduler(); self.flusher .do_flush(flush_scheduler, &table_data, opts) @@ -67,6 +70,7 @@ impl Closer { let removed_table = self.space.remove_table(&request.table_name); assert!(removed_table.is_some()); + serial_exec_ctx.mark_closed(); info!( "table:{}-{} has been removed from the space_id:{}", table_data.name, table_data.id, self.space.id diff --git a/analytic_engine/src/instance/drop.rs b/analytic_engine/src/instance/drop.rs index ac6d1653fd..08d673f820 100644 --- a/analytic_engine/src/instance/drop.rs +++ b/analytic_engine/src/instance/drop.rs @@ -3,12 +3,12 @@ //! Drop table logic of instance use log::{info, warn}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table_engine::engine::DropTableRequest; use crate::{ instance::{ - engine::{FlushTable, Result, WriteManifest}, + engine::{FlushTable, OperateClosedTable, Result, WriteManifest}, flush_compaction::{Flusher, TableFlushOptions}, SpaceStoreRef, }, @@ -36,7 +36,10 @@ impl Dropper { } }; - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec_ctx = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; if table_data.is_dropped() { warn!( @@ -51,7 +54,7 @@ impl Dropper { // be avoided. let opts = TableFlushOptions::default(); - let flush_scheduler = serial_exec.flush_scheduler(); + let flush_scheduler = serial_exec_ctx.flush_scheduler(); self.flusher .do_flush(flush_scheduler, &table_data, opts) .await diff --git a/analytic_engine/src/instance/engine.rs b/analytic_engine/src/instance/engine.rs index e32fa064d5..562c97ff2e 100644 --- a/analytic_engine/src/instance/engine.rs +++ b/analytic_engine/src/instance/engine.rs @@ -23,29 +23,21 @@ use crate::{ #[derive(Debug, Snafu)] #[snafu(visibility(pub(crate)))] pub enum Error { - #[snafu(display( - "The space of the table does not exist, space_id:{}, table:{}.\nBacktrace:\n{}", - space_id, - table, - backtrace, - ))] + #[snafu(display("The space of the table does not exist, space_id:{space_id}, table:{table}.\nBacktrace:\n{backtrace}"))] SpaceNotExist { space_id: SpaceId, table: String, backtrace: Backtrace, }, - #[snafu(display("Failed to read meta update, table_id:{}, err:{}", table_id, source))] + #[snafu(display("Failed to read meta update, table_id:{table_id}, err:{source}"))] ReadMetaUpdate { table_id: TableId, source: GenericError, }, #[snafu(display( - "Failed to recover table data, space_id:{}, table:{}, err:{}", - space_id, - table, - source + "Failed to recover table data, space_id:{space_id}, table:{table}, err:{source}" ))] RecoverTableData { space_id: SpaceId, @@ -53,14 +45,11 @@ pub enum Error { source: crate::table::data::Error, }, - #[snafu(display("Failed to read wal, err:{}", source))] + #[snafu(display("Failed to read wal, err:{source}"))] ReadWal { source: wal::manager::Error }, #[snafu(display( - "Failed to apply log entry to memtable, table:{}, table_id:{}, err:{}", - table, - table_id, - source + "Failed to apply log entry to memtable, table:{table}, table_id:{table_id}, err:{source}", ))] ApplyMemTable { space_id: SpaceId, @@ -70,11 +59,7 @@ pub enum Error { }, #[snafu(display( - "Flush failed, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Flush failed, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] FlushTable { space_id: SpaceId, @@ -84,11 +69,7 @@ pub enum Error { }, #[snafu(display( - "Failed to persist meta update to manifest, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Failed to persist meta update to manifest, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] WriteManifest { space_id: SpaceId, @@ -98,11 +79,7 @@ pub enum Error { }, #[snafu(display( - "Failed to persist meta update to WAL, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Failed to persist meta update to WAL, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] WriteWal { space_id: SpaceId, @@ -112,11 +89,7 @@ pub enum Error { }, #[snafu(display( - "Invalid options, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Invalid options, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] InvalidOptions { space_id: SpaceId, @@ -126,11 +99,7 @@ pub enum Error { }, #[snafu(display( - "Failed to create table data, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Failed to create table data, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] CreateTableData { space_id: SpaceId, @@ -140,11 +109,8 @@ pub enum Error { }, #[snafu(display( - "Try to update schema to elder version, table:{}, current_version:{}, given_version:{}.\nBacktrace:\n{}", - table, - current_version, - given_version, - backtrace, + "Try to update schema to elder version, table:{table}, current_version:{current_version}, \ + given_version:{given_version}.\nBacktrace:\n{backtrace}", ))] InvalidSchemaVersion { table: String, @@ -154,11 +120,8 @@ pub enum Error { }, #[snafu(display( - "Invalid previous schema version, table:{}, current_version:{}, pre_version:{}.\nBacktrace:\n{}", - table, - current_version, - pre_version, - backtrace, + "Invalid previous schema version, table:{table}, current_version:{current_version}, \ + pre_version:{pre_version}.\nBacktrace:\n{backtrace}", ))] InvalidPreVersion { table: String, @@ -167,21 +130,14 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display( - "Alter schema of a dropped table:{}.\nBacktrace:\n{}", - table, - backtrace - ))] + #[snafu(display("Alter schema of a dropped table:{table}.\nBacktrace:\n{backtrace}"))] AlterDroppedTable { table: String, backtrace: Backtrace }, - #[snafu(display("Failed to store version edit, err:{}", source))] + #[snafu(display("Failed to store version edit, err:{source}"))] StoreVersionEdit { source: GenericError }, #[snafu(display( - "Failed to encode payloads, table:{}, wal_location:{:?}, err:{}", - table, - wal_location, - source + "Failed to encode payloads, table:{table}, wal_location:{wal_location:?}, err:{source}" ))] EncodePayloads { table: String, @@ -190,10 +146,7 @@ pub enum Error { }, #[snafu(display( - "Failed to do manifest snapshot for table, space_id:{}, table:{}, err:{}", - space_id, - table, - source + "Failed to do manifest snapshot for table, space_id:{space_id}, table:{table}, err:{source}", ))] DoManifestSnapshot { space_id: SpaceId, @@ -202,30 +155,31 @@ pub enum Error { }, #[snafu(display( - "Table open failed and can not be created again, table:{}.\nBacktrace:\n{}", - table, - backtrace, + "Table open failed and can not be created again, table:{table}.\nBacktrace:\n{backtrace}", ))] CreateOpenFailedTable { table: String, backtrace: Backtrace }, - #[snafu(display("Failed to open manifest, err:{}", source))] + #[snafu(display("Failed to open manifest, err:{source}"))] OpenManifest { source: crate::manifest::details::Error, }, - #[snafu(display("Failed to find table, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Failed to find table, msg:{msg}.\nBacktrace:\n{backtrace}"))] TableNotExist { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to open shard, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Failed to open shard, msg:{msg}.\nBacktrace:\n{backtrace}"))] OpenTablesOfShard { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to replay wal, msg:{:?}, err:{}", msg, source))] + #[snafu(display("Try to operate a closed table.\nBacktrace:\n{backtrace}"))] + OperateClosedTable { backtrace: Backtrace }, + + #[snafu(display("Failed to replay wal, msg:{msg:?}, err:{source}"))] ReplayWalWithCause { msg: Option, source: GenericError, }, - #[snafu(display("Failed to replay wal, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Failed to replay wal, msg:{msg:?}.\nBacktrace:\n{backtrace}"))] ReplayWalNoCause { msg: Option, backtrace: Backtrace, @@ -264,6 +218,7 @@ impl From for table_engine::engine::Error { | Error::TableNotExist { .. } | Error::OpenTablesOfShard { .. } | Error::ReplayWalNoCause { .. } + | Error::OperateClosedTable { .. } | Error::ReplayWalWithCause { .. } => Self::Unexpected { source: Box::new(err), }, diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 1faf254f08..492178b41c 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -28,14 +28,14 @@ use common_util::{ }; use log::{error, info}; use mem_collector::MemUsageCollector; -use snafu::{ResultExt, Snafu}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{engine::EngineRuntimes, table::FlushRequest}; use tokio::sync::oneshot::{self, error::RecvError}; use wal::manager::{WalLocation, WalManagerRef}; -use self::flush_compaction::{Flusher, TableFlushOptions}; use crate::{ compaction::{scheduler::CompactionSchedulerRef, TableCompactionRequest}, + instance::flush_compaction::{Flusher, TableFlushOptions}, manifest::ManifestRef, row_iter::IterOptions, space::{SpaceId, SpaceRef, SpacesRef}, @@ -66,6 +66,9 @@ pub enum Error { source: GenericError, }, + #[snafu(display("Try to operate a closed table, table:{table}.\nBacktrace:\n{backtrace}"))] + OperateClosedTable { table: String, backtrace: Backtrace }, + #[snafu(display("Failed to receive {} result, table:{}, err:{}", op, table, source))] RecvManualOpResult { op: String, @@ -195,7 +198,13 @@ impl Instance { }; let flusher = self.make_flusher(); - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec = + table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable { + table: &table_data.name, + })?; let flush_scheduler = serial_exec.flush_scheduler(); flusher .schedule_flush(flush_scheduler, table_data, flush_opts) diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index 5d494450e4..0144833ec6 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -12,7 +12,7 @@ use async_trait::async_trait; use common_types::{schema::IndexInWriterSchema, table::ShardId}; use common_util::error::BoxError; use log::{debug, error, info, trace}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table_engine::table::TableId; use tokio::sync::MutexGuard; use wal::{ @@ -25,13 +25,13 @@ use wal::{ use crate::{ instance::{ self, - engine::{Error, ReplayWalWithCause, Result}, + engine::{Error, OperateClosedTable, ReplayWalWithCause, Result}, flush_compaction::{Flusher, TableFlushOptions}, serial_executor::TableOpSerialExecutor, write::MemTableWriter, }, payload::{ReadPayload, WalDecoder}, - table::data::TableDataRef, + table::data::{SerialExecContext, TableDataRef}, }; /// Wal replayer supporting both table based and region based @@ -182,7 +182,10 @@ impl TableBasedReplay { .box_err() .context(ReplayWalWithCause { msg: None })?; - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); loop { // fetch entries to log_entry_buf @@ -264,14 +267,17 @@ impl RegionBasedReplay { let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); // Lock all related tables. - let mut serial_exec_ctxs = HashMap::with_capacity(table_datas.len()); + let mut replay_table_ctxs = HashMap::with_capacity(table_datas.len()); for table_data in table_datas { - let serial_exec = table_data.serial_exec.lock().await; - let serial_exec_ctx = SerialExecContext { + let serial_exec_ctx = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; + let replay_table_ctx = TableReplayContext { table_data: table_data.clone(), - serial_exec, + serial_exec_ctx, }; - serial_exec_ctxs.insert(table_data.id, serial_exec_ctx); + replay_table_ctxs.insert(table_data.id, replay_table_ctx); } // Split and replay logs. @@ -287,7 +293,7 @@ impl RegionBasedReplay { break; } - Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds) + Self::replay_single_batch(context, &log_entry_buf, &mut replay_table_ctxs, faileds) .await?; } @@ -297,7 +303,7 @@ impl RegionBasedReplay { async fn replay_single_batch( context: &ReplayContext, log_batch: &VecDeque>, - serial_exec_ctxs: &mut HashMap>, + serial_exec_ctxs: &mut HashMap>, faileds: &mut FailedTables, ) -> Result<()> { let mut table_batches = Vec::new(); @@ -317,7 +323,7 @@ impl RegionBasedReplay { let result = replay_table_log_entries( &context.flusher, context.max_retry_flush_limit, - &mut ctx.serial_exec, + &mut ctx.serial_exec_ctx, &ctx.table_data, log_batch.range(table_batch.range), ) @@ -391,9 +397,9 @@ struct TableBatch { range: Range, } -struct SerialExecContext<'a> { +struct TableReplayContext<'a> { table_data: TableDataRef, - serial_exec: MutexGuard<'a, TableOpSerialExecutor>, + serial_exec_ctx: MutexGuard<'a, SerialExecContext>, } /// Replay all log entries into memtable and flush if necessary diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index 219e86102c..6017686b9a 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -608,9 +608,9 @@ impl<'a> Writer<'a> { "Try to trigger flush of other table:{} from the write procedure of table:{}", table_data.name, self.table_data.name ); - match table_data.serial_exec.try_lock() { - Ok(mut serial_exec) => { - let flush_scheduler = serial_exec.flush_scheduler(); + match table_data.try_acquire_serial_exec_ctx() { + Some(mut serial_exec_ctx) => { + let flush_scheduler = serial_exec_ctx.flush_scheduler(); // Set `block_on_write_thread` to false and let flush do in background. flusher .schedule_flush(flush_scheduler, table_data, opts) @@ -619,7 +619,7 @@ impl<'a> Writer<'a> { table: &table_data.name, }) } - Err(_) => { + None => { warn!( "Failed to acquire write lock for flush table:{}", table_data.name, diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index 49d9d6cae8..6b21672fe1 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -7,6 +7,7 @@ use std::{ convert::TryInto, fmt, fmt::Formatter, + ops::{Deref, DerefMut}, sync::{ atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, Arc, Mutex, @@ -28,6 +29,7 @@ use log::{debug, info}; use object_store::Path; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table::TableId; +use tokio::sync::MutexGuard; use crate::{ instance::serial_executor::TableOpSerialExecutor, @@ -86,6 +88,32 @@ impl TableShardInfo { } } +/// The context for execution of serial operation on the table. +pub struct SerialExecContext { + closed: bool, + serial_exec: TableOpSerialExecutor, +} + +impl SerialExecContext { + pub fn mark_closed(&mut self) { + self.closed = true; + } +} + +impl Deref for SerialExecContext { + type Target = TableOpSerialExecutor; + + fn deref(&self) -> &Self::Target { + &self.serial_exec + } +} + +impl DerefMut for SerialExecContext { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.serial_exec + } +} + /// Data of a table pub struct TableData { /// Id of this table @@ -143,14 +171,13 @@ pub struct TableData { /// No write/alter is allowed if the table is dropped. dropped: AtomicBool, + serial_exec_ctx: tokio::sync::Mutex, + /// Metrics of this table pub metrics: Metrics, /// Shard info of the table pub shard_info: TableShardInfo, - - /// The table operation serial_exec - pub serial_exec: tokio::sync::Mutex, } impl fmt::Debug for TableData { @@ -217,6 +244,10 @@ impl TableData { preflush_write_buffer_size_ratio, )); + let serial_exec_ctx = tokio::sync::Mutex::new(SerialExecContext { + closed: false, + serial_exec: TableOpSerialExecutor::new(table_id), + }); Ok(Self { id: table_id, name: table_name, @@ -235,7 +266,7 @@ impl TableData { dropped: AtomicBool::new(false), metrics, shard_info: TableShardInfo::new(shard_id), - serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(table_id)), + serial_exec_ctx, }) } @@ -249,35 +280,17 @@ impl TableData { preflush_write_buffer_size_ratio: f32, mem_usage_collector: CollectorRef, ) -> Result { - let memtable_factory = Arc::new(SkiplistMemTableFactory); - let purge_queue = purger.create_purge_queue(add_meta.space_id, add_meta.table_id); - let current_version = TableVersion::new(purge_queue); - let metrics = Metrics::default(); - let mutable_limit = AtomicU32::new(compute_mutable_limit( - add_meta.opts.write_buffer_size, + Self::new( + add_meta.space_id, + add_meta.table_id, + add_meta.table_name, + add_meta.schema, + shard_id, + add_meta.opts, + purger, preflush_write_buffer_size_ratio, - )); - - Ok(Self { - id: add_meta.table_id, - name: add_meta.table_name, - schema: Mutex::new(add_meta.schema), - space_id: add_meta.space_id, - mutable_limit, - mutable_limit_write_buffer_ratio: preflush_write_buffer_size_ratio, - opts: ArcSwap::new(Arc::new(add_meta.opts)), - memtable_factory, mem_usage_collector, - current_version, - last_sequence: AtomicU64::new(0), - last_memtable_id: AtomicU64::new(0), - last_file_id: AtomicU64::new(0), - last_flush_time_ms: AtomicU64::new(0), - dropped: AtomicBool::new(false), - metrics, - shard_info: TableShardInfo::new(shard_id), - serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(add_meta.table_id)), - }) + ) } /// Get current schema of the table. @@ -352,6 +365,34 @@ impl TableData { self.dropped.store(true, Ordering::SeqCst); } + /// Acquire the [`SerialExecContext`] if the table is not closed. + pub async fn acquire_serial_exec_ctx(&self) -> Option> { + let v = self.serial_exec_ctx.lock().await; + if v.closed { + None + } else { + Some(v) + } + } + + /// Try to acquire the [SerialExecContext]. + /// + /// [None] will be returned if the serial_exec_ctx has been acquired + /// already, or the table is closed. + pub fn try_acquire_serial_exec_ctx(&self) -> Option> { + let v = self.serial_exec_ctx.try_lock(); + match v { + Ok(ctx) => { + if ctx.closed { + None + } else { + Some(ctx) + } + } + Err(_) => None, + } + } + /// Returns total memtable memory usage in bytes. #[inline] pub fn memtable_memory_usage(&self) -> usize { diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index 82b2b54b9c..780b9cafcc 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -21,9 +21,9 @@ use table_engine::{ stream::{PartitionedStreams, SendableRecordBatchStream}, table::{ AlterOptions, AlterSchema, AlterSchemaRequest, Compact, Flush, FlushRequest, Get, - GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, ReadOptions, ReadOrder, - ReadRequest, Result, Scan, Table, TableId, TableStats, TooManyPendingWrites, - WaitForPendingWrites, Write, WriteRequest, + GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, OperateClosedTable, + ReadOptions, ReadOrder, ReadRequest, Result, Scan, Table, TableId, TableStats, + TooManyPendingWrites, WaitForPendingWrites, Write, WriteRequest, }, ANALYTIC_ENGINE_TYPE, }; @@ -255,7 +255,7 @@ impl TableImpl { // This is the first request in the queue, and we should // take responsibilities for merging and writing the // requests in the queue. - let serial_exec = self.table_data.serial_exec.lock().await; + let serial_exec_ctx = self.table_data.acquire_serial_exec_ctx().await; // The `serial_exec` is acquired, let's merge the pending requests and write // them all. let pending_writes = { @@ -266,9 +266,22 @@ impl TableImpl { !pending_writes.is_empty(), "The pending writes should contain at least the one just pushed." ); - let merged_write_request = - merge_pending_write_requests(pending_writes.writes, pending_writes.num_rows); - (merged_write_request, serial_exec, pending_writes.notifiers) + match serial_exec_ctx { + Some(v) => { + let merged_write_request = merge_pending_write_requests( + pending_writes.writes, + pending_writes.num_rows, + ); + (merged_write_request, v, pending_writes.notifiers) + } + None => { + // The table has been closed, notify all the waiters in + // the queue. + let write_err = OperateClosedTable.fail(); + self.notify_waiters(pending_writes.notifiers, &write_err); + return write_err; + } + } } QueueResult::Waiter(rx) => { // The request is successfully pushed into the queue, and just wait for the @@ -303,12 +316,18 @@ impl TableImpl { .box_err() .context(Write { table: self.name() }); - // There is no waiter for pending writes, return the write result. - if notifiers.is_empty() { - return write_res; - } - // Notify the waiters for the pending writes. + self.notify_waiters(notifiers, &write_res); + + write_res.map(|_| num_rows) + } + + #[inline] + fn should_queue_write_request(&self, request: &WriteRequest) -> bool { + request.row_group.num_rows() < self.instance.max_rows_in_write_queue + } + + fn notify_waiters(&self, notifiers: Vec>>, write_res: &Result) { match write_res { Ok(_) => { for notifier in notifiers { @@ -319,7 +338,6 @@ impl TableImpl { ); } } - Ok(num_rows) } Err(e) => { let err_msg = format!("Failed to do merge write, err:{e}"); @@ -332,15 +350,9 @@ impl TableImpl { ); } } - Err(e) } } } - - #[inline] - fn should_queue_write_request(&self, request: &WriteRequest) -> bool { - request.row_group.num_rows() < self.instance.max_rows_in_write_queue - } } #[async_trait] @@ -384,11 +396,15 @@ impl Table for TableImpl { return self.write_with_pending_queue(request).await; } - let mut serial_exec = self.table_data.serial_exec.lock().await; + let mut serial_exec_ctx = self + .table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let mut writer = Writer::new( self.instance.clone(), self.space_table.clone(), - &mut serial_exec, + &mut serial_exec_ctx, ); writer .write(request) @@ -501,10 +517,14 @@ impl Table for TableImpl { } async fn alter_schema(&self, request: AlterSchemaRequest) -> Result { - let mut serial_exec = self.table_data.serial_exec.lock().await; + let mut serial_exec_ctx = self + .table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let mut alterer = Alterer::new( self.table_data.clone(), - &mut serial_exec, + &mut serial_exec_ctx, self.instance.clone(), ) .await; @@ -518,10 +538,14 @@ impl Table for TableImpl { } async fn alter_options(&self, options: HashMap) -> Result { - let mut serial_exec = self.table_data.serial_exec.lock().await; + let mut serial_exec_ctx = self + .table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let alterer = Alterer::new( self.table_data.clone(), - &mut serial_exec, + &mut serial_exec_ctx, self.instance.clone(), ) .await; diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 821bdb6195..aea9f64cf5 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -135,6 +135,9 @@ pub enum Error { source: GenericError, }, + #[snafu(display("Try to operate a closed table.\nBacktrace:\n{backtrace}"))] + OperateClosedTable { backtrace: Backtrace }, + #[snafu(display( "Failed to wait for pending writes, table:{table}.\nBacktrace:\n{backtrace}" ))] From fd9ee4fd1610d3abca80173216e3ed2dcdcaf84a Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Fri, 16 Jun 2023 18:25:57 +0800 Subject: [PATCH 2/2] chore: address CR --- analytic_engine/src/instance/close.rs | 6 +++--- analytic_engine/src/table/data.rs | 16 ++++++++++------ 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/analytic_engine/src/instance/close.rs b/analytic_engine/src/instance/close.rs index a77256d16c..d933b01214 100644 --- a/analytic_engine/src/instance/close.rs +++ b/analytic_engine/src/instance/close.rs @@ -70,10 +70,10 @@ impl Closer { let removed_table = self.space.remove_table(&request.table_name); assert!(removed_table.is_some()); - serial_exec_ctx.mark_closed(); + serial_exec_ctx.invalidate(); info!( - "table:{}-{} has been removed from the space_id:{}", - table_data.name, table_data.id, self.space.id + "table:{} has been removed from the space_id:{}, table_id:{}", + table_data.name, self.space.id, table_data.id, ); Ok(()) } diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index 6b21672fe1..01ba153fd8 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -90,13 +90,17 @@ impl TableShardInfo { /// The context for execution of serial operation on the table. pub struct SerialExecContext { - closed: bool, + /// Denotes whether `serial_exec` is valid. + /// + /// The `serial_exec` will be invalidated if the table is closed. + invalid: bool, serial_exec: TableOpSerialExecutor, } impl SerialExecContext { - pub fn mark_closed(&mut self) { - self.closed = true; + #[inline] + pub fn invalidate(&mut self) { + self.invalid = true; } } @@ -245,7 +249,7 @@ impl TableData { )); let serial_exec_ctx = tokio::sync::Mutex::new(SerialExecContext { - closed: false, + invalid: false, serial_exec: TableOpSerialExecutor::new(table_id), }); Ok(Self { @@ -368,7 +372,7 @@ impl TableData { /// Acquire the [`SerialExecContext`] if the table is not closed. pub async fn acquire_serial_exec_ctx(&self) -> Option> { let v = self.serial_exec_ctx.lock().await; - if v.closed { + if v.invalid { None } else { Some(v) @@ -383,7 +387,7 @@ impl TableData { let v = self.serial_exec_ctx.try_lock(); match v { Ok(ctx) => { - if ctx.closed { + if ctx.invalid { None } else { Some(ctx)