Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ML-280: Support time range in readers #200

Merged
merged 21 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 179 additions & 1 deletion integration/test_filesystems_integration.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import asyncio
import sys
import random as rand

from .integration_test_utils import setup_teardown_test, _generate_table_name, V3ioHeaders, V3ioError
from storey import build_flow, ReadCSV, WriteToCSV, Source, Reduce, Map, FlatMap, AsyncSource, WriteToParquet
from storey import build_flow, ReadCSV, WriteToCSV, Source, Reduce, Map, FlatMap, AsyncSource, WriteToParquet, ReadParquet, DataframeSource
import pandas as pd
import aiohttp
import pytest
import v3io
import uuid
import datetime


@pytest.fixture()
Expand Down Expand Up @@ -246,3 +250,177 @@ def test_write_to_parquet_to_v3io_with_indices(setup_teardown_test):

read_back_df = pd.read_parquet(out_file, columns=columns)
assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}"


def append_and_return(lst, x):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move it to some test_utils? (both here & in test_flow)

lst.append(x)
return lst


def test_filter_before_after_non_partitioned(setup_teardown_test):
columns = ['my_string', 'my_time']

df = pd.DataFrame([['good', pd.Timestamp('2018-05-07 13:52:37')],
['hello', pd.Timestamp('2019-01-26 14:52:37')],
['world', pd.Timestamp('2020-05-11 13:52:37')]],
columns=columns)
df.set_index('my_string')

out_file = f'v3io:///{setup_teardown_test}/'
controller = build_flow([
DataframeSource(df),
WriteToParquet(out_file, columns=columns, partition_cols=[]),
]).run()
controller.await_termination()

before = pd.Timestamp('2019-12-01 00:00:00')
after = pd.Timestamp('2019-01-01 23:59:59.999999')

controller = build_flow([
ReadParquet(out_file, end_filter=before, start_filter=after, filter_column='my_time'),
Reduce([], append_and_return)
]).run()
read_back_result = controller.await_termination()
expected = [{'my_string': 'hello', 'my_time': pd.Timestamp('2019-01-26 14:52:37')}]

assert read_back_result == expected, f"{read_back_result}\n!=\n{expected}"


def test_filter_before_after_partitioned_random(setup_teardown_test):
low_limit = pd.Timestamp('2018-01-01')
high_limit = pd.Timestamp('2020-12-31 23:59:59.999999')

delta = high_limit - low_limit

seed_value = rand.randrange(sys.maxsize)
print('Seed value:', seed_value)

rand.seed(seed_value)

random_second = rand.randrange(int(delta.total_seconds()))
middle_limit = low_limit + datetime.timedelta(seconds=random_second)

print("middle_limit is " + str(middle_limit))

number_below_middle_limit = rand.randrange(0, 10)

def create_rand_data(num_elements, low_limit, high_limit, char):
import datetime
delta = high_limit - low_limit

data = []
for i in range(0, num_elements):
element = {}
element['string'] = char + str(i)
random_second = rand.randrange(int(delta.total_seconds()))
element['datetime'] = low_limit + datetime.timedelta(seconds=random_second)
data.append(element)
return data

list1 = create_rand_data(number_below_middle_limit, low_limit, middle_limit, 'lower')
list2 = create_rand_data(10-number_below_middle_limit, middle_limit, high_limit, 'higher')
combined_list = list1 + list2

df = pd.DataFrame(combined_list)
print("data frame is " + str(df))

all_partition_columns = ['$year', '$month', '$day', '$hour', '$minute', '$second']
num_part_columns = rand.randrange(1, 6)
partition_columns = all_partition_columns[:num_part_columns]
print("partitioned by " + str(partition_columns))

out_file = f'v3io:///{setup_teardown_test}/'
controller = build_flow([
DataframeSource(df, time_field='datetime'),
WriteToParquet(out_file, columns=['string', 'datetime'], partition_cols=partition_columns),
]).run()

controller.await_termination()

controller = build_flow([
ReadParquet(out_file, end_filter=high_limit, start_filter=middle_limit, filter_column='datetime'),
Reduce([], append_and_return)
]).run()
read_back_result = controller.await_termination()
print("expecting " + str(10 - number_below_middle_limit) + " to be above middle limit")
assert(len(read_back_result)) == 10 - number_below_middle_limit

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, why not verify the data?


controller = build_flow([
ReadParquet(out_file, end_filter=middle_limit, start_filter=low_limit, filter_column='datetime'),
Reduce([], append_and_return)
]).run()
read_back_result = controller.await_termination()
print("expecting " + str(number_below_middle_limit) + " to be below middle limit")
assert (len(read_back_result)) == number_below_middle_limit

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here



def test_filter_before_after_partitioned_inner_other_partition(setup_teardown_test):
columns = ['my_string', 'my_time', 'my_city']

