Skip to content

Commit

Permalink
fix cr
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri committed Nov 22, 2024
1 parent 85e4c7b commit 61d5140
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions horaedb/metric_engine/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ impl Manifest {
merge_options,
)
.await?;
// Merge all delta files when startup
merger.do_merge().await?;

{
let merger = merger.clone();
Expand Down Expand Up @@ -142,7 +140,7 @@ impl Manifest {
.context("failed to encode manifest file")?;
let put_payload = PutPayload::from_bytes(Bytes::from(buf));

// 1. Persist the manifest file
// 1. Persist the delta manifest
self.store
.put(&new_sst_path, put_payload)
.await
Expand All @@ -154,6 +152,9 @@ impl Manifest {
payload.files.push(new_sst);
}

// 3. Update delta files num
self.merger.add_delta_num();

Ok(())
}

Expand Down Expand Up @@ -213,7 +214,7 @@ impl ManifestMerger {
merge_options: ManifestMergeOptions,
) -> Result<Arc<Self>> {
let (tx, rx) = mpsc::channel(merge_options.channel_size);
let manifest_merge = Self {
let merger = Self {
snapshot_path,
delta_dir,
store,
Expand All @@ -222,8 +223,10 @@ impl ManifestMerger {
deltas_num: AtomicUsize::new(0),
merge_options,
};
// Merge all delta files when startup
merger.do_merge().await?;

Ok(Arc::new(manifest_merge))
Ok(Arc::new(merger))
}

async fn run(&self) {
Expand All @@ -237,11 +240,12 @@ impl ManifestMerger {
error!("Failed to merge delta, err:{err}");
}
}

}
_merge_type = receiver.recv() => {
if let Err(err) = self.do_merge().await {
error!("Failed to merge delta, err:{err}");
if self.deltas_num.load(Ordering::Relaxed) > self.merge_options.min_merge_threshold {
if let Err(err) = self.do_merge().await {
error!("Failed to merge delta, err:{err}");
}
}
}
}
Expand Down Expand Up @@ -270,14 +274,18 @@ impl ManifestMerger {
Ok(())
}

fn add_delta_num(&self) {
self.deltas_num.fetch_add(1, Ordering::Relaxed);
}

async fn do_merge(&self) -> Result<()> {
let paths = self
.store
.list(Some(&self.delta_dir))
.map(|value| {
value
.map(|v| v.location)
.context("failed to get delta file")
.context("failed to get delta file path")
})
.try_collect::<Vec<_>>()
.await?;
Expand All @@ -294,7 +302,7 @@ impl ManifestMerger {
}
let mut delta_files = Vec::with_capacity(stream_read.len());
while let Some(res) = stream_read.next().await {
let res = res.context("failed to read delta file")??;
let res = res.context("Failed to join read delta task")??;
delta_files.push(res);
}

Expand Down

0 comments on commit 61d5140

Please sign in to comment.