Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full support for multiindex in dataframes #1493

Open
dirkbike opened this issue Aug 20, 2016 · 55 comments · May be fixed by #8153
Open

Full support for multiindex in dataframes #1493

dirkbike opened this issue Aug 20, 2016 · 55 comments · May be fixed by #8153

Comments

@dirkbike
Copy link

Dask can load a dataframe from a pytables hdf5 file, and pytables already supports a hierarchy tables. Why not simulate a multiindex (like in pandas) by loading all tables from an hdf5 file into one dask dataframe with nested column indices?

@mrocklin
Copy link
Member

I encourage you to prototype this, perhaps with dask.delayed. http://dask.readthedocs.io/en/latest/delayed-collections.html

@dirkbike
Copy link
Author

I was originally thinking of doing this as a dict that wraps a bunch of dask.dataframes, but as you recommended I'm trying this with dask.delayed. I am using pandas to read/write the hdf data rather than pytables using these functions:

import pandas as pd

def custom_load(key):
    return pd.read_hdf('test.hdf', key)

def custom_save(df, key):
    df.to_hdf('test.hdf', key)

Unfortunately, when I try to use them to build a dask.dataframe I get a TypeError exception:

import dask.dataframe as dd
from dask.delayed import delayed

dfs = [delayed(custom_load)(key) for key in ['msft','aapl']]
df = dd.from_delayed(dfs)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-19-59138932b0db> in <module>()
----> 1 df = dd.from_delayed(dfs)

C:\Python34\lib\site-packages\dask\dataframe\io.py in from_delayed(dfs, metadata, divisions, columns, prefix)
    670         return Series(merge(dsk, dsk2), name, metadata, divisions)
    671     else:
--> 672         return DataFrame(merge(dsk, dsk2), name, metadata, divisions)
    673
    674

C:\Python34\lib\site-packages\dask\dataframe\core.py in __new__(cls, dask, name, columns, divisions)
   1322         result._name = name
   1323
-> 1324         result._pd, result._known_dtype = cls._build_pd(columns)
   1325         result.divisions = tuple(divisions)
   1326         return result

C:\Python34\lib\site-packages\dask\dataframe\core.py in _build_pd(cls, metadata)
    201         else:
    202             if np.isscalar(metadata) or metadata is None:
--> 203                 _pd = cls._partition_type([], name=metadata)
    204             else:
    205                 _pd = cls._partition_type(columns=metadata)

TypeError: __init__() got an unexpected keyword argument 'name'

I don't have much experience with dask.delayed so I'm not sure what the problem is. For reference, this is how I built test.hdf:

from pandas_datareader import data as web

df1 = web.get_data_yahoo('msft', '2000-01-01', '2016-01-01')
df2 = web.get_data_yahoo('aapl', '2000-01-01', '2016-01-01')

df1.to_hdf('test.hdf', 'msft', format='table', complevel=9, complib='blosc')
df2.to_hdf('test.hdf', 'aapl', format='table', complevel=9, complib='blosc')

@mrocklin
Copy link
Member

Can you do me a favor and try this from git master?

On Mon, Aug 22, 2016 at 8:05 PM, dirkbike notifications@github.com wrote:

I was originally thinking of doing this as a dict that wraps a bunch of
dask.dataframes, but as you recommended I'm trying this with dask.delayed.
I am using pandas to read/write the hdf data rather than pytables using
these functions:

import pandas as pd

def custom_load(key):
return pd.read_hdf('test.hdf', key)

def custom_save(df, key):
df.to_hdf('test.hdf', key)

Unfortunately, when I try to use them to build a dask.dataframe I get a
TypeError exception:

import dask.dataframe as dd
from dask.delayed import delayed

dfs = [delayed(custom_load)(key) for key in ['msft','aapl']]

df = dd.from_delayed(dfs)

TypeError Traceback (most recent call last)
in ()
----> 1 df = dd.from_delayed(dfs)

C:\Python34\lib\site-packages\dask\dataframe\io.py in from_delayed(dfs, metadata, divisions, columns, prefix)
670 return Series(merge(dsk, dsk2), name, metadata, divisions)
671 else:
--> 672 return DataFrame(merge(dsk, dsk2), name, metadata, divisions)
673
674

C:\Python34\lib\site-packages\dask\dataframe\core.py in new(cls, dask, name, columns, divisions)
1322 result._name = name
1323
-> 1324 result._pd, result._known_dtype = cls._build_pd(columns)
1325 result.divisions = tuple(divisions)
1326 return result

C:\Python34\lib\site-packages\dask\dataframe\core.py in _build_pd(cls, metadata)
201 else:
202 if np.isscalar(metadata) or metadata is None:
--> 203 _pd = cls._partition_type([], name=metadata)
204 else:
205 _pd = cls._partition_type(columns=metadata)

TypeError: init() got an unexpected keyword argument 'name'

I don't have much experience with dask.delayed so I'm not sure what the
problem is. For reference, this is how I built test.hdf:

from pandas_datareader import data as web

df1 = web.get_data_yahoo('msft', '2000-01-01', '2016-01-01')
df2 = web.get_data_yahoo('aapl', '2000-01-01', '2016-01-01')

df1.to_hdf('test.hdf', 'msft', format='table', complevel=9, complib='blosc')
df2.to_hdf('test.hdf', 'aapl', format='table', complevel=9, complib='blosc')


You are receiving this because you commented.
Reply to this email directly, view it on GitHub
#1493 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AASszLSaZvyokPKWr5qE1Kdj6Dnnq2Tkks5qijlVgaJpZM4JpFvx
.

@dirkbike
Copy link
Author

That worked, thanks. So, what happened is that all of the 'aapl' data was concatenated to the end of 'msft' data in one large dataframe. However, in this case it would be more desirable to have a top-level index that uses key from custom_load in a way similar to this:

df1 = pd.read_hdf('test.hdf', 'msft')
df2 = pd.read_hdf('test.hdf', 'aapl')
df = pd.concat([df1, df2], keys=['msft','aapl'], axis=1)
df['msft'].head() # first key
                Open     High       Low     Close    Volume  Adj Close
Date
2000-01-03  117.3750  118.625  112.0000  116.5625  53228400  39.840438
2000-01-04  113.5625  117.125  112.2500  112.6250  54119000  38.494621
2000-01-05  111.1250  116.375  109.3750  113.8125  64059600  38.900502
2000-01-06  112.1875  113.875  108.3750  110.0000  54976600  37.597410
2000-01-07  108.6250  112.250  107.3125  111.4375  62013600  38.088740

df['aapl'].head() # second key
                  Open        High         Low       Close     Volume  Adj Close
Date
2000-01-03  104.874997  112.499998  101.687501  111.937502  133949200   3.660058
2000-01-04  108.250001  110.625002  101.187503  102.500003  128094400   3.351477
2000-01-05  103.749998  110.562497  103.000001  103.999997  194580400   3.400523
2000-01-06  106.124999  106.999999   94.999998   94.999998  191993200   3.106247
2000-01-07   96.499999  101.000002   95.500003   99.500001  115183600   3.253385

@mrocklin
Copy link
Member

Then perhaps you're right that your dict-of-dataframes idea would suit better

@dirkbike
Copy link
Author

Just curious, but why can't the dask.dataframe object support these kinds of keys internally? I think that is all that's necessary to simulate a pandas multiindex (at least across columns). Would modifying the _Frame class be the best place for this?

@mrocklin
Copy link
Member

Eventually yes, it would be nice for DataFrame to support multiindices. It's non-trivial to change all functions within dask.dataframe to support this. I budget this task at somewhere between a week and a month of developer time, though I am often pessimistic in things like this.

Have you read through the design documentation of dask.dataframe? http://dask.readthedocs.io/en/latest/dataframe-partitions.html

@dirkbike
Copy link
Author

So if I'm understanding correctly, it seems that the best way to support multiindex would be to map them to multiple dimensions of partitions since the multiindex itself provides a natural place to create a partition. My example above would only add a second dimension to the partitions (partitions would span time index and then first-level column keys). It would be a lot easier to maintain these partitions outside of dask by managing multiple dask.dataframe objects, but you would lose the ability to slice across multiple of these partitions and won't guarantee the data stays aligned with the index. I agree that this is non-trivial, since it looks like it would require changing how all of the blocked algorithms are handled and might add undesired overhead.

@mrocklin
Copy link
Member

Yes, that seems like a reasonable synopsis. We would choose some depth of the multi-index along with to partition. For example we might partition along the second or third step of the multi-index. Partitions would then hold a list of tuples of values rather than a list of single values. Many of the operations can probably be changed in bulk, by changing some of the heavier functions, like elemwise and reduction, but I would expect groupbys, joins, etc. to take a fair amount of finesse. I don't yet see a way to do this incrementally.

