From 7ef3663eeed374bd195e9efade059f2b112b41f2 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Thu, 17 Nov 2022 19:59:34 +0100 Subject: [PATCH] feat: improve write perfromance of `DeltaFileSystemHandler` (#943) # Description This PR builds in top of the changes to handling the runtime in #933. In my local tests this fixed #915. Additionally, I added the runtime as a property on the fs handler to avoid re-creating it on every call. In some non-representative tests with a large number of very small partitions it cut the runtime in about half. cc @wjones127 # Related Issue(s) # Documentation --- python/src/filesystem.rs | 143 ++++++++++++++++++++++++--------------- python/src/lib.rs | 8 ++- python/src/utils.rs | 13 ++-- 3 files changed, 96 insertions(+), 68 deletions(-) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 370b078621..74793b4a4c 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::utils::{delete_dir, wait_for_future, walk_tree}; +use crate::utils::{delete_dir, rt, walk_tree}; use crate::PyDeltaTableError; use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreError, Path}; @@ -10,11 +10,13 @@ use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError}; use pyo3::prelude::*; use pyo3::types::{IntoPyDict, PyBytes}; use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::runtime::Runtime; #[pyclass(subclass)] #[derive(Debug, Clone)] pub struct DeltaFileSystemHandler { pub(crate) inner: Arc, + pub(crate) rt: Arc, } #[pymethods] @@ -26,7 +28,10 @@ impl DeltaFileSystemHandler { .with_storage_options(options.unwrap_or_default()) .build_storage() .map_err(PyDeltaTableError::from_raw)?; - Ok(Self { inner: storage }) + Ok(Self { + inner: storage, + rt: Arc::new(rt()?), + }) } fn get_type_name(&self) -> String { @@ -39,10 +44,11 @@ impl DeltaFileSystemHandler { Ok(format!("{}{}", path, suffix)) } - fn copy_file(&self, src: String, dest: String, py: Python) -> PyResult<()> { + fn copy_file(&self, src: String, dest: String) -> PyResult<()> { let from_path = Path::from(src); let to_path = Path::from(dest); - wait_for_future(py, self.inner.copy(&from_path, &to_path)) + self.rt + .block_on(self.inner.copy(&from_path, &to_path)) .map_err(PyDeltaTableError::from_object_store)?; Ok(()) } @@ -52,16 +58,18 @@ impl DeltaFileSystemHandler { Ok(()) } - fn delete_dir(&self, path: String, py: Python) -> PyResult<()> { + fn delete_dir(&self, path: String) -> PyResult<()> { let path = Path::from(path); - wait_for_future(py, delete_dir(self.inner.as_ref(), &path)) + self.rt + .block_on(delete_dir(self.inner.as_ref(), &path)) .map_err(PyDeltaTableError::from_object_store)?; Ok(()) } - fn delete_file(&self, path: String, py: Python) -> PyResult<()> { + fn delete_file(&self, path: String) -> PyResult<()> { let path = Path::from(path); - wait_for_future(py, self.inner.delete(&path)) + self.rt + .block_on(self.inner.delete(&path)) .map_err(PyDeltaTableError::from_object_store)?; Ok(()) } @@ -81,12 +89,14 @@ impl DeltaFileSystemHandler { let mut infos = Vec::new(); for file_path in paths { let path = Path::from(file_path); - let listed = wait_for_future(py, self.inner.list_with_delimiter(Some(&path))) + let listed = self + .rt + .block_on(self.inner.list_with_delimiter(Some(&path))) .map_err(PyDeltaTableError::from_object_store)?; // TODO is there a better way to figure out if we are in a directory? if listed.objects.is_empty() && listed.common_prefixes.is_empty() { - let maybe_meta = wait_for_future(py, self.inner.head(&path)); + let maybe_meta = self.rt.block_on(self.inner.head(&path)); match maybe_meta { Ok(meta) => { let kwargs = HashMap::from([ @@ -138,22 +148,24 @@ impl DeltaFileSystemHandler { }; let path = Path::from(base_dir); - let list_result = - match wait_for_future(py, walk_tree(self.inner.clone(), &path, recursive)) { - Ok(res) => Ok(res), - Err(ObjectStoreError::NotFound { path, source }) => { - if allow_not_found { - Ok(ListResult { - common_prefixes: vec![], - objects: vec![], - }) - } else { - Err(ObjectStoreError::NotFound { path, source }) - } + let list_result = match self + .rt + .block_on(walk_tree(self.inner.clone(), &path, recursive)) + { + Ok(res) => Ok(res), + Err(ObjectStoreError::NotFound { path, source }) => { + if allow_not_found { + Ok(ListResult { + common_prefixes: vec![], + objects: vec![], + }) + } else { + Err(ObjectStoreError::NotFound { path, source }) } - Err(err) => Err(err), } - .map_err(PyDeltaTableError::from_object_store)?; + Err(err) => Err(err), + } + .map_err(PyDeltaTableError::from_object_store)?; let mut infos = vec![]; infos.extend( @@ -190,18 +202,25 @@ impl DeltaFileSystemHandler { Ok(infos) } - fn move_file(&self, src: String, dest: String, py: Python) -> PyResult<()> { + fn move_file(&self, src: String, dest: String) -> PyResult<()> { let from_path = Path::from(src); let to_path = Path::from(dest); // TODO check the if not exists semantics - wait_for_future(py, self.inner.rename(&from_path, &to_path)) + self.rt + .block_on(self.inner.rename(&from_path, &to_path)) .map_err(PyDeltaTableError::from_object_store)?; Ok(()) } - fn open_input_file(&self, path: String, py: Python) -> PyResult { + fn open_input_file(&self, path: String) -> PyResult { let path = Path::from(path); - let file = wait_for_future(py, ObjectInputFile::try_new(self.inner.clone(), path)) + let file = self + .rt + .block_on(ObjectInputFile::try_new( + self.rt.clone(), + self.inner.clone(), + path, + )) .map_err(PyDeltaTableError::from_object_store)?; Ok(file) } @@ -211,10 +230,15 @@ impl DeltaFileSystemHandler { &self, path: String, #[allow(unused)] metadata: Option>, - py: Python, ) -> PyResult { let path = Path::from(path); - let file = wait_for_future(py, ObjectOutputStream::try_new(self.inner.clone(), path)) + let file = self + .rt + .block_on(ObjectOutputStream::try_new( + self.rt.clone(), + self.inner.clone(), + path, + )) .map_err(PyDeltaTableError::from_object_store)?; Ok(file) } @@ -226,6 +250,7 @@ impl DeltaFileSystemHandler { #[derive(Debug, Clone)] pub struct ObjectInputFile { store: Arc, + rt: Arc, path: Path, content_length: i64, #[pyo3(get)] @@ -236,7 +261,11 @@ pub struct ObjectInputFile { } impl ObjectInputFile { - pub async fn try_new(store: Arc, path: Path) -> Result { + pub async fn try_new( + rt: Arc, + store: Arc, + path: Path, + ) -> Result { // Issue a HEAD Object to get the content-length and ensure any // errors (e.g. file not found) don't wait until the first read() call. let meta = store.head(&path).await?; @@ -245,6 +274,7 @@ impl ObjectInputFile { // https://github.com/apache/arrow/blob/f184255cbb9bf911ea2a04910f711e1a924b12b8/cpp/src/arrow/filesystem/s3fs.cc#L1083 Ok(Self { store, + rt, path, content_length, closed: false, @@ -338,7 +368,7 @@ impl ObjectInputFile { } #[args(nbytes = "None")] - fn read<'py>(&mut self, nbytes: Option, py: Python<'py>) -> PyResult<&'py PyBytes> { + fn read(&mut self, nbytes: Option) -> PyResult> { self.check_closed()?; let range = match nbytes { Some(len) => { @@ -356,13 +386,14 @@ impl ObjectInputFile { let nbytes = (range.end - range.start) as i64; self.pos += nbytes; let obj = if nbytes > 0 { - wait_for_future(py, self.store.get_range(&self.path, range)) + self.rt + .block_on(self.store.get_range(&self.path, range)) .map_err(PyDeltaTableError::from_object_store)? .to_vec() } else { Vec::new() }; - Ok(PyBytes::new(py, &obj)) + Python::with_gil(|py| Ok(PyBytes::new(py, &obj).into_py(py))) } fn fileno(&self) -> PyResult<()> { @@ -389,6 +420,7 @@ impl ObjectInputFile { #[pyclass(weakref)] pub struct ObjectOutputStream { store: Arc, + rt: Arc, path: Path, writer: Box, multipart_id: MultipartId, @@ -400,10 +432,15 @@ pub struct ObjectOutputStream { } impl ObjectOutputStream { - pub async fn try_new(store: Arc, path: Path) -> Result { + pub async fn try_new( + rt: Arc, + store: Arc, + path: Path, + ) -> Result { let (multipart_id, writer) = store.put_multipart(&path).await?; Ok(Self { store, + rt, path, writer, multipart_id, @@ -424,16 +461,14 @@ impl ObjectOutputStream { #[pymethods] impl ObjectOutputStream { - fn close(&mut self, py: Python) -> PyResult<()> { + fn close(&mut self) -> PyResult<()> { self.closed = true; - match wait_for_future(py, self.writer.shutdown()) { + match self.rt.block_on(self.writer.shutdown()) { Ok(_) => Ok(()), Err(err) => { - wait_for_future( - py, - self.store.abort_multipart(&self.path, &self.multipart_id), - ) - .map_err(PyDeltaTableError::from_object_store)?; + self.rt + .block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) + .map_err(PyDeltaTableError::from_object_store)?; Err(PyDeltaTableError::from_io(err)) } } @@ -477,31 +512,27 @@ impl ObjectOutputStream { Err(PyNotImplementedError::new_err("'read' not implemented")) } - fn write(&mut self, data: Vec, py: Python) -> PyResult { + fn write(&mut self, data: Vec) -> PyResult { self.check_closed()?; let len = data.len() as i64; - match wait_for_future(py, self.writer.write_all(&data)) { + match self.rt.block_on(self.writer.write_all(&data)) { Ok(_) => Ok(len), Err(err) => { - wait_for_future( - py, - self.store.abort_multipart(&self.path, &self.multipart_id), - ) - .map_err(PyDeltaTableError::from_object_store)?; + self.rt + .block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) + .map_err(PyDeltaTableError::from_object_store)?; Err(PyDeltaTableError::from_io(err)) } } } - fn flush(&mut self, py: Python) -> PyResult<()> { - match wait_for_future(py, self.writer.flush()) { + fn flush(&mut self) -> PyResult<()> { + match self.rt.block_on(self.writer.flush()) { Ok(_) => Ok(()), Err(err) => { - wait_for_future( - py, - self.store.abort_multipart(&self.path, &self.multipart_id), - ) - .map_err(PyDeltaTableError::from_object_store)?; + self.rt + .block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) + .map_err(PyDeltaTableError::from_object_store)?; Err(PyDeltaTableError::from_io(err)) } } diff --git a/python/src/lib.rs b/python/src/lib.rs index 051d2b8f14..794cab92dc 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -27,6 +27,7 @@ use pyo3::types::PyType; use std::collections::HashMap; use std::collections::HashSet; use std::convert::TryFrom; +use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -394,10 +395,11 @@ impl RawDeltaTable { Ok(()) } - pub fn get_py_storage_backend(&self) -> filesystem::DeltaFileSystemHandler { - filesystem::DeltaFileSystemHandler { + pub fn get_py_storage_backend(&self) -> PyResult { + Ok(filesystem::DeltaFileSystemHandler { inner: self._table.object_store(), - } + rt: Arc::new(rt()?), + }) } } diff --git a/python/src/utils.rs b/python/src/utils.rs index e554154edf..8d3c9d9eb3 100644 --- a/python/src/utils.rs +++ b/python/src/utils.rs @@ -1,20 +1,15 @@ -use std::future::Future; use std::sync::Arc; use deltalake::storage::{ListResult, ObjectStore, ObjectStoreError, ObjectStoreResult, Path}; use futures::future::{join_all, BoxFuture, FutureExt}; use futures::StreamExt; +use pyo3::exceptions::PyRuntimeError; use pyo3::prelude::*; use tokio::runtime::Runtime; -/// Utility to collect rust futures with GIL released -pub fn wait_for_future(py: Python, f: F) -> F::Output -where - F: Send, - F::Output: Send, -{ - let rt = Runtime::new().unwrap(); - py.allow_threads(|| rt.block_on(f)) +#[inline] +pub fn rt() -> PyResult { + Runtime::new().map_err(|_| PyRuntimeError::new_err("Couldn't start a new tokio runtime.")) } /// walk the "directory" tree along common prefixes in object store