Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KED-2254] Add datatable csv dataset (#592) #616

Closed
Closed
38 changes: 38 additions & 0 deletions kedro/extras/datasets/datatable/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2020 QuantumBlack Visual Analytics Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
# NONINFRINGEMENT. IN NO EVENT WILL THE LICENSOR OR OTHER CONTRIBUTORS
# BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF, OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# The QuantumBlack Visual Analytics Limited ("QuantumBlack") name and logo
# (either separately or in combination, "QuantumBlack Trademarks") are
# trademarks of QuantumBlack. The License does not grant you any right or
# license to the QuantumBlack Trademarks. You may not use the QuantumBlack
# Trademarks or any confusingly similar mark as a trademark for your product,
# or use the QuantumBlack Trademarks in any other manner that might cause
# confusion in the marketplace, including but not limited to in advertising,
# on websites, or on software.
#
# See the License for the specific language governing permissions and
# limitations under the License.

"""``AbstractDataSet`` implementations that produce datatable Frames."""

__all__ = [
"CSVDataSet",
]

from contextlib import suppress

with suppress(ImportError):
from .csv_dataset import CSVDataSet # NOQA
185 changes: 185 additions & 0 deletions kedro/extras/datasets/datatable/csv_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# Copyright 2020 QuantumBlack Visual Analytics Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
# NONINFRINGEMENT. IN NO EVENT WILL THE LICENSOR OR OTHER CONTRIBUTORS
# BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF, OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# The QuantumBlack Visual Analytics Limited ("QuantumBlack") name and logo
# (either separately or in combination, "QuantumBlack Trademarks") are
# trademarks of QuantumBlack. The License does not grant you any right or
# license to the QuantumBlack Trademarks. You may not use the QuantumBlack
# Trademarks or any confusingly similar mark as a trademark for your product,
# or use the QuantumBlack Trademarks in any other manner that might cause
# confusion in the marketplace, including but not limited to in advertising,
# on websites, or on software.
#
# See the License for the specific language governing permissions and
# limitations under the License.

"""``CSVDataSet`` loads/saves data from/to a CSV file using an underlying
filesystem (e.g.: local, S3, GCS). It uses datatable to handle the CSV file.
"""
import copy
from pathlib import PurePosixPath
from typing import Any, Dict

import datatable as dt
import fsspec

from kedro.io.core import (
AbstractVersionedDataSet,
DataSetError,
Version,
get_filepath_str,
get_protocol_and_path,
)


class CSVDataSet(AbstractVersionedDataSet):
"""``CSVDataSet`` loads/saves data from/to a CSV file using an underlying
filesystem (e.g.: local, S3, GCS). It uses datatable to handle the CSV file.

The main differences with respect to the ``pandas`` implementation:

- speed-up of csv reading, as datatable allows multi-thread reading from csv;
- usage of data manipulation API that is familiar to R users,
who might not have experience with ``pandas``.

Note that ``pandas.DataFrame`` is used for writing out.
mlisovyi marked this conversation as resolved.
Show resolved Hide resolved

Example:
::

>>> from kedro.extras.datasets.datatable import CSVDataSet
>>> import datatable as dt
>>>
>>> data = dt.Frame(A=range(5), B=[1.7, 3.4, 0, None, -dt.math.inf],
>>> stypes={"A": dt.int64})
>>>
>>> data_set = CSVDataSet(filepath="test.csv")
>>> data_set.save(data)

"""

DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {"index": False} # type: Dict[str, Any]

# pylint: disable=too-many-arguments
def __init__(
self,
filepath: str,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``CSVDataSet`` pointing to a concrete CSV file
on a specific filesystem.

Args:
filepath: Filepath in POSIX format to a CSV file prefixed with a protocol like `s3://`.
If prefix is not provided, `file` protocol (local filesystem) will be used.
The prefix should be any protocol supported by ``fsspec``.
Note: `http(s)` doesn't support versioning.
load_args: Datatable options for loading CSV files.
Here you can find all available arguments:
https://datatable.readthedocs.io/en/latest/api/dt/fread.html
All defaults are preserved.
save_args: Datatable options for saving CSV files.
mlisovyi marked this conversation as resolved.
Show resolved Hide resolved
Saving is done via pandas.
Here you can find all available arguments:
https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_csv.html
All defaults are preserved, but "index", which is set to False.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``), as well as
to pass to the filesystem's `open` method through nested keys
`open_args_load` and `open_args_save`.
Here you can find all available arguments for `open`:
https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open
"""
_fs_args = copy.deepcopy(fs_args) or {}
_fs_open_args_load = _fs_args.pop("open_args_load", {})
_fs_open_args_save = _fs_args.pop("open_args_save", {})
_credentials = copy.deepcopy(credentials) or {}

protocol, path = get_protocol_and_path(filepath, version)

self._protocol = protocol
self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args)

super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)

# Handle default load and save arguments
self._load_args = copy.deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = copy.deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

_fs_open_args_save.setdefault("mode", "w")
self._fs_open_args_load = _fs_open_args_load
self._fs_open_args_save = _fs_open_args_save

def _describe(self) -> Dict[str, Any]:
return dict(
filepath=self._filepath,
protocol=self._protocol,
load_args=self._load_args,
save_args=self._save_args,
version=self._version,
)

def _load(self) -> dt.Frame:
load_path = get_filepath_str(self._get_load_path(), self._protocol)

with self._fs.open(load_path, **self._fs_open_args_load) as fs_file:
return dt.fread(file=fs_file, **self._load_args)

def _save(self, data: dt.Frame) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)

with self._fs.open(save_path, **self._fs_open_args_save) as fs_file:
# convert to pandas before saving as otherwise can not use fs_file
data.to_pandas().to_csv(path_or_buf=fs_file, **self._save_args)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you still using pandas to save the data?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, that is inefficient and wierd.
The reason is that datatable doesn't seem to support file-like object as input for writing out a table. Therefore one can not benefit from access to multiple various filesystems that is provided by fsspec.open()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so the main purpose of this dataset would be that you can load data in datatable and then work with it, before saving it again as pandas? If so, I'd suggest to update the description of the class to make this clear.

Copy link
Author

@mlisovyi mlisovyi Dec 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, correct. the main 2 benefits would be:

  • speed-up of csv reading, as datatable allows multi-thread reading from csv;
  • usage of data manipulation API that is familiar to R users, who might not have experience with pandas.

There is a nice list of advantages summarised in the original issue (#592).

Ok, i'll modify the doc-string of the class. This and the other proposed change will take a couple of days due to other tasks on my TODO.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, the changed have been pushed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to Frame.to_csv() docs, "If no path is given, then the Frame will be serialized into a string, and that string will be returned".

So maybe we should just get such string, .encode("utf-8") it and send to fsspec instead of converting to pandas?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I do not have enough experience to judge on it. I can blindly implement your suggestion, but I will not be able to validate it due to lack of time.


self._invalidate_cache()

def _exists(self) -> bool:
try:
load_path = get_filepath_str(self._get_load_path(), self._protocol)
except DataSetError:
return False

return self._fs.exists(load_path)

def _release(self) -> None:
super()._release()
self._invalidate_cache()

def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath)
1 change: 1 addition & 0 deletions test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ behave==1.2.6
biopython~=1.73
black==v19.10.b0
dask[complete]~=2.6
datatable>=0.11.1, <1.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Datatable should also be added into extras_require in setup.py

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Thanks.

I'm not really familiar with the extras_require configuration for setuptools. I have added requirements as far as I get the logic, but could you please have a look that it is correct? In particular the cross-dependency of entries with datatable and pandas requirements is not fully clear to me.

flake8~=3.5
gcsfs>=0.3.0, <0.7.0 # gcsfs 0.7.0 requires fsspec>=0.8.0, but we pin fsspec <0.7.0
geopandas>=0.6.0, <1.0
Expand Down
Loading