Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Async CSV Writer #3740

Closed
metesynnada opened this issue Feb 21, 2023 · 18 comments · Fixed by #3824
Closed

Support for Async CSV Writer #3740

metesynnada opened this issue Feb 21, 2023 · 18 comments · Fixed by #3824
Labels
arrow Changes to the arrow crate enhancement Any new improvement worthy of a entry in the changelog object-store Object Store Interface

Comments

@metesynnada
Copy link
Contributor

metesynnada commented Feb 21, 2023

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

I want an async CSV writer to use it in an async context. The current implementation is blocking even if we are in the tokio environment.

Describe the solution you'd like
A writer using tokio::io::AsyncWrite might be the solution.

Describe alternatives you've considered
NA

Additional context
Na

@metesynnada metesynnada added the enhancement Any new improvement worthy of a entry in the changelog label Feb 21, 2023
@tustvold
Copy link
Contributor

I wonder if this could be achieved by simply writing a batch to an in-memory Vec using the current "blocking" writer, and then flushing the output to an async output. This would be more flexible, and likely significantly faster than an approach that integrates async at a lower level.

@metesynnada
Copy link
Contributor Author

I wonder if this could be achieved by simply writing a batch to an in-memory Vec using the current "blocking" writer, and then flushing the output to an async output. This would be more flexible, and likely significantly faster than an approach that integrates async at a lower level.

I think this requires constantly creating a "blocking" writer for each record batch since it will own the in-memory Vec. The API for reaching the internal buffer is self.writer.into_inner() which uses mem::replace(self, None).

I couldn't think of a solution on how to keep buffer ownership while writing with the usual Writer. Do you have any idea how I can code that?

Btw, I verified the performance degradation, I agree with you that CPU-bound computations like serialization shouldn't be async since there is no gain. I am trying to isolate the IO-bound operation (flush) async as you said.

@tustvold
Copy link
Contributor

I couldn't think of a solution on how to keep buffer ownership while writing with the usual Writer

Perhaps something like (not tested)

let mut buffer = Vec::with_capacity(1024);

while let Some(batch) = stream.next().await.transpose()? {
    let writer = Writer::new(&mut buffer);
    writer.write(&batch)?;
    std::mem::drop(writer);
    flush(&buffer).await?;
    buffer.clear();
}

Whilst creating the Writer each time is perhaps a little unfortunate, I am skeptical it will be relevant when amortised over the cost of encoding an entire batch - CSV encoding is very expensive.

@metesynnada
Copy link
Contributor Author

metesynnada commented Feb 23, 2023

I made a benchmark for record batch writing by 3 cases. Ordinary writer, async writer, and buffered async writer (current discussion)

For low batch size (10) - batch count (1000) with the usual schema.

          Running benches/buffer_bench.rs (target/release/deps/buffer_bench-f7b991c50b44d14d)
async                   time:   [18.417 ms 18.942 ms 19.526 ms]
                        change: [-75.407% -74.625% -73.930%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe

async_buffered          time:   [17.519 ms 17.718 ms 18.016 ms]
                        change: [-75.844% -75.540% -75.034%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 15 outliers among 100 measurements (15.00%)
  8 (8.00%) high mild
  7 (7.00%) high severe

sync                    time:   [11.640 ms 12.062 ms 12.532 ms]
                        change: [-85.176% -84.617% -83.999%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 22 outliers among 100 measurements (22.00%)
  5 (5.00%) high mild
  17 (17.00%) high severe

For larger batch sizes (1000) - batch count (100) with the usual schema.

Benchmarking async: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 7.7s, or reduce sample count to 60.
async                   time:   [74.173 ms 74.648 ms 75.173 ms]
                        change: [+490.63% +496.74% +502.77%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe

Benchmarking async_buffered: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 7.3s, or reduce sample count to 60.
async_buffered          time:   [72.195 ms 72.435 ms 72.695 ms]
                        change: [+623.27% +627.13% +630.93%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 13 outliers among 100 measurements (13.00%)
  1 (1.00%) low mild
  8 (8.00%) high mild
  4 (4.00%) high severe

Benchmarking sync: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 7.9s, or reduce sample count to 60.
sync                    time:   [78.149 ms 78.415 ms 78.772 ms]
                        change: [+730.79% +735.48% +740.48%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 11 outliers among 100 measurements (11.00%)
  4 (4.00%) low mild
  7 (7.00%) high severe

For the usual batch size (4096) - batch count (100) with the usual schema.

     Running benches/buffer_bench.rs (target/release/deps/buffer_bench-f7b991c50b44d14d)
Benchmarking async: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 30.4s, or reduce sample count to 10.
async                   time:   [300.46 ms 301.73 ms 303.03 ms]
                        change: [+1443.9% +1493.0% +1538.6%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) high mild

Benchmarking async_buffered: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 28.1s, or reduce sample count to 10.
async_buffered          time:   [279.85 ms 281.20 ms 282.96 ms]
                        change: [+1459.3% +1487.1% +1508.2%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 6 outliers among 100 measurements (6.00%)
  6 (6.00%) high severe

Benchmarking sync: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 31.1s, or reduce sample count to 10.
sync                    time:   [310.66 ms 311.43 ms 312.36 ms]
                        change: [+2385.1% +2481.8% +2576.3%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) high mild
  4 (4.00%) high severe

I think the buffered version is also scalable immediately into JSON and AVRO:

pub struct BufferWriter<W: AsyncWrite + Unpin + Send> {
    writer: BufWriter<W>,
    buffer: Vec<u8>
}



impl<W: AsyncWrite + Unpin + Send> BufferWriter<W> {
    pub fn new(writer: W) -> Self {
        BufferWriter {
            writer: BufWriter::new(writer),
            buffer: Vec::with_capacity(4096)
        }
    }

    pub async fn write(&mut self, batch: &RecordBatch, header: bool) -> Result<(), ArrowError> {
        {
            let mut inner_writer = Writer::new_with_header(&mut self.buffer, header);
            inner_writer.write(batch)?;
        }
        self.writer.write_all(&self.buffer).await?;
        self.writer.flush().await?;
        self.buffer.clear();
        Ok(())

    }
}

If you are also satisfied with the result of buffered version, I will add this functionality into CSV and JSON cc @tustvold.

@tustvold
Copy link
Contributor

If you are also satisfied with the result of buffered version

The performance across all seems to be basically comparable, it would be interesting to see a profile, but I suspect the difference is in the sizing of the intermediate buffer, which will be highly dependent on the destination sink as to what the optimal size is.

If you are also satisfied with the result of buffered version, I will add this functionality into CSV and JSON

Thus far we have managed to avoid async within arrow-rs, and I think this encourages a nice separation of compute and IO. What do you think about adding this functionality instead to DataFusion and perhaps just adding a doc comment to arrow-rs showing how it can be done?

e.g. something like (not tested)

async fn write_async<I, F, Fut>(batches: I, flush: F) -> Result<(), ArrowError> where I: IntoIterator<Item=RecordBatch>, F: Fn(&[u8]) -> Fut, Fut: Future<Output=Result<(), ArrowError> {
    let mut buffer = Vec::with_capacity(4096);
    for batch in batches {
        {
            let mut writer = Writer::new(&mut buffer);
            writer.write(batch)?;
        }
        flush(&buffer).await?;
        buffer.clear()
    }
}

@metesynnada
Copy link
Contributor Author

I am OK with the separation. The main idea behind adding AsyncWrite support was using the object store's put and put_multipart APIs in Datafusion.

//! # Multipart put object
//! Use the [`ObjectStore::put_multipart`] method to save large amount of data in chunks.
//!
//! ```
//! # use object_store::local::LocalFileSystem;
//! # fn get_object_store() -> LocalFileSystem {
//! # LocalFileSystem::new_with_prefix("/tmp").unwrap()
//! # }
//! # async fn multi_upload() {
//! use object_store::ObjectStore;
//! use std::sync::Arc;
//! use bytes::Bytes;
//! use tokio::io::AsyncWriteExt;
//! use object_store::path::Path;
//!
//! let object_store: Arc<dyn ObjectStore> = Arc::new(get_object_store());
//! let path: Path = "data/large_file".try_into().unwrap();
//! let (_id, mut writer) = object_store
//! .put_multipart(&path)
//! .await
//! .unwrap();
//! let bytes = Bytes::from_static(b"hello");
//! writer.write_all(&bytes).await.unwrap();
//! writer.flush().await.unwrap();
//! writer.shutdown().await.unwrap();
//! # }
//! ```

I was planning to add a new API like put_multipart which returns only Box<dyn AsyncWrite + Unpin + Send> to support single files without multipart support. What do you think about this feature?

It would look like

async fn put_singlepart(
        &self,
        location: &Path,
    ) -> Result<Box<dyn AsyncWrite + Unpin + Send>>;

Consider the put_multipart implementation of the local file system, which generates a temporary file and finalizes its name upon reaching the LocalUploadState::ShuttingDown stage to complete the writing process. However, when dealing with FIFO cases, what would you recommend? Would it be still appropriate to use the object_store for individual files? In Datafusion, my approach would be to verify if the underlying source is unbounded and, if so, utilize the put_singlepart().

@tustvold
Copy link
Contributor

What do you think about this feature?

Store specific functionality, let alone operating specific functionality, doesn't seem like a great fit for the object store crate. Python fsspec which is more filesystem focused doesn't support them either.

I'm not familiar with your use-case, but trying to shoehorn streaming through OS-specific filesystem APIs seems a little odd to me. Is there a reason you opted for such an approach over a custom operator? This would also allow generalising to streaming brokers like Kafka, Kinesis, etc... and potentially using things like unix domain sockets which have better async stories?

@metesynnada
Copy link
Contributor Author

metesynnada commented Feb 24, 2023

I'm not familiar with your use case, but trying to shoehorn streaming through OS-specific filesystem APIs seems a little odd to me.

Suppose you read a ListingTable in Datafusion. You need to use the object-store crate to read the data. More specifically, you would use the get API and it would produce a ByteStream (futures::stream) and you can consume it.

I was looking for a put API similar to this logic, an async writer. As you said, we will be looking for extending this feature to streaming brokers like Kafka, Kinesis, etc. FIFO was a way to unify stream and batch approaches. This is why I was looking for ways for extending the object-store crate.

Overall, we believe that overfitting the batch solutions is mostly avoidable. However, I understand your concern.

@tustvold
Copy link
Contributor

tustvold commented Feb 24, 2023

we believe that overfitting the batch solutions is mostly avoidable

I don't think this is avoidable, arrow is a columnar data format, it fundamentally assumes batching to amortise dispatch overheads. Row-based streaming would require a completely different architecture, likely using a JIT?

FWIW kafka and kinesis aren't really streaming APIs, under the hood they rely on aggressive batching for throughput

Suppose you read a ListingTable in Datafusion. You need to use the object-store crate to read the data

I am aware, I wrote a lot of that logic, my confusion is why this is the point of integration, instead of say a custom operator or TableProvider? This would be more flexible and would avoid all these issues?

ListingTable is intended for the case of ad-hoc querying files in the absence of a catalog, I would expect service workloads to always make use of some sort of catalog be it Hive MetaStore, Glue, Delta, Lakehouse, etc...

@ozankabak
Copy link

I don't think this is avoidable, arrow is a columnar data format, it fundamentally assumes batching to amortise dispatch overheads. Row-based streaming would require a completely different architecture, likely using a JIT?

@tustvold, I think there is maybe some terminology-related confusion going on here w.r.t. batching. I am sure @metesynnada was not trying to say he wants to avoid batching in its entirety. I think what he envisions (albeit maybe not conveyed clearly) is simply an API that operates with an async writer so that non-IO operations can carry on when the actual write to the object store is taking place.

The current API (i.e. the put function) is already async and it performs the actual write in a separate thread AFAICT. If this is indeed true, it already doesn't stop the other non-IO operations. Given that we want to serialize synchronously for performance reasons, then it doesn't really matter where we do it -- the API seems sufficient to me as is. I just had a discussion with @metesynnada on this, he seems to agree and can comment further on this if I'm missing something.

Given that we are analyzing this part of the code, one good thing we can do is to investigate whether avoiding the new IO thread and using async primitives to do the actual writing within the same thread makes sense. I am not entirely sure what the advantages/disadvantages of doing that will be. @metesynnada can do some measurements to quantify this. Maybe you can share the reasoning behind the current choice?

@tustvold
Copy link
Contributor

tustvold commented Feb 24, 2023

using async primitives to do the actual writing within the same thread

FWIW tokio doesn't support non-blocking filesystem IO, tokio-uring is still experimental, so it will always dispatch to a separate blocking threadpool. This was what I was alluding to when I suggested sockets might be a more compelling primitive than using filesystem APIs, as they support epoll.

The current API (i.e. the put function) is already async

This is true, but each put creates a new file, overwriting anything already present, which I suspect will be a problem?

@ozankabak
Copy link

ozankabak commented Feb 24, 2023

OK, so we will need proper tokio support before we can investigate thread vs. async primitives distinction w.r.t. files, so let's wait on that. For handling streaming brokers, a custom operator leveraging a socket-based approach makes sense, I will think about that.

This is true, but each put creates a new file, overwriting anything already present, which I suspect will be a problem?

Right. A simple mechanism to choose between overwrite/append behaviors should be enough for @metesynnada's purposes. Everything is already async. Any suggestions on how we can add this capability?

@alamb
Copy link
Contributor

alamb commented Feb 25, 2023

For more context, I believe the use case for this feature may be related to apache/datafusion#5130

Having a "native" async csv writer in arrow-rs I can see being a compelling UX feature (aka it would make it easier to use and integrate). However, I don't know how much easier it would be and how much maintenance it would require.

I really like the idea, mentioned in a few places on this thread, of creating an async CSV writer by wrapping the existing blocking / batching one outside of arrow-rs initially (I don't think there is any reason this can not be done).

Once we have the actual code and an an example use of it it, then we will be in a much better position to evaluate how it performs compared to a "native" async writer, how it is actually used, and if we should reconsider incorporating it into arrow-rs

@metesynnada
Copy link
Contributor Author

cc @tustvold , I introduced a new API for objects that support append. What do you think about it?

@tustvold
Copy link
Contributor

tustvold commented Mar 1, 2023

What do you think about it

I'm not a massive fan of introducing an abstraction to an object_store crate that few if any actual object stores support... Perhaps @alamb might be able to facilitate a synchronous chat if you would be amenable? I think we may be able to facilitate your use-case without needing to graft it to filesystem APIs

Edit: Whilst Azure does have an Append Blob concept, I think we would need to bifurcate the API as the way you interact with them is different. The general approach of object_store has been to expose the common functionality across multiple backends, it is unclear how we could do this in a portable manner

@metesynnada
Copy link
Contributor Author

metesynnada commented Mar 1, 2023

A synchronous chat would be OK.

Meanwhile, you can check apache/datafusion#5130 for more insight into the use case. In short, we want to support appending files (only supporting ones).

@tustvold
Copy link
Contributor

label_issue.py automatically added labels {'arrow'} from #2

@tustvold tustvold added the object-store Object Store Interface label Mar 24, 2023
@tustvold
Copy link
Contributor

label_issue.py automatically added labels {'object-store'} from #2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate enhancement Any new improvement worthy of a entry in the changelog object-store Object Store Interface
Projects
None yet
4 participants