Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Row-wise random sampling in LazyFrames #3933

Open
OneRaynyDay opened this issue Jul 7, 2022 · 15 comments
Open

Row-wise random sampling in LazyFrames #3933

OneRaynyDay opened this issue Jul 7, 2022 · 15 comments
Labels
enhancement New feature or an improvement of an existing feature

Comments

@OneRaynyDay
Copy link
Contributor

OneRaynyDay commented Jul 7, 2022

Row-wise random sampling

Suppose lf is a pl.LazyFrame, then one popular transformation on the lazy frame is to select rows with some preset probability parametrized by a bernoulli distribution with parameter p. In SQL this is expressed as:

SELECT * FROM table TABLESAMPLE BERNOULLI(10)

which corresponds to a random sample with probability 10%. I would like a similar API on the row-level for LazyFrames, and I imagine it to look something like:

# Sample each row with probability 0.1
lf.sample(p=0.1)

Optional arguments may include: distribution="bernoulli", but I don't see any other type of TABLESAMPLE distributions out there except for SYSTEM from trino which is storage-aware.

@zundertj
Copy link
Collaborator

zundertj commented Jul 8, 2022

Could you clarify what this request would add on top of the existing sample method? https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.sample.html. I see two pointers, but it is unclear what is blocking you:

  1. the existing method exists on DataFrame and Expr, but not directly on LazyFrame
  2. the existing method only supports Bernoulli (but how exactly would other distributions be interpreted, you would specify a row specific mean/variance/whatever parametrization the distribution takes?)

@zundertj
Copy link
Collaborator

zundertj commented Jul 8, 2022

Would be good to highlight that on the distributions side there has been an open request on adding weights: #2661

@cbilot
Copy link

cbilot commented Jul 8, 2022

Let me see if I can help. Since it may help to demonstrate using data, let's start with this dataset of 100 records.

import polars as pl

nbr_obs = 100
df = pl.DataFrame({
    'row_nr': pl.arange(0, nbr_obs, eager=True),
})
df
shape: (100, 1)
┌────────┐
│ row_nr │
│ ---    │
│ i64    │
╞════════╡
│ 0      │
├╌╌╌╌╌╌╌╌┤
│ 1      │
├╌╌╌╌╌╌╌╌┤
│ 2      │
├╌╌╌╌╌╌╌╌┤
│ 3      │
├╌╌╌╌╌╌╌╌┤
│ ...    │
├╌╌╌╌╌╌╌╌┤
│ 96     │
├╌╌╌╌╌╌╌╌┤
│ 97     │
├╌╌╌╌╌╌╌╌┤
│ 98     │
├╌╌╌╌╌╌╌╌┤
│ 99     │
└────────┘

The Sample Method

The sample method on a DataFrame has two parameters, both of which return a specifed number or a specified percent of rows from the DataFrame. For example, if we want a sample that gives us exactly 70% of rows, we can use the frac keyword.

df.sample(frac=0.7).shape
>>> df.sample(frac=0.7).shape
(70, 1)

Note that each time we run this, we will get exactly 70 rows (70% of 100 rows). We may get different rows in the result set, but we will always get 70 total rows.

Row-wise Sampling

What I think this issue is requesting is row-wise sampling, which is something slightly different.

In row-wise sampling, we are making a decision, not on the DataFrame as a whole, but on a row-by-row basis. Essentially, for each row, we are flipping a coin and asking "Should this row be in the result set?". There is no consideration for the total number of rows that will be in the result set - only whether this particular row should be selected.

With our dataset above, we could simulate this by first drawing a random number for each row (from a uniform distribution). I am purposely omitting the seed parameter in the default_rng method, to demonstrate something later.

import numpy as np
(
    df
    .with_column(
        pl.Series(
            name="random_nbr",
            values=np.random.default_rng().uniform(0.0, 1.0, df.height),
        )
    )
)
shape: (100, 2)
┌────────┬────────────┐
│ row_nr ┆ random_nbr │
│ ---    ┆ ---        │
│ i64    ┆ f64        │
╞════════╪════════════╡
│ 0      ┆ 0.0422     │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 1      ┆ 0.100758   │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2      ┆ 0.665123   │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 3      ┆ 0.581239   │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ ...    ┆ ...        │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 96     ┆ 0.1509     │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 97     ┆ 0.376525   │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 98     ┆ 0.845115   │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 99     ┆ 0.188381   │
└────────┴────────────┘

