Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Failure when performing ORDER BY desc query with JIT_UNSPILL enabled #714

Closed
ChrisJar opened this issue Aug 30, 2021 · 4 comments · Fixed by #716
Closed

[BUG] Failure when performing ORDER BY desc query with JIT_UNSPILL enabled #714

ChrisJar opened this issue Aug 30, 2021 · 4 comments · Fixed by #716

Comments

@ChrisJar
Copy link

I get an unexpected error when performing the ORDER BY desc operation when using dask-sql with a dask-cuda cluster with JIT unspilling enabled.

For example:

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster(n_workers=16, device_memory_limit="15GB", enable_tcp_over_ucx=True, enable_nvlink=True, rmm_pool_size="29GB", jit_unspill=True)
client = Client(cluster)

import cudf, dask_cudf
from dask_sql import Context

c = Context()

df = cudf.DataFrame({"id":[1,4,4,5,3], "val":[4,6,6,3,8]})
ddf = dask_cudf.from_cudf(df, npartitions=1)
c.create_table("df", ddf)

query = "SELECT * FROM df ORDER BY id desc"

c.sql(query).compute()

returns:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_75923/4184803532.py in <module>
     16 query = "SELECT * FROM df ORDER BY id desc"
     17 
---> 18 c.sql(query).compute()
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql, return_futures, dataframes)
    420         rel, select_names, _ = self._get_ral(sql)
    421 
--> 422         dc = RelConverter.convert(rel, context=self)
    423 
    424         if dc is None:
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     54             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     55         )
---> 56         df = plugin_instance.convert(rel, context=context)
     57         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     58         return df
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py in convert(self, rel, context)
     25     ) -> DataContainer:
     26         # Get the input of the previous step
---> 27         (dc,) = self.assert_inputs(rel, 1, context)
     28 
     29         df = dc.df
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in assert_inputs(rel, n, context)
     79         from dask_sql.physical.rel.convert import RelConverter
     80 
---> 81         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     82 
     83     @staticmethod
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in <listcomp>(.0)
     79         from dask_sql.physical.rel.convert import RelConverter
     80 
---> 81         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     82 
     83     @staticmethod
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     54             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     55         )
---> 56         df = plugin_instance.convert(rel, context=context)
     57         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     58         return df
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/rel/logical/sort.py in convert(self, rel, context)
     42 
     43             df = df.persist()
---> 44             df = apply_sort(df, sort_columns, sort_ascending, sort_null_first)
     45 
     46         offset = rel.offset
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/utils/sort.py in apply_sort(df, sort_columns, sort_ascending, sort_null_first)
     21     # As sorting is rather expensive, we bether persist here
     22     df = df.persist()
---> 23     df = _sort_first_column(
     24         df, first_sort_column, first_sort_ascending, first_null_first
     25     )
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/utils/sort.py in _sort_first_column(df, first_sort_column, first_sort_ascending, first_null_first)
     84     else:
     85         df_is_na = None
---> 86         df_not_is_na = df.set_index(first_sort_column, drop=False).reset_index(
     87             drop=True
     88         )
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_cudf/core.py in set_index(***failed resolving arguments***)
    219             return df2
    220 