df = pd.DataFrame([['hello', pd.Timestamp('2020-12-31 14:05:00'), 'tel aviv'],
['world', pd.Timestamp('2018-12-30 09:00:00'), 'haifa'],
['sun', pd.Timestamp('2019-12-29 09:00:00'), 'tel aviv'],
['is', pd.Timestamp('2019-06-30 15:00:45'), 'hod hasharon'],
['shining', pd.Timestamp('2020-02-28 13:00:56'), 'hod hasharon']],
columns=columns)
df.set_index('my_string')

out_file = f'v3io:///{setup_teardown_test}/'
controller = build_flow([
DataframeSource(df, time_field='my_time'),
WriteToParquet(out_file, columns=columns, partition_cols=['$year', '$month', '$day', '$hour', 'my_city']),
]).run()
controller.await_termination()

before = pd.Timestamp('2020-12-31 14:00:00')
after = pd.Timestamp('2019-07-01 00:00:00')

controller = build_flow([
ReadParquet(out_file, end_filter=before, start_filter=after, filter_column='my_time'),
Reduce([], append_and_return)
]).run()
read_back_result = controller.await_termination()

expected = [{'my_string': 'sun', 'my_time': pd.Timestamp('2019-12-29 09:00:00'), 'my_city': 'tel aviv',
'year': 2019, 'month': 12, 'day': 29, 'hour': 9},
{'my_string': 'shining', 'my_time': pd.Timestamp('2020-02-28 13:00:56'), 'my_city': 'hod hasharon',
'year': 2020, 'month': 2, 'day': 28, 'hour': 13}]

assert read_back_result == expected, f"{read_back_result}\n!=\n{expected}"


def test_filter_before_after_partitioned_outer_other_partition(setup_teardown_test):
columns = ['my_string', 'my_time', 'my_city']

df = pd.DataFrame([['hello', pd.Timestamp('2020-12-31 15:05:00'), 'tel aviv'],
['world', pd.Timestamp('2020-12-30 09:00:00'), 'haifa'],
['sun', pd.Timestamp('2020-12-29 09:00:00'), 'tel aviv'],
['is', pd.Timestamp('2020-12-30 15:00:45'), 'hod hasharon'],
['shining', pd.Timestamp('2020-12-31 13:00:56'), 'hod hasharon']],
columns=columns)
df.set_index('my_string')

out_file = f'v3io:///{setup_teardown_test}/'
controller = build_flow([
DataframeSource(df, time_field='my_time'),
WriteToParquet(out_file, columns=columns, partition_cols=['my_city', '$year', '$month', '$day', '$hour']),
]).run()
controller.await_termination()

before = pd.Timestamp('2020-12-31 14:00:00')
after = pd.Timestamp('2020-12-30 08:53:00')

controller = build_flow([
ReadParquet(out_file, end_filter=before, start_filter=after, filter_column='my_time'),
Reduce([], append_and_return)
]).run()
read_back_result = controller.await_termination()
expected = [{'my_string': 'world', 'my_time': pd.Timestamp('2020-12-30 09:00:00'), 'my_city': 'haifa', 'year': 2020,
'month': 12, 'day': 30, 'hour': 9},
{'my_string': 'is', 'my_time': pd.Timestamp('2020-12-30 15:00:45'), 'my_city': 'hod hasharon', 'year': 2020,
'month': 12, 'day': 30, 'hour': 15},
{'my_string': 'shining', 'my_time': pd.Timestamp('2020-12-31 13:00:56'), 'my_city': 'hod hasharon', 'year': 2020,
'month': 12, 'day': 31, 'hour': 13}]

assert read_back_result == expected, f"{read_back_result}\n!=\n{expected}"

40 changes: 36 additions & 4 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
import warnings
from datetime import datetime
from typing import List, Optional, Union, Callable, Coroutine, Iterable
import pyarrow.parquet as pq

import pandas

from .dtypes import _termination_obj, Event
from .flow import Flow, Complete
from .utils import url_to_file_system
from .utils import url_to_file_system, find_filters


class AwaitableResult:
Expand Down Expand Up @@ -671,11 +672,42 @@ class ReadParquet(DataframeSource):
"""Reads Parquet files as input source for a flow.
:parameter paths: paths to Parquet files
:parameter columns : list, default=None. If not None, only these columns will be read from the file.
:parameter start_filter: datetime. If not None, the results will be filtered by partitions and 'filter_column' >= start_filter.
Default is None
:parameter end_filter: datetime. If not None, the results will be filtered by partitions 'filter_column' < end_filter.
Default is None
:parameter filter_column: Optional. if not None, the results will be filtered by this column and before and/or after
"""
def __init__(self, paths: Union[str, Iterable[str]], columns=None, start_filter: Optional[datetime] = None,
end_filter: Optional[datetime] = None, filter_column: Optional[str] = None, **kwargs):
katyakats marked this conversation as resolved.
Show resolved Hide resolved
if end_filter or start_filter:
start_filter = datetime.min if start_filter is None else start_filter
end_filter = datetime.max if end_filter is None else end_filter
if filter_column is None:
raise TypeError('Filter column is required when passing start/end filters')

