Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release q42021 #121

Merged
merged 42 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
50e848b
new changes
rportilla-databricks Aug 10, 2021
30492b5
updated upsample
rportilla-databricks Aug 12, 2021
0980ad0
updated upsample
rportilla-databricks Aug 12, 2021
6579868
updated upsample
rportilla-databricks Aug 13, 2021
a1b2f21
committing read_yaml
rportilla-databricks Aug 13, 2021
0c13bb8
adding class1 with stacking
rportilla-databricks Aug 16, 2021
f043b87
adding class1 with stacking
rportilla-databricks Aug 16, 2021
c874d1c
removing streams
rportilla-databricks Aug 16, 2021
32f5fc3
removing streams
rportilla-databricks Aug 16, 2021
ef90f31
adding anomaly detection yaml support
rportilla-databricks Sep 8, 2021
bb2e34c
fixing merge conflict
rportilla-databricks Sep 8, 2021
c671764
making database configurable
rportilla-databricks Sep 10, 2021
7e5fd1b
making database configurable
rportilla-databricks Sep 10, 2021
cd853b7
making database configurable
rportilla-databricks Sep 10, 2021
2c2ed63
added option for empty string prefix
rportilla-databricks Sep 11, 2021
b49ef5f
added option for empty string prefix
rportilla-databricks Sep 12, 2021
d2f5a34
added option for empty string prefix
rportilla-databricks Sep 12, 2021
d7de342
removing anomaly detection in branch
rportilla-databricks Sep 12, 2021
cd16c10
remove anomaly detection code test file
rportilla-databricks Sep 12, 2021
73cb447
merging resample
rportilla-databricks Sep 12, 2021
b127082
removing dbl tempo egg files
rportilla-databricks Sep 12, 2021
bd05931
removing dbl tempo egg files
rportilla-databricks Sep 13, 2021
30377cd
removing dbl tempo egg files
rportilla-databricks Sep 13, 2021
3c5debf
removing dbl tempo egg files
rportilla-databricks Sep 13, 2021
d58f89e
removing dbl tempo egg files
rportilla-databricks Sep 14, 2021
5eb532f
merging results
rportilla-databricks Nov 27, 2021
58707a9
Fourier transform functionality release Q42021 (#111)
Spratiher9 Dec 27, 2021
3327792
Update README.md
rportilla-databricks Jan 9, 2022
ae7d023
committing latest fixes to release branch
rportilla-databricks Jan 12, 2022
4eb3e8a
fixing workflows
rportilla-databricks Jan 12, 2022
c43888f
feature: add interpolation functionality (#109)
guanjieshen Jan 20, 2022
ddb8135
commiting release file
rportilla-databricks Jan 25, 2022
fbb5519
removed unused code
rportilla-databricks Jan 27, 2022
784e1d8
make the sql opt optional
rportilla-databricks Jan 27, 2022
298886c
pushing prefix change
rportilla-databricks Jan 28, 2022
77fb538
pushing prefix change
rportilla-databricks Jan 28, 2022
d1b86a3
pushing prefix change
rportilla-databricks Jan 28, 2022
14a1789
pushing prefix change
rportilla-databricks Jan 28, 2022
34e9748
adding files
rportilla-databricks Jan 28, 2022
f5d92d9
adding files
rportilla-databricks Jan 28, 2022
e4cb60d
adding files
rportilla-databricks Jan 28, 2022
0832eeb
updating asof prefix logic for sql optimization
rportilla-databricks Jan 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: build
on:
push:
branches: [master, scala_refactor]
branches: [master]
jobs:
run:
runs-on: ${{ matrix.os }}
Expand All @@ -20,13 +20,13 @@ jobs:
- name: Set Spark env
run: |
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_SUBMIT_OPTS="--illegal-access=permit -Dio.netty.tryReflectionSetAccessible=true"
- name: Generate coverage report
working-directory: ./python
run: |
pip install -r requirements.txt
pip install coverage
coverage run -m unittest
coverage run -m unittest discover -s tests -p '*_tests.py'
coverage xml

- name: Publish test coverage
uses: codecov/codecov-action@v1
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# ignore IntelliJ/PyCharm IDE files
# ignore IntelliJ/PyCharm/VSCode IDE files
.idea
*.iml
.vscode


# coverage files
.coverage
Expand All @@ -10,6 +12,7 @@ coverage.xml
scala/tempo/target
scala/tempo/project/target/
scala/tempo/project/project/target/
scala/target/stream/*
.bsp

# local delta tables
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ Authorized Users (please list Github usernames):**


* Required field
** Please note that Authorized Users may not be immediately be granted authorization to submit Contributions; should more than one individual attempt to sign a CLA on behalf of a Corporation, the first such CLA will apply and later CLAs will be deemed void.
** Please note that Authorized Users may not be immediately be granted authorization to submit Contributions; should more than one individual attempt to sign a CLA on behalf of a Corporation, the first such CLA will apply and later CLAs will be deemed void.
89 changes: 89 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
## Project Description
The purpose of this project is to make time series manipulation with Spark simpler. Operations covered under this package include AS OF joins, rolling statistics with user-specified window lengths, featurization of time series using lagged values, and Delta Lake optimization on time and partition fields.

[![image](https://github.com/databrickslabs/tempo/workflows/build/badge.svg)](https://github.com/databrickslabs/tempo/actions?query=workflow%3Abuild)
[![codecov](https://codecov.io/gh/databrickslabs/tempo/branch/master/graph/badge.svg)](https://codecov.io/gh/databrickslabs/tempo)
[![Downloads](https://pepy.tech/badge/dbl-tempo/month)](https://pepy.tech/project/dbl-tempo)
[![PyPI version](https://badge.fury.io/py/dbl-tempo.svg)](https://badge.fury.io/py/dbl-tempo)

## Using the Project

Expand Down Expand Up @@ -164,6 +167,92 @@ moving_avg.select('event_ts', 'x', 'y', 'z', 'mean_y').show(10, False)
```


#### 6 - Fourier Transform

Method for transforming the time series to frequency domain based on the distinguished data column

Parameters:

timestep = timestep value to be used for getting the frequency scale

valueCol = name of the time domain data column which will be transformed

```python
ft_df = tsdf.fourier_transform(timestep=1, valueCol="data_col")
display(ft_df)
```
#### 7 - Interpolation

Interpolate a series to fill in missing values using a specified function. The following interpolation methods are supported:

- Zero Fill : `zero`
- Null Fill: `null`
- Backwards Fill: `bfill`
- Forwards Fill: `ffill`
- Linear Fill: `linear`

The `interpolate` method can either be use in conjunction with `resample` or independently.

If `interpolate` is not chained after a `resample` operation, the method automatically first re-samples the input dataset into a given frequency, then performs interpolation on the sampled time-series dataset.

Possible values for frequency include patterns such as 1 minute, 4 hours, 2 days or simply sec, min, day. For the accepted functions to aggregate data, options are 'floor', 'ceil', 'min', 'max', 'mean'.

`NULL` values after re-sampling are treated the same as missing values. Ability to specify `NULL` as a valid value is currently not supported.

Valid columns data types for interpolation are: `["int", "bigint", "float", "double"]`.

```python
# Create instance of the TSDF class
input_tsdf = TSDF(
input_df,
partition_cols=["partition_a", "partition_b"],
ts_col="event_ts",
)


# What the following chain of operation does is:
# 1. Aggregate all valid numeric columns using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
# Note: When chaining interpolate after a resample, there is no need to provide a freq or func parameter. Only method is required.
interpolated_tsdf = input_tsdf.resample(freq="30 seconds", func="mean").interpolate(
method="linear"
)

# What the following interpolation method does is:
# 1. Aggregate columnA and columnBN using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
interpolated_tsdf = input_tsdf.interpolate(
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"

)

# Alternatively it's also possible to override default TSDF parameters.
# e.g. partition_cols, ts_col a
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"
)

# The show_interpolated flag can be set to `True` to show additional boolean columns
# for a given row that shows if a column has been interpolated.
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
method="linear",
target_cols= ["columnA","columnB"],
show_interpolated=True,
)

```

## Project Support
Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects.
Expand Down
109 changes: 109 additions & 0 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
## Project Description
The purpose of this project is to make time series manipulation with Spark simpler. Operations covered under this package include AS OF joins, rolling statistics with user-specified window lengths, featurization of time series using lagged values, and Delta Lake optimization on time and partition fields.

[![image](https://github.com/databrickslabs/tempo/workflows/build/badge.svg)](https://github.com/databrickslabs/tempo/actions?query=workflow%3Abuild)
[![codecov](https://codecov.io/gh/databrickslabs/tempo/branch/master/graph/badge.svg)](https://codecov.io/gh/databrickslabs/tempo)
[![Downloads](https://pepy.tech/badge/dbl-tempo/month)](https://pepy.tech/project/dbl-tempo)

## Using the Project

Expand Down Expand Up @@ -144,7 +146,114 @@ moving_avg = watch_accel_tsdf.withRangeStats("y", rangeBackWindowSecs=600)
moving_avg.select('event_ts', 'x', 'y', 'z', 'mean_y').show(10, False)
```

#### 6 - Anomaly Detection

First create a local yaml file containing tables you wish to create anomalies for:

Note: Use `%sh` in Databricks or just run a bash command in your local directory as follows:

```
echo """
table1:
database : "default"
name : "revenue_hourly_2021"
ts_col : "timestamp"
lookback_window : "84600"
mode : "new"
# include any grouping columns or metrics you wish to detect anomalies on
partition_cols : ["winner"]
metrics : ["advertiser_impressions", "publisher_net_revenue"]
""" > ad.yaml
```

The code to run to produce the stacked table with anomalies is:

#### 7 - Fourier Transform

Method for transforming the time series to frequency domain based on the distinguished data column

Parameters:

timestep = timestep value to be used for getting the frequency scale

valueCol = name of the time domain data column which will be transformed

```python
ft_df = tsdf.fourier_transform(timestep=1, valueCol="data_col")
display(ft_df)
```

#### 8- Interpolation
Interpolate a series to fill in missing values using a specified function. The following interpolation methods are supported:

- Zero Fill : `zero`
- Null Fill: `null`
- Backwards Fill: `bfill`
- Forwards Fill: `ffill`
- Linear Fill: `linear`

The `interpolate` method can either be use in conjunction with `resample` or independently.

If `interpolate` is not chained after a `resample` operation, the method automatically first re-samples the input dataset into a given frequency, then performs interpolation on the sampled time-series dataset.

Possible values for frequency include patterns such as 1 minute, 4 hours, 2 days or simply sec, min, day. For the accepted functions to aggregate data, options are 'floor', 'ceil', 'min', 'max', 'mean'.

`NULL` values after re-sampling are treated the same as missing values. Ability to specify `NULL` as a valid value is currently not supported.

Valid columns data types for interpolation are: `["int", "bigint", "float", "double"]`.

```python
# Create instance of the TSDF class
input_tsdf = TSDF(
input_df,
partition_cols=["partition_a", "partition_b"],
ts_col="event_ts",
)


# What the following chain of operation does is:
# 1. Aggregate all valid numeric columns using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
# Note: When chaining interpolate after a resample, there is no need to provide a freq or func parameter. Only method is required.
interpolated_tsdf = input_tsdf.resample(freq="30 seconds", func="mean").interpolate(
method="linear"
)

# What the following interpolation method does is:
# 1. Aggregate columnA and columnBN using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
interpolated_tsdf = input_tsdf.interpolate(
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"

)

# Alternatively it's also possible to override default TSDF parameters.
# e.g. partition_cols, ts_col a
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"
)

# The show_interpolated flag can be set to `True` to show additional boolean columns
# for a given row that shows if a column has been interpolated.
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
method="linear",
target_cols= ["columnA","columnB"],
show_interpolated=True,
)

```

## Project Support
Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects.
Expand Down
5 changes: 4 additions & 1 deletion python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
ipython==7.28.0
numpy==1.19.1
chispa==0.8.2
pandas==1.1.0
py4j==0.10.9
pyarrow==6.0.1
pyspark==3.0.0
pyspark-stubs==3.0.0
python-dateutil==2.8.1
pytz==2020.1
scipy==1.7.2
six==1.15.0
wheel==0.34.2
ipython==7.28.0
5 changes: 3 additions & 2 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setuptools.setup(
name='dbl-tempo',
version='0.1.2',
version='0.1.3',
author='Ricardo Portilla, Tristan Nixon, Max Thone, Sonali Guleria',
author_email='labs@databricks.com',
description='Spark Time Series Utility Package',
Expand All @@ -16,7 +16,8 @@
packages=find_packages(where=".", include=["tempo"]),
install_requires=[
'ipython',
'pandas'
'pandas',
'scipy'
],
extras_require=dict(tests=["pytest"]),
classifiers=[
Expand Down
2 changes: 1 addition & 1 deletion python/tempo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from tempo.tsdf import TSDF
from tempo.utils import display
from tempo.utils import display
Loading