Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(advanced analytics): support groupby in resample #18045

Merged
merged 7 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
* specific language governing permissions and limitationsxw
* under the License.
*/
import { PostProcessingResample } from '@superset-ui/core';
import {
ensureIsArray,
isPhysicalColumn,
PostProcessingResample,
} from '@superset-ui/core';
import { PostProcessingFactory } from './types';
import { TIME_COLUMN } from './utils';

Expand All @@ -28,13 +32,21 @@ export const resampleOperator: PostProcessingFactory<
const resampleMethod = resampleZeroFill ? 'asfreq' : formData.resample_method;
const resampleRule = formData.resample_rule;
if (resampleMethod && resampleRule) {
const groupby_columns = ensureIsArray(queryObject.columns).map(column => {
if (isPhysicalColumn(column)) {
return column;
}
return column.label;
});
zhaoyongjie marked this conversation as resolved.
Show resolved Hide resolved

return {
operation: 'resample',
options: {
method: resampleMethod,
rule: resampleRule,
fill_value: resampleZeroFill ? 0 : null,
time_column: TIME_COLUMN,
groupby_columns,
},
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
import { QueryObject, SqlaFormData } from '@superset-ui/core';
import { resampleOperator } from '../../../src';
import { AdhocColumn, QueryObject, SqlaFormData } from '@superset-ui/core';
import { resampleOperator } from '@superset-ui/chart-controls';

const formData: SqlaFormData = {
metrics: [
Expand Down Expand Up @@ -75,6 +75,7 @@ test('should do resample', () => {
rule: '1D',
fill_value: null,
time_column: '__timestamp',
groupby_columns: [],
},
});
});
Expand All @@ -92,6 +93,80 @@ test('should do zerofill resample', () => {
rule: '1D',
fill_value: 0,
time_column: '__timestamp',
groupby_columns: [],
},
});
});

test('should append physical column to resample', () => {
expect(
resampleOperator(
{ ...formData, resample_method: 'zerofill', resample_rule: '1D' },
{ ...queryObject, columns: ['column1', 'column2'] },
),
).toEqual({
operation: 'resample',
options: {
method: 'asfreq',
rule: '1D',
fill_value: 0,
time_column: '__timestamp',
groupby_columns: ['column1', 'column2'],
},
});
});

test('should append label of adhoc column and physical column to resample', () => {
expect(
resampleOperator(
{ ...formData, resample_method: 'zerofill', resample_rule: '1D' },
{
...queryObject,
columns: [
{
hasCustomLabel: true,
label: 'concat_a_b',
expressionType: 'SQL',
sqlExpression: "'a' + 'b'",
} as AdhocColumn,
'column2',
],
},
),
).toEqual({
operation: 'resample',
options: {
method: 'asfreq',
rule: '1D',
fill_value: 0,
time_column: '__timestamp',
groupby_columns: ['concat_a_b', 'column2'],
},
});
});

test('should append `undefined` if adhoc non-existing label', () => {
expect(
resampleOperator(
{ ...formData, resample_method: 'zerofill', resample_rule: '1D' },
{
...queryObject,
columns: [
{
sqlExpression: "'a' + 'b'",
} as AdhocColumn,
'column2',
],
},
),
).toEqual({
operation: 'resample',
options: {
method: 'asfreq',
rule: '1D',
fill_value: 0,
time_column: '__timestamp',
groupby_columns: [undefined, 'column2'],
},
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ export interface PostProcessingResample {
rule: string;
fill_value?: number | null;
time_column: string;
// If AdhocColumn doesn't have a label, it will be undefined.
// todo: we have to give an explicit label for AdhocColumn.
groupby_columns?: Array<string | undefined>;
zhaoyongjie marked this conversation as resolved.
Show resolved Hide resolved
};
}

Expand Down
26 changes: 20 additions & 6 deletions superset/utils/pandas_postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -958,27 +958,41 @@ def outliers(series: Series) -> Set[float]:
return aggregate(df, groupby=groupby, aggregates=aggregates)


def resample(
@validate_column_args("groupby_columns")
def resample( # pylint: disable=too-many-arguments
df: DataFrame,
rule: str,
method: str,
time_column: str,
groupby_columns: Optional[Tuple[Optional[str], ...]] = None,
fill_value: Optional[Union[float, int]] = None,
) -> DataFrame:
"""
resample a timeseries dataframe.
support upsampling in resample

:param df: DataFrame to resample.
:param rule: The offset string representing target conversion.
:param method: How to fill the NaN value after resample.
:param time_column: existing columns in DataFrame.
:param groupby_columns: columns except time_column in dataframe
:param fill_value: What values do fill missing.
:return: DataFrame after resample
:raises QueryObjectValidationError: If the request in incorrect
"""
df = df.set_index(time_column)
if method == "asfreq" and fill_value is not None:
df = df.resample(rule).asfreq(fill_value=fill_value)

def _upsampling(_df: DataFrame) -> DataFrame:
_df = _df.set_index(time_column)
if method == "asfreq" and fill_value is not None:
return _df.resample(rule).asfreq(fill_value=fill_value)
return getattr(_df.resample(rule), method)()

if groupby_columns:
df = (
df.set_index(keys=list(groupby_columns))
.groupby(by=list(groupby_columns))
.apply(_upsampling)
)
df = df.reset_index().set_index(time_column).sort_index()
else:
df = getattr(df.resample(rule), method)()
df = _upsampling(df)
return df.reset_index()
67 changes: 67 additions & 0 deletions tests/integration_tests/pandas_postprocessing_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,3 +1029,70 @@ def test_resample(self):
)
self.assertListEqual(post_df["label"].tolist(), ["x", "y", 0, 0, "z", 0, "q"])
self.assertListEqual(post_df["y"].tolist(), [1.0, 2.0, 0, 0, 3.0, 0, 4.0])

def test_resample_with_groupby(self):
"""
The Dataframe contains a timestamp column, a string column and a numeric column.
__timestamp city val
0 2022-01-13 Chicago 6.0
1 2022-01-13 LA 5.0
2 2022-01-13 NY 4.0
3 2022-01-11 Chicago 3.0
4 2022-01-11 LA 2.0
5 2022-01-11 NY 1.0
"""
df = DataFrame(
{
"__timestamp": to_datetime(
[
"2022-01-13",
"2022-01-13",
"2022-01-13",
"2022-01-11",
"2022-01-11",
"2022-01-11",
]
),
"city": ["Chicago", "LA", "NY", "Chicago", "LA", "NY"],
"val": [6.0, 5.0, 4.0, 3.0, 2.0, 1.0],
}
)
post_df = proc.resample(
df=df,
rule="1D",
method="asfreq",
fill_value=0,
time_column="__timestamp",
groupby_columns=tuple(["city"]),
zhaoyongjie marked this conversation as resolved.
Show resolved Hide resolved
)
assert list(post_df.columns) == [
"__timestamp",
"city",
"val",
]
assert [str(dt.date()) for dt in post_df["__timestamp"]] == (
["2022-01-11"] * 3 + ["2022-01-12"] * 3 + ["2022-01-13"] * 3
)
assert list(post_df["val"]) == [3.0, 2.0, 1.0, 0, 0, 0, 6.0, 5.0, 4.0]

# should raise error when get a non-existent column
with pytest.raises(QueryObjectValidationError):
proc.resample(
df=df,
rule="1D",
method="asfreq",
fill_value=0,
time_column="__timestamp",
groupby_columns=tuple(["city", "unkonw_column"]),
zhaoyongjie marked this conversation as resolved.
Show resolved Hide resolved
)

# should raise error when get a None value in groupby list
with pytest.raises(QueryObjectValidationError):
proc.resample(
df=df,
rule="1D",
method="asfreq",
fill_value=0,
time_column="__timestamp",
groupby_columns=tuple(["city", None]),
zhaoyongjie marked this conversation as resolved.
Show resolved Hide resolved
)