-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: add interpolation functionality #109
feature: add interpolation functionality #109
Conversation
This pull request introduces 1 alert when merging 012c7f5 into 88c4d05 - view on LGTM.com new alerts:
|
This pull request introduces 1 alert when merging 862ae78 into 88c4d05 - view on LGTM.com new alerts:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guanjieshen, this looks like a really great start to a highly requested feature that delivers a lot of value. One thing that is needed are unit tests. Ideally, most of the functions in the code below have an associated test. Could you add these so we can take a look at the interface? I'm happy to meet in person to sort this out next week as well.
python/tempo/interpol.py
Outdated
|
||
:linear_udf register linear calculation UDF | ||
""" | ||
self.linear_udf = udf(Interpolation.__calc_linear, FloatType()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guanjieshen, is there any reason this needs to be a Python UDF vs pandas UDF? We should test this on a billion-record dataset to measure performance and I'm happy to help with that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I'm thinking of reimplementing this either in native spark throughselectExpr
or converting it into a panadsUDF.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have switched this to use native spark functionality rather than udfs.
|
||
return output_df | ||
|
||
def interpolate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guanjieshen , I'm wondering if there is a way to extend TSDF so that this operation can be applied in the same way that 'resample' is applied. This is not required but having a consistent way to call functions on TSDF makes things simpler. Something like tsdf.interpolate as opposed to Interpolate.interpolate(tsdf)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, can definitely make that change!
This pull request introduces 1 alert when merging 35cdd54 into 88c4d05 - view on LGTM.com new alerts:
|
This pull request introduces 2 alerts when merging 32220b0 into 88c4d05 - view on LGTM.com new alerts:
|
Great that sounds good! This is still a WIP right now, but hopefully we can run some benchmarks on a larger dataset (than the one I've been currently working on) in order to get a sense how it scales. |
@rportilla-databricks also any concerns with create a separate test file for interpolation? The main |
This pull request introduces 1 alert when merging 2fc47fc into 88c4d05 - view on LGTM.com new alerts:
|
This pull request introduces 2 alerts when merging 8638b0e into 88c4d05 - view on LGTM.com new alerts:
|
Separate test file sounds good to me
…On Thu, Dec 23, 2021 at 11:54 AM Guanjie Shen ***@***.***> wrote:
@rportilla-databricks <https://github.com/rportilla-databricks> also any
concerns with create a separate test file for interpolation? The main
tests.py seems fairly large right now.
—
Reply to this email directly, view it on GitHub
<#109 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AJCRAXH3NJZVOAUFR6B2Y23USNH2ZANCNFSM5KTAJMBQ>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
You are receiving this because you were mentioned.Message ID:
***@***.***>
--
Ricardo Portilla
Lead Solutions Architect, Ph.D
Databricks Inc.
***@***.***
databricks.com
|
python/tempo/tsdf.py
Outdated
@@ -517,6 +519,32 @@ def resample(self, freq, func=None, metricCols = None, prefix=None, fill = None) | |||
enriched_tsdf = rs.aggregate(self, freq, func, metricCols, prefix, fill) | |||
return(enriched_tsdf) | |||
|
|||
def interpolate(self, target_cols: List[str], sample_freq: str, sample_func: str, fill: str, ts_col: str = None, partition_cols: List[str]=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guanjieshen , let's discuss why a frequency or function is required here at all. Users should be able to interpolate without resampling at all - https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.interpolate.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some suggestions:
- Change sample_freq -> freq (to be consistent with resample)
- Change sample_func -> func (to be consistent with resample)
- Make freq/func and target_cols optional (you can use the code block below to populate the target_cols in case the user forgets or really just wants everything interpolated)
if target_cols is None:
prohibited_cols = self.partitionCols + [self.ts_col]
summarizable_types = ['int', 'bigint', 'float', 'double']
# filter columns to find summarizable columns
target_cols = [datatype[0] for datatype in self.df.dtypes if
((datatype[1] in summarizable_types) and
(datatype[0].lower() not in prohibited_cols))]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the changes to the naming, and modified the default behaviour for target_cols
based on the code snipped.
For freq
and func
wouldn't these need to be required? Unless we are thinking that a user needs to call resample
first before interpolate
?
python/tempo/tsdf.py
Outdated
@@ -517,6 +519,32 @@ def resample(self, freq, func=None, metricCols = None, prefix=None, fill = None) | |||
enriched_tsdf = rs.aggregate(self, freq, func, metricCols, prefix, fill) | |||
return(enriched_tsdf) | |||
|
|||
def interpolate(self, target_cols: List[str], sample_freq: str, sample_func: str, fill: str, ts_col: str = None, partition_cols: List[str]=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion - could you call the fill parameter 'method' instead of 'fill'? This is more consistent with pandas and I think is a little clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed!
python/tempo/tsdf.py
Outdated
@@ -517,6 +519,32 @@ def resample(self, freq, func=None, metricCols = None, prefix=None, fill = None) | |||
enriched_tsdf = rs.aggregate(self, freq, func, metricCols, prefix, fill) | |||
return(enriched_tsdf) | |||
|
|||
def interpolate(self, target_cols: List[str], sample_freq: str, sample_func: str, fill: str, ts_col: str = None, partition_cols: List[str]=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestions:
Make an option for adding the 'is_interpolated' columns. I see this being really valuable but if there are 90 columns, this could get really cumbersome to maintain in the table.
Make an option for forcing a resample - in this case, the user can supply an argument for freq/func. Otherwise, we assume it's done and interpolate won't do much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this to a boolean show_interpolated
that can be set when calling interpolate
by default it's set to not show.
python/tempo/interpol.py
Outdated
|
||
# Join output_list into single DataFrame to output | ||
output: DataFrame = output_list[0] | ||
output.cache() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A memory cache will be expensive here. Instead of using this, I would rely on Delta Lake - we can make a comment that this step is optimized on NVMe-supported cloud VMs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed output.cache()
python/tempo/interpol.py
Outdated
:param fill - interpolation function to fill missing values | ||
""" | ||
# Generate Fill for Dataset | ||
filled_series: DataFrame = self.__fill_series( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can return a list of new columns [last(col(f)) for f in target_list] then expand in the final data frame, we can avoid the joins. I'll work on a prototype for that - this would be really helpful in avoiding the join even though logically we are still going through each column to compute the last non-null value.
This pull request introduces 2 alerts when merging 4a10e1d into 4eb3e8a - view on LGTM.com new alerts:
|
@@ -20,13 +20,13 @@ jobs: | |||
- name: Set Spark env | |||
run: | | |||
export SPARK_LOCAL_IP=127.0.0.1 | |||
export SPARK_SUBMIT_OPTS="--illegal-access=permit -Dio.netty.tryReflectionSetAccessible=true" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to mitigate issue identified in SPARK-29923
.config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \ | ||
.config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to mitigate issue identified in SPARK-29923
python/tempo/interpol.py
Outdated
from pyspark.sql.window import Window | ||
|
||
# Interpolation fill options | ||
method_options = ["zero", "null", "back", "forward", "linear"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to suggest we use 'bfill', 'ffill' for the back/forward options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed in this commit!
@@ -585,6 +586,41 @@ def resample(self, freq, func=None, metricCols = None, prefix=None, fill = None) | |||
enriched_tsdf = rs.aggregate(self, freq, func, metricCols, prefix, fill) | |||
return(enriched_tsdf) | |||
|
|||
def interpolate(self, freq: str, func: str, method: str, target_cols: List[str] = None,ts_col: str = None, partition_cols: List[str]=None, show_interpolated:bool = False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make the freq/func optional in the case that the user supplies a resampled data frame? So, I'd like to be able to do this:
actual_df: DataFrame =simple_input_tsdf.resample(freq = '1 minute', func='min').interpolate(
method="linear"
).df
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can get rid of func
, but freq
is required so that the interpolation function knows what interval to fill the gaps in with. Alternatively, we can store freq
and func
as a private variable in the TSDF class if a resample has been done on the dataset?
Oh this makes sense. When this is done using pandas, it's typically another
subclass of data frame (resampled data frame).
We could add a subclass of TSDF in a similar way (ResampledTSDF) - this
would have the resampled frequency as a data attribute. Then
interpolate method always knows it can safely skip. What do you think?
…On Fri, Jan 14, 2022 at 5:03 PM Guanjie Shen ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In python/tempo/tsdf.py
<#109 (comment)>:
> @@ -585,6 +586,41 @@ def resample(self, freq, func=None, metricCols = None, prefix=None, fill = None)
enriched_tsdf = rs.aggregate(self, freq, func, metricCols, prefix, fill)
return(enriched_tsdf)
+ def interpolate(self, freq: str, func: str, method: str, target_cols: List[str] = None,ts_col: str = None, partition_cols: List[str]=None, show_interpolated:bool = False):
I can get rid of func, but freq is required so that the interpolation
function know what interval to fill the gaps with. Alternatively, we can
store that as a private variable in the TSDF class if a resample has been
done on the dataset?
—
Reply to this email directly, view it on GitHub
<#109 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AJCRAXDBVRMZYAGMQ3JVRKTUWCMS3ANCNFSM5KTAJMBQ>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
You are receiving this because you were mentioned.Message ID:
***@***.***>
--
Ricardo Portilla
Lead Solutions Architect, Ph.D
Databricks Inc.
***@***.***
databricks.com
|
Good idea, made the change in this commit: guanjieshen@f871c2f So now something like this should will work: interpolated_df: DataFrame = (
input_tsdf.resample(freq="30 seconds", func="mean").interpolate(method="linear").df
) @rportilla-databricks Thoughts on deprecating the fill parameter within the resample method? Since with the latest change this: input_tsdf.resample(freq="30 seconds", func="mean", fill=True) should now be equivalent to: input_tsdf.resample(freq="30 seconds", func="mean").interpolate(method="null") I think leaving it in may create unexpected results especially if users try something like this: input_tsdf.resample(freq="30 seconds", func="mean", fill=True).interpolate(method="null") |
Just added one unit test. This looks great! |
* new changes * updated upsample * updated upsample * updated upsample * committing read_yaml * adding class1 with stacking * adding class1 with stacking * removing streams * removing streams * adding anomaly detection yaml support * making database configurable * making database configurable * making database configurable * added option for empty string prefix * added option for empty string prefix * added option for empty string prefix * removing anomaly detection in branch * remove anomaly detection code test file * merging resample * removing dbl tempo egg files * removing dbl tempo egg files * removing dbl tempo egg files * removing dbl tempo egg files * removing dbl tempo egg files * Fourier transform functionality release Q42021 (#111) * fourier transformation functionality in tempo * fourier transform method docstrings added * fourier transform unit test added * updating readme with the fourier transform usage and the fourier function's variable naming pattern standard * Updating requirements * minor logic correction of naming the data column as 'val' * adding the corrected buildTestDF and also adding pyarrow in requirements.txt * Fourier unit test fixed and contributions information updated * data column in tests and logic is corrected with the name changed to tdval * original contribution restoration * bringing the pandas_udf method inside the calling method to ensure the reference is not lost in the executors * committing the correct timestep variable position * adding self to timestep * inherit timestep directly from parameter * tidying up the codebase * removed the set_timestep method as it is no longer required * removing the unnecessary orderby * adding order by inside the pandas function * removed the redundant imports * Update README.md * fixing workflows * feature: add interpolation functionality (#109) * feat: add interpolation * feat(interpolation): add support for multuple partitions, and target columns * test: add interpolation zero fill test * test: add additional interpolation tests * chore: convert linear interpolation to use spark native functions * chore: allow for interpolation to be called directly from the TSDF object * chore: update series fill logic * chore: change default behaviour for target_cols * chore: rename to be more consistent with pandas and the tsdf class * chore(interpolation): make show if interpolated column optional * chore(interpolation): remove caching * Troubleshooting (#2) * Refactor interpolation code to remove joins, and double` resample` * Added additional test coverage to interpolation code * Updated `test` folder structure Co-authored-by: Guanjie Shen <guanjie.shen@databricks.com> * chore: add additional comments * chore: update branches in test.yml * fix: update interpolate_column params * chore: add interpolation details in readme.md * chore: update main readme.md * chore: update main readme.md * Merge branch 'master' of github.com:guanjieshen/tempo * chore: make readme more consistent * chore: add build and downloads badge to readme * changes * fix: fourier test java error * fix: try to configure netty changes so tests for fourier will work * change * housekeeping: organize imports on tsdf.py * chore(interpolation): change back to bfill, change forward to ffill * interpolation: add the ability to call interpolate after resample * housekeeping: add missing type hint * chore(interpolate): update readme * chore: update interpolation documentation to be more clear * adding one unit test Co-authored-by: Guanjie Shen <guanjie.shen@databricks.com> Co-authored-by: Ricardo Portilla <ricardo.portilla@databricks.com> * commiting release file * removed unused code * make the sql opt optional * pushing prefix change * pushing prefix change * pushing prefix change * pushing prefix change * adding files * adding files * adding files * updating asof prefix logic for sql optimization Co-authored-by: Souvik Pratiher <70095944+Spratiher9@users.noreply.github.com> Co-authored-by: Guanjie Shen <75445106+guanjieshen@users.noreply.github.com> Co-authored-by: Guanjie Shen <guanjie.shen@databricks.com>
Add the ability to perform interpolation on
TSDF
objects. Will support the following types of fill for interpolation:Core config options for this interpolation method will be in line with the
TSDF
class, such as the ability to support multiple partitions, and also the option to interpolate multiple columns at once (using the same fill type).This implementation leverages the
resample
method already present in theTSDF
class in order to normalize the dataset such that it can be interpolated. Therefore this can be thought of as a wrapper around theresample
feature in order to add additional fill capabilities.Also if
show_interpolated
flag toTrue
the output will also provide a new columnis_{column_name}_interpolated
for each interpolated column that specifies on the row level if the data for that column is interpolated or not. There will also be a more generalis_ts_interpolated
column that specifies if the time series row itself has been interpolated regardless if data within the columns are null.This implementation assumes that nulls are treated the same as missing values. Which means that users do not have to filter out nulls before interpolation.
Items that still need to be addressed in a later PR: