Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ML-280: Support time range in readers #200

Merged
merged 21 commits into from
Apr 22, 2021
Merged

Conversation

katyakats
Copy link
Contributor

No description provided.

katyakats and others added 9 commits April 18, 2021 18:14
Co-authored-by: Gal Topper <galt@iguazio.com>
* refactor

* working integ

* lint

* fix typo

* test fixes

* lint

* temp

* integ

* parametrize tests

* rename

* handle exceptions inside flush worker

* mid

* fix reuse

* flushing

* use changed items list instead of time lookup

* check if running loop exists

* add test

* fix test

* add flush interval enum

* update test and doc

* code review

* make flush interval an optional[int] and init_flush_task only from async code

* update doc

Co-authored-by: Dina Nimrodi <dinan@iguazio.com>
* iterate over a copy of changed keys

* don't add key to persist job if it's already pending

* fix several bugs

Co-authored-by: Dina Nimrodi <dinan@iguazio.com>
…of aggregations. (mlrun#199)

* Don't use now in tests.

* ML-389: Fix expected webapi error in case of concurrent modification of aggregations.

Co-authored-by: Gal Topper <galt@iguazio.com>
@gtopper gtopper changed the title Ml 280 ML-280 Apr 19, 2021
@gtopper gtopper changed the title ML-280 ML-280: Support time range in readers Apr 19, 2021
"""

def __init__(self, paths: Union[str, Iterable[str]], columns=None, **kwargs):
def __init__(self, paths: Union[str, Iterable[str]], columns=None, before=None, after=None, filter_column=None, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing type annotations?

Comment on lines 674 to 676
:parameter before: Optional. datetime. If not None, the results will be filtered 'filter_column' >= before
:parameter after: Optional. datetime. If not None, the results will be filtered 'filter_column' <= after
:parameter filter_column: Optional. if not None, the results will be filtered by this column and before and/or after
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove type information from here. Better state the default at the end rather than "Optional" at the beginning.

]).run()
read_back_result = controller.await_termination()

assert len(read_back_result) == 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not verify the specific data?

integration/test_filesystems_integration.py Outdated Show resolved Hide resolved
integration/test_filesystems_integration.py Outdated Show resolved Hide resolved
integration/test_filesystems_integration.py Outdated Show resolved Hide resolved
integration/test_filesystems_integration.py Outdated Show resolved Hide resolved
]).run()
read_back_result = controller.await_termination()
print("expecting " + str(10 - number_below_middle_limit) + " to be above middle limit")
assert(len(read_back_result)) == 10 - number_below_middle_limit

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, why not verify the data?

]).run()
read_back_result = controller.await_termination()
print("expecting " + str(number_below_middle_limit) + " to be below middle limit")
assert (len(read_back_result)) == number_below_middle_limit

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

@@ -246,3 +250,108 @@ def test_write_to_parquet_to_v3io_with_indices(setup_teardown_test):

read_back_df = pd.read_parquet(out_file, columns=columns)
assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}"


def append_and_return(lst, x):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move it to some test_utils? (both here & in test_flow)

storey/sources.py Outdated Show resolved Hide resolved
storey/sources.py Outdated Show resolved Hide resolved
storey/utils.py Outdated
return new_date


def get_filtered_path(dir_path, before, after, storage_options, dummy_date_first, dummy_date_last, filtered_paths):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not datetime.min & datetime.max as default values?

storey/utils.py Outdated Show resolved Hide resolved
storey/utils.py Outdated Show resolved Hide resolved
storey/utils.py Outdated Show resolved Hide resolved
storey/utils.py Outdated Show resolved Hide resolved
storey/sources.py Show resolved Hide resolved
storey/sources.py Outdated Show resolved Hide resolved
storey/utils.py Outdated Show resolved Hide resolved
storey/utils.py Show resolved Hide resolved
storey/utils.py Outdated Show resolved Hide resolved
if isinstance(paths, str):
paths = [paths]
dfs = map(lambda path: pandas.read_parquet(path, columns=columns,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed that it's the init.
You should create a map and not actually read the parquets in this stage.

storey/utils.py Outdated

def _find_filter_helper(list_partitions, dtime, sign, first_sign, first_uncommon, filters, filter_column=None):
single_filter = []
if len(list_partitions) == 0:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if len(list_partitions)<=1 or first_uncommon is None

_create_filter_tuple(dtime, partition, "=", single_filter)
if first_sign:
_create_filter_tuple(dtime, last_partition, first_sign, single_filter)
tuple_last_range = (filter_column, sign, dtime)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be me, but all the naming confuses me (last_range vs first_sign)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same for single filter (maybe you can extract a find_single_filter function from here)


else:
dfs = map(lambda path: pandas.read_parquet(path, columns=columns,
storage_options=kwargs.get('storage_options')), paths)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use storage_options instead of kwargs.get

@katyakats katyakats merged commit 05b6db8 into mlrun:development Apr 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants