Skip to content
This repository has been archived by the owner on Feb 3, 2023. It is now read-only.

Syntax for suggested merging of data assets #6

Open
rabernat opened this issue Oct 7, 2019 · 26 comments
Open

Syntax for suggested merging of data assets #6

rabernat opened this issue Oct 7, 2019 · 26 comments

Comments

@rabernat
Copy link
Collaborator

rabernat commented Oct 7, 2019

It should always be an option for an application to open the data assets (the rows of the CSV file) one-by-one. However, as discussed in the design document, we also may want to "suggest" how the assets can be merged / aggregated.

@andersy005 original had this in yaml.

# define the set of attributes for which common values indicate
# datasets that can be merged or concatenated. `intake-esm` applies
# `df.groupby(col_compatible_dset_attrs)` to determine the unique groups
col_compatible_dset_attrs: 
     - institution_id
     - source_id
     - experiment_id
     - table_id
     - grid_label

# define a set of new dimensions across which to 
# concatenate and construct a return dataset.
col_concat_newdim_attrs: 
     - member_id

# define a set of collection attributes across which to 
# merge returned datasets
col_merge_attrs: 
     - variable_id

I'm not sure exactly how the new syntax would look. We should try to leverage the vocabulary from ncml rather than inventing something new.

This issue is also discussed in pydata/xarray#2697.

@rabernat
Copy link
Collaborator Author

rabernat commented Oct 9, 2019

What I would like to see is a standalone package called something like xarray-mergetool.

@matt-long
Copy link
Collaborator

I think we should postpone effort on developing a separate package like xarray-mergetool. There are likely to be relatively few implications for the user-facing API, so we should focus first on functionality. I would be happy with a separate package, but note constructing input to control merging is not trivial—and that in part is what the value-added piece is here.

@matt-long
Copy link
Collaborator

matt-long commented Oct 9, 2019

@andersy005 and I propose the following aggregation_control as an extension of the catalog spec definition:

"aggregation_control": {
  "type": "object",
  "required": [
    "variable_column_name"
  ],
  "properties": {
    "variable_column_name": {
      "title": "Variable Column Name",
      "description": "Name of the attribute column in csv file that contains the variable name.",
      "type": "string"
    },
    "join_newdim_column_names": {
      "title": "joinNewDim column names",
      "description": "Column names across which join datasets along a new dimension.",
      "type": "array",
     "dset_groupby_column_names": {
        "title": "Compatible dataset column names",
        "description": "Column names for which common values indicate datasets that can be joined.",
        "type": "array"
      }
    }
  }
}

For CMIP6, this would look like this:

"aggregation_control": {
    "dset_groupby_column_names": [
            "activity_id",
            "institution_id",
            "source_id",
            "experiment_id",
            "table_id",
            "grid_label"],
    "join_newdim_column_names": ["member_id"],
    "variable_column_name": "variable_id"
}

variable_column_name is currently required to prevent xarray from doing the wrong thing with static variables.

The code that accomplishes the aggregation works as follows.

  1. Query results are grouped by dset_groupby_column_names to create compatible_groups.
  2. Loop over compatible_groups and groupby join_newdim_column_names to create join_newdim_groups.
  3. Loop over join_newdim_groups, open the assets in this group, apply expand_dims as necessary, append to dset_list.
  4. Apply combine_by_coords to dset_list and return.

I think we have a working prototype in hand.

It is a requirement that successive "groupby" operations applied to dset_groupby_column_names and then to join_newdim_column_names yield a collection of datasets that can be merged.

aggegration_control is a property of the collection and immutable. If a user is interested in avoiding merge behavior, returning, for example, a single ensemble member, they must do this by adding specificity to their query.

We are automatically concatenating multiple files in time, should multiple files exist after successive grouping, but this is happening under the hood in xarray using the appropriate coordinate information. Again, the search API provides a mechanism to avoid this, but it might be desirable to have attributes in some collections like epoch or date_range that enable appropriate searching.

Another use case we considered is the Decadal Prediction experiments. These might look similar to the CMIP6 example above, except for the following.

    "join_newdim_column_names": ["member_id", "start_year"],

