Skip to content

Commit

Permalink
Remove multi table feature (#431)
Browse files Browse the repository at this point in the history
* Remove dataset multi table feature from API

* Drop merge datasets pipelines

* Add changelog

* Remove dead code

* Enable ipython warning on exception again

* Adjust some docs

* Raise on instantiation of datasetmetadata if there are too many tables

* Remove obsolete todo
  • Loading branch information
fjetter authored Mar 16, 2021
1 parent dd7b244 commit 032856a
Show file tree
Hide file tree
Showing 69 changed files with 1,301 additions and 4,305 deletions.
18 changes: 18 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,24 @@
Changelog
=========

Kartothek 4.0.0 (2021-03-XY)
============================

This is a major release of kartothek with breaking API changes.

* Removal of complex user input (see gh427)
* Removal of multi table feature
* Removal of `kartothek.io.merge` module
* class :class:`~kartothek.core.dataset.DatasetMetadata` now has an attribute called `schema` which replaces the previous attribute `table_meta` and returns only a single schema
* All outputs which previously returned a sequence of dictionaries where each key-value pair would correspond to a table-data pair now returns only one :class:`pandas.DataFrame`
* All read pipelines will now automatically infer the table to read such that it is no longer necessary to provide `table` or `table_name` as an input argument
* All writing pipelines which previously supported a complex user input type now expose an argument `table_name` which can be used to continue usage of legacy datasets (i.e. datasets with an intrinsic, non-trivial table name). This usage is discouraged and we recommend users to migrate to a default table name (i.e. leave it None / `table`)
* All pipelines which previously accepted an argument `tables` to select the subset of tables to load no longer accept this keyword. Instead the to-be-loaded table will be inferred
* Trying to read a multi-tabled dataset will now cause an exception telling users that this is no longer supported with kartothek 4.0
* The dict schema for :meth:`~kartothek.core.dataset.DatasetMetadataBase.to_dict` and :meth:`~kartothek.core.dataset.DatasetMetadata.from_dict` changed replacing a dictionary in `table_meta` with the simple `schema`
* All pipeline arguments which previously accepted a dictionary of sequences to describe a table specific subset of columns now accept plain sequences (e.g. `columns`, `categoricals`)


Version 3.20.0 (2021-03-15)
===========================

Expand Down
4 changes: 1 addition & 3 deletions asv_bench/benchmarks/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ def setup(self, cardinality, num_values, partitions_to_merge):
unique_vals = ["{:010d}".format(n) for n in range(cardinality)]
array = [unique_vals[x % len(unique_vals)] for x in range(num_values)]
self.df = pd.DataFrame({self.column: array})
self.mp = MetaPartition(
label=self.table, data={"core": self.df}, metadata_version=4
)
self.mp = MetaPartition(label=self.table, data=self.df, metadata_version=4)
self.mp_indices = self.mp.build_indices([self.column])
self.merge_indices.append(self.mp_indices)

Expand Down
6 changes: 2 additions & 4 deletions asv_bench/benchmarks/metapartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ def setup(self, num_rows, dtype):
self.mp = MetaPartition(
label="primary_key={}/base_label".format(dtype[0]),
metadata_version=4,
table_meta={"table": self.schema},
schema=self.schema,
)

def time_reconstruct_index(self, num_rows, dtype):

self.mp._reconstruct_index_columns(
df=self.df,
key_indices=[("primary_key", str(dtype[1]))],
table="table",
columns=None,
categories=None,
date_as_object=False,
Expand All @@ -51,8 +50,7 @@ def time_reconstruct_index_categorical(self, num_rows, dtype):
self.mp._reconstruct_index_columns(
df=self.df,
key_indices=[("primary_key", str(dtype[1]))],
table="table",
columns=None,
categories={"table": ["primary_key"]},
categories="primary_key",
date_as_object=False,
)
16 changes: 8 additions & 8 deletions asv_bench/benchmarks/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
from .config import AsvBenchmarkConfig


def generate_mp(dataset_metadata=None):
def generate_mp():
return MetaPartition(
label=uuid.uuid4().hex,
table_meta={"table": make_meta(get_dataframe_alltypes(), origin="alltypes")},
files={"table": "fakefile"},
dataset_metadata=dataset_metadata,
schema=make_meta(get_dataframe_alltypes(), origin="alltypes"),
file="fakefile",
)


Expand Down Expand Up @@ -50,8 +49,7 @@ class TimeStoreDataset(AsvBenchmarkConfig):

def setup(self, num_partitions, max_depth, num_leafs):
self.store = get_store_from_url("hfs://{}".format(tempfile.mkdtemp()))
dataset_metadata = generate_metadata(max_depth, num_leafs)
self.partitions = [generate_mp(dataset_metadata) for _ in range(num_partitions)]
self.partitions = [generate_mp() for _ in range(num_partitions)]
self.dataset_uuid = "dataset_uuid"
self.user_dataset_metadata = {}

Expand All @@ -70,8 +68,10 @@ class TimePersistMetadata(AsvBenchmarkConfig):

def setup(self, num_partitions):
self.store = get_store_from_url("hfs://{}".format(tempfile.mkdtemp()))
self.partitions = [generate_mp() for _ in range(num_partitions)]
self.schemas = [generate_mp().schema for _ in range(num_partitions)]
self.dataset_uuid = "dataset_uuid"

def time_persist_common_metadata(self, num_partitions):
persist_common_metadata(self.partitions, None, self.store, self.dataset_uuid)
persist_common_metadata(
self.schemas, None, self.store, self.dataset_uuid, "name"
)
5 changes: 0 additions & 5 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,3 @@
"kartothek.serialization._generic": "kartothek.serialization",
"kartothek.serialization._parquet": "kartothek.serialization",
}

# In particular the deprecation warning in DatasetMetadata.table_schema is
# raising too many warning to handle sensibly using ipython directive pseudo
# decorators. Remove this with 4.X again
ipython_warning_is_error = False
58 changes: 13 additions & 45 deletions docs/guide/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Setup a store
# Load your data
# By default the single dataframe is stored in the 'core' table
df_from_store = read_table(store=store_url, dataset_uuid=dataset_uuid, table="table")
df_from_store = read_table(store=store_url, dataset_uuid=dataset_uuid)
df_from_store
Expand All @@ -58,14 +58,8 @@ Write
# We'll define two partitions which both have two tables
input_list_of_partitions = [
{
"label": "FirstPartition",
"data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
},
{
"label": "SecondPartition",
"data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
},
pd.DataFrame({"A": range(10)}),
pd.DataFrame({"A": range(10, 20)}),
]
# The pipeline will return a :class:`~kartothek.core.dataset.DatasetMetadata` object
Expand Down Expand Up @@ -96,17 +90,10 @@ Read
# In case you were using the dataset created in the Write example
for d1, d2 in zip(
list_of_partitions,
[
# FirstPartition
{"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
# SecondPartition
{"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
],
[pd.DataFrame({"A": range(10)}), pd.DataFrame({"A": range(10, 20)}),],
):
for kv1, kv2 in zip(d1.items(), d2.items()):
k1, v1 = kv1
k2, v2 = kv2
assert k1 == k2 and all(v1 == v2)
for k1, k2 in zip(d1, d2):
assert k1 == k2
Iter
Expand All @@ -120,14 +107,8 @@ Write
from kartothek.api.dataset import store_dataframes_as_dataset__iter
input_list_of_partitions = [
{
"label": "FirstPartition",
"data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
},
{
"label": "SecondPartition",
"data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
},
pd.DataFrame({"A": range(10)}),
pd.DataFrame({"A": range(10, 20)}),
]
# The pipeline will return a :class:`~kartothek.core.dataset.DatasetMetadata` object
Expand Down Expand Up @@ -160,17 +141,10 @@ Read
# In case you were using the dataset created in the Write example
for d1, d2 in zip(
list_of_partitions,
[
# FirstPartition
{"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
# SecondPartition
{"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
],
[pd.DataFrame({"A": range(10)}), pd.DataFrame({"A": range(10, 20)}),],
):
for kv1, kv2 in zip(d1.items(), d2.items()):
k1, v1 = kv1
k2, v2 = kv2
assert k1 == k2 and all(v1 == v2)
for k1, k2 in zip(d1, d2):
assert k1 == k2
Dask
````
Expand All @@ -184,14 +158,8 @@ Write
from kartothek.api.dataset import store_delayed_as_dataset
input_list_of_partitions = [
{
"label": "FirstPartition",
"data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
},
{
"label": "SecondPartition",
"data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
},
pd.DataFrame({"A": range(10)}),
pd.DataFrame({"A": range(10, 20)}),
]
# This will return a :class:`~dask.delayed`. The figure below
Expand Down
84 changes: 19 additions & 65 deletions docs/guide/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ Getting Started
===============


Kartothek manages datasets that consist of files that contain tables. It does so by offering
a metadata definition to handle these datasets efficiently.

Datasets in Kartothek are made up of one or more ``tables``, each with a unique schema.
When working with Kartothek tables as a Python user, we will use :class:`~pandas.DataFrame`
as the user-facing type.

Expand Down Expand Up @@ -131,33 +127,25 @@ This class holds information about the structure and schema of the dataset.

.. ipython:: python
dm.tables
dm.table_name
sorted(dm.partitions.keys())
dm.table_meta["table"].remove_metadata() # Arrow schema
dm.schema.remove_metadata()
For this guide, two attributes that are noteworthy are ``tables`` and ``partitions``:
- Each dataset has one or more ``tables``, where each table is a logical collection of data,
bound together by a common schema.
- ``partitions`` are the physical "pieces" of data which together constitute the
contents of a dataset. Data is written to storage on a per-partition basis.
See the section on partitioning for further details: :ref:`partitioning_section`.
For this guide we want to take a closer look at the ``partitions`` attribute.
``partitions`` are the physical "pieces" of data which together constitute the
contents of a dataset. Data is written to storage on a per-partition basis. See
the section on partitioning for further details: :ref:`partitioning_section`.

The attribute ``table_meta`` can be accessed to see the underlying schema of the dataset.
The attribute ``schema`` can be accessed to see the underlying schema of the dataset.
See :ref:`type_system` for more information.

To store multiple dataframes into a dataset, it is possible to pass a collection of
dataframes; the exact format will depend on the I/O backend used.

Additionally, Kartothek supports several data input formats,
it does not need to always be a plain ``pd.DataFrame``.
See :func:`~kartothek.io_components.metapartition.parse_input_to_metapartition` for
further details.

If table names are not specified when passing an iterator of dataframes,
Kartothek assumes these dataframes are different chunks of the same table
and expects their schemas to be identical. A ``ValueError`` will be thrown otherwise.
Kartothek assumes these dataframes are different chunks of the same table and
will therefore be required to have the same schema. A ``ValueError`` will be
thrown otherwise.
For example,

.. ipython:: python
Expand Down Expand Up @@ -194,39 +182,6 @@ For example,
.. note:: Read these sections for more details: :ref:`type_system`, :ref:`dataset_spec`


When we do not explicitly define the name of the table and partition, Kartothek uses the
default table name ``table`` and generates a UUID for the partition name.

.. admonition:: A more complex example: multiple named tables

Sometimes it may be useful to write multiple dataframes with different schemas into
a single dataset. This can be achieved by creating a dataset with multiple tables.

In this example, we create a dataset with two tables: ``core-table`` and ``aux-table``.
The schemas of the tables are identical across partitions (each dictionary in the
``dfs`` list argument represents a partition).

.. ipython:: python
dfs = [
{
"data": {
"core-table": pd.DataFrame({"id": [22, 23], "f": [1.1, 2.4]}),
"aux-table": pd.DataFrame({"id": [22], "col1": ["x"]}),
}
},
{
"data": {
"core-table": pd.DataFrame({"id": [29, 31], "f": [3.2, 0.6]}),
"aux-table": pd.DataFrame({"id": [31], "col1": ["y"]}),
}
},
]
dm = store_dataframes_as_dataset(store_url, dataset_uuid="two-tables", dfs=dfs)
dm.tables
Reading data from storage
=========================

Expand All @@ -238,24 +193,24 @@ table of the dataset as a pandas DataFrame.
from kartothek.api.dataset import read_table
read_table("a_unique_dataset_identifier", store_url, table="table")
read_table("a_unique_dataset_identifier", store_url)
We can also read a dataframe iteratively, using
:func:`~kartothek.io.iter.read_dataset_as_dataframes__iterator`. This will return a generator
of dictionaries (one dictionary for each `partition`), where the keys of each dictionary
represent the `tables` of the dataset. For example,
:func:`~kartothek.io.iter.read_dataset_as_dataframes__iterator`. This will return a generator of :class:`pandas.DataFrame` where every element represents one file. For example,

.. ipython:: python
from kartothek.api.dataset import read_dataset_as_dataframes__iterator
for partition_index, df_dict in enumerate(
read_dataset_as_dataframes__iterator(dataset_uuid="two-tables", store=store_url)
for partition_index, df in enumerate(
read_dataset_as_dataframes__iterator(
dataset_uuid="a_unique_dataset_identifier", store=store_url
)
):
# Note: There is no guarantee on the ordering
print(f"Partition #{partition_index}")
for table_name, table_df in df_dict.items():
print(f"Table: {table_name}. Data: \n{table_df}")
print(f"Data: \n{df}")
Respectively, the ``dask.delayed`` back-end provides the function
:func:`~kartothek.io.dask.delayed.read_dataset_as_delayed`, which has a very similar
Expand All @@ -275,8 +230,7 @@ function but returns a collection of ``dask.delayed`` objects.

.. ipython:: python
# Read only values table `core-table` where `f` < 2.5
read_table("two-tables", store_url, table="core-table", predicates=[[("f", "<", 2.5)]])
read_table("a_unique_dataset_identifier", store_url, predicates=[[("A", "<", 2.5)]])
.. _storefact: https://github.com/blue-yonder/storefact
.. _dask: https://docs.dask.org/en/latest/
Loading

0 comments on commit 032856a

Please sign in to comment.