-
Notifications
You must be signed in to change notification settings - Fork 653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
REFACTOR-#3853: interacting with Dask interface through 'DaskWrapper' class #3854
Conversation
57203c5
to
31c6a0b
Compare
@anmyachev, that is what we were discussing earlier when restructuring, great changes! What about Ray? |
I plan to do the same for Ray, but on the separate PR. Is it ok for you? :) |
Codecov Report
@@ Coverage Diff @@
## master #3854 +/- ##
===========================================
+ Coverage 62.00% 89.90% +27.89%
===========================================
Files 214 214
Lines 17370 17359 -11
===========================================
+ Hits 10771 15607 +4836
+ Misses 6599 1752 -4847
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
Yes, that should be done in a separate PR. |
|
||
Returns | ||
------- | ||
ray.ObjectRef or list | ||
Ray identifier of the result being put to Plasma store. | ||
""" | ||
return deploy_ray_func.options(num_returns=num_returns).remote(func, kwargs) | ||
return deploy_ray_func.options(num_returns=num_returns).remote( | ||
func, *args, kwargs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks strange, for this to be so **kwargs
, deploy_ray_func
signature should be changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Historically we have not been able to pass **kwargs
and *args
to a Ray task for implementation reasons. To get around this we pass kwargs
as a dictionary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly that such remote call did not work before?
>>> def t2(*args, **kwargs):
... print(args)
... print(kwargs)
...
>>> t2_rem = ray.remote(t2)
>>> t2_rem.remote("test_args", 2, 6, test_keyword="some value")
ObjectRef(4ee449587774c1f0ffffffffffffffffffffffff0100000001000000)
>>> (pid=28664) ('test_args', 2, 6)
(pid=28664) {'test_keyword': 'some value'}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@devin-petersohn, ping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. I wonder if we can align the two (ray and dask) implementations based on this interface.
@@ -62,3 +67,22 @@ def materialize(cls, future): | |||
""" | |||
client = default_client() | |||
return client.gather(future) | |||
|
|||
@classmethod | |||
def scatter(cls, data, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to call this put
. It will be a little weird to say DaskTask.put
or DaskTask.scatter
because tasks aren't an object store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using DaskWrapper
class name instead of DaskTask
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change to put
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@devin-petersohn, ping
|
||
Returns | ||
------- | ||
ray.ObjectRef or list | ||
Ray identifier of the result being put to Plasma store. | ||
""" | ||
return deploy_ray_func.options(num_returns=num_returns).remote(func, kwargs) | ||
return deploy_ray_func.options(num_returns=num_returns).remote( | ||
func, *args, kwargs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Historically we have not been able to pass **kwargs
and *args
to a Ray task for implementation reasons. To get around this we pass kwargs
as a dictionary.
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py
Outdated
Show resolved
Hide resolved
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py
Outdated
Show resolved
Hide resolved
Will this make sense to create a common interface for every |
This is the main goal of the refactoring, but it will only be done in the third step :) (After updating |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anmyachev is there another PR that does something similar? or is this a different PR?
There is a separate pull request PR where I'm trying to bring the interface of ray and dask to the same look. The main remaining problem in that pull request is to synchronize the interface for working with actors. This pull request (3854) can be considered separately. |
@@ -64,9 +62,9 @@ def get(self): | |||
""" | |||
self.drain_call_queue() | |||
# blocking operation | |||
if isinstance(self.future, pandas.DataFrame): | |||
if not isinstance(self.future, Future): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thus, the code looks clearer, we materialize when the object can be materialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A partition should hold a pandas DataFrame. We may come to undefined behavior or errors in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some code is written as if there could also be an object of pandas.series
type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each partition holds a pandas DataFrame, but not a pandas Series. That is what we require and say in docs. So I am in favor of checking for a pandas DataFrame only, to be strongly restricted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We should add a check for a future passed in the constructor to detect if it is really Dask.Future (as for Ray).
- I still tend to add a check in get if the materialized data is a pandas DataFrame.
- We should remove isinstance for a pandas Series in to_pandas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Check is added.
- This is not possible, because after calling
apply
function, the partition can contain an arbitrary object underFuture
object (More details in REFACTOR-#4206: add assert check into__init__
method ofPandasOnDaskDataframePartition
class #4207). - It is impossible according to 2 point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I see, to_numpy is presumably the only case when we have a NumPy array in a partition. Okay, let's leave it as is but we should think of this.
- We can remove isinstance for a pandas Series in to_pandas since we don't add that check in get.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- This should not be done, because after calling some function on the
DataFrame
object, an object ofSeries
type may be obtained, which will be stored in the partition (in that case,get
function will returnSeries
).
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py
Outdated
Show resolved
Hide resolved
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py
Outdated
Show resolved
Hide resolved
f2aec07
to
d004417
Compare
@YarShev ready for review |
Please fix conflicts. |
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
docs/flow/modin/core/execution/dask/implementations/pandas_on_dask/index.rst
Outdated
Show resolved
Hide resolved
modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py
Outdated
Show resolved
Hide resolved
modin/core/execution/dask/implementations/pandas_on_dask/io/io.py
Outdated
Show resolved
Hide resolved
modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Yaroslav Igoshev <Poolliver868@mail.ru>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
@YarShev ready for review. I answered all the comments except about the filename. |
Co-authored-by: Yaroslav Igoshev <Poolliver868@mail.ru>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
@YarShev ready for review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left the last comment. LGTM! Let's make it possible to put similar changes for Ray as well in 0.14.
docs/flow/modin/core/execution/dask/implementations/pandas_on_dask/index.rst
Outdated
Show resolved
Hide resolved
Also, do not forget to add a release note. |
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
@YarShev the error with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anmyachev, LGTM, thanks!
Signed-off-by: Anatoly Myachev anatoly.myachev@intel.com
What do these changes do?
deploy
functions.client.submit
function,DaskTask.deploy
function is called.client.materialize
instead ofclient.gather
client.scatter
(maybe rename toput
) instead ofclient.scatter
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
distributed.client.default_client
function into the single place #3853addedand passingdocs/developer/architecture.rst
is up-to-date