@dirkbike
Copy link
Author

This may be a bit of a stretch, but maybe it's worth considering more abstract partitioning. I got some inspiration from this paper A Hierarchical Aggregation Framework for Efficient Multilevel Visual Exploration and Analysis that breaks data into hierarchical chunks of smaller and smaller index-slices to make data exploration faster. Partitioning the data would be an expensive operation, but can be done as data is collected. You would need at least one hierarchy for the index and possibly one for the columns (column groupings can come from an existing storage hierarchy or can be made dynamically using groupbys). The index hierarchy would define the partitions similar to dask's current structure except using multiple levels (i.e. days are grouped into months, which are grouped into years, etc.). Columns could use a pseudo-index to map to the main index (i.e. a range or years, months, or specific days) to keep the data dense (no filler NaNs) and allow calculations to quickly skip regions with no data. Index and column groupings would be exposed to the end user via indexing and slicing methods and would provide natural partition boundaries for applied computation. A column hierarchy also provides an organized structure for caching intermediate computation results.

@mrocklin
Copy link
Member

Sounds very cool. I encourage you to explore that further.

@dirkbike
Copy link
Author

I started a prototype using basic python structures (dicts, and subclasses of lists) and realized that data columns either need to use a sequence of index labels to identify each element (because of the hierarchical index), or the columns can map to a flat representation of the hierarchical index (using a pseudo-index). I couldn't think of another way to do this, and mapping each element individually with labels would be very wasteful. The problem with using a pseudo-index is that when data is appended to the data set, the pseudo-index needs to be recalculated. I'm starting to re-think the use of hierarchies at all. Relational databases can represent hierarchical structures by referencing keys between tables of data, and joining tables on a specific column already aligns tables to each other. Perhaps it's better to treat every chunk of data (of n-columns) as a regular 2D dataframe, and use a relational representation to tie all of the dataframes together. Each dataframe would have its own independent index, avoiding the pseudo-index problem, and only when chunks are joined would the index need to be adjusted. The end user could still reference a specific subset of data using slices and labels, but chunks of data would be dynamically joined (or split if necessary) behind the scenes. I'm going to try and prototype something using sqlite and pandas with some more stock data and see how that might work.

@liorshk
Copy link

liorshk commented May 5, 2017

Any update on this issue?

@dirkbike
Copy link
Author

dirkbike commented May 5, 2017

I ended up using a regular SQL database to track chunks of data and assembling them as necessary into Pandas DataFrames.

@timothydmorton
Copy link

Does dask support reading multi-level indices yet? I'm particularly interested in reading a table written to parquet with a multi-level column index, and I'm getting the following traceback when I try to do this:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<timed exec> in <module>()

/ssd/lsstsw/stack3_20171021/python/miniconda3-4.3.21/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine)
    797 
    798     return read(fs, fs_token, paths, columns=columns, filters=filters,
--> 799                 categories=categories, index=index)
    800 
    801 

/ssd/lsstsw/stack3_20171021/python/miniconda3-4.3.21/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in _read_pyarrow(fs, fs_token, paths, columns, filters, categories, index)
    557     dtypes = {storage_name_mapping.get(k, k): v for k, v in dtypes.items()}
    558 
--> 559     meta = _meta_from_dtypes(all_columns, dtypes, index_names, column_index_names)
    560 
    561     if out_type == Series:

/ssd/lsstsw/stack3_20171021/python/miniconda3-4.3.21/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in _meta_from_dtypes(to_read_columns, file_dtypes, index_cols, column_index_names)
    140         df.columns.name = column_index_names[0]
    141     else:
--> 142         df.columns.names = column_index_names
    143     return df
    144 

~/.local/lib/python3.6/site-packages/pandas/core/indexes/base.py in _set_names(self, values, level)
   1116         if len(values) != 1:
   1117             raise ValueError('Length of new names must be 1, got %d' %
-> 1118                              len(values))
   1119         self.name = values[0]
   1120 

ValueError: Length of new names must be 1, got 3

If multi-level indices are generally supported, but not in read_parquet, then perhaps this should be a new issue?

@vss888
Copy link

vss888 commented Jun 1, 2018

To add my 5 cents: absence of MultiIndex support is the show-stopper for me in terms of doing anything with Dask beyond poking around a bit. It is the most important missing feature. Please, do consider implementing it in some form sooner.

@mrocklin
Copy link
Member

mrocklin commented Jun 4, 2018

I definitely agree @vss888 if this is something that you'd like to contribute that would be very welcome!

@mmann1123
Copy link

This is maybe a simple but not ideal hack with a less that idea resolution. If there were a way to do element-wise concatenation on the two indexes you could create a unique multi-index value (sort of).

The issue that I am running into is that i can't figure out how to do element-wise concat on two dask arrays. Any way to do the following?

index = np.array([x1 + x2 +x3 for x1,x2,x3 in zip(index_id1.astype('str'),
                                           repeat('-', len(index_id1) ) ,
                                           index_id2.astype('str'))])

@mUtterberg
Copy link

Hi there. I'm trying to get a handle on what all might be involved in supporting development on this. It sounds like a few options were previously explored, but the method discussed by @dirkbike and @mrocklin above is the preferred path forward, although the main blocker to that is the amount of work and inability to implement such a change incrementally. @mrocklin do you have a ballpark on the number of functions in the DataFrame API that would be affected by this? I see that it's a complex issue, but I'd like to at least look into supporting this or breaking it down and finding some people to help chip away at it.

@mrocklin
Copy link
Member

mrocklin commented Jun 29, 2019 via email

@jakirkham
Copy link
Member

@TomAugspurger, do you have any thoughts on this one?

@TomAugspurger
Copy link
Member

TomAugspurger commented Aug 8, 2019 via email

@jvivian-atreca
Copy link

jvivian-atreca commented Aug 26, 2019

In the interim, does anyone have a workaround? I don't actually need the multi-index, but all of the intermediate operations I want to use output a multi-index dataframe (pivot_table or groupby with first().unstack() before I can call reset_index) .

These two operations produce the same result for my data, but both produce a multi-index as an intermediate step:

# First, pivot_table
df.pivot_table(index=['Query', 'Target'], columns='Path', values='Percent_similar').reset_index()
# groupby with unstack
df.groupby(['Query', 'Target', 'Path'])['Percent_similar'].first().unstack().reset_index()

My only workaround is to iterate over the unique values of one of the desired indices (in this case, Query), perform pivot_table, assign a new column to the value I'm iterating over, and finally concatenate them all back together.

@jvivian-atreca
Copy link

Would a suitable workaround be just concatenating the string versions of the multiple indices into one column, then making that column the index?

For example:

ddf['%s_%s_%s' %('id1', 'id2', 'id3')] = ddf['id1'].astype(str) + ddf['id2'].astype(str) + ddf['id3'].astype(str)
ddf = ddf.set_index('%s_%s_%s' %('id1', 'id2', 'id3'))

That's what I've done to get around this in the past.

@msacs09
Copy link

msacs09 commented Feb 23, 2021

@TheXu How do we do this when doing read_sql_table please

ddf['%s_%s_%s' %('id1', 'id2', 'id3')] = ddf['id1'].astype(str) + ddf['id2'].astype(str) + ddf['id3'].astype(str)
ddf = ddf.set_index('%s_%s_%s' %('id1', 'id2', 'id3'))

daskDF = ddf.read_sql_table('test_table', sqluri, index_col=['column1','column2'])

@TheXu
Copy link

TheXu commented Feb 24, 2021

@TheXu How do we do this when doing read_sql_table please

ddf['%s_%s_%s' %('id1', 'id2', 'id3')] = ddf['id1'].astype(str) + ddf['id2'].astype(str) + ddf['id3'].astype(str)
ddf = ddf.set_index('%s_%s_%s' %('id1', 'id2', 'id3'))

daskDF = ddf.read_sql_table('test_table', sqluri, index_col=['column1','column2'])

Since dask doesn't support MultiIndex yet, you might have to concatenate column1 and column2 within SQL (naming it 'column1_and_column2' for example), then

daskDF = ddf.read_sql_table('test_table', sqluri, index_col=['column1_and_column2'])

@msacs09
Copy link

msacs09 commented Feb 24, 2021

@TheXu Thank you. Can we construct this in the dask ? something like. If yes, would appreciate if you can share syntax?

 Dindex_col = ddf['id1'].astype(str) + ddf['id2'].astype(str) + ddf['id3'].astype(str)
daskDF = ddf.read_sql_table('test_table', sqluri, index_col=Dindex_col)

Alternatively, can we do this? i get syntax error

sa_meta = sa.MetaData()
sa_table = sa.Table("test1", sa_meta, autoload=True, autoload_with=engine)
sa_query= sa.select("select a.* , a.id||a.period as d_idx from test1 a join test2 b on test1.id=test2.id where a.acct='Client'")
ddf = dd.read_sql_table(sa_query, sqluri, index_col="d_idx ")