Next, we'll choose a p value of 0.7 (70%). In essence, we are saying "each row independently has a 70% chance of being included in the result set", with no consideration as to how many rows have already been selected or how many rows will be selected in total.

We'll simulate that by filtering any rows that have a random_nbr < 0.70.

p = 0.7
(
    df
    .with_column(
        pl.Series(
            name="random_nbr",
            values=np.random.default_rng().uniform(0.0, 1.0, df.height),
        )
    )
    .filter(pl.col('random_nbr') < p)
)
shape: (75, 2)
┌────────┬────────────┐
│ row_nr ┆ random_nbr │
│ ---    ┆ ---        │
│ i64    ┆ f64        │
╞════════╪════════════╡
│ 0      ┆ 0.674584   │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 1      ┆ 0.0712     │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2      ┆ 0.372068   │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 3      ┆ 0.113028   │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ ...    ┆ ...        │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 96     ┆ 0.302775   │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 97     ┆ 0.652685   │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 98     ┆ 0.026801   │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 99     ┆ 0.016079   │
└────────┴────────────┘

Note that for this run, I got 75 rows. Each time I run this, the total number of rows selected may change. There is no guarantee of exactly 70 rows.

Row-wise sampling can be extended further, such are choosing not just whether a row is included or not, but how many times a row should be included in the result set. Indeed, in some cases, the p value or distribution for each row may be different. (I won't go any further.)

Lazy Row-Wise Sampling

How might we accomplish the above in Lazy mode? Here's one simple workaround.

I'll first define a function that will encapsulate my choice of distribution and p value. The Series s passed in (in this case) is used solely to obtain the number of records in the LazyFrame, and for no other reason. (I'll discuss this further later.)

def sample_it(s: pl.Series) -> pl.Series:
    return pl.Series(
        values=np.random.binomial(1, 0.7, s.len()),
        dtype=pl.Boolean,
    )

We'll put our DataFrame in Lazy mode, and then use the map method. The pl.first method simply returns the first column of the LazyFrame. We are using it only to obtain the length of the LazyFrame. It plays no other role in this example.

(
    df
    .lazy()
    .with_column(pl.first().map(sample_it).alias("_sample"))
    .filter(pl.col("_sample"))
    # .drop("_sample")
    .collect()
)
shape: (64, 2)
┌────────┬─────────┐
│ row_nr ┆ _sample │
│ ---    ┆ ---     │
│ i64    ┆ bool    │
╞════════╪═════════╡
│ 1      ┆ true    │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2      ┆ true    │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 3      ┆ true    │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 8      ┆ true    │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ ...    ┆ ...     │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 96     ┆ true    │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 97     ┆ true    │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 98     ┆ true    │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 99     ┆ true    │
└────────┴─────────┘

Only 64 records on this run.

Extensions

The above technique can be extended to pass, not just the first column to our function, but columns that represent weights and/or parameters, so that the probability distribution that determines inclusion differs for each row. Stratified sampling might be such an example, where the p value differs by group. (The sample_it function above would obviously need to change.)

Was this helpful? If not, I apologize for the intrusion.

@OneRaynyDay
Copy link
Contributor Author

OneRaynyDay commented Jul 8, 2022

Could you clarify what this request would add on top of the existing sample method? https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.sample.html. I see two pointers, but it is unclear what is blocking you:

  1. the existing method exists on DataFrame and Expr, but not directly on LazyFrame
  2. the existing method only supports Bernoulli (but how exactly would other distributions be interpreted, you would specify a row specific mean/variance/whatever parametrization the distribution takes?)

Hi @zundertj and @cbilot, thanks for the prompt replies! I am specifically talking about 1). I was only looking at the documentation for LazyFrame and not DataFrame so I wasn't aware there was an in-memory sampling method for this. I would like to do this for LazyFrames since my datasets are too big to fit in memory. Thank you @cbilot for the example for lazy frames(and you pointed out correctly that row-wise sampling only gives me the number of rows in expectation, as opposed to the dataframe sample function which gives exactly the specified number of rows), but if I understand correctly that requires me to specify a boolean mask in pl.Series() that's created in-memory which may once again hit memory problems (100+ billion rows). Is there a way to do this in a truly lazy way?

@zundertj
Copy link
Collaborator

I had not realized that in Polars sample with a fraction would guarantee that exactly the pre-specified fraction of rows would be returned. The Pandas docs are more precise on this point.

Unfortunately, I cannot find a way to achieve this. The closest I have got without constructing the series in advance is this (using df as defined by @cbilot ):

from random import random

# need to wrap, as apply supplies the value of the column whilst random.random does not take one
def random2(x):
    return random()

# this adds an additional column with +/- 70% True values
df.lazy().with_column((pl.first().apply(random2) < 0.7) .alias("_sample")).collect()  

# but the same expression cannot be used by filter
df.lazy().filter(pl.first().apply(random2) < 0.7).collect()  
pyo3_runtime.PanicException: should be no nth at this point

@indigoviolet
Copy link

indigoviolet commented Aug 20, 2022

I am also interested in this (row-wise sampling on lazy frames).

@zundertj I was wondering why

df.lazy().with_column((pl.first().apply(random2) < 0.7).alias("_sample")).filter(pr.col('_sample')).collect()

would not suffice - I guess I'm not understanding why with_column does/must instantiate the entire _sample column?

Edit: I didn't realize that .apply() must materialize the DataFrame.

@elbaro
Copy link
Contributor

elbaro commented Jan 5, 2023

Please separate the API for uniform sampling and weighted sampling.
The former can avoid iterating all rows.

In Spark, if you have 1,000,000 rows in S3 and sample 100 rows, it downloads the entire dataset.

@akdienes
Copy link

I am interested in this feature.

@blais
Copy link

blais commented Jun 10, 2023

Me too, I was going to implement it with pyarrow, but then thought I'd try DuckDB or Polars. DuckDB has it but it's slow. Going coding I guess.

@stinodego stinodego added enhancement New feature or an improvement of an existing feature and removed feature labels Jul 14, 2023
@icfly2
Copy link

icfly2 commented Aug 11, 2023

Please separate the API for uniform sampling and weighted sampling. The former can avoid iterating all rows.

In Spark, if you have 1,000,000 rows in S3 and sample 100 rows, it downloads the entire dataset.
I'm not sure but

    height = lazy_frame.select(pl.count()).collect().row(0)[0]
    lazy_frame = lazy_frame.take_every(maths.floor(height/sample_size))

still takes the whole data set.... see #8664

.fetch() doesn't help much either, but perhaps when the streaming option works one could append a random filter to only pass a fraction and drop the rest. That could bring some performance improvement.

@dazzle-me
Copy link

I would like to have it too!
For now, I need to materialize dataframes in memory for that

@akdienes
Copy link

if you are comfortable with using hash functions for randomness, a workaround is something like

lazy_df.with_row_index().filter(col("index").hash(seed)%10 == 1).drop("index")

to filter to 10% of rows, etc. and can change the seed each invocation if you want to allow overlapping subsampling

obviously this is kind of ugly

@leechungpa
Copy link

I am also looking forward to implementing the sample method on LazyFrame.

@wasade
Copy link

wasade commented Aug 1, 2024

A potential way to stream this is by pushing a tuple of a random uniform value and the row index into a heap, where the heap is sorted on the random value and constrained to a maximum size. It's not exactly the same as sample(...), as this is specifying the maximum number of elements to retain rather than a probability. But, if you know the number of elements in advance, you can set the maximum to be the desired proportion of elements.

An example is here. It does still require scanning the full column though so each item gets a chance to be included in the resulting set. However, in the case where the number of elements is known in advance (or easily obtainable), the set of row indices to retain could be computed prior to fetching the column at the expense of storing those indices in advance.

@arturdaraujo
Copy link

This would be great

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature
Projects
None yet
Development

No branches or pull requests