Skip to content

Commit

Permalink
fix: add buffer flushing to filesystem writes (#1911)
Browse files Browse the repository at this point in the history
# Description
Current implementation of `ObjectOutputStream` does not invoke flush
when writing out files to Azure storage which seem to cause intermittent
issues when the `write_deltalake` hangs with no progress and no error.

I'm adding a periodic flush to the write process, based on the written
buffer size, which can be parameterized via `storage_options` parameter
(I could not find another way without changing the interface). I don't
know if this is an acceptable approach (also, it requires string values)

Setting the `"max_buffer_size": f"{100 * 1024}"` in `storage_options`
passed to `write_deltalake` helps me resolve the issue with writing a
dataset to Azure which was otherwise failing constantly.

Default max buffer size is set to 4MB which looks reasonable and used by
other implementations I've seen (e.g.
https://github.com/fsspec/filesystem_spec/blob/3c247f56d4a4b22fc9ffec9ad4882a76ee47237d/fsspec/spec.py#L1577)

# Related Issue(s)
Can help with resolving #1770

# Documentation
If the approach is accepted then I need to find the best way of adding
this to docs

---------

Signed-off-by: Nikolay Ulmasov <ulmasov@hotmail.com>
  • Loading branch information
r3stl355 authored Nov 29, 2023
1 parent e6ad2e0 commit 6628493
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use tokio::runtime::Runtime;
use crate::error::PythonError;
use crate::utils::{delete_dir, rt, walk_tree};

const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct FsConfig {
pub(crate) root_url: String,
Expand Down Expand Up @@ -279,12 +281,20 @@ impl DeltaFileSystemHandler {
#[allow(unused)] metadata: Option<HashMap<String, String>>,
) -> PyResult<ObjectOutputStream> {
let path = Self::parse_path(&path);
let max_buffer_size = self
.config
.options
.get("max_buffer_size")
.map_or(DEFAULT_MAX_BUFFER_SIZE, |v| {
v.parse::<i64>().unwrap_or(DEFAULT_MAX_BUFFER_SIZE)
});
let file = self
.rt
.block_on(ObjectOutputStream::try_new(
Arc::clone(&self.rt),
self.inner.clone(),
path,
max_buffer_size,
))
.map_err(PythonError::from)?;
Ok(file)
Expand Down Expand Up @@ -492,13 +502,16 @@ pub struct ObjectOutputStream {
closed: bool,
#[pyo3(get)]
mode: String,
max_buffer_size: i64,
buffer_size: i64,
}

impl ObjectOutputStream {
pub async fn try_new(
rt: Arc<Runtime>,
store: Arc<DynObjectStore>,
path: Path,
max_buffer_size: i64,
) -> Result<Self, ObjectStoreError> {
let (multipart_id, writer) = store.put_multipart(&path).await?;
Ok(Self {
Expand All @@ -510,6 +523,8 @@ impl ObjectOutputStream {
pos: 0,
closed: false,
mode: "wb".into(),
max_buffer_size,
buffer_size: 0,
})
}

Expand Down Expand Up @@ -582,15 +597,21 @@ impl ObjectOutputStream {
let len = data.as_bytes().len() as i64;
let py = data.py();
let data = data.as_bytes();
py.allow_threads(|| match self.rt.block_on(self.writer.write_all(data)) {
let res = py.allow_threads(|| match self.rt.block_on(self.writer.write_all(data)) {
Ok(_) => Ok(len),
Err(err) => {
self.rt
.block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
.map_err(PythonError::from)?;
Err(PyIOError::new_err(err.to_string()))
}
})
})?;
self.buffer_size += len;
if self.buffer_size >= self.max_buffer_size {
let _ = self.flush(py);
self.buffer_size = 0;
}
Ok(res)
}

fn flush(&mut self, py: Python<'_>) -> PyResult<()> {
Expand Down

0 comments on commit 6628493

Please sign in to comment.