@martindurant martindurant changed the title Support for multiindex in dataframes Full support for multiindex in dataframes Mar 12, 2021
@0x00b1
Copy link

0x00b1 commented Apr 15, 2021

I ran into this problem earlier in the week when, for the first time, converting a package to use Dask instead of Pandas. It was a serious bummer since the performance benefits I was starting to see were dramatic. If I have more time in the future, I would be more than happy to do the implementation if one has already been designed.

@TomAugspurger
Copy link
Member

@0x00b1 see #1493 (comment) / https://github.com/TomAugspurger/dask/tree/multiindex.

I don't recall where that branch is at, but the main choice is how to represent the data. The two choices are a list of arrays, or as an ndarray of tuples. pandas does a bit of both, but tries to avoid "materializing the tuples" as long as possible. I think Dask has to take the tuple approach. Things like partitionquantiles seems pretty hard to do on a list of arrays, but is relatively easy on an ndarray of tuples (our existing code mostly works).

@Hoeze
Copy link

Hoeze commented May 6, 2021

@TomAugspurger What about representing the MultiIndex as a sparse CSD array?
This would also allow for fast partitioning by slicing the most outer dimension of the CSD array.

Being more fancy, we could also have multi-dimensional partitioning along multiple dimensions of the array.
This would be especially useful as a secondary index.

@TomAugspurger
Copy link
Member

I'm not sure, would any advantages to using a sparse CSD array apply equally well to a pandas MultiIndex?

Being more fancy, we could also have multi-dimensional partitioning along multiple dimensions of the array.
This would be especially useful as a secondary index.

I think I looked into this briefly, but struggled with the requirement that DataFrames are ordered so (AFAICT) the partitioning strategy needs to include information from every level of the MultiIndex.

@Hoeze
Copy link

Hoeze commented May 7, 2021

Yes, you are right @TomAugspurger. Thinking over it, this would make only sense to carry all index in memory.

Likely, it makes more sense to have a RangePartitioner and HashPartitioner together with skip indices as done in Spark:

Then it would be easy to implement a ".from_pandas()" / ".to_pandas()" by mirroring the multiindex as columns.
This is probably a lot of work though...

@terramars
Copy link

My company is interested in paying someone to implement this functionality. Would significantly simplify our infrastructure and be a great contribution to the community. Please reach out to me if you're interested or would like to suggest someone who can do it. @TomAugspurger @Hoeze @jsignell

@martindurant
Copy link
Member

^ @mrocklin , I don't know if Coiled does paid feature requests or if there is any capacity. Perhaps you have a lead for this.

@0-ren
Copy link

0-ren commented Aug 4, 2022

Hey @terramars Nice to virtually meet you :) Coiled would certainly like to try and help out! Especially since it would help out the community as well as make you and your teammates lives easier.

Are you free sometime next week to talk over the specifics? Here's a link to my calendar where you can find a time that works best for you.

@terramars
Copy link

terramars commented Aug 4, 2022 via email

@0-ren
Copy link

0-ren commented Aug 5, 2022

Sounds great! Excited to help you and hopefully the community too! :)

@pfox89
Copy link

pfox89 commented Dec 21, 2022

Hello, I'm also interested in this feature, but my use-case is perhaps simpler, and there might be a way to implement this in an acceptable way that's very simple.

In my case, partitioning and chunking only need to be performed on the first level of the index.

Could a simple implementation involve Dask just keeping track of the first level of the index as a simple index, and delegating everything else to Pandas? This would restrict partitioning and chunking to the first level, but I think in a lot of cases that might be good enough.

@a-reich
Copy link

a-reich commented Oct 4, 2023

I’ll upvote this. In my work, many of our datasets have multiple “natural index” columns that we frequently join on together or filter/group by. Or sometimes there isn’t a single column that has high enough cardinality to effectively distribute a DF across many partitions. The result is that we can’t get the optimizations that dask has when operating with the index. This can also be a stumbling block in learning to transition from pandas code.
I’ll throw a thought out there and maybe one of the dask devs can say whether it’s totally off base: pandas now has pretty solid support for using Arrow-backed dtypes, including Structs. Would having the dask DF index be technically a single column, but consisting of a struct with multiple child arrays (kind of like the single MultiIndex object) help mitigate some of the technical difficulties that adding multiple-column indexes ran into?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.