Skip to content

Commit

Permalink
Add unit tests for index manifests and local append op.
Browse files Browse the repository at this point in the history
  • Loading branch information
coufon committed Dec 21, 2023
1 parent 321a7dd commit 67e879e
Show file tree
Hide file tree
Showing 13 changed files with 467 additions and 108 deletions.
29 changes: 19 additions & 10 deletions python/src/space/core/manifests/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@
_MAX_FIELD = "_MAX"


def _stats_field_name(name: str) -> str:
"""Column stats struct field name."""
return f"{_STATS_FIELD}_{name}"
def _stats_field_name(field_id_: int) -> str:
"""Column stats struct field name.
It uses field ID instead of name. Manifest file has all Parquet files and it
is not tied with one Parquet schema, we can't do table field name to file
field name projection. Using field ID ensures that we can always uniquely
identifies a field.
"""
return f"{_STATS_FIELD}_f{field_id_}"


def _stats_subfields(type_: pa.DataType) -> List[pa.Field]:
Expand All @@ -62,9 +68,10 @@ def _manifest_schema(
if f.name not in primary_keys_:
continue

field_id_ = field_id(f)
fields.append(
(_stats_field_name(f.name), pa.struct(_stats_subfields(f.type))))
stats_fields.append((field_id(f), f.type))
(_stats_field_name(field_id_), pa.struct(_stats_subfields(f.type))))
stats_fields.append((field_id_, f.type))

return pa.schema(fields), stats_fields # type: ignore[arg-type]

Expand Down Expand Up @@ -140,9 +147,6 @@ def __init__(self, metadata_dir: str, schema: pa.Schema,
self._stats_column_ids.append(column_id)
self._field_stats_dict[column_id] = _FieldStats(type_)

# Cache for manifest data to materialize.
self._manifest_data: List[pa.Table] = []

def write(self, file_path: str,
parquet_metadata: pq.FileMetaData) -> meta.StorageStatistics:
"""Write a new manifest row.
Expand Down Expand Up @@ -182,6 +186,8 @@ def write(self, file_path: str,
def finish(self) -> Optional[str]:
"""Materialize the manifest file and return the file path."""
# Convert cached manifest data to Arrow.
all_manifest_data: List[pa.Table] = []

if self._file_paths:
arrays = [
self._file_paths, self._num_rows, self._index_compressed_bytes,
Expand All @@ -191,12 +197,15 @@ def finish(self) -> Optional[str]:
for column_id in self._stats_column_ids:
arrays.append(self._field_stats_dict[column_id].to_arrow())

self._manifest_data.append(
all_manifest_data.append(
pa.Table.from_arrays(
arrays=arrays,
schema=self._manifest_schema)) # type: ignore[call-arg]

manifest_data = pa.concat_tables(self._manifest_data)
if not all_manifest_data:
return None

manifest_data = pa.concat_tables(all_manifest_data)
if manifest_data.num_rows == 0:
return None

Expand Down
3 changes: 3 additions & 0 deletions python/src/space/core/ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Space local data operations."""

from space.core.ops.append import LocalAppendOp
12 changes: 6 additions & 6 deletions python/src/space/core/ops/append.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def finish(self) -> Optional[runtime.Patch]:


@dataclass
class IndexWriterInfo:
class _IndexWriterInfo:
"""Contain information of index file writer."""
writer: pq.ParquetWriter
file_path: str
Expand All @@ -73,7 +73,7 @@ def __init__(self, location: str, metadata: meta.StorageMetadata):
self._schema = arrow_schema(self._metadata.schema.fields)

# Data file writers.
self._index_writer_info: Optional[IndexWriterInfo] = None
self._index_writer_info: Optional[_IndexWriterInfo] = None

# Local runtime caches.
self._cached_index_data: Optional[pa.Table] = None
Expand All @@ -98,9 +98,6 @@ def finish(self) -> Optional[runtime.Patch]:
Returns:
A patch to the storage or None if no actual storage modification happens.
"""
if self._patch.storage_statistics_update.num_rows == 0:
return None

# Flush all cached index data.
if self._cached_index_data is not None:
self._maybe_create_index_writer()
Expand All @@ -115,6 +112,9 @@ def finish(self) -> Optional[runtime.Patch]:
self._patch.added_index_manifest_files.append(
self.short_path(index_manifest_full_path))

if self._patch.storage_statistics_update.num_rows == 0:
return None

return self._patch

def _append_arrow(self, data: pa.Table) -> None:
Expand Down Expand Up @@ -150,7 +150,7 @@ def _maybe_create_index_writer(self) -> None:
if self._index_writer_info is None:
full_file_path = paths.new_index_file_path(self._data_dir)
writer = pq.ParquetWriter(full_file_path, self._schema)
self._index_writer_info = IndexWriterInfo(
self._index_writer_info = _IndexWriterInfo(
writer, self.short_path(full_file_path))

def _finish_index_writer(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion python/src/space/core/ops/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ def update_index_storage_statistics(
update: meta.StorageStatistics,
) -> None:
"""Update index storage statistics."""
base.num_rows += base.num_rows
base.num_rows += update.num_rows
base.index_compressed_bytes += update.index_compressed_bytes
base.index_uncompressed_bytes += update.index_uncompressed_bytes
1 change: 1 addition & 0 deletions python/src/space/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def create(
primary_keys: un-enforced primary keys.
"""
# TODO: to verify that location is an empty directory.
# TODO: to verify primary key fields (and types) are valid.

field_id_mgr = FieldIdManager()
schema = field_id_mgr.assign_field_ids(schema)
Expand Down
166 changes: 166 additions & 0 deletions python/tests/core/manifests/test_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict, List

import pyarrow as pa
import pyarrow.parquet as pq

from space.core.manifests.index import IndexManifestWriter
from space.core.schema.arrow import field_metadata

_SCHEMA = pa.schema([
pa.field("int64", pa.int64(), metadata=field_metadata(0)),
pa.field("float64", pa.float64(), metadata=field_metadata(1)),
pa.field("bool", pa.bool_(), metadata=field_metadata(2)),
pa.field("string", pa.string(), metadata=field_metadata(3))
])


def _write_parquet_file(
file_path: str, schema: pa.Schema,
batches: List[Dict[str, List[Any]]]) -> pq.FileMetaData:
writer = pq.ParquetWriter(file_path, schema)
for batch in batches:
writer.write_table(pa.Table.from_pydict(batch))

writer.close()
return writer.writer.metadata


class TestIndexManifestWriter:

def test_write_all_types(self, tmp_path):
data_dir = tmp_path / "dataset" / "data"
data_dir.mkdir(parents=True)
metadata_dir = tmp_path / "dataset" / "metadata"
metadata_dir.mkdir(parents=True)

schema = _SCHEMA
manifest_writer = IndexManifestWriter(
metadata_dir=str(metadata_dir),
schema=schema,
primary_keys=["int64", "float64", "bool", "string"])

file_path = str(data_dir / "file0")
# TODO: the test should cover all types supported by column stats.
manifest_writer.write(
file_path,
_write_parquet_file(file_path, schema, [{
"int64": [1, 2, 3],
"float64": [0.1, 0.2, 0.3],
"bool": [True, False, False],
"string": ["a", "b", "c"]
}, {
"int64": [0, 10],
"float64": [-0.1, 100.0],
"bool": [False, False],
"string": ["A", "z"]
}]))
file_path = str(data_dir / "file1")
manifest_writer.write(
file_path,
_write_parquet_file(file_path, schema, [{
"int64": [1000, 1000000],
"float64": [-0.001, 0.001],
"bool": [False, False],
"string": ["abcedf", "ABCDEF"]
}]))

manifest_path = manifest_writer.finish()

data_dir_str = str(data_dir)
assert manifest_path is not None
assert pq.read_table(manifest_path).to_pydict() == {
"_FILE": [f"{data_dir_str}/file0", f"{data_dir_str}/file1"],
"_INDEX_COMPRESSED_BYTES": [645, 334],
"_INDEX_UNCOMPRESSED_BYTES": [624, 320],
"_NUM_ROWS": [5, 2],
"_STATS_f0": [{
"_MAX": 10,
"_MIN": 0
}, {
"_MAX": 1000000,
"_MIN": 1000
}],
"_STATS_f1": [{
"_MAX": 100.0,
"_MIN": -0.1
}, {
"_MAX": 0.001,
"_MIN": -0.001
}],
"_STATS_f2": [{
"_MAX": True,
"_MIN": False
}, {
"_MAX": False,
"_MIN": False
}],
"_STATS_f3": [{
"_MAX": "z",
"_MIN": "A"
}, {
"_MAX": "abcedf",
"_MIN": "ABCDEF"
}]
}

def test_write_collet_stats_for_primary_keys_only(self, tmp_path):
data_dir = tmp_path / "dataset" / "data"
data_dir.mkdir(parents=True)
metadata_dir = tmp_path / "dataset" / "metadata"
metadata_dir.mkdir(parents=True)

schema = _SCHEMA
manifest_writer = IndexManifestWriter(metadata_dir=str(metadata_dir),
schema=schema,
primary_keys=["int64"])

file_path = str(data_dir / "file0")
# TODO: the test should cover all types supported by column stats.
manifest_writer.write(
file_path,
_write_parquet_file(file_path, schema, [{
"int64": [1, 2, 3],
"float64": [0.1, 0.2, 0.3],
"bool": [True, False, False],
"string": ["a", "b", "c"]
}]))

manifest_path = manifest_writer.finish()

data_dir_str = str(data_dir)
assert manifest_path is not None
assert pq.read_table(manifest_path).to_pydict() == {
"_FILE": [f"{data_dir_str}/file0"],
"_INDEX_COMPRESSED_BYTES": [110],
"_INDEX_UNCOMPRESSED_BYTES": [109],
"_NUM_ROWS": [3],
"_STATS_f0": [{
"_MAX": 3,
"_MIN": 1
}]
}

def test_empty_manifest_should_return_none(self, tmp_path):
metadata_dir = tmp_path / "dataset" / "metadata"
metadata_dir.mkdir(parents=True)

schema = _SCHEMA
manifest_writer = IndexManifestWriter(metadata_dir=str(metadata_dir),
schema=schema,
primary_keys=["int64"])

assert manifest_writer.finish() is None
85 changes: 85 additions & 0 deletions python/tests/core/ops/test_append.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pyarrow as pa
import pyarrow.parquet as pq

from space.core.ops import LocalAppendOp
import space.core.proto.metadata_pb2 as meta
from space.core.storage import Storage


class TestLocalAppendOp:

def test_write_pydict_all_types(self, tmp_path):
location = tmp_path / "dataset"
schema = pa.schema([
pa.field("int64", pa.int64()),
pa.field("float64", pa.float64()),
pa.field("bool", pa.bool_()),
pa.field("string", pa.string())
])
storage = Storage.create(location=str(location),
schema=schema,
primary_keys=["int64"])

op = LocalAppendOp(str(location), storage.metadata)

# TODO: the test should cover all types supported by column stats.
op.write({
"int64": [1, 2, 3],
"float64": [0.1, 0.2, 0.3],
"bool": [True, False, False],
"string": ["a", "b", "c"]
})
op.write({
"int64": [0, 10],
"float64": [-0.1, 100.0],
"bool": [False, False],
"string": ["A", "z"]
})

patch = op.finish()
assert patch is not None

index_manifests = []
for f in patch.added_index_manifest_files:
index_manifests.append(pq.read_table(storage.full_path(f)))

index_manifest = pa.concat_tables(index_manifests).to_pydict()
assert "_FILE" in index_manifest

assert index_manifest == {
"_FILE": index_manifest["_FILE"],
"_INDEX_COMPRESSED_BYTES": [114],
"_INDEX_UNCOMPRESSED_BYTES": [126],
"_NUM_ROWS": [5],
"_STATS_f0": [{
"_MAX": 10,
"_MIN": 0
}]
}

assert patch.storage_statistics_update == meta.StorageStatistics(
num_rows=5, index_compressed_bytes=114, index_uncompressed_bytes=126)

def test_empty_op_return_none(self, tmp_path):
location = tmp_path / "dataset"
schema = pa.schema([pa.field("int64", pa.int64())])
storage = Storage.create(location=str(location),
schema=schema,
primary_keys=["int64"])

op = LocalAppendOp(str(location), storage.metadata)
assert op.finish() is None
Loading

0 comments on commit 67e879e

Please sign in to comment.