Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add possibility to iterate over table data (streaming) #440

Open
hagenw opened this issue Jun 24, 2024 · 5 comments
Open

Add possibility to iterate over table data (streaming) #440

hagenw opened this issue Jun 24, 2024 · 5 comments
Labels
enhancement New feature or request

Comments

@hagenw
Copy link
Member

hagenw commented Jun 24, 2024

To support loading large tables, that might not fit into memory, it would be a good idea to add an option to Table.get() (or another method?), to read the data piece-wise.

Let us first create an example table and store as CSV and PARQUET:

import audformat

entries = 100
db = audformat.Database("mydb")
db.schemes["int"] = audformat.Scheme("int")
index = audformat.filewise_index([f"file{n}.wav" for n in range(entries)])
db["files"] = audformat.Table(index)
db["files"]["int"] = audformat.Column(scheme_id="int")
db["files"]["int"].set([int(n) for n in range(entries)])

db["files"].save("files", storage_format="csv")
db["files"].save("files", storage_format="parquet", update_other_formats=False)

Stream PARQUET tables

The table stored in PARQUET can be iterated with:

import pyarrow.parquet as parquet

stream = parquet.ParquetFile("files.parquet")
nrows = 10
for batch in stream.iter_batches(batch_size=nrows):
    print(batch.to_pandas())

Stream CSV tables

Streaming a CSV file with pyarrow seems to be more complicated, as we cannot directly pass the number of rows we want per batch, but only the size in bytes of a batch. But the problem is the number of bytes per line can vary:

with open("files.csv") as file:
    for line in file:
        bytes_per_line = len(line) + 1
        print(bytes_per_line)

returns

10
13
13
13
13
13
13
13
13
13
13
15
...
15

If we find a way to calculate the correct block_size value we could do:

import pyarrow.csv as csv

block_size = 140  # returns same result as streaming from PARQUET
read_options = csv.ReadOptions(column_names=["file", "int"], skip_rows=1, block_size=block_size)
convert_options = csv.ConvertOptions(column_types=db["files"]._pyarrow_csv_schema())
stream = csv.open_csv("files.csv", read_options=read_options, convert_options=convert_options)
for batch in stream:
    print(batch.to_pandas())

Fallback solution using pandas (as far as I understand this, it uses the Python read engine under the hood, which should be much slower than pyarrow):

import pandas as pd

for batch in pd.read_csv("files.csv", chunksize=nrows):
    print(batch)

Argument name

The most straightforward implementation seems to me to add a single argument to audformat.Table.get() specifying the number of rows we want to read. This could be named nrows, n_rows, chunksize, batch_size or similar:

for batch in db["files"].get(batch_size=10):
    print(batch)

We might want to add a second argument to specify an offset to the first row we start reading. This way, we would be able to read a particular part of the table, e.g.

next(db["files"].get(batch_size=5, offset=5)

We might also want to consider integration with audb already. In audb we might want to have the option to stream the data directly from the backend and not from the cache. This means we load the part of the table file from the backend, and we also load the corresponding media files.

/cc @ChristianGeng

@hagenw hagenw added the enhancement New feature or request label Jun 24, 2024
@ChristianGeng
Copy link
Member

ChristianGeng commented Jun 24, 2024

You mention that pd.read_csv used with the `chunksize` argument will result in using the `python` engine instead of the pyarrow one that we would prefer.

Afaics it is correct that when reading up on the topic in the blogpost here: Pandas then would use pyarrow.csv.read_csv which is the "non-streaming version" of parsing csvs in the pyarrow lib when parametrizing it using the pyarrow engine. About the C engine I do not know whether this would be possible. Anyway then using the pyarrow open_csv that you use would imo be favourable. as no additional deserialization after parsing the csv would be required (as suggested by th diagrams in the blogpost above), and it uses the pyarrow streaming natively in the first place.

The ideal block_size a priori can only be determined by scanning over the whole file in the first place, but that would incur extra compulational cost. Also I think that block_size and chunk_size should be treated independently: chunk_size it what you want to return and is output oriented, block_size is input_oriented and wants to balance memory size and performance. Normally block_size would be a lot larger than the 140 that you chose above.

Alternatively you can buffer the data and return them once they reach the desired chunk_size.
So similar to this maybe (one would probably implement it as a generator in real life).

    data_file = "files.csv"
    import pyarrow as pa
    import pyarrow.csv as csv
    block_size = 141
    block_size = 19  # returns same result as streaming from PARQUET
    read_options = csv.ReadOptions(column_names=["file", "int"], skip_rows=1, block_size=block_size)
    # read_options = csv.ReadOptions(column_names=["file", "int"], skip_rows=1)
    convert_options = csv.ConvertOptions(column_types=db["files"]._pyarrow_csv_schema())

    chunk_size = 10

    stream = csv.open_csv(data_file, read_options=read_options, convert_options=convert_options)
    df_spillover = None

    for next_chunk in stream:
        if next_chunk is None:
            break

        df = pa.Table.from_batches([next_chunk]).to_pandas()

        if df_spillover is not None:
            df = pd.concat([df_spillover, df], axis=0)

        list_df = [df[i:i+chunk_size] for i in range(0,df.shape[0],chunk_size)]

        for df_chunk in list_df:
            if chunk_size == df_chunk.shape[0]:
                print(df_chunk.shape)
                df_spillover = None
            else:
                df_spillover = df_chunk


    df_chunk = df_spillover
    print(df_chunk.shape)
    print()

@hagenw
Copy link
Member Author

hagenw commented Jun 24, 2024

Combining streaming with audb

A few thoughts how we might want to combine streaming with audb.

The implementation discussed above for audformat assumes that the table files are stored locally (e.g. in the cache of audb). But when doing something like audb.load(..., streaming=True), we might be interested to not load any files in cache before.
In that case we need to somehow track in the database if we are in streaming mode, e.g. storing db.meta["audb"]["streaming"] = True. Every time a use then calls get(batch_size=...) on a table, it will then have to execute the following steps:

  1. load the row from the dependency table corresponding to the table ID, in order to know the location of the table on the backend.
    A possible workflow would be:
import pyarrow as pa
import pyarropw.parquet as parquet

stream = parquet.ParquetFile("db.parquet")
for batch in stream.iter_batches(batch_size=batch_size):
    selection = batch.filter(pa.compute.equal(batch["file"], file)
    if len(selection):
        # use the selected row to get infos to download table
        break
  1. Stream the rows of the table, e.g. as outlined at Add possibility to preview tables audbcards#59 (comment)
  2. Extract a list of audio files from the table batch and download only those media files to cache (delete them afterwards?)

@ChristianGeng
Copy link
Member

Combining streaming with audb

A few thoughts how we might want to combine streaming with audb.

The implementation discussed above for audformat assumes that the table files are stored locally (e.g. in the cache of audb). But when doing something like audb.load(..., streaming=True), we might be interested to not load any files in cache before. In that case we need to somehow track in the database if we are in streaming mode, e.g. storing db.meta["audb"]["streaming"] = True.

This would be an approach that is straightforward ans is based of what we have with the Artifactory being setup as the "server side". What you do in the linked issue is to have the Artifactory as a kind of filesystem mounted. One would have to then locally cache the bandwidth sensitive media data on the local machine in order to further consume them by means of a download. This would be a kind of pseudo-streaming mode, and if this meets the requirements that lurk around the corner this will be a good way to go. In case that this does not meet the speed / bandwidth requirements, one could also think more radically, but this would require a stronger departure from what we have already as one would probably need the capability to stream from the server side and necessitate more flexible behavior on the server side. For example Apache Arrow comes with "arrow-flight", and see https://blog.djnavarro.net/posts/2022-10-18_arrow-flight/. This is probably out of scope for now, and I have no clue about future requirements. But as you say, for now this would be a good approach.

@hagenw
Copy link
Member Author

hagenw commented Jun 25, 2024

Stream CSV tables

Alternatively you can buffer the data and return them once they reach the desired chunk_size. So similar to this maybe (one would probably implement it as a generator in real life).

Thanks for the CSV suggestion, this totally makes sense. For the actual implementation, we then only need to decide if we want to use a fixed block_size or a variable one, influenced by batch_size.

I also made my own version of your code:

batch_size = 10  # number of rows to return from CSV
block_size = 200  # could be set variable, e.g. 1000 * batch_size

read_options = csv.ReadOptions(column_names=["file", "int"], skip_rows=1, block_size=block_size)
convert_options = csv.ConvertOptions(column_types=db["files"]._pyarrow_csv_schema())
stream = csv.open_csv("files.csv", read_options=read_options, convert_options=convert_options)

df_buffer = pd.DataFrame([])
for block in stream:
    df_buffer = pd.concat([df_buffer, block.to_pandas()])
    while len(df_buffer) >= batch_size:
        df = df_buffer.iloc[:batch_size, :]
        df_buffer = df_buffer.iloc[batch_size:, :]
        print(df)  # return value here

@ChristianGeng
Copy link
Member

I also made my own version of your code:

Great. This is more condense and less clunky as it for example does not require a separate print statement (or yield if it were implemented accordingly) after looping is finished.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants