Skip to content

Commit

Permalink
refactor file_sink: remove file_upload as optional
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeldjeffrey committed Apr 10, 2024
1 parent 22e824c commit e8638aa
Showing 1 changed file with 17 additions and 25 deletions.
42 changes: 17 additions & 25 deletions file_store/src/file_sink.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use crate::{
file_upload::{self, FileUpload},
Error, Result,
};
use crate::{file_upload::FileUpload, Error, Result};
use async_compression::tokio::write::GzipEncoder;
use bytes::Bytes;
use chrono::{DateTime, Duration, Utc};
Expand Down Expand Up @@ -137,7 +134,7 @@ impl FileSinkBuilder {
tmp_path: self.tmp_path,
prefix: self.prefix,
max_size: self.max_size,
file_upload: Some(self.file_upload),
file_upload: self.file_upload,
roll_time: self.roll_time,
messages: rx,
staged_files: Vec::new(),
Expand Down Expand Up @@ -254,8 +251,7 @@ pub struct FileSink {
roll_time: Duration,

messages: MessageReceiver,
// deposits: Option<file_upload::MessageSender>,
file_upload: Option<FileUpload>,
file_upload: FileUpload,
staged_files: Vec<PathBuf>,
auto_commit: bool,

Expand Down Expand Up @@ -297,21 +293,19 @@ impl FileSink {
fs::create_dir_all(&self.tmp_path).await?;

// Notify all existing completed sinks via file uploads
if let Some(file_uploads) = &self.file_upload {
let mut dir = fs::read_dir(&self.target_path).await?;
loop {
match dir.next_entry().await {
Ok(Some(entry))
if entry
.file_name()
.to_string_lossy()
.starts_with(&self.prefix) =>
{
file_upload::upload_file(&file_uploads.sender, &entry.path()).await?;
}
Ok(None) => break,
_ => continue,
let mut dir = fs::read_dir(&self.target_path).await?;
loop {
match dir.next_entry().await {
Ok(Some(entry))
if entry
.file_name()
.to_string_lossy()
.starts_with(&self.prefix) =>
{
self.file_upload.upload_file(&entry.path()).await?;
}
Ok(None) => break,
_ => continue,
}
}

Expand Down Expand Up @@ -478,9 +472,7 @@ impl FileSink {
let target_path = self.target_path.join(target_filename);

fs::rename(&sink_path, &target_path).await?;
if let Some(file_upload) = &self.file_upload {
file_upload.upload_file(&target_path).await?;
};
self.file_upload.upload_file(&target_path).await?;

Ok(())
}
Expand Down Expand Up @@ -535,7 +527,7 @@ pub fn file_name(path_buf: &Path) -> Result<String> {
#[cfg(test)]
mod tests {
use super::*;
use crate::{file_source, FileInfo, FileType};
use crate::{file_source, file_upload, FileInfo, FileType};
use futures::stream::StreamExt;
use std::str::FromStr;
use tempfile::TempDir;
Expand Down

0 comments on commit e8638aa

Please sign in to comment.