@rabernat, what do you think?

@rabernat
Copy link
Collaborator Author

rabernat commented Oct 9, 2019

Where does the time concatenation come in?

@matt-long
Copy link
Collaborator

It's handled implicitly by xarray.

@andersy005
Copy link
Contributor

And here's the implementation that I've come up with:

   def _open_dataset(self):

        path_column_name = self._col_data['assets']['column_name']
        if 'format' in self._col_data['assets']:
            data_format = self._col_data['assets']['format']
            use_format_column = False
        else:
            format_column_name = self._col_data['assets']['format_column_name']
            use_format_column = True
            
        compatible_attrs = self._col_data["aggregation_control"].get("dset_groupby_column_names", [])
        concat_newdim_attrs = self._col_data["aggregation_control"].get("join_newdim_column_names", [])
        variable_column_name = self._col_data["aggregation_control"]["variable_column_name"]
        
        if compatible_attrs:
            groups = self.df.groupby(compatible_attrs)
        else:
            groups = self.df.groupby(self.df.columns.tolist())

        dsets = {}
        
        for compat_key, compatible_group in groups:
            if concat_newdim_attrs:
                concat_groups = compatible_group.groupby(concat_newdim_attrs)
            else:
                concat_groups = compatible_group.groupby(compatible_group.columns.tolist())
                
            datasets = []
            for concat_key, concat_group in concat_groups:
                temp_ds = []
                for _, row in concat_group.iterrows():
                    if use_format_column:
                        data_format = row[format_column_name]
                    if concat_newdim_attrs:
                        if isinstance(concat_key, str):
                            concat_key = [concat_key]
                        expand_dims = dict(zip(concat_newdim_attrs, concat_key))
                        expand_dims = {dim_name: [dim_value] for dim_name, dim_value in expand_dims.items()}
                    varname = [row[variable_column_name]]
                    temp_ds.append(_open_dataset(row, path_column_name, varname, data_format, expand_dims=expand_dims, zarr_kwargs=self.zarr_kwargs, cdf_kwargs=self.cdf_kwargs))
                datasets.extend(temp_ds)
            attrs = dict_union(*[ds.attrs for ds in datasets])
            dset = xr.combine_by_coords(datasets)
            dset = _restore_non_dim_coords(dset)
            dset.attrs = attrs
            group_id = '.'.join(compat_key)
            dsets[group_id] = dset
       
        self._ds = dsets

@rabernat
Copy link
Collaborator Author

rabernat commented Oct 9, 2019

It's handled implicitly by xarray.

Explicit is better than implicit.

We should try to leverage the vocabulary from ncml rather than inventing something new.

I still stand by this. They have thought this problem through.

Why can't we have something like?

"aggregations": [
  {
    "type": "union",
    "attribute_name": "variable_id"
  },
  {
    "type": "join_new",
    "attribute_name": "member_id"
  }.
  {
    "type": "join_existing",
    "attribute_name": "time_range",
    "dim_name": "time"
  }
]

@matt-long
Copy link
Collaborator

I think that approach could work and I like it's clarity and simplicity.

I could see @andersy005's implementation essentially working as is, but first deriving dset_groupby_column_names from the columns not specified in aggregations and proceeding along a similar path...with the exception of the join_existing, for which we're relying on xarray's coordinate-aware combine. This is explicit in a sense.

I'd have to think more about an alternative implementation.

@rabernat
Copy link
Collaborator Author

rabernat commented Oct 10, 2019

I wasn't trying to provide a complete example, just referring to the aggregation part. I agree that we should still explicitly have something like dset_groupby_column_names. In my version, the complete example would look like.

"suggested_aggregations": {
  "groupby_attributes": [
              "activity_id",
              "institution_id",
              "source_id",
              "experiment_id",
              "table_id",
              "grid_label"],
  "aggregations": [
    {
      "type": "union",
      "attribute_name": "variable_id"
    },
    {
      "type": "join_new",
      "attribute_name": "member_id",
      "options": {"coords": "minimal"}
    }.
    {
      "type": "join_existing",
      "attribute_name": "time_range",
      "dim_name": "time"
    }
  ]
}

An implementation would do something like this for each group

flowchart

Doing the aggregations in an general way requires a recursive algorithm.

@rabernat
Copy link
Collaborator Author

I hacked up a little demo of how this sort of recursive algorithm might work:

https://nbviewer.jupyter.org/gist/rabernat/eb15709b2b7cac33b64f1164dfd49992

Hope this is useful. I would eventually like to implement this in the hypothetical xarray-mergetool package. For now, feel free to use it in intake-esm if it fits.

@andersy005
Copy link
Contributor

@rabernat

In apply_aggregation you pointed out that we don't actually care about the keys, and I was wondering how we are going to know the values for the join_new dimension?

def apply_aggregation(v, level=0):
    """Recursively descend into nested dictionary and aggregate items.
    level tells how deep we are."""
    
    assert level <= n_agg
    
    if level == n_agg:
        # bottom of the hierarchy - should be an actual path at this point
        return open_dataset(v)
    else:
        agg_column = agg_columns[level]
        agg_function = agg_function_map[aggregation_dict[agg_column]['type']]
        # we don't actually care about the keys
        dsets = [apply_aggregation(w, level=level+1)
                 for w in v.values()]
        return agg_function(*dsets)

For instance, with the following group:

{'pr': {'r1i1p1f1': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/',
  'r2i1p1f1': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/',
  'r3i1p1f1': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/'}}

I end up with three datasets from the three ensemble members:

'union(join_new(OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/), OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/), OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/)))'

When join_new() is called, it gets a list of three datasets, but it is not aware of the r1i1p1f1, r2i1p1f1, r3i1p1f1 values for the new member_id. I am not sure how the join_new() operation is going to work without knowing the actual values for member_id.

In my previous implementation, I was expanding every single asset (zarr store, netcdf file) with a new dim after the open_dataset operation

  • ds = OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/)
  • ds = ds.expand_dims({'member_id': ['r2i1p1f1'])
  • return ds

@rabernat
Copy link
Collaborator Author

rabernat commented Oct 10, 2019

Good observation.

So apply_aggregation does need to care about the keys.

  • change its signature to apply_aggregation(keys, values, level=0)
  • The agg functions should accept a keys keyword (but maybe do nothing with it) other than assert len(dsets) == len(keys)
  • Then we call agg_function(*dsets, keys=keys)

Alternatively, we could make all function just accept a dictionary of keys: dsets. More concise but also maybe more confusing to read.

@andersy005
Copy link
Contributor

Awesome! Thank you for for the prompt reply! I am going to try this out

@andersy005
Copy link
Contributor

andersy005 commented Oct 10, 2019

to understand recursion you must first understand recursion. :)

Well, I am having a hard time getting it to work :)

@rabernat, I was having a hard time wrapping my head around your approach, and I made a few changes:

def open_dataset(d, agg_column, key):
    return f'OPEN({d}.EXPAND_DIMS({{{agg_column} : {[key]}}})'

def join_new(*d):
    return 'join_new(' + ', '.join(d) + ')'

def join_existing(*d):
    return 'join_existing(' + ', '.join(d) + ')'

def union(*d):
    return 'union(' + ', '.join(d) + ')'
def apply_aggregation(v, agg_column=None, k=None, level=0):
    """Recursively descend into nested dictionary and aggregate items.
    level tells how deep we are."""
    
    assert level <= n_agg
    
    
    if level == n_agg:
        
        # bottom of the hierarchy - should be an actual path at this point
        return open_dataset(v, agg_column, k)
    else:
        
        agg_column = agg_columns[level]
        agg_type = aggregation_dict[agg_column]['type']
        agg_function = agg_function_map[agg_type]
        dsets = [apply_aggregation(value, agg_column=agg_column, k=key, level=level+1)
                 for key, value in v.items()]
        return agg_function(*dsets)

Everything seemed to be working when there's only one new dimension to add in join_new():

nd = {'pr': {'r1i1p1f1': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/',
  'r2i1p1f1': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/',
  'r3i1p1f1': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/'}}
apply_aggregation(nd)

"union(join_new(OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/.EXPAND_DIMS({member_id : ['r1i1p1f1']}), OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/.EXPAND_DIMS({member_id : ['r2i1p1f1']}), OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/.EXPAND_DIMS({member_id : ['r3i1p1f1']})))"

I tried simulating one other use case for Decadal Prediction datasets. This case requires adding two new dimensions (start_year, member_id)

with

nd={'pr': {'r1i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/'},
  'r2i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/'},
  'r3i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/'}}}

aggregation_dict = {'variable_id': {'type': 'union'},
 'member_id': {'type': 'join_new', 'options': {'coords': 'minimal'}},
 'start_year': {'type': 'join_new', 'options': {'coords': 'minimal'}}}

apply_aggregation produces the wrong results

apply_aggregation(nd)

"union(join_new(join_new(OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/.EXPAND_DIMS({start_year : ['1960']})), join_new(OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/.EXPAND_DIMS({start_year : ['1960']})), join_new(OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/.EXPAND_DIMS({start_year : ['1960']}))))"

Any ideas on how to do this the right way are appreciated

Cc @matt-long

@rabernat
Copy link
Collaborator Author

rabernat commented Oct 10, 2019

I see a few issues in what you posted.

  1. How can you apply an aggregation over start_year when you only have a single item for each year? Aggregation only makes sense when you have multiple datasets to combine, for example:
'r2i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/1960',
             '1980': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/1980'}

Either remove the start_year aggregation or else provide more values to operate on. (This error shows that some input validation / consistency checks would be very useful here.)

  1. Why are you calling join_new on a timeseries? You should be doingjoin_existing, possibly specifying dim='time' as part of the join option.

  2. I would not call expand_dims in open_dataset. Expand dims is only necessary in join_new. So it should be called there. Moreover, I'm not sure expand_dims is necessary at all. Note that xarray.concat is meant to "Concatenate xarray objects along a new or existing dimension." You can use a dataarray as the concat dim, like this

concat_dim = xr.DataArray([member_ids], dim=['member_id'])
return xr.concat(dsets, dim=concat_dim)

@rabernat
Copy link
Collaborator Author

One implication of what I said above is that the catalog needs to be fully explicit about each data granule. I don't know if you are currently relying on globbing / open_mfdataset to discover time granules--that should not be done anymore.

@rabernat
Copy link
Collaborator Author

  1. Why are you calling join_new on a timeseries? You should be doingjoin_existing, possibly specifying dim='time' as part of the join option

Ok I see how I misunderstood this. In decadal prediction, the start_date IS clearly a new dimension. However, I think my points 1 and 3 still stand.

@andersy005
Copy link
Contributor

Aggregation only makes sense when you have multiple datasets to combine

The right representation for my example

nd={'pr': {'r1i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/'},
  'r2i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/'},
  'r3i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/'}}}

should have been

nd={'pr': {'1960': {'r1i1p1f1': {'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/'},
  'r2i1p1f1':  {'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/'},
  'r3i1p1f1': {'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/'}}}}

@andersy005
Copy link
Contributor

However, I think my points 1 and 3 still stand.

I concur... Let me take another stab at it again

I don't know if you are currently relying on globbing / open_mfdataset to discover time granules

I wasn't relying on globbing or open_mfdataset. I was opening each file independently with open_dataset() and using xr.combine_by_coords() to do the magic time concatenation.

@rabernat
Copy link
Collaborator Author

You have changed the nesting order, but I still don't see the point. Why are you trying to do join_new over start_date when you only have one key for start_date? Your example would make more sense to me if you had multiple start dates to concatenate.

@rabernat
Copy link
Collaborator Author

I wasn't relying on globbing or open_mfdataset. I was opening each file independently with open_dataset() and using xr.combine_by_coords() to do the magic time concatenation.

But you are still assuming that you will "list" the directory contents to discover the files to open, correct? What I am suggesting is that, going forward, every single individual file must appear explicitly in the CSV catalog.

@andersy005
Copy link
Contributor

andersy005 commented Oct 10, 2019

You have changed the nesting order, but I still don't see the point. Why are you trying to do join_new over start_date when you only have one key for start_date? Your example would make more sense to me if you had multiple start dates to concatenate.

Assuming that every file zarr store in nd ends up having the following dimensions:

pr         (time, lat, lon) float32 dask.array<shape=(432, 160, 320), chunksize=(432, 160, 320)
nd={'pr': {'1960': {'r1i1p1f1': {'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/'},
  'r2i1p1f1':  {'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/'},
  'r3i1p1f1': {'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/

What I am hoping on achieving after all join_new() calls, is a dataset with:

pr         (start_year, member_id, time, lat, lon) float32 dask.array<shape=(1, 3, 432, 160, 320), chunksize=(1, 1, 432, 160, 320)

where start_year=1960 and member_id=['r1i1p1f1', 'r2i1p1f1', 'r3i1p1f1']

I might be missing something here

@andersy005
Copy link
Contributor

But you are still assuming that you will "list" the directory contents to discover the files to open, correct?

Yes

@andersy005
Copy link
Contributor

Now I am realizing that this process might a little different for netcdf files where the time axis is split across multiple netcdf files which appear as multiple entries in the csv catalog.

Let me revisit your approach and stop relying on expand_dims ( and use xarray.concat explicity). I will get back to you if I have more questions

@rabernat
Copy link
Collaborator Author

Now I am realizing that this process might a little different for netcdf files where the time axis is split across multiple netcdf files which appear as multiple entries in the csv catalog.

In this case, you would just have to include an explicit join, like

    'time_granule': {'type': 'join_existing', 'options': {'dim': 'time'}}

I think this is really preferable to what you are currently doing in intake-esm, where time concatenation is implicit.

In the end, it will feel the same to the user (data is provided as a unified xarray dataset). But under the hood, the architecture will be much cleaner.

@matt-long
Copy link
Collaborator

matt-long commented Oct 10, 2019

@rabernat, first of all, this is a very impressive bit of thinking on your part.

We have it working with real data and actual calls to xarray. We have tested on Decadal Prediction runs, adding 2 new dims, and the vanilla CMIP data. It works!

The aggregation function have different signatures (may not be fully developed here).

import xarray as xr
def open_dataset(path, user_kwargs={'chunks': {'time': 36}}): # hardwired chunking for example
    return xr.open_dataset(path, **user_kwargs)

def join_new(dsets, dim_name, coord_value, options={}):
    concat_dim = xr.DataArray(coord_value, dims=(dim_name), name=dim_name)
    return xr.concat(dsets, dim=concat_dim, **options)

def join_existing(dsets, options={}):
    return xr.concat(dsets, dim='time')

def union(dsets, options={}):
    return xr.merge(dsets, **options)

apply_aggregation is modified as follows.

def apply_aggregation(v, agg_column=None, key=None, level=0):
    """Recursively descend into nested dictionary and aggregate items.
    level tells how deep we are."""

    assert level <= n_agg

    if level == n_agg:
        # bottom of the hierarchy - should be an actual path at this point
        return open_dataset(v)

    else:
        agg_column = agg_columns[level]

        agg_info = aggregation_dict[agg_column]
        agg_type = agg_info['type']        

        if 'options' in agg_info:
            agg_options = agg_info['options']
        else:
            agg_options = {}            

        dsets = [apply_aggregation(value, agg_column, key=key, level=level+1)
                 for key, value in v.items()]        
        keys = list(v.keys())

        if agg_type == 'join_new':
            return join_new(dsets, dim_name=agg_column, coord_value=keys, options=agg_options)

        elif agg_type == 'join_existing':  
            return join_existing(dsets, options=agg_options)

        elif agg_type == 'union':
            return union(dsets, options=agg_options)

@andersy005 will clean this up and implement for real; hopefully we'll have it in intake-esm tonight or tomorrow.

This approach requires that the variable "time" is universally present in the collection; i.e., you couldn't mix datasets with variables named "Time" and "time" in the same collection. We could work around this (adding a time_coord_name column, for instance), but could probably just live with multiple collections to handle that use case.

I am curious if there are any performance implications associated with the order of aggregation operations. I don't see any a priori reason for there to be, but maybe I am naive.

cc @dcherian

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

3 participants