def __init__(self, paths: Union[str, Iterable[str]], columns=None, **kwargs):
if isinstance(paths, str):
paths = [paths]
dfs = map(lambda path: pandas.read_parquet(path, columns=columns,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed that it's the init.
You should create a map and not actually read the parquets in this stage.

storage_options=kwargs.get('storage_options')), paths)
storage_options = kwargs.get('storage_options')

def read_filtered_parquet(path, start_filter, end_filter, storage_options, columns):
fs, file_path = url_to_file_system(path, storage_options)
dataset = pq.ParquetDataset(path, filesystem=fs)
if dataset.partitions:
partitions = dataset.partitions.partition_names
time_attributes = ['year', 'month', 'day', 'hour', 'minute', 'second']
partitions_time_attributes = [j for j in time_attributes if j in partitions]
else:
partitions_time_attributes = []
filters = []
find_filters(partitions_time_attributes, start_filter, end_filter, filters, filter_column)
return pandas.read_parquet(path, columns=columns, filters=filters,
storage_options=storage_options)

if start_filter or end_filter:
dfs = map(lambda path: read_filtered_parquet(path, start_filter, end_filter, storage_options, columns), paths)

else:
dfs = map(lambda path: pandas.read_parquet(path, columns=columns,
storage_options=storage_options), paths)
super().__init__(dfs, **kwargs)
76 changes: 76 additions & 0 deletions storey/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,79 @@ def get_hashed_key(key_list):
return key_list[0]
else:
return key_list


def _create_filter_tuple(dtime, attr, sign, list_tuples):
if attr:
value = getattr(dtime, attr, None)
tuple1 = (attr, sign, value)
list_tuples.append(tuple1)


def _find_filter_helper(list_partitions, dtime, sign, first_sign, first_uncommon, filters, filter_column=None):
single_filter = []
if len(list_partitions) == 0 or first_uncommon is None:
return
last_partition = list_partitions[-1]
if len(list_partitions) == 1 or last_partition == first_uncommon:
return
list_partitions_without_last_element = list_partitions[:-1]
for partition in list_partitions_without_last_element:
_create_filter_tuple(dtime, partition, "=", single_filter)
if first_sign:
# only for the first iteration we need to have ">="/"<=" instead of ">"/"<"
_create_filter_tuple(dtime, last_partition, first_sign, single_filter)
tuple_last_range = (filter_column, sign, dtime)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be me, but all the naming confuses me (last_range vs first_sign)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same for single filter (maybe you can extract a find_single_filter function from here)

single_filter.append(tuple_last_range)
else:
_create_filter_tuple(dtime, last_partition, sign, single_filter)
_find_filter_helper(list_partitions_without_last_element, dtime, sign, None, first_uncommon, filters)
filters.append(single_filter)


def _get_filters_for_filter_column(start, end, filter_column, side_range):
lower_limit_tuple = (filter_column, ">=", start)
upper_limit_tuple = (filter_column, "<=", end)
side_range.append(lower_limit_tuple)
side_range.append(upper_limit_tuple)


def find_filters(partitions_time_attributes, start, end, filters, filter_column):
katyakats marked this conversation as resolved.
Show resolved Hide resolved
# this method build filters to be used by
# https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
common_partitions = []
first_uncommon = None
# finding the common attributes. for example for start=1.2.2018 08:53:15, end=5.2.2018 16:24:31, partitioned by
# year, month, day, hour. common_partions=[year, month], first_uncommon=day
for part in partitions_time_attributes:
value_start = getattr(start, part, None)
value_end = getattr(end, part, None)
if value_end == value_start:
common_partitions.append(part)
else:
first_uncommon = part
break

# for start=1.2.2018 08:53:15, end=5.2.2018 16:24:31, this method will append to filters
# [(year=2018, month=2,day>=1, filter_column>1.2.2018 08:53:15)]
_find_filter_helper(partitions_time_attributes, start, ">", ">=", first_uncommon, filters, filter_column)

middle_range_filter = []
for partition in common_partitions:
_create_filter_tuple(start, partition, "=", middle_range_filter)

if len(filters) == 0:
# creating only the middle range
_create_filter_tuple(start, first_uncommon, ">=", middle_range_filter)
_create_filter_tuple(end, first_uncommon, "<=", middle_range_filter)
_get_filters_for_filter_column(start, end, filter_column, middle_range_filter)
else:
_create_filter_tuple(start, first_uncommon, ">", middle_range_filter)
_create_filter_tuple(end, first_uncommon, "<", middle_range_filter)
# for start=1.2.2018 08:53:15, end=5.2.2018 16:24:31, this will append to filters
# [(year=2018, month=2, 1<day<5)]
filters.append(middle_range_filter)

# for start=1.2.2018 08:53:15, end=5.2.2018 16:24:31, this method will append to filters
# [(year=2018, month=2,day<=5, filter_column<5.2.2018 16:24:31)]
_find_filter_helper(partitions_time_attributes, end, "<", "<=", first_uncommon, filters, filter_column)