Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capturing wide to long transformations of entire dataframes using Hamilton #17

Closed
HamiltonRepoMigrationBot opened this issue Feb 26, 2023 · 13 comments
Labels
documentation Improvements or additions to documentation enhancement New feature or request migrated-from-old-repo Migrated from old repository

Comments

@HamiltonRepoMigrationBot
Copy link
Collaborator

Issue by latlan1
Thursday Feb 03, 2022 at 01:17 GMT
Originally opened as stitchfix/hamilton#46


Is your feature request related to a problem? Please describe.
Is there a way to transform a dataframe from wide to long (or vice versa) using Hamilton to track this transformation? I concluded no since all of the input/output columns would need to be specified, which could be a lot of typing.

Describe the solution you'd like
It would nice if I could define a function that accepts df_wide and outputs df_long with pd.melt.

Describe alternatives you've considered
I performed the melt operation outside of Hamilton so this operation is not directly captured through the DAG.

@HamiltonRepoMigrationBot HamiltonRepoMigrationBot added documentation Improvements or additions to documentation enhancement New feature or request labels Feb 26, 2023
@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by skrawcz
Thursday Feb 03, 2022 at 02:24 GMT


A recap of our discord conversation:

what you can implement now

If you implement it as a function right now, you will lose expressiveness as the transform takes in a fixed set of columns. You would not be able to pick and choose what went into dynamically when using hamilton driver.
E.g.

@extract_columns([index_value, name, value])
def wide_to_long(col1, col2, ...) -> pd.DataFrame:
    df = pd.DataFrame(... col1, ... col2 ...).
    return df.melt(...)

Future Option 1

Introduce the ability to tell Hamilton how to "combine" results. We'd pass in an object of some type to tell the driver what to do.
e.g.

dr = driver.Driver(config_or_data, modules, adapter=LongFormatDF())
long_df = dr.execute(['col1', ..., 'col3'])
# more hamilton stuff
dr2 = driver.Driver({'col1': long_df.col1, ...}, modules)
df2 = dr2.execute(['colX', ..., 'colY'])

This would not enable a single pass through, but it would enable you to chain computation in a way that enables changing what columns are transformed to long format easily.

Future Option 2

We enable the ability to configure the inputs to a function via some configuration, and in the driver you'd be able to specify the final result, and have the DAG know what inputs are required.
e.g.

@extract_columns([index_value, name, value])
@variable_args
def wide_to_long(**variable_args) -> pd.DataFrame:
    df = pd.DataFrame(**variable_args).
    return df.melt(...)

dr = driver.Driver({..., wide_to_long_args=['col1', 'col2', ...]}, modules)
df_final = dr.execute(...)

Directional thinking

We're pretty close to enabling option 1 with #36. I think that it forces a nice decoupling of concerns in a way -- it's more obvious what's happening, versus people having to understand configuration that's passed in via option 2. But I could see a future where we enable both options actually...

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by elijahbenizzy
Thursday Feb 03, 2022 at 19:42 GMT


Agreed that both might be nice, and that (1) is pretty clean for most use-cases. (2) could actually be implemented pretty cleanly with model (soon to be named dynamic_node). You could build a decorator @wide_to_long that implements model and requires a config option that gets passed in. E.G (with bad names to demonstrate feasibility):

@wide_to_long(where_to_find_columns_in_config='my_pipeline_outputs') 
def outputs_of_my_pipeline_converted_to_long() -> pd.DataFrame:
    pass

And this should be pretty easy to implement. Have a fairly long queue of things now but I'm happy to give this a spin when I"m writing documentation on building decorators.

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by elijahbenizzy
Saturday Oct 29, 2022 at 17:32 GMT


OK, is related but not exactly the same as stitchfix/hamilton#121. We need to think through where we'd want this to live...

Also @latlan1 haven't heard from you in a while! If you have a minute, would be great to know how you're handling this and what your needs are.

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by Arnechos
Monday Feb 13, 2023 at 14:55 GMT


Hi guys, while wide_to_long from @skrawcz function works for me I have a bit of trouble going long-to-wide. I have a column n columns that have x unique values. My goal is to encode them all 0/1 dynamically as was in examples with LabelEncoder (with @parametrize). Is there a way to connect somehow list of output columns with @extract_columns decorator?

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by skrawcz
Monday Feb 13, 2023 at 19:39 GMT


Hi guys, while wide_to_long from @skrawcz function works for me I have a bit of trouble going long-to-wide. I have a column n columns that have x unique values. My goal is to encode them all 0/1 dynamically as was in examples with LabelEncoder (with @parametrize). Is there a way to connect somehow list of output columns with @extract_columns decorator?

Hi @Arnechos thanks for the question! To clarify my understanding, your question relates to knowing the name of the one hot encoded columns so you can connect it with @extract_columns?

If my understanding is correct, then there are two high level approaches based on whether:

  1. You know the possible values ahead of time.
  2. You don't know the values ahead of time.

For (1), Hamilton tries to make what is happening quite explicit. So you would put into code, the possible column values. Yes this is more work, but it means that if data changes, your code needs to explicitly change, but the upside is that it's easy to debug and maintain. E.g. in the example below it shows one-hot-encoding a "seasons" column that has four values (1, 2, 3, 4). Seasons could be a dataframe, or the function could take in multiple input columns -- the point is that you know how to create the column names; this is required if you want to write functions that declare those columns as specific inputs.

@extract_columns(*[f"seasons_{v}" for v in ["1", "2", "3", "4"]])
def seasons_encoded(seasons: pd.Series) -> pd.DataFrame:
    """One hot encodes seasons into 4 dimensions because there are 4 values:
    1 - first season
    2 - second season
    3 - third season
    4 - fourth season
    """
    return pd.get_dummies(seasons, prefix="seasons")

def some_feature(seasons_1: pd.Series, ... ) -> pd.Series:
    """function taking one of the one hot encoded columns..."""

For (2), you wouldn't be able to use @extract_columns so you'd be forced to operate at a dataframe level.

def seasons_encoded(seasons: pd.Series) -> pd.DataFrame:
    """One hot encodes seasons into 4 dimensions because there are 4 values:
    1 - first season
    2 - second season
    3 - third season
    4 - fourth season
    """
    return pd.get_dummies(seasons, prefix="seasons")

def some_feature(seasons_encoded: pd.DataFrame, ... ) -> pd.DataFrame/pd.Series:
    """function taking the dataframe instead of the series..."""

Now regarding (1), we haven't added the feature yet, but it could be possible to inject what should be extracted at "Driver" time, i.e. pass in a list of values as configuration. If you'd be interested in that functionality, then we'd need to open up an issue (I don't think we have one created for that feature request) - would that be of interest to you?

Otherwise if you can provide more context on your use case, then perhaps there could be a better way to do things? E.g. what requires the output of these columns?

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by Arnechos
Monday Feb 13, 2023 at 22:46 GMT


@skrawcz Thanks, for the input!

Suppose dataframe like this:

A B C
cat_1 cat_1 cat_1
cat_2 cat_1 cat_2
cat_3 cat_1 cat_2

Converting to wide format (using pd.get_dummies) will give this output

A_cat_1 A_cat_2 A_cat_3 B_cat_1 C_cat_1 C_cat_2
1 1 0 1 1 0
0 0 0 1 0 1
0 0 1 1 0 1

(2) first function does work, it creates dummy table. But I have a trouble extrating through Driver all of those encoded columns without specifing upfront their names (in this case after encoding so all unique values from original dataframe).

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by skrawcz
Tuesday Feb 14, 2023 at 04:57 GMT


@Arnechos understood. You should be able to do one of the following:

Create a function

# can create one to create the one-hot-encoded like you have
def seasons_encoded(seasons: pd.Series) -> pd.DataFrame:
    """One hot encodes seasons into 4 dimensions because there are 4 values:
    1 - first season
    2 - second season
    3 - third season
    4 - fourth season
    """
    return pd.get_dummies(seasons, prefix="seasons")

def my_final_df(seasons: pd.DataFrame, other_df: pd.Dataframe, other_series: pd.Series) -> pd.DataFrame:
    """use this function to stitch together what you want if you know things ahead of time."""
   _df = seasons.join(...)
   _df["other_series"] = other_series
   return _df

And then in your driver just request my_final_df as the output.

Create a custom result builder

This is useful if you want the flexibility of creating a dataframe at execution time from not knowing the outputs ahead of time. The standard Hamilton result builder doesn't concatenate dataframes (we could make it do that though) so
that's why you'd need a custom one.

class MyPandasDataFrameResult(ResultMixin):
    """Custom result builder"""
     @staticmethod
    def build_result(**outputs: Dict[str, Any]) -> pd.DataFrame:
        if len(outputs) == 1:
            (value,) = outputs.values()  # this works because it's length 1.
            if isinstance(value, pd.DataFrame):
                return value
            else:
                return pd.DataFrame(outputs)
        _df = None
        _scalars = []
        for name, output in outputs.items():
            if isinstance(output, pd.DataFrame):
                if _df is None:
                    _df = output
                else:
                    _df = pd.concat([_df, output], axis=1)
            elif isinstance(output, pd.Series):
                if _df is None:
                    _df = pd.DataFrame({name: output})
                else:
                    _df[name] = output
            else:
                _scalars.append((name, output))
        for _name, _scalar in _scalars:
            _df[_name] = _scalar
        return _df

You'd then just instantiate your driver like so:

adapter = base.SimplePythonGraphAdapter(MyPandasDataFrameResult())
dr = driver.Driver(dag_config, modules, adapter=adapter)
df = dr.execute(outputs, ...)

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by elijahbenizzy
Tuesday Feb 14, 2023 at 05:02 GMT


Heh! We had both drafted a response and sent at the same time :) Stefan said basically everything I did -- agreed with the two options. Note, you can also use a DictResult if you want to do do some initial ad-hoc work:

def long_df(...) -> pd.DataFrame:
    ... # load up your data/whatever you're doing

def some_other_column(...) -> pd.Series:
    ... # other columns
    
def wide_df(long_df: pd.DataFrame) -> pd.DataFrame:
    return pd.get_dummies(long_df, prefix=['A', 'B', 'C'])

Then in Driver:

import above_code
dr = driver.Driver({}, above_code, adapter=base.SimplePythonGraphAdapter(base.DictResult()))
dr.execute(['wide_df', 'some_other_column'])

One more thing -- we're happy to hop on a call if you want to talk through your use-case!

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by Arnechos
Tuesday Feb 14, 2023 at 08:27 GMT


@skrawcz @elijahbenizzy

Thanks a lot guys! I've just run few tests using custom result builder - works flawlessly.

The standard Hamilton result builder doesn't concatenate dataframes (we could make it do that though) so
that's why you'd need a custom one.

I think at some point it could be needed, as sometimes transformations are easier done with concatenation. For now adding an example of long-to-wide into the documentation should be enough

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by skrawcz
Monday Feb 20, 2023 at 06:19 GMT


@Arnechos I merged #321 and have a release candidate that I think should mean you can remove that custom result builder -- pip install sf-hamilton==1.17.0rc0

Could you give it a spin if you get a chance please?

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by Arnechos
Tuesday Feb 21, 2023 at 13:10 GMT


@skrawcz On sf-hamilton==1.17 no issues, I just had to remove prefix parameter from pd.get_dummies

@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by skrawcz
Tuesday Feb 21, 2023 at 18:11 GMT


great, thanks -- yes we now prefix it automatically :)

On Tue, Feb 21, 2023 at 5:10 AM Krystian Wawer @.***>
wrote:

@skrawcz https://github.com/skrawcz On sf-hamilton==1.17 no issues, I
just had to remove prefix parameter from pd.get_dummies


Reply to this email directly, view it on GitHub
stitchfix/hamilton#46 (comment),
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AARYMBZTJB4WOEWA2V5KMGLWYS5FNANCNFSM5NNTETLQ
.
You are receiving this because you were mentioned.Message ID:
@.***>

@skrawcz
Copy link
Collaborator

skrawcz commented Jul 18, 2024

we have enabled this I believe.

@skrawcz skrawcz closed this as completed Jul 18, 2024
elijahbenizzy added a commit that referenced this issue Sep 9, 2024
# This is the 1st commit message:

Update graph_functions.py

Describes what to do in `graph_functions.py`
# This is the commit message #2:

Adds comments to lifecycle base
# This is the commit message #3:

Update h_ray.py with comments for ray tracking compatibility
# This is the commit message #4:

Replicate previous error

# This is the commit message #5:

Inline function, unsure if catching errors and exceptions to be handadled differently

# This is the commit message #6:

BaseDoRemoteExecute has the added Callable function that snadwisched lifecycle hooks

# This is the commit message #7:

method fails, says AssertionError about ray.remote decorator

# This is the commit message #8:

simple script for now to check telemetry, execution yield the ray.remote AssertionError

# This is the commit message #9:

passing pointer through and arguments to lifecycle wrapper into ray.remote

# This is the commit message #10:

post-execute hook for node not called

# This is the commit message #11:

finally executed only when exception occurs, hamilton tracker not executed

# This is the commit message #12:

atexit.register does not work, node keeps running inui

# This is the commit message #13:

added stop() method, but doesn't get called

# This is the commit message #14:

Ray telemtry works for single node, problem with connected nodes

# This is the commit message #15:

Ray telemtry works for single node, problem with connected nodes

# This is the commit message #16:

Ray telemtry works for single node, problem with connected nodes

# This is the commit message #17:

Fixes ray object dereferencing

Ray does not resolve nested arguments:
https://docs.ray.io/en/latest/ray-core/objects.html#passing-object-arguments

So one option is to make them all top level:

- one way to do that is to make the other arguments not clash with any
possible user parameters -- hence the `__` prefix. This is what I did.
- another way would be in the ray adapter, wrap the incoming function,
and explicitly do a ray.get() on any ray object references in the
kwargs arguments. i.e. keep the nested structure, but when the ray
task starts way for all inputs... not sure which is best, but this
now works correctly.

# This is the commit message #18:

ray works checkpoint, pre-commit fixed

# This is the commit message #19:

fixed graph level telemtry proposal

# This is the commit message #20:

pinned ruff

# This is the commit message #21:

Correct output, added option to start ray cluster

# This is the commit message #22:

Unit test mimicks the DoNodeExecute unit test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation enhancement New feature or request migrated-from-old-repo Migrated from old repository
Projects
None yet
Development

No branches or pull requests

3 participants