From 277b1c5911053a29574c16034a205d72ba4f579f Mon Sep 17 00:00:00 2001 From: draco Date: Fri, 6 Sep 2024 21:22:55 +0800 Subject: [PATCH 01/18] feat: Implement delete operation for local disk WAL --- src/wal/src/local_storage_impl/segment.rs | 107 ++++++++++++++++------ src/wal/tests/read_write.rs | 1 - 2 files changed, 80 insertions(+), 28 deletions(-) diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 80b3718228..19636786d4 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -16,6 +16,7 @@ // under the License. use std::{ + cmp::{max, min}, collections::{HashMap, VecDeque}, fmt::Debug, fs::{self, File, OpenOptions}, @@ -158,6 +159,10 @@ pub struct Segment { /// The maximum sequence number of records within this segment. max_seq: SequenceNumber, + /// A hashmap storing both min and max sequence numbers of records within + /// this segment for each `TableId`. + seq_ranges: HashMap, + /// The encoding format used for records within this segment. record_encoding: RecordEncoding, @@ -191,6 +196,7 @@ impl Segment { segment_size, min_seq: MAX_SEQUENCE_NUMBER, max_seq: MIN_SEQUENCE_NUMBER, + seq_ranges: HashMap::new(), record_encoding: RecordEncoding::newest(), mmap: None, record_position: Vec::new(), @@ -249,10 +255,9 @@ impl Segment { let mut pos = VERSION_SIZE + header_len; let mut record_position = Vec::new(); - // Update min and max sequence number - let mut min_seq = MAX_SEQUENCE_NUMBER; - let mut max_seq = MIN_SEQUENCE_NUMBER; + self.seq_ranges.clear(); + // Scan all records in the segment while pos < file_size as usize { let data = &mmap[pos..]; @@ -262,8 +267,20 @@ impl Segment { start: pos, end: pos + record.len(), }); - min_seq = min_seq.min(record.sequence_num); - max_seq = max_seq.max(record.sequence_num); + + // Update min and max sequence number + self.min_seq = min(self.min_seq, record.sequence_num); + self.max_seq = max(self.max_seq, record.sequence_num); + + // Update seq_ranges + self.seq_ranges + .entry(record.table_id) + .and_modify(|seq_range| { + seq_range.0 = min(seq_range.0, record.sequence_num); + seq_range.1 = max(seq_range.1, record.sequence_num); + }) + .or_insert((record.sequence_num, record.sequence_num)); + pos += record.len(); } Err(_) => { @@ -279,8 +296,6 @@ impl Segment { self.current_size = pos; self.write_count = 0; self.last_flushed_position = pos; - self.min_seq = min_seq; - self.max_seq = max_seq; Ok(()) } @@ -360,6 +375,7 @@ impl Segment { &mut self, data: &[u8], positions: &mut Vec, + table_id: TableId, prev_sequence_num: SequenceNumber, next_sequence_num: SequenceNumber, ) -> Result<()> { @@ -370,8 +386,17 @@ impl Segment { self.record_position.append(positions); // Update min and max sequence number - self.min_seq = self.min_seq.min(prev_sequence_num); - self.max_seq = self.max_seq.max(next_sequence_num - 1); + self.min_seq = min(self.min_seq, prev_sequence_num); + self.max_seq = max(self.max_seq, next_sequence_num - 1); + + // Update sequence range + self.seq_ranges + .entry(table_id) + .and_modify(|seq_range| { + seq_range.0 = min(seq_range.0, prev_sequence_num); + seq_range.1 = max(seq_range.1, next_sequence_num); + }) + .or_insert((prev_sequence_num, next_sequence_num - 1)); Ok(()) } @@ -437,14 +462,6 @@ impl SegmentManager { Ok(()) } - pub fn mark_delete_entries_up_to( - &self, - _location: WalLocation, - _sequence_num: SequenceNumber, - ) -> Result<()> { - todo!() - } - pub fn close_all(&self) -> Result<()> { let mut cache = self.cache.lock().unwrap(); cache.clear(); @@ -485,6 +502,8 @@ pub struct Region { /// The latest segment for appending logs current_segment: Mutex>>, + + last_mark_deleted: Mutex>, } impl Region { @@ -569,6 +588,7 @@ impl Region { next_sequence_num: AtomicU64::new(next_sequence_num), runtime, current_segment: Mutex::new(latest_segment), + last_mark_deleted: Mutex::new(HashMap::new()), }) } @@ -646,6 +666,7 @@ impl Region { guard.append_records( &data, &mut record_position, + table_id, prev_sequence_num, next_sequence_num - 1, )?; @@ -655,7 +676,16 @@ impl Region { pub fn read(&self, ctx: &ReadContext, req: &ReadRequest) -> Result { // Check read range's validity. let start = if let Some(start) = req.start.as_start_sequence_number() { - start + if let Some(last_mark_deleted) = self + .last_mark_deleted + .lock() + .unwrap() + .get(&req.location.table_id) + { + max(start, *last_mark_deleted + 1) + } else { + start + } } else { MAX_SEQUENCE_NUMBER }; @@ -675,6 +705,7 @@ impl Region { Some(req.location.table_id), start, end, + None, )?; Ok(BatchLogIteratorAdapter::new_with_sync( @@ -692,6 +723,7 @@ impl Region { None, MIN_SEQUENCE_NUMBER, MAX_SEQUENCE_NUMBER, + Some(self.last_mark_deleted.lock().unwrap().clone()), )?; Ok(BatchLogIteratorAdapter::new_with_sync( Box::new(iter), @@ -705,8 +737,9 @@ impl Region { location: WalLocation, sequence_num: SequenceNumber, ) -> Result<()> { - self.segment_manager - .mark_delete_entries_up_to(location, sequence_num) + let mut guard = self.last_mark_deleted.lock().unwrap(); + guard.insert(location.table_id, sequence_num); + Ok(()) } pub fn close(&self) -> Result<()> { @@ -864,6 +897,10 @@ struct SegmentLogIterator { /// Optional identifier for the table, which is used to filter logs. table_id: Option, + /// Optional reference to a hashmap of last mark deleted sequence number, + /// which is used to filter logs. + last_mark_deleted: Option>, + /// Starting sequence number for log iteration. start: SequenceNumber, @@ -884,8 +921,8 @@ impl SegmentLogIterator { segment: Arc>, segment_manager: Arc, table_id: Option, - start: SequenceNumber, - end: SequenceNumber, + last_mark_deleted: Option>, + range: (SequenceNumber, SequenceNumber), ) -> Result { // Open the segment if it is not open let mut segment = segment.lock().unwrap(); @@ -905,8 +942,9 @@ impl SegmentLogIterator { segment_content, record_positions, table_id, - start, - end, + last_mark_deleted, + start: range.0, + end: range.1, current_record_idx: 0, no_more_data: false, }) @@ -952,6 +990,15 @@ impl SegmentLogIterator { } } + // Filter by last_mark_deleted + if let Some(last_mark_deleted) = &self.last_mark_deleted { + if let Some(&last_deleted_seq) = last_mark_deleted.get(&record.table_id) { + if record.sequence_num <= last_deleted_seq { + continue; + } + } + } + // Decode the value let value = self .log_encoding @@ -992,6 +1039,10 @@ pub struct MultiSegmentLogIterator { /// Optional identifier for the table, which is used to filter logs. table_id: Option, + /// Optional hashmap of last mark deleted sequence number, which is used to + /// filter logs. + last_mark_deleted: Option>, + /// Starting sequence number for log iteration. start: SequenceNumber, @@ -1010,6 +1061,7 @@ impl MultiSegmentLogIterator { table_id: Option, start: SequenceNumber, end: SequenceNumber, + last_mark_deleted: Option>, ) -> Result { // Find all segments that contain the requested sequence numbers let mut relevant_segments = Vec::new(); @@ -1036,6 +1088,7 @@ impl MultiSegmentLogIterator { log_encoding, record_encoding, table_id, + last_mark_deleted, start, end, current_payload: Vec::new(), @@ -1055,14 +1108,14 @@ impl MultiSegmentLogIterator { let segment = self.segments[self.current_segment_idx]; let segment = self.segment_manager.get_segment(segment)?; - let iterator = SegmentLogIterator::new( + let iterator: SegmentLogIterator = SegmentLogIterator::new( self.log_encoding.clone(), self.record_encoding.clone(), segment, self.segment_manager.clone(), self.table_id, - self.start, - self.end, + self.last_mark_deleted.clone(), + (self.start, self.end) )?; self.current_iterator = Some(iterator); diff --git a/src/wal/tests/read_write.rs b/src/wal/tests/read_write.rs index cccde53cfc..d2180c4bd1 100644 --- a/src/wal/tests/read_write.rs +++ b/src/wal/tests/read_write.rs @@ -72,7 +72,6 @@ fn test_kafka_wal() { } #[test] -#[ignore = "this test cannot pass completely, since delete is not supported yet"] fn test_local_storage_wal() { let builder = LocalStorageWalBuilder; test_all(builder, false); From 89918e1edc3cdebf1bfdc9078d5a2ca7cde4f7cf Mon Sep 17 00:00:00 2001 From: draco Date: Sat, 7 Sep 2024 18:01:14 +0800 Subject: [PATCH 02/18] Refactor and add a testcase --- src/wal/src/local_storage_impl/segment.rs | 301 ++++++++++++++++------ 1 file changed, 225 insertions(+), 76 deletions(-) diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 19636786d4..1139b8baef 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -19,7 +19,7 @@ use std::{ cmp::{max, min}, collections::{HashMap, VecDeque}, fmt::Debug, - fs::{self, File, OpenOptions}, + fs::{self, remove_file, File, OpenOptions}, io::{self, Write}, path::Path, sync::{ @@ -161,7 +161,7 @@ pub struct Segment { /// A hashmap storing both min and max sequence numbers of records within /// this segment for each `TableId`. - seq_ranges: HashMap, + table_ranges: HashMap, /// The encoding format used for records within this segment. record_encoding: RecordEncoding, @@ -192,16 +192,16 @@ impl Segment { version: NEWEST_WAL_SEGMENT_VERSION, path: path.clone(), id: segment_id, - current_size: SEGMENT_HEADER.len(), + current_size: VERSION_SIZE + SEGMENT_HEADER.len(), segment_size, min_seq: MAX_SEQUENCE_NUMBER, max_seq: MIN_SEQUENCE_NUMBER, - seq_ranges: HashMap::new(), + table_ranges: HashMap::new(), record_encoding: RecordEncoding::newest(), mmap: None, record_position: Vec::new(), write_count: 0, - last_flushed_position: SEGMENT_HEADER.len(), + last_flushed_position: VERSION_SIZE + SEGMENT_HEADER.len(), }; if !Path::new(&path).exists() { @@ -215,7 +215,7 @@ impl Segment { } // Open the segment file to update min and max sequence number and file size - segment.open()?; + segment.open(true)?; // Close the segment file. If the segment is to be used for read or write, it // will be opened again @@ -224,7 +224,7 @@ impl Segment { Ok(segment) } - pub fn open(&mut self) -> Result<()> { + pub fn open(&mut self, update_meta: bool) -> Result<()> { // Open the segment file let file = OpenOptions::new() .read(true) @@ -251,11 +251,16 @@ impl Segment { let header = &mmap[VERSION_SIZE..VERSION_SIZE + header_len]; ensure!(header == SEGMENT_HEADER, InvalidHeader); + if !update_meta { + self.mmap = Some(mmap); + return Ok(()); + } + // Read and validate all records let mut pos = VERSION_SIZE + header_len; let mut record_position = Vec::new(); - self.seq_ranges.clear(); + self.table_ranges.clear(); // Scan all records in the segment while pos < file_size as usize { @@ -272,8 +277,8 @@ impl Segment { self.min_seq = min(self.min_seq, record.sequence_num); self.max_seq = max(self.max_seq, record.sequence_num); - // Update seq_ranges - self.seq_ranges + // Update table_ranges + self.table_ranges .entry(record.table_id) .and_modify(|seq_range| { seq_range.0 = min(seq_range.0, record.sequence_num); @@ -390,16 +395,39 @@ impl Segment { self.max_seq = max(self.max_seq, next_sequence_num - 1); // Update sequence range - self.seq_ranges + self.table_ranges .entry(table_id) .and_modify(|seq_range| { seq_range.0 = min(seq_range.0, prev_sequence_num); - seq_range.1 = max(seq_range.1, next_sequence_num); + seq_range.1 = max(seq_range.1, next_sequence_num - 1); }) .or_insert((prev_sequence_num, next_sequence_num - 1)); Ok(()) } + + fn contains_table(&self, table_id: TableId) -> bool { + self.table_ranges.contains_key(&table_id) + } + + fn mark_deleted(&mut self, table_id: TableId, sequence_num: SequenceNumber) { + if let Some(range) = self.table_ranges.get_mut(&table_id) { + range.0 = max(range.0, sequence_num + 1); + if range.0 > range.1 { + self.table_ranges.remove(&table_id); + } + } + } + + fn is_empty(&self) -> bool { + self.table_ranges.is_empty() + } + + fn delete(&mut self) -> Result<()> { + self.close()?; + remove_file(&self.path).context(SegmentAppend)?; + Ok(()) + } } #[derive(Debug)] @@ -412,6 +440,9 @@ pub struct SegmentManager { /// Maximum size of the cache cache_size: usize, + + /// The latest segment for appending logs + current_segment: Mutex>>, } impl SegmentManager { @@ -445,7 +476,7 @@ impl SegmentManager { } // If not in cache, load from disk - segment.open()?; + segment.open(false)?; // Add to cache if cache.len() == self.cache_size { @@ -471,6 +502,81 @@ impl SegmentManager { } Ok(()) } + + pub fn mark_delete_entries_up_to( + &self, + location: WalLocation, + sequence_num: SequenceNumber, + ) -> Result<()> { + let mut segments_to_remove: Vec = Vec::new(); + let mut all_segments = self.all_segments.lock().unwrap(); + let current_segment_id = self.current_segment.lock().unwrap().lock().unwrap().id; + + for (_, segment) in all_segments.iter() { + let mut segment = segment.lock().unwrap(); + + // Skip segments that are not relevant + if segment.min_seq > sequence_num || !segment.contains_table(location.table_id) { + continue; + } + + segment.mark_deleted(location.table_id, sequence_num); + + // Delete this segment if it is empty + if segment.is_empty() && segment.id != current_segment_id { + let mut cache = self.cache.lock().unwrap(); + + // Check if segment is already in cache + if let Some(index) = cache.iter().position(|id| *id == segment.id) { + cache.remove(index); + } + + segments_to_remove.push(segment.id); + segment.delete()?; + } + } + + for segment_id in segments_to_remove { + all_segments.remove(&segment_id); + } + + Ok(()) + } + + pub fn get_relevant_segments( + &self, + table_id: Option, + start: SequenceNumber, + end: SequenceNumber, + ) -> Result> { + // Find all segments that contain the requested sequence numbers + let mut relevant_segments = Vec::new(); + + let all_segments = self.all_segments.lock().unwrap(); + + for (_, segment) in all_segments.iter() { + let segment = segment.lock().unwrap(); + match table_id { + Some(table_id) => { + if let Some(range) = segment.table_ranges.get(&table_id) { + if range.0 <= end && range.1 >= start { + relevant_segments.push(segment.id); + } + } + } + None => { + if segment.min_seq <= end && segment.max_seq >= start { + relevant_segments.push(segment.id); + } + } + } + } + + // Sort by segment id + relevant_segments.sort_unstable(); + + Ok(relevant_segments) + } } #[derive(Debug)] @@ -499,11 +605,6 @@ pub struct Region { /// Sequence number for the next log next_sequence_num: AtomicU64, - - /// The latest segment for appending logs - current_segment: Mutex>>, - - last_mark_deleted: Mutex>, } impl Region { @@ -576,6 +677,7 @@ impl Region { all_segments: Mutex::new(all_segments), cache: Mutex::new(VecDeque::new()), cache_size, + current_segment: Mutex::new(latest_segment), }; Ok(Self { @@ -587,8 +689,6 @@ impl Region { region_dir, next_sequence_num: AtomicU64::new(next_sequence_num), runtime, - current_segment: Mutex::new(latest_segment), - last_mark_deleted: Mutex::new(HashMap::new()), }) } @@ -598,7 +698,7 @@ impl Region { // Perhaps we could avoid acquiring the lock here and instead allocate the // position that needs to be written in the segment, then fill it within // spawn_blocking. However, I’m not sure about the correctness of this approach. - let mut current_segment = self.current_segment.lock().unwrap(); + let mut current_segment = self.segment_manager.current_segment.lock().unwrap(); let entries_num = batch.len() as u64; let table_id = batch.location.table_id; @@ -668,7 +768,7 @@ impl Region { &mut record_position, table_id, prev_sequence_num, - next_sequence_num - 1, + next_sequence_num, )?; Ok(next_sequence_num - 1) } @@ -676,16 +776,7 @@ impl Region { pub fn read(&self, ctx: &ReadContext, req: &ReadRequest) -> Result { // Check read range's validity. let start = if let Some(start) = req.start.as_start_sequence_number() { - if let Some(last_mark_deleted) = self - .last_mark_deleted - .lock() - .unwrap() - .get(&req.location.table_id) - { - max(start, *last_mark_deleted + 1) - } else { - start - } + start } else { MAX_SEQUENCE_NUMBER }; @@ -705,7 +796,6 @@ impl Region { Some(req.location.table_id), start, end, - None, )?; Ok(BatchLogIteratorAdapter::new_with_sync( @@ -723,7 +813,6 @@ impl Region { None, MIN_SEQUENCE_NUMBER, MAX_SEQUENCE_NUMBER, - Some(self.last_mark_deleted.lock().unwrap().clone()), )?; Ok(BatchLogIteratorAdapter::new_with_sync( Box::new(iter), @@ -737,8 +826,8 @@ impl Region { location: WalLocation, sequence_num: SequenceNumber, ) -> Result<()> { - let mut guard = self.last_mark_deleted.lock().unwrap(); - guard.insert(location.table_id, sequence_num); + self.segment_manager + .mark_delete_entries_up_to(location, sequence_num)?; Ok(()) } @@ -897,9 +986,8 @@ struct SegmentLogIterator { /// Optional identifier for the table, which is used to filter logs. table_id: Option, - /// Optional reference to a hashmap of last mark deleted sequence number, - /// which is used to filter logs. - last_mark_deleted: Option>, + /// A hashmap of start and end sequence number for each table. + table_ranges: HashMap, /// Starting sequence number for log iteration. start: SequenceNumber, @@ -921,8 +1009,8 @@ impl SegmentLogIterator { segment: Arc>, segment_manager: Arc, table_id: Option, - last_mark_deleted: Option>, - range: (SequenceNumber, SequenceNumber), + start: SequenceNumber, + end: SequenceNumber, ) -> Result { // Open the segment if it is not open let mut segment = segment.lock().unwrap(); @@ -936,15 +1024,18 @@ impl SegmentLogIterator { // Get record positions let record_positions = segment.record_position.clone(); + // Get sequence number ranges + let table_ranges = segment.table_ranges.clone(); + Ok(Self { log_encoding, record_encoding, segment_content, record_positions, table_id, - last_mark_deleted, - start: range.0, - end: range.1, + table_ranges, + start, + end, current_record_idx: 0, no_more_data: false, }) @@ -990,13 +1081,13 @@ impl SegmentLogIterator { } } - // Filter by last_mark_deleted - if let Some(last_mark_deleted) = &self.last_mark_deleted { - if let Some(&last_deleted_seq) = last_mark_deleted.get(&record.table_id) { - if record.sequence_num <= last_deleted_seq { - continue; - } + // Filter by sequence range + if let Some((start, end)) = &self.table_ranges.get(&record.table_id) { + if record.sequence_num < *start || record.sequence_num > *end { + continue; } + } else { + continue; } // Decode the value @@ -1039,10 +1130,6 @@ pub struct MultiSegmentLogIterator { /// Optional identifier for the table, which is used to filter logs. table_id: Option, - /// Optional hashmap of last mark deleted sequence number, which is used to - /// filter logs. - last_mark_deleted: Option>, - /// Starting sequence number for log iteration. start: SequenceNumber, @@ -1061,24 +1148,8 @@ impl MultiSegmentLogIterator { table_id: Option, start: SequenceNumber, end: SequenceNumber, - last_mark_deleted: Option>, ) -> Result { - // Find all segments that contain the requested sequence numbers - let mut relevant_segments = Vec::new(); - - { - let all_segments = segment_manager.all_segments.lock().unwrap(); - - for (_, segment) in all_segments.iter() { - let segment = segment.lock().unwrap(); - if segment.min_seq <= end && segment.max_seq >= start { - relevant_segments.push(segment.id); - } - } - } - - // Sort by segment id - relevant_segments.sort_unstable(); + let relevant_segments = segment_manager.get_relevant_segments(table_id, start, end)?; let mut iter = Self { segment_manager, @@ -1088,7 +1159,6 @@ impl MultiSegmentLogIterator { log_encoding, record_encoding, table_id, - last_mark_deleted, start, end, current_payload: Vec::new(), @@ -1114,8 +1184,8 @@ impl MultiSegmentLogIterator { segment, self.segment_manager.clone(), self.table_id, - self.last_mark_deleted.clone(), - (self.start, self.end) + self.start, + self.end, )?; self.current_iterator = Some(iterator); @@ -1204,7 +1274,7 @@ mod tests { let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap(); segment - .open() + .open(false) .expect("Expected to open segment successfully"); } @@ -1218,7 +1288,7 @@ mod tests { .unwrap() .to_string(); let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap(); - segment.open().unwrap(); + segment.open(false).unwrap(); let data = b"test_data"; let append_result = segment.append(data); @@ -1229,6 +1299,26 @@ mod tests { assert_eq!(read_result.unwrap(), data); } + #[test] + fn test_segment_delete() { + let dir = tempdir().unwrap(); + + let path = dir + .path() + .join("segment_0.wal") + .to_str() + .unwrap() + .to_string(); + + let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap(); + + segment.open(false).unwrap(); + assert!(Path::new(&path).exists()); + + segment.delete().unwrap(); + assert!(!Path::new(&path).exists()); + } + #[test] fn test_region_create_and_close() { let dir = tempdir().unwrap(); @@ -1353,4 +1443,63 @@ mod tests { let runtime = Arc::new(Builder::default().build().unwrap()); runtime.block_on(test_multi_segment_write_and_read_inner(runtime.clone())); } + + #[test] + fn test_region_mark_delete_entries_up_to() { + const SEGMENT_SIZE: usize = 4096; + + let dir = tempdir().unwrap(); + let runtime = Arc::new(Builder::default().build().unwrap()); + + // Create a new region + let region = Region::new( + 1, + 2, + SEGMENT_SIZE, + dir.path().to_str().unwrap().to_string(), + runtime.clone(), + ) + .unwrap(); + let region = Arc::new(region); + + // Write some log entries + let location = WalLocation::new(1, 1); // region_id = 1, table_id = 1 + let mut sequence = MIN_SEQUENCE_NUMBER + 1; + + for _i in 0..10 { + let log_entries = 0..100; + let log_batch_encoder = LogBatchEncoder::create(location); + let log_batch = log_batch_encoder + .encode_batch(log_entries.clone().map(|v| MemoryPayload { val: v })) + .expect("should succeed to encode payloads"); + + let write_ctx = WriteContext::default(); + region.write(&write_ctx, &log_batch).unwrap(); + sequence += 100; + } + + // Expect more than one segment + { + let all_segments = region.segment_manager.all_segments.lock().unwrap(); + assert!( + all_segments.len() > 1, + "Expected multiple segments, but got {}", + all_segments.len() + ); + } + + // Mark delete entries up to sequence - 1, so only the last segment should + // remain + let mark_delete_sequence = sequence - 1; + region + .mark_delete_entries_up_to(location, mark_delete_sequence) + .unwrap(); + + { + let all_segments = region.segment_manager.all_segments.lock().unwrap(); + assert_eq!(all_segments.len(), 1); + } + + region.close().unwrap(); + } } From f75d9a8970dfec3b38decedd7d7cd9266b35d954 Mon Sep 17 00:00:00 2001 From: draco Date: Sat, 7 Sep 2024 19:55:46 +0800 Subject: [PATCH 03/18] Minor fix --- src/wal/src/local_storage_impl/config.rs | 1 + src/wal/src/local_storage_impl/segment.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/src/wal/src/local_storage_impl/config.rs b/src/wal/src/local_storage_impl/config.rs index 8d018896e1..d8d1f3e2a5 100644 --- a/src/wal/src/local_storage_impl/config.rs +++ b/src/wal/src/local_storage_impl/config.rs @@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] pub struct LocalStorageConfig { pub path: String, pub max_segment_size: usize, diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 1139b8baef..73f05bc3e1 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -864,6 +864,8 @@ impl RegionManager { segment_size: usize, runtime: Arc, ) -> Result { + fs::create_dir_all(&root_dir).context(DirOpen)?; + let mut regions = HashMap::new(); // Naming conversion: / From feda6e90189c8abc73379a92347deff1e06b4855 Mon Sep 17 00:00:00 2001 From: draco Date: Sat, 7 Sep 2024 20:07:08 +0800 Subject: [PATCH 04/18] Minor fix mmap flush --- src/wal/src/local_storage_impl/segment.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 73f05bc3e1..0e8e0a7855 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -307,8 +307,11 @@ impl Segment { pub fn close(&mut self) -> Result<()> { if let Some(ref mut mmap) = self.mmap { // Flush before closing - mmap.flush_range(self.last_flushed_position, self.current_size) - .context(Flush)?; + mmap.flush_range( + self.last_flushed_position, + self.current_size - self.last_flushed_position, + ) + .context(Flush)?; // Reset the write count self.write_count = 0; // Update the last flushed position @@ -344,8 +347,11 @@ impl Segment { // Only flush if the write_count reaches FLUSH_INTERVAL if self.write_count >= FLUSH_INTERVAL { - mmap.flush_range(self.last_flushed_position, self.current_size + data.len()) - .context(Flush)?; + mmap.flush_range( + self.last_flushed_position, + self.current_size + data.len() - self.last_flushed_position, + ) + .context(Flush)?; // Reset the write count self.write_count = 0; // Update the last flushed position @@ -865,7 +871,7 @@ impl RegionManager { runtime: Arc, ) -> Result { fs::create_dir_all(&root_dir).context(DirOpen)?; - + let mut regions = HashMap::new(); // Naming conversion: / From 9c0f5eba6da5ef30cf29311ff89f008b4ea1b1b4 Mon Sep 17 00:00:00 2001 From: draco Date: Sun, 8 Sep 2024 14:33:16 +0800 Subject: [PATCH 05/18] Minor fix --- Cargo.lock | 1 + src/wal/Cargo.toml | 1 + src/wal/src/local_storage_impl/config.rs | 8 +++--- src/wal/src/local_storage_impl/segment.rs | 20 ++++++++++---- src/wal/src/local_storage_impl/wal_manager.rs | 27 ++++++++++++++----- src/wal/tests/read_write.rs | 2 +- 6 files changed, 42 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0a2d3a0f76..200574415f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8219,6 +8219,7 @@ checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" name = "wal" version = "2.0.0" dependencies = [ + "anyhow", "async-trait", "bytes_ext", "chrono", diff --git a/src/wal/Cargo.toml b/src/wal/Cargo.toml index 0d13ef36a4..30a5b00461 100644 --- a/src/wal/Cargo.toml +++ b/src/wal/Cargo.toml @@ -47,6 +47,7 @@ name = "read_write" required-features = ["wal-message-queue", "wal-table-kv", "wal-rocksdb", "wal-local-storage"] [dependencies] +anyhow = { workspace = true } async-trait = { workspace = true } bytes_ext = { workspace = true } chrono = { workspace = true } diff --git a/src/wal/src/local_storage_impl/config.rs b/src/wal/src/local_storage_impl/config.rs index d8d1f3e2a5..b3e70e9b5c 100644 --- a/src/wal/src/local_storage_impl/config.rs +++ b/src/wal/src/local_storage_impl/config.rs @@ -20,16 +20,16 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct LocalStorageConfig { - pub path: String, - pub max_segment_size: usize, + pub data_dir: String, + pub segment_size: usize, pub cache_size: usize, } impl Default for LocalStorageConfig { fn default() -> Self { Self { - path: "/tmp/horaedb".to_string(), - max_segment_size: 64 * 1024 * 1024, // 64MB + data_dir: "/tmp/horaedb".to_string(), + segment_size: 64 * 1024 * 1024, // 64MB cache_size: 3, } } diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 0e8e0a7855..cbc008656c 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -19,7 +19,7 @@ use std::{ cmp::{max, min}, collections::{HashMap, VecDeque}, fmt::Debug, - fs::{self, remove_file, File, OpenOptions}, + fs::{self, File, OpenOptions}, io::{self, Write}, path::Path, sync::{ @@ -116,6 +116,9 @@ pub enum Error { source: GenericError, backtrace: Backtrace, }, + + #[snafu(display("{}", source))] + Internal { source: anyhow::Error }, } define_result!(Error); @@ -296,6 +299,7 @@ impl Segment { } } + self.segment_size = file_size as usize; self.mmap = Some(mmap); self.record_position = record_position; self.current_size = pos; @@ -418,6 +422,11 @@ impl Segment { fn mark_deleted(&mut self, table_id: TableId, sequence_num: SequenceNumber) { if let Some(range) = self.table_ranges.get_mut(&table_id) { + // If sequence number is MAX, remove the range directly to prevent overflow + if sequence_num == MAX_SEQUENCE_NUMBER { + self.table_ranges.remove(&table_id); + return; + } range.0 = max(range.0, sequence_num + 1); if range.0 > range.1 { self.table_ranges.remove(&table_id); @@ -431,7 +440,9 @@ impl Segment { fn delete(&mut self) -> Result<()> { self.close()?; - remove_file(&self.path).context(SegmentAppend)?; + fs::remove_file(&self.path) + .map_err(anyhow::Error::new) + .context(Internal)?; Ok(()) } } @@ -833,8 +844,7 @@ impl Region { sequence_num: SequenceNumber, ) -> Result<()> { self.segment_manager - .mark_delete_entries_up_to(location, sequence_num)?; - Ok(()) + .mark_delete_entries_up_to(location, sequence_num) } pub fn close(&self) -> Result<()> { @@ -1186,7 +1196,7 @@ impl MultiSegmentLogIterator { let segment = self.segments[self.current_segment_idx]; let segment = self.segment_manager.get_segment(segment)?; - let iterator: SegmentLogIterator = SegmentLogIterator::new( + let iterator = SegmentLogIterator::new( self.log_encoding.clone(), self.record_encoding.clone(), segment, diff --git a/src/wal/src/local_storage_impl/wal_manager.rs b/src/wal/src/local_storage_impl/wal_manager.rs index 91c69fcd5a..e375750f5d 100644 --- a/src/wal/src/local_storage_impl/wal_manager.rs +++ b/src/wal/src/local_storage_impl/wal_manager.rs @@ -54,14 +54,14 @@ impl LocalStorageImpl { ) -> Result { let LocalStorageConfig { cache_size, - max_segment_size, + segment_size, .. } = config.clone(); let wal_path_str = wal_path.to_str().unwrap().to_string(); let region_manager = RegionManager::new( wal_path_str.clone(), cache_size, - max_segment_size, + segment_size, runtime.clone(), ) .box_err() @@ -104,6 +104,10 @@ impl WalManager for LocalStorageImpl { location: WalLocation, sequence_num: SequenceNumber, ) -> Result<()> { + debug!( + "Mark delete entries up to {} for location:{:?}", + sequence_num, location + ); self.region_manager .mark_delete_entries_up_to(location, sequence_num) .box_err() @@ -111,10 +115,7 @@ impl WalManager for LocalStorageImpl { } async fn close_region(&self, region_id: RegionId) -> Result<()> { - debug!( - "Close region for LocalStorage based WAL is noop operation, region_id:{}", - region_id - ); + debug!("Close region {} for LocalStorage based WAL", region_id); self.region_manager .close(region_id) .box_err() @@ -133,10 +134,18 @@ impl WalManager for LocalStorageImpl { ctx: &ReadContext, req: &ReadRequest, ) -> Result { + debug!( + "Read batch from LocalStorage based WAL, ctx:{:?}, req:{:?}", + ctx, req + ); self.region_manager.read(ctx, req).box_err().context(Read) } async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch) -> Result { + debug!( + "Write batch to LocalStorage based WAL, ctx:{:?}, batch:{:?}", + ctx, batch + ); self.region_manager .write(ctx, batch) .box_err() @@ -144,6 +153,10 @@ impl WalManager for LocalStorageImpl { } async fn scan(&self, ctx: &ScanContext, req: &ScanRequest) -> Result { + debug!( + "Scan from LocalStorage based WAL, ctx:{:?}, req:{:?}", + ctx, req + ); self.region_manager.scan(ctx, req).box_err().context(Read) } @@ -181,7 +194,7 @@ impl WalsOpener for LocalStorageWalsOpener { }; let write_runtime = runtimes.write_runtime.clone(); - let data_path = Path::new(&local_storage_wal_config.path); + let data_path = Path::new(&local_storage_wal_config.data_dir); let data_wal = if config.disable_data { Arc::new(crate::dummy::DoNothing) diff --git a/src/wal/tests/read_write.rs b/src/wal/tests/read_write.rs index d2180c4bd1..24c4c75f5b 100644 --- a/src/wal/tests/read_write.rs +++ b/src/wal/tests/read_write.rs @@ -996,7 +996,7 @@ impl WalBuilder for LocalStorageWalBuilder { async fn build(&self, data_path: &Path, runtime: Arc) -> Arc { let config = LocalStorageConfig { - path: data_path.to_str().unwrap().to_string(), + data_dir: data_path.to_str().unwrap().to_string(), ..LocalStorageConfig::default() }; Arc::new(LocalStorageImpl::new(data_path.to_path_buf(), config, runtime).unwrap()) From 0a31f8cf480d580c7dbd82679bc1970a429ba0e3 Mon Sep 17 00:00:00 2001 From: draco Date: Sun, 8 Sep 2024 15:02:10 +0800 Subject: [PATCH 06/18] Fix potential deadlock --- src/wal/src/local_storage_impl/segment.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index cbc008656c..92f2d9a3b4 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -416,10 +416,6 @@ impl Segment { Ok(()) } - fn contains_table(&self, table_id: TableId) -> bool { - self.table_ranges.contains_key(&table_id) - } - fn mark_deleted(&mut self, table_id: TableId, sequence_num: SequenceNumber) { if let Some(range) = self.table_ranges.get_mut(&table_id) { // If sequence number is MAX, remove the range directly to prevent overflow @@ -525,18 +521,13 @@ impl SegmentManager { location: WalLocation, sequence_num: SequenceNumber, ) -> Result<()> { + let current_segment_id = self.current_segment.lock().unwrap().lock().unwrap().id; let mut segments_to_remove: Vec = Vec::new(); let mut all_segments = self.all_segments.lock().unwrap(); - let current_segment_id = self.current_segment.lock().unwrap().lock().unwrap().id; for (_, segment) in all_segments.iter() { let mut segment = segment.lock().unwrap(); - // Skip segments that are not relevant - if segment.min_seq > sequence_num || !segment.contains_table(location.table_id) { - continue; - } - segment.mark_deleted(location.table_id, sequence_num); // Delete this segment if it is empty From f472115caca22dad84662b9a45fc787b4e996134 Mon Sep 17 00:00:00 2001 From: draco Date: Sun, 8 Sep 2024 15:04:18 +0800 Subject: [PATCH 07/18] Fix UT --- src/wal/src/local_storage_impl/segment.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 92f2d9a3b4..841709bcc3 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -1261,7 +1261,7 @@ mod tests { assert_eq!(segment.version, NEWEST_WAL_SEGMENT_VERSION); assert_eq!(segment.path, path); assert_eq!(segment.id, 0); - assert_eq!(segment.current_size, SEGMENT_HEADER.len()); + assert_eq!(segment.current_size, SEGMENT_HEADER.len() + VERSION_SIZE); let segment_content = fs::read(path).unwrap(); assert_eq!(segment_content[0], NEWEST_WAL_SEGMENT_VERSION); From 7c4a78d0676325f5227e3648bc22d832660845ff Mon Sep 17 00:00:00 2001 From: draco Date: Wed, 11 Sep 2024 14:51:13 +0800 Subject: [PATCH 08/18] Fix deadlock: the lock for SegmentManager::all_segments should be acquired before acquiring the lock for a segment. --- src/wal/src/local_storage_impl/segment.rs | 78 ++++++++++------------- 1 file changed, 33 insertions(+), 45 deletions(-) diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 841709bcc3..80216def98 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -57,8 +57,8 @@ pub enum Error { #[snafu(display("Failed to map file to memory: {}", source))] Mmap { source: io::Error }, - #[snafu(display("Segment full"))] - SegmentFull, + #[snafu(display("Segment {} is full, current size: {}, segment size: {}, try to append {}", id, current_size, segment_size, data_size))] + SegmentFull { id : u64, current_size: usize, segment_size: usize, data_size: usize }, #[snafu(display("Failed to append data to segment file: {}", source))] SegmentAppend { source: io::Error }, @@ -333,7 +333,7 @@ impl Segment { fn append(&mut self, data: &[u8]) -> Result<()> { ensure!( self.current_size + data.len() <= self.segment_size, - SegmentFull + SegmentFull { id: self.id, current_size: self.current_size, segment_size: self.segment_size, data_size: data.len() } ); // Ensure the segment file is open @@ -449,7 +449,7 @@ pub struct SegmentManager { all_segments: Mutex>>>, /// Cache for opened segments - cache: Mutex>, + cache: Mutex>)>>, /// Maximum size of the cache cache_size: usize, @@ -464,27 +464,13 @@ impl SegmentManager { all_segments.insert(id, segment); Ok(()) } - - /// Obtain the target segment - fn get_segment(&self, segment_id: u64) -> Result>> { - let all_segments = self.all_segments.lock().unwrap(); - - let segment = all_segments.get(&segment_id); - - let segment = match segment { - Some(segment) => segment, - None => return SegmentNotFound { id: segment_id }.fail(), - }; - - Ok(segment.clone()) - } - + /// Open segment if it is not in cache, need to acquire the lock outside - fn open_segment(&self, segment: &mut Segment) -> Result<()> { + fn open_segment(&self, segment: &mut Segment, segment_arc: Arc>) -> Result<()> { let mut cache = self.cache.lock().unwrap(); // Check if segment is already in cache - if cache.iter().any(|id| *id == segment.id) { + if cache.iter().any(|(id, _)| *id == segment.id) { return Ok(()); } @@ -494,14 +480,13 @@ impl SegmentManager { // Add to cache if cache.len() == self.cache_size { let evicted_segment_id = cache.pop_front(); - if let Some(evicted_segment_id) = evicted_segment_id { + if let Some((_, evicted_segment)) = evicted_segment_id { // The evicted segment should be closed first - let evicted_segment = self.get_segment(evicted_segment_id)?; let mut evicted_segment = evicted_segment.lock().unwrap(); evicted_segment.close()?; } } - cache.push_back(segment.id); + cache.push_back((segment.id, segment_arc)); Ok(()) } @@ -535,7 +520,7 @@ impl SegmentManager { let mut cache = self.cache.lock().unwrap(); // Check if segment is already in cache - if let Some(index) = cache.iter().position(|id| *id == segment.id) { + if let Some(index) = cache.iter().position(|(id, _)| *id == segment.id) { cache.remove(index); } @@ -556,34 +541,37 @@ impl SegmentManager { table_id: Option, start: SequenceNumber, end: SequenceNumber, - ) -> Result> { + ) -> Result>>> { // Find all segments that contain the requested sequence numbers let mut relevant_segments = Vec::new(); let all_segments = self.all_segments.lock().unwrap(); for (_, segment) in all_segments.iter() { - let segment = segment.lock().unwrap(); + let guard = segment.lock().unwrap(); match table_id { Some(table_id) => { - if let Some(range) = segment.table_ranges.get(&table_id) { + if let Some(range) = guard.table_ranges.get(&table_id) { if range.0 <= end && range.1 >= start { - relevant_segments.push(segment.id); + relevant_segments.push((guard.id, segment.clone())); } } } None => { - if segment.min_seq <= end && segment.max_seq >= start { - relevant_segments.push(segment.id); + if guard.min_seq <= end && guard.max_seq >= start { + relevant_segments.push((guard.id, segment.clone())); } } } } - // Sort by segment id - relevant_segments.sort_unstable(); + // 根据记录的 id 进行排序 + relevant_segments.sort_by_key(|(id, _)| *id); + + // 只保留排序后的 segment + let sorted_segments: Vec<_> = relevant_segments.into_iter().map(|(_, segment)| segment).collect(); - Ok(relevant_segments) + Ok(sorted_segments) } } @@ -764,7 +752,7 @@ impl Region { let mut guard = current_segment.lock().unwrap(); // Open the segment if not opened - self.segment_manager.open_segment(&mut guard)?; + self.segment_manager.open_segment(&mut guard, current_segment.clone())?; for pos in record_position.iter_mut() { pos.start += guard.current_size; pos.end += guard.current_size; @@ -1022,19 +1010,19 @@ impl SegmentLogIterator { end: SequenceNumber, ) -> Result { // Open the segment if it is not open - let mut segment = segment.lock().unwrap(); - if !segment.is_open() { - segment_manager.open_segment(&mut segment)?; + let mut guard = segment.lock().unwrap(); + if !guard.is_open() { + segment_manager.open_segment(&mut guard, segment.clone())?; } // Read the entire content of the segment - let segment_content = segment.read(0, segment.current_size)?; + let segment_content = guard.read(0, guard.current_size)?; // Get record positions - let record_positions = segment.record_position.clone(); + let record_positions = guard.record_position.clone(); // Get sequence number ranges - let table_ranges = segment.table_ranges.clone(); + let table_ranges = guard.table_ranges.clone(); Ok(Self { log_encoding, @@ -1122,7 +1110,7 @@ pub struct MultiSegmentLogIterator { segment_manager: Arc, /// All segments involved in this read operation. - segments: Vec, + segments: Vec>>, /// Current segment index. current_segment_idx: usize, @@ -1185,8 +1173,8 @@ impl MultiSegmentLogIterator { return Ok(false); } - let segment = self.segments[self.current_segment_idx]; - let segment = self.segment_manager.get_segment(segment)?; + let segment = self.segments[self.current_segment_idx].clone(); + // let segment = self.segment_manager.get_segment(segment)?; let iterator = SegmentLogIterator::new( self.log_encoding.clone(), self.record_encoding.clone(), @@ -1342,7 +1330,7 @@ mod tests { ) .unwrap(); - let _segment = region.segment_manager.get_segment(0).unwrap(); + // let _segment = region.segment_manager.get_segment(0).unwrap(); region.close().unwrap() } From 3ebfb77da160194b1a918e827f7b2391a097e36a Mon Sep 17 00:00:00 2001 From: draco Date: Wed, 11 Sep 2024 15:36:52 +0800 Subject: [PATCH 09/18] Fix comments --- src/wal/src/local_storage_impl/segment.rs | 69 ++++++++++++++++------- 1 file changed, 48 insertions(+), 21 deletions(-) diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 80216def98..e387eddba4 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -57,8 +57,19 @@ pub enum Error { #[snafu(display("Failed to map file to memory: {}", source))] Mmap { source: io::Error }, - #[snafu(display("Segment {} is full, current size: {}, segment size: {}, try to append {}", id, current_size, segment_size, data_size))] - SegmentFull { id : u64, current_size: usize, segment_size: usize, data_size: usize }, + #[snafu(display( + "Segment {} is full, current size: {}, segment size: {}, try to append: {}", + id, + current_size, + segment_size, + data_size + ))] + SegmentFull { + id: u64, + current_size: usize, + segment_size: usize, + data_size: usize, + }, #[snafu(display("Failed to append data to segment file: {}", source))] SegmentAppend { source: io::Error }, @@ -333,7 +344,12 @@ impl Segment { fn append(&mut self, data: &[u8]) -> Result<()> { ensure!( self.current_size + data.len() <= self.segment_size, - SegmentFull { id: self.id, current_size: self.current_size, segment_size: self.segment_size, data_size: data.len() } + SegmentFull { + id: self.id, + current_size: self.current_size, + segment_size: self.segment_size, + data_size: data.len() + } ); // Ensure the segment file is open @@ -464,7 +480,7 @@ impl SegmentManager { all_segments.insert(id, segment); Ok(()) } - + /// Open segment if it is not in cache, need to acquire the lock outside fn open_segment(&self, segment: &mut Segment, segment_arc: Arc>) -> Result<()> { let mut cache = self.cache.lock().unwrap(); @@ -479,8 +495,8 @@ impl SegmentManager { // Add to cache if cache.len() == self.cache_size { - let evicted_segment_id = cache.pop_front(); - if let Some((_, evicted_segment)) = evicted_segment_id { + let evicted_segment = cache.pop_front(); + if let Some((_, evicted_segment)) = evicted_segment { // The evicted segment should be closed first let mut evicted_segment = evicted_segment.lock().unwrap(); evicted_segment.close()?; @@ -507,30 +523,37 @@ impl SegmentManager { sequence_num: SequenceNumber, ) -> Result<()> { let current_segment_id = self.current_segment.lock().unwrap().lock().unwrap().id; - let mut segments_to_remove: Vec = Vec::new(); + let mut segments_to_remove = Vec::new(); let mut all_segments = self.all_segments.lock().unwrap(); for (_, segment) in all_segments.iter() { - let mut segment = segment.lock().unwrap(); + let mut guard = segment.lock().unwrap(); - segment.mark_deleted(location.table_id, sequence_num); + guard.mark_deleted(location.table_id, sequence_num); // Delete this segment if it is empty - if segment.is_empty() && segment.id != current_segment_id { + if guard.is_empty() && guard.id != current_segment_id { let mut cache = self.cache.lock().unwrap(); // Check if segment is already in cache - if let Some(index) = cache.iter().position(|(id, _)| *id == segment.id) { + if let Some(index) = cache.iter().position(|(id, _)| *id == guard.id) { cache.remove(index); } - segments_to_remove.push(segment.id); - segment.delete()?; + segments_to_remove.push((guard.id, segment.clone())); } } - for segment_id in segments_to_remove { - all_segments.remove(&segment_id); + // Delete all segments in memory + for (segment_id, _) in segments_to_remove.iter() { + all_segments.remove(segment_id); + } + + drop(all_segments); + + // Delete all segments on disk + for (_, segment) in segments_to_remove.iter() { + segment.lock().unwrap().delete()?; } Ok(()) @@ -547,7 +570,7 @@ impl SegmentManager { let all_segments = self.all_segments.lock().unwrap(); - for (_, segment) in all_segments.iter() { + for segment in all_segments.values() { let guard = segment.lock().unwrap(); match table_id { Some(table_id) => { @@ -565,13 +588,16 @@ impl SegmentManager { } } - // 根据记录的 id 进行排序 + // Sort by id relevant_segments.sort_by_key(|(id, _)| *id); - // 只保留排序后的 segment - let sorted_segments: Vec<_> = relevant_segments.into_iter().map(|(_, segment)| segment).collect(); + // id is not needed, so remove it + let relevant_segments = relevant_segments + .into_iter() + .map(|(_, segment)| segment) + .collect(); - Ok(sorted_segments) + Ok(relevant_segments) } } @@ -752,7 +778,8 @@ impl Region { let mut guard = current_segment.lock().unwrap(); // Open the segment if not opened - self.segment_manager.open_segment(&mut guard, current_segment.clone())?; + self.segment_manager + .open_segment(&mut guard, current_segment.clone())?; for pos in record_position.iter_mut() { pos.start += guard.current_size; pos.end += guard.current_size; From 2e247493f58c1e4e49cdba4b1726bb24571a6aeb Mon Sep 17 00:00:00 2001 From: draco Date: Wed, 11 Sep 2024 16:26:46 +0800 Subject: [PATCH 10/18] Remove sequence number check --- src/analytic_engine/src/instance/write.rs | 15 --------------- src/wal/src/local_storage_impl/segment.rs | 8 +++----- 2 files changed, 3 insertions(+), 20 deletions(-) diff --git a/src/analytic_engine/src/instance/write.rs b/src/analytic_engine/src/instance/write.rs index 95f2efce4e..8a007d5c98 100644 --- a/src/analytic_engine/src/instance/write.rs +++ b/src/analytic_engine/src/instance/write.rs @@ -531,21 +531,6 @@ impl<'a> Writer<'a> { e })?; - // When wal is disabled, there is no need to do this check. - if !self.instance.disable_wal { - // NOTE: Currently write wal will only increment seq by one, - // this may change in future. - let last_seq = table_data.last_sequence(); - if sequence != last_seq + 1 { - warn!( - "Sequence must be consecutive, table:{}, table_id:{}, last_sequence:{}, wal_sequence:{}", - table_data.name,table_data.id, - table_data.last_sequence(), - sequence - ); - } - } - debug!( "Instance write finished, update sequence, table:{}, table_id:{} last_sequence:{}", table_data.name, table_data.id, sequence diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index e387eddba4..583e9ff7c1 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -544,14 +544,14 @@ impl SegmentManager { } } - // Delete all segments in memory + // Delete segments in all_segments for (segment_id, _) in segments_to_remove.iter() { all_segments.remove(segment_id); } drop(all_segments); - // Delete all segments on disk + // Delete segments on disk for (_, segment) in segments_to_remove.iter() { segment.lock().unwrap().delete()?; } @@ -588,7 +588,7 @@ impl SegmentManager { } } - // Sort by id + // Sort by segment id relevant_segments.sort_by_key(|(id, _)| *id); // id is not needed, so remove it @@ -1357,8 +1357,6 @@ mod tests { ) .unwrap(); - // let _segment = region.segment_manager.get_segment(0).unwrap(); - region.close().unwrap() } From b695fb173218ecb5d7272ad8794e36f8f2352e67 Mon Sep 17 00:00:00 2001 From: draco Date: Wed, 11 Sep 2024 16:27:54 +0800 Subject: [PATCH 11/18] Remove redundant comment --- src/wal/src/local_storage_impl/segment.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 583e9ff7c1..5e95f0d5b9 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -1201,7 +1201,6 @@ impl MultiSegmentLogIterator { } let segment = self.segments[self.current_segment_idx].clone(); - // let segment = self.segment_manager.get_segment(segment)?; let iterator = SegmentLogIterator::new( self.log_encoding.clone(), self.record_encoding.clone(), From d24d4f987b2a78406d198c62a2b4e21f21cee8aa Mon Sep 17 00:00:00 2001 From: draco Date: Thu, 12 Sep 2024 15:18:24 +0800 Subject: [PATCH 12/18] Minor fix --- src/analytic_engine/src/compaction/scheduler.rs | 6 ++++-- src/wal/src/local_storage_impl/segment.rs | 6 ++++-- src/wal/src/local_storage_impl/wal_manager.rs | 5 +---- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/analytic_engine/src/compaction/scheduler.rs b/src/analytic_engine/src/compaction/scheduler.rs index 215bd0ac94..4e7422daf2 100644 --- a/src/analytic_engine/src/compaction/scheduler.rs +++ b/src/analytic_engine/src/compaction/scheduler.rs @@ -87,8 +87,10 @@ impl Default for SchedulerConfig { // 30 seconds schedule interval. schedule_interval: ReadableDuration(Duration::from_secs(30)), max_ongoing_tasks: 8, - // flush_interval default is 5h. - max_unflushed_duration: ReadableDuration(Duration::from_secs(60 * 60 * 5)), + // flush_interval default is 60s. + // The previous 5h duration for the local storage WAL is too long, which results in many + // segment files not being deleted in a timely manner. + max_unflushed_duration: ReadableDuration(Duration::from_secs(60)), memory_limit: ReadableSize::gb(4), max_pending_compaction_tasks: 1024, } diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 5e95f0d5b9..211bede74a 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -522,9 +522,10 @@ impl SegmentManager { location: WalLocation, sequence_num: SequenceNumber, ) -> Result<()> { - let current_segment_id = self.current_segment.lock().unwrap().lock().unwrap().id; - let mut segments_to_remove = Vec::new(); + let current_segment = self.current_segment.lock().unwrap(); + let current_segment_id = current_segment.lock().unwrap().id; let mut all_segments = self.all_segments.lock().unwrap(); + let mut segments_to_remove = Vec::new(); for (_, segment) in all_segments.iter() { let mut guard = segment.lock().unwrap(); @@ -550,6 +551,7 @@ impl SegmentManager { } drop(all_segments); + drop(current_segment); // Delete segments on disk for (_, segment) in segments_to_remove.iter() { diff --git a/src/wal/src/local_storage_impl/wal_manager.rs b/src/wal/src/local_storage_impl/wal_manager.rs index e375750f5d..694831eae1 100644 --- a/src/wal/src/local_storage_impl/wal_manager.rs +++ b/src/wal/src/local_storage_impl/wal_manager.rs @@ -142,10 +142,7 @@ impl WalManager for LocalStorageImpl { } async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch) -> Result { - debug!( - "Write batch to LocalStorage based WAL, ctx:{:?}, batch:{:?}", - ctx, batch - ); + debug!("Write batch to LocalStorage based WAL, ctx:{:?}", ctx); self.region_manager .write(ctx, batch) .box_err() From 8e10f403971aceb225b0707e5dedc6568e2d2de5 Mon Sep 17 00:00:00 2001 From: draco Date: Thu, 12 Sep 2024 16:33:39 +0800 Subject: [PATCH 13/18] Revert some changes --- src/analytic_engine/src/compaction/scheduler.rs | 6 ++---- src/wal/src/local_storage_impl/segment.rs | 4 +--- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/analytic_engine/src/compaction/scheduler.rs b/src/analytic_engine/src/compaction/scheduler.rs index 4e7422daf2..215bd0ac94 100644 --- a/src/analytic_engine/src/compaction/scheduler.rs +++ b/src/analytic_engine/src/compaction/scheduler.rs @@ -87,10 +87,8 @@ impl Default for SchedulerConfig { // 30 seconds schedule interval. schedule_interval: ReadableDuration(Duration::from_secs(30)), max_ongoing_tasks: 8, - // flush_interval default is 60s. - // The previous 5h duration for the local storage WAL is too long, which results in many - // segment files not being deleted in a timely manner. - max_unflushed_duration: ReadableDuration(Duration::from_secs(60)), + // flush_interval default is 5h. + max_unflushed_duration: ReadableDuration(Duration::from_secs(60 * 60 * 5)), memory_limit: ReadableSize::gb(4), max_pending_compaction_tasks: 1024, } diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 211bede74a..4ef1dab1e0 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -522,8 +522,7 @@ impl SegmentManager { location: WalLocation, sequence_num: SequenceNumber, ) -> Result<()> { - let current_segment = self.current_segment.lock().unwrap(); - let current_segment_id = current_segment.lock().unwrap().id; + let current_segment_id = self.current_segment.lock().unwrap().lock().unwrap().id; let mut all_segments = self.all_segments.lock().unwrap(); let mut segments_to_remove = Vec::new(); @@ -551,7 +550,6 @@ impl SegmentManager { } drop(all_segments); - drop(current_segment); // Delete segments on disk for (_, segment) in segments_to_remove.iter() { From 29e7ecdccb1ad392b2f345d1e37fc06b46052c51 Mon Sep 17 00:00:00 2001 From: draco Date: Fri, 13 Sep 2024 01:21:37 +0800 Subject: [PATCH 14/18] Split Segment::open into two methods --- src/wal/src/local_storage_impl/segment.rs | 98 +++++++++++++---------- 1 file changed, 56 insertions(+), 42 deletions(-) diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 4ef1dab1e0..ef105d911d 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -228,8 +228,11 @@ impl Segment { return Ok(segment); } - // Open the segment file to update min and max sequence number and file size - segment.open(true)?; + // Open the segment file + segment.open()?; + + // Restore meta info + segment.restore_meta()?; // Close the segment file. If the segment is to be used for read or write, it // will be opened again @@ -238,46 +241,21 @@ impl Segment { Ok(segment) } - pub fn open(&mut self, update_meta: bool) -> Result<()> { - // Open the segment file - let file = OpenOptions::new() - .read(true) - .write(true) - .open(&self.path) - .context(FileOpen)?; - - let metadata = file.metadata().context(FileOpen)?; - let file_size = metadata.len(); - - // Map the file to memory - let mmap = unsafe { MmapOptions::new().map_mut(&file).context(Mmap)? }; - - // Validate segment version - let version = mmap[0]; - ensure!(version == self.version, InvalidHeader); - - // Validate segment header - let header_len = SEGMENT_HEADER.len(); - ensure!( - file_size >= (VERSION_SIZE + header_len) as u64, - InvalidHeader - ); - let header = &mmap[VERSION_SIZE..VERSION_SIZE + header_len]; - ensure!(header == SEGMENT_HEADER, InvalidHeader); - - if !update_meta { - self.mmap = Some(mmap); - return Ok(()); - } + fn restore_meta(&mut self) -> Result<()> { + // Ensure the segment file is open + let Some(mmap) = &mut self.mmap else { + return SegmentNotOpen { id: self.id }.fail(); + }; + let file_size = mmap.len(); // Read and validate all records - let mut pos = VERSION_SIZE + header_len; + let mut pos = VERSION_SIZE + SEGMENT_HEADER.len(); let mut record_position = Vec::new(); self.table_ranges.clear(); // Scan all records in the segment - while pos < file_size as usize { + while pos < file_size { let data = &mmap[pos..]; match self.record_encoding.decode(data).box_err() { @@ -287,7 +265,7 @@ impl Segment { end: pos + record.len(), }); - // Update min and max sequence number + // Update max sequence number self.min_seq = min(self.min_seq, record.sequence_num); self.max_seq = max(self.max_seq, record.sequence_num); @@ -310,8 +288,7 @@ impl Segment { } } - self.segment_size = file_size as usize; - self.mmap = Some(mmap); + self.segment_size = file_size; self.record_position = record_position; self.current_size = pos; self.write_count = 0; @@ -319,6 +296,38 @@ impl Segment { Ok(()) } + pub fn open(&mut self) -> Result<()> { + // Open the segment file + let file = OpenOptions::new() + .read(true) + .write(true) + .open(&self.path) + .context(FileOpen)?; + + let metadata = file.metadata().context(FileOpen)?; + let file_size = metadata.len(); + + // Map the file to memory + let mmap = unsafe { MmapOptions::new().map_mut(&file).context(Mmap)? }; + + // Validate segment version + let version = mmap[0]; + ensure!(version == self.version, InvalidHeader); + + // Validate segment header + let header_len = SEGMENT_HEADER.len(); + ensure!( + file_size >= (VERSION_SIZE + header_len) as u64, + InvalidHeader + ); + let header = &mmap[VERSION_SIZE..VERSION_SIZE + header_len]; + ensure!(header == SEGMENT_HEADER, InvalidHeader); + + self.mmap = Some(mmap); + + Ok(()) + } + pub fn close(&mut self) -> Result<()> { if let Some(ref mut mmap) = self.mmap { // Flush before closing @@ -491,7 +500,7 @@ impl SegmentManager { } // If not in cache, load from disk - segment.open(false)?; + segment.open()?; // Add to cache if cache.len() == self.cache_size { @@ -1297,8 +1306,12 @@ mod tests { let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap(); segment - .open(false) + .open() .expect("Expected to open segment successfully"); + + segment + .restore_meta() + .expect("Expected to restore meta successfully"); } #[test] @@ -1311,7 +1324,8 @@ mod tests { .unwrap() .to_string(); let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap(); - segment.open(false).unwrap(); + segment.open().unwrap(); + segment.restore_meta().unwrap(); let data = b"test_data"; let append_result = segment.append(data); @@ -1335,7 +1349,7 @@ mod tests { let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap(); - segment.open(false).unwrap(); + segment.open().unwrap(); assert!(Path::new(&path).exists()); segment.delete().unwrap(); From 18f9f8a1a3b7558267ae41e144714592ed34e7ca Mon Sep 17 00:00:00 2001 From: draco Date: Fri, 13 Sep 2024 01:50:13 +0800 Subject: [PATCH 15/18] Fix by comments --- src/wal/src/local_storage_impl/segment.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index ef105d911d..534c22e69d 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -540,8 +540,9 @@ impl SegmentManager { guard.mark_deleted(location.table_id, sequence_num); - // Delete this segment if it is empty - if guard.is_empty() && guard.id != current_segment_id { + // Delete this segment if it is empty and its id is less than the current + // segment + if guard.is_empty() && guard.id < current_segment_id { let mut cache = self.cache.lock().unwrap(); // Check if segment is already in cache @@ -1509,19 +1510,21 @@ mod tests { .expect("should succeed to encode payloads"); let write_ctx = WriteContext::default(); - region.write(&write_ctx, &log_batch).unwrap(); + let actual_sequence = region.write(&write_ctx, &log_batch).unwrap(); + assert_eq!(actual_sequence, sequence + 100 - 1); sequence += 100; } - // Expect more than one segment - { + let latest_segment_id = { let all_segments = region.segment_manager.all_segments.lock().unwrap(); + // Expect more than one segment assert!( all_segments.len() > 1, "Expected multiple segments, but got {}", all_segments.len() ); - } + all_segments.keys().max().unwrap().to_owned() + }; // Mark delete entries up to sequence - 1, so only the last segment should // remain @@ -1533,8 +1536,13 @@ mod tests { { let all_segments = region.segment_manager.all_segments.lock().unwrap(); assert_eq!(all_segments.len(), 1); + assert!(all_segments.contains_key(&latest_segment_id)); } + // The num of segment in the dir should be 1 + let segment_count = fs::read_dir(dir.path()).unwrap().count(); + assert_eq!(segment_count, 1); + region.close().unwrap(); } } From 42ac5b6fb2a17d5ca50f07229f63432d813fa680 Mon Sep 17 00:00:00 2001 From: draco Date: Fri, 13 Sep 2024 17:51:44 +0800 Subject: [PATCH 16/18] refactor open_segment --- src/wal/src/local_storage_impl/segment.rs | 83 ++++++++++------------- 1 file changed, 37 insertions(+), 46 deletions(-) diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 534c22e69d..dfa00f7b42 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -297,6 +297,8 @@ impl Segment { } pub fn open(&mut self) -> Result<()> { + assert!(self.mmap.is_none()); + // Open the segment file let file = OpenOptions::new() .read(true) @@ -345,10 +347,6 @@ impl Segment { Ok(()) } - pub fn is_open(&self) -> bool { - self.mmap.is_some() - } - /// Append a slice to the segment file. fn append(&mut self, data: &[u8]) -> Result<()> { ensure!( @@ -491,18 +489,18 @@ impl SegmentManager { } /// Open segment if it is not in cache, need to acquire the lock outside - fn open_segment(&self, segment: &mut Segment, segment_arc: Arc>) -> Result<()> { + /// otherwise the segment may get closed again. + fn open_segment(&self, guard: &mut Segment, segment: Arc>) -> Result<()> { let mut cache = self.cache.lock().unwrap(); - // Check if segment is already in cache - if cache.iter().any(|(id, _)| *id == segment.id) { + let already_opened = cache.iter().any(|(id, _)| *id == guard.id); + if already_opened { return Ok(()); } - // If not in cache, load from disk - segment.open()?; + guard.open()?; - // Add to cache + // Try evicting the oldest segment if the cache is full if cache.len() == self.cache_size { let evicted_segment = cache.pop_front(); if let Some((_, evicted_segment)) = evicted_segment { @@ -511,14 +509,15 @@ impl SegmentManager { evicted_segment.close()?; } } - cache.push_back((segment.id, segment_arc)); - + cache.push_back((guard.id, segment.clone())); Ok(()) } pub fn close_all(&self) -> Result<()> { - let mut cache = self.cache.lock().unwrap(); - cache.clear(); + { + let mut cache = self.cache.lock().unwrap(); + cache.clear(); + } let all_segments = self.all_segments.lock().unwrap(); for segment in all_segments.values() { segment.lock().unwrap().close()?; @@ -724,6 +723,19 @@ impl Region { }) } + fn create_new_segment(&self, id: u64) -> Result>> { + // Create a new segment + let new_segment = Segment::new( + format!("{}/segment_{}.wal", self.region_dir, id), + id, + self.segment_size, + )?; + let new_segment = Arc::new(Mutex::new(new_segment)); + self.segment_manager.add_segment(id, new_segment.clone())?; + + Ok(new_segment) + } + pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) -> Result { // In the WAL based on local storage, we need to ensure the sequence number in // segment is monotonically increasing. So we need to acquire a lock here. @@ -760,36 +772,23 @@ impl Region { next_sequence_num += 1; } - let guard = current_segment.lock().unwrap(); - // Check if the current segment has enough space for the new data // If not, create a new segment and update current_segment - if guard.current_size + data.len() > guard.segment_size { - let new_segment_id = guard.id + 1; - // We need to drop guard to allow the update of current_segment - drop(guard); - - // Create a new segment - let new_segment = Segment::new( - format!("{}/segment_{}.wal", self.region_dir, new_segment_id), - new_segment_id, - self.segment_size, - )?; - let new_segment = Arc::new(Mutex::new(new_segment)); - self.segment_manager - .add_segment(new_segment_id, new_segment.clone())?; + { + let guard = current_segment.lock().unwrap(); + if guard.current_size + data.len() > guard.segment_size { + let new_segment_id = guard.id + 1; + drop(guard); - // Update current segment - *current_segment = new_segment; - } else { - drop(guard); + *current_segment = self.create_new_segment(new_segment_id)?; + } } - let mut guard = current_segment.lock().unwrap(); - // Open the segment if not opened + let mut guard = current_segment.lock().unwrap(); self.segment_manager .open_segment(&mut guard, current_segment.clone())?; + for pos in record_position.iter_mut() { pos.start += guard.current_size; pos.end += guard.current_size; @@ -1046,19 +1045,11 @@ impl SegmentLogIterator { start: SequenceNumber, end: SequenceNumber, ) -> Result { - // Open the segment if it is not open let mut guard = segment.lock().unwrap(); - if !guard.is_open() { - segment_manager.open_segment(&mut guard, segment.clone())?; - } - - // Read the entire content of the segment + // Open the segment if it is not open + segment_manager.open_segment(&mut guard, segment.clone())?; let segment_content = guard.read(0, guard.current_size)?; - - // Get record positions let record_positions = guard.record_position.clone(); - - // Get sequence number ranges let table_ranges = guard.table_ranges.clone(); Ok(Self { From 85cf569a5e6abd38819329abdb931ea99e78edc6 Mon Sep 17 00:00:00 2001 From: draco Date: Fri, 13 Sep 2024 19:17:46 +0800 Subject: [PATCH 17/18] Fix deadlock --- src/wal/src/local_storage_impl/segment.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index dfa00f7b42..93cb984b41 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -491,6 +491,10 @@ impl SegmentManager { /// Open segment if it is not in cache, need to acquire the lock outside /// otherwise the segment may get closed again. fn open_segment(&self, guard: &mut Segment, segment: Arc>) -> Result<()> { + if guard.mmap.is_some() { + return Ok(()); + } + let mut cache = self.cache.lock().unwrap(); let already_opened = cache.iter().any(|(id, _)| *id == guard.id); From 85814dc4ad95f50e9072b99c2d17a9d40479c7d4 Mon Sep 17 00:00:00 2001 From: draco Date: Fri, 13 Sep 2024 19:23:08 +0800 Subject: [PATCH 18/18] Fix style --- src/wal/src/local_storage_impl/segment.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 93cb984b41..02bd1d13e0 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -494,7 +494,7 @@ impl SegmentManager { if guard.mmap.is_some() { return Ok(()); } - + let mut cache = self.cache.lock().unwrap(); let already_opened = cache.iter().any(|(id, _)| *id == guard.id);