--> 221         return super().set_index(
    222             other,
    223             sorted=pre_sorted,
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/core.py in set_index(***failed resolving arguments***)
   4235             from .shuffle import set_index
   4236 
-> 4237             return set_index(
   4238                 self,
   4239                 other,
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/shuffle.py in set_index(df, index, npartitions, shuffle, compute, drop, upsample, divisions, partition_size, **kwargs)
    160 
    161     if divisions is None:
--> 162         divisions, mins, maxes = _calculate_divisions(
    163             df, index2, repartition, npartitions, upsample, partition_size
    164         )
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/shuffle.py in _calculate_divisions(df, partition_col, repartition, npartitions, upsample, partition_size)
     33     mins = partition_col.map_partitions(M.min)
     34     maxes = partition_col.map_partitions(M.max)
---> 35     divisions, sizes, mins, maxes = base.compute(divisions, sizes, mins, maxes)
     36     divisions = methods.tolist(divisions)
     37     if type(sizes) is not list:
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2669                     should_rejoin = False
   2670             try:
-> 2671                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2672             finally:
   2673                 for f in futures.values():
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1946             else:
   1947                 local_worker = None
-> 1948             return self.sync(
   1949                 self._gather,
   1950                 futures,
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    843             return future
    844         else:
--> 845             return sync(
    846                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    847             )
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    323     if error[0]:
    324         typ, exc, tb = error[0]
--> 325         raise exc.with_traceback(tb)
    326     else:
    327         return result[0]
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/utils.py in f()
    306             if callback_timeout is not None:
    307                 future = asyncio.wait_for(future, callback_timeout)
--> 308             result[0] = yield future
    309         except Exception:
    310             error[0] = sys.exc_info()
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1811                             exc = CancelledError(key)
   1812                         else:
-> 1813                             raise exception.with_traceback(traceback)
   1814                         raise exc
   1815                     if errors == "skip":
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/partitionquantiles.py in percentiles_summary()
    419     ):
    420         interpolation = "nearest"
--> 421     vals, n = _percentile(data, qs, interpolation=interpolation)
    422     if (
    423         is_cupy_type(data)
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/utils.py in __call__()
    572         Call the corresponding method based on type of argument.
    573         """
--> 574         meth = self.dispatch(type(arg))
    575         return meth(arg, *args, **kwargs)
    576 
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/utils.py in dispatch()
    566                 lk[cls] = lk[cls2]
    567                 return lk[cls2]
--> 568         raise TypeError("No dispatch for {0}".format(cls))
    569 
    570     def __call__(self, arg, *args, **kwargs):
TypeError: No dispatch for <class 'dask_cuda.proxify_device_objects._register_cudf.<locals>.FrameProxyObject'>

Environment:
dask - 2021.8.1
dask-sql - 0.3.10
cudf - 21.10
dask-cudf - 21.10
dask-cuda - 21.10

@quasiben
Copy link
Member

Thanks @ChrisJar I can also reproduce. Here is perhaps a simpler reproducer:

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster(n_workers=1, jit_unspill=True)
client = Client(cluster)

import cudf, dask_cudf
from dask_sql import Context

c = Context()

df = cudf.DataFrame({"id":[1,4,4,5,3], "val":[4,6,6,3,8]})
ddf = dask_cudf.from_cudf(df, npartitions=1)
c.create_table("df", ddf)

query = "SELECT * FROM df ORDER BY id desc"

c.sql(query).compute()

Seeing _percentile come up in the traceback, I'm wondering if we need to add additional dispatches in Dask. @galipremsagar do you have thoughts here ?

@galipremsagar
Copy link
Contributor

galipremsagar commented Sep 1, 2021

@galipremsagar do you have thoughts here ?

Looking into it

@galipremsagar
Copy link
Contributor

galipremsagar commented Sep 1, 2021

#716 contains the fix to this issue. But what's the expected result type for c.sql(query).compute() is what I want to know from dask-cuda experts, the reason I'm asking is a FrameProxyObject which is a proxy object that acts as a pass-through to Frame like objects will be returned:

>>> from dask_cuda import LocalCUDACluster
>>> from dask.distributed import Client

>>> cluster = LocalCUDACluster(n_workers=1, jit_unspill=True)
>>> client = Client(cluster)

>>> import cudf, dask_cudf
>>> from dask_sql import Context

>>> c = Context()

>>> df = cudf.DataFrame({"id":[1,4,4,5,3], "val":[4,6,6,3,8]})
>>> ddf = dask_cudf.from_cudf(df, npartitions=1)
>>> c.create_table("df", ddf)

>>> query = "SELECT * FROM df ORDER BY id desc"

>>> c.sql(query).compute()

>>> x = c.sql(query).compute()
>>> x.to_pandas()
   id  val
4   5    3
3   4    6
2   4    6
1   3    8
0   1    4
>>> type(x)
<class 'dask_cuda.proxify_device_objects.FrameProxyObject'>

@madsbk
Copy link
Member

madsbk commented Sep 6, 2021

#716 contains the fix to this issue. But what's the expected result type for c.sql(query).compute() is what I want to know from dask-cuda experts, the reason I'm asking is a FrameProxyObject which is a proxy object that acts as a pass-through to Frame like objects will be returned:

Yes, that is as expected. The proxy objects leaks into userspace, unless setting DASK_JIT_UNSPILL_COMPATIBILITY_MODE=True.

@rapids-bot rapids-bot bot closed this as completed in #716 Sep 8, 2021
rapids-bot bot pushed a commit that referenced this issue Sep 8, 2021
Fixes: #714 

This PR registers `percentile_lookup` for `FrameProxyObject`

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: #716
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants