Skip to content

Commit

Permalink
Add local delete operation implementation. (#9)
Browse files Browse the repository at this point in the history
* Add local delete operation implementation.

* Add comment of manifest_schema method

* Test append file schema
  • Loading branch information
Zhou Fang authored Dec 24, 2023
1 parent b1f2201 commit 5a08e57
Show file tree
Hide file tree
Showing 16 changed files with 538 additions and 89 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ jobs:
- name: Analysing test code with pylint
working-directory: ./python
run: |
pylint tests/**/* --disable="missing-module-docstring,missing-function-docstring,missing-class-docstring,duplicate-code"
pylint tests/**/* \
--disable="missing-module-docstring,missing-function-docstring,missing-class-docstring,duplicate-code"
- name: Checking type with mypy
working-directory: ./python/src
run: |
Expand Down
49 changes: 27 additions & 22 deletions python/src/space/core/manifests/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@
from space.core.schema.arrow import field_id, field_id_to_column_id_dict
from space.core.utils import paths

# Manifest file fields.
_INDEX_COMPRESSED_BYTES_FIELD = '_INDEX_COMPRESSED_BYTES'
_INDEX_UNCOMPRESSED_BYTES_FIELD = '_INDEX_UNCOMPRESSED_BYTES'


def _stats_subfields(type_: pa.DataType) -> List[pa.Field]:
"""Column stats struct field sub-fields."""
Expand All @@ -51,8 +47,8 @@ def _manifest_schema(

fields = [(constants.FILE_PATH_FIELD, pa.utf8()),
(constants.NUM_ROWS_FIELD, pa.int64()),
(_INDEX_COMPRESSED_BYTES_FIELD, pa.int64()),
(_INDEX_UNCOMPRESSED_BYTES_FIELD, pa.int64())]
(constants.INDEX_COMPRESSED_BYTES_FIELD, pa.int64()),
(constants.INDEX_UNCOMPRESSED_BYTES_FIELD, pa.int64())]

# Fields to collect Parquet column statistics: [(field_id, type), ...].
stats_fields: List[Tuple[int, pa.DataType]] = []
Expand Down Expand Up @@ -139,6 +135,13 @@ def __init__(self, metadata_dir: str, schema: pa.Schema,
self._stats_column_ids.append(column_id)
self._field_stats_dict[column_id] = _FieldStats(type_)

self._cached_manifest_data: List[pa.Table] = []

@property
def manifest_schema(self) -> pa.Schema:
"""The schema of index manifest files."""
return self._manifest_schema

def write(self, file_path: str,
parquet_metadata: pq.FileMetaData) -> meta.StorageStatistics:
"""Write a new manifest row.
Expand Down Expand Up @@ -175,11 +178,13 @@ def write(self, file_path: str,
index_compressed_bytes=index_compressed_bytes,
index_uncompressed_bytes=index_uncompressed_bytes)

def write_arrow(self, manifest_data: pa.Table) -> None:
"""Write manifest rows in Arrow format."""
self._cached_manifest_data.append(manifest_data)

def finish(self) -> Optional[str]:
"""Materialize the manifest file and return the file path."""
# Convert cached manifest data to Arrow.
all_manifest_data: List[pa.Table] = []

if self._file_paths:
arrays = [
self._file_paths, self._num_rows, self._index_compressed_bytes,
Expand All @@ -189,15 +194,15 @@ def finish(self) -> Optional[str]:
for column_id in self._stats_column_ids:
arrays.append(self._field_stats_dict[column_id].to_arrow())

all_manifest_data.append(
self._cached_manifest_data.append(
pa.Table.from_arrays(
arrays=arrays,
schema=self._manifest_schema)) # type: ignore[call-arg]

if not all_manifest_data:
if not self._cached_manifest_data:
return None

manifest_data = pa.concat_tables(all_manifest_data)
manifest_data = pa.concat_tables(self._cached_manifest_data)
if manifest_data.num_rows == 0:
return None

Expand All @@ -217,11 +222,13 @@ class _IndexManifests:

def read_index_manifests(
manifest_path: str,
manifest_file_id: int,
filter_: Optional[pc.Expression] = None) -> runtime.FileSet:
"""Read an index manifest file.
Args:
manifest_path: full file path of the manifest file.
manifest_file_id: a temporary manifest file ID assigned by the caller.
filter_: a filter on the index manifest rows.
Returns:
Expand All @@ -234,15 +241,13 @@ def read_index_manifests(

file_set = runtime.FileSet()
for i in range(table.num_rows):
file = runtime.DataFile()
file.path = manifests.file_path[i].as_py()

stats = file.storage_statistics
stats.num_rows = manifests.num_rows[i].as_py()
stats.index_compressed_bytes = manifests.index_compressed_bytes[i].as_py()
stats.index_uncompressed_bytes = manifests.index_uncompressed_bytes[
i].as_py()

stats = meta.StorageStatistics(
num_rows=manifests.num_rows[i].as_py(),
index_compressed_bytes=manifests.index_compressed_bytes[i].as_py(),
index_uncompressed_bytes=manifests.index_uncompressed_bytes[i].as_py())
file = runtime.DataFile(path=manifests.file_path[i].as_py(),
manifest_file_id=manifest_file_id,
storage_statistics=stats)
file_set.index_files.append(file)

return file_set
Expand All @@ -254,6 +259,6 @@ def _index_manifests(table: pa.Table) -> _IndexManifests:
file_path=table.column(constants.FILE_PATH_FIELD).combine_chunks(),
num_rows=table.column(constants.NUM_ROWS_FIELD).combine_chunks(),
index_compressed_bytes=table.column(
_INDEX_COMPRESSED_BYTES_FIELD).combine_chunks(),
constants.INDEX_COMPRESSED_BYTES_FIELD).combine_chunks(),
index_uncompressed_bytes=table.column(
_INDEX_UNCOMPRESSED_BYTES_FIELD).combine_chunks())
constants.INDEX_UNCOMPRESSED_BYTES_FIELD).combine_chunks())
1 change: 1 addition & 0 deletions python/src/space/core/ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
"""Space local data operations."""

from space.core.ops.append import LocalAppendOp
from space.core.ops.delete import FileSetDeleteOp
from space.core.ops.read import FileSetReadOp
36 changes: 27 additions & 9 deletions python/src/space/core/ops/append.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@ class LocalAppendOp(BaseAppendOp, StoragePaths):
Not thread safe.
"""

def __init__(self, location: str, metadata: meta.StorageMetadata):
def __init__(self,
location: str,
metadata: meta.StorageMetadata,
record_address_input: bool = False):
"""
Args:
record_address_input: if true, input record fields are addresses.
"""
StoragePaths.__init__(self, location)

self._metadata = metadata
Expand All @@ -91,6 +98,8 @@ def __init__(self, location: str, metadata: meta.StorageMetadata):
self._index_fields, self._record_fields = arrow.classify_fields(
self._physical_schema, record_fields, selected_fields=None)

self._record_address_input = record_address_input

# Data file writers.
self._index_writer_info: Optional[_IndexWriterInfo] = None

Expand Down Expand Up @@ -151,6 +160,10 @@ def finish(self) -> Optional[runtime.Patch]:

return self._patch

def append_index_manifest(self, index_manifest_table: pa.Table) -> None:
"""Append external index manifest data."""
return self._index_manifest_writer.write_arrow(index_manifest_table)

def _append_arrow(self, data: pa.Table) -> None:
# TODO: to verify the schema of input data.
if data.num_rows == 0:
Expand All @@ -165,15 +178,20 @@ def _append_arrow(self, data: pa.Table) -> None:

# Write record fields into files.
# TODO: to parallelize it.
record_addresses = [
self._write_record_column(f, data.column(f.name))
for f in self._record_fields
]

# TODO: to preserve the field order in schema.
for field_name, address_column in record_addresses:
# TODO: the field/column added must have field ID.
index_data = index_data.append_column(field_name, address_column)
if self._record_address_input:
for f in self._record_fields:
index_data = index_data.append_column(f.name, data.column(
f.name)) # type: ignore[arg-type]
else:
record_addresses = [
self._write_record_column(f, data.column(f.name))
for f in self._record_fields
]

for field_name, address_column in record_addresses:
# TODO: the field/column added must have field ID.
index_data = index_data.append_column(field_name, address_column)

# Write index fields into files.
self._cached_index_file_bytes += index_data.nbytes
Expand Down
191 changes: 191 additions & 0 deletions python/src/space/core/ops/delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Local delete operation implementation."""

from abc import abstractmethod
from typing import List, Optional, Set

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc

from space.core.ops import utils
from space.core.ops.append import LocalAppendOp
from space.core.ops.base import BaseOp
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as runtime
from space.core.utils.paths import StoragePaths
from space.core.schema import constants


class BaseDeleteOp(BaseOp):
"""Abstract base delete operation class.
The deletion only applies to index files. The rows in record files are
cleaned up by a separate garbage collection operation.
"""

@abstractmethod
def delete(self) -> Optional[runtime.Patch]:
"""Delete data matching the filter from the storage.
TODO: a class is not needed for the current single thread implementation.
To revisit the interface.
"""


class FileSetDeleteOp(BaseDeleteOp, StoragePaths):
"""Delete operation of a given file set running locally.
It can be used as components of more complex operations and distributed
delete operation.
Not thread safe.
"""

def __init__(self, location: str, metadata: meta.StorageMetadata,
file_set: runtime.FileSet, filter_: pc.Expression):
StoragePaths.__init__(self, location)

if not _validate_files(file_set):
raise RuntimeError(f"Invalid input file set for delete op:\n{file_set}")

self._file_set = file_set
# Rows not matching not filter will be reinserted.
self._reinsert_filter = ~filter_

self._append_op = LocalAppendOp(location,
metadata,
record_address_input=True)

def delete(self) -> Optional[runtime.Patch]:
# The index files and manifests deleted, to remove them from index
# manifests.
deleted_files: List[str] = []
deleted_manifest_ids: Set[int] = set()

deleted_rows = 0
stats_before_delete = meta.StorageStatistics()
for file in self._file_set.index_files:
utils.update_index_storage_stats(stats_before_delete,
file.storage_statistics)

index_data = pq.read_table(self.full_path(file.path),
filters=self._reinsert_filter)

# No row is deleted. No need to re-insert rows.
if index_data.num_rows == file.storage_statistics.num_rows:
continue

# Collect statistics.
deleted_rows += (file.storage_statistics.num_rows - index_data.num_rows)
all_deleted = index_data.num_rows == 0

# Record deleted files and manifests information.
deleted_files.append(file.path)
deleted_manifest_ids.add(file.manifest_file_id)

# Write reinsert file for survived rows.
if all_deleted:
continue

# Re-insert survived rows.
self._append_op.write(index_data)

if deleted_rows == 0:
return None

# Carry over unmodified index files in reinserted manifests.
deleted_manifest_files: List[str] = []
for manifest_id in deleted_manifest_ids:
if manifest_id not in self._file_set.index_manifest_files:
raise RuntimeError(
f"Index manifest ID {manifest_id} not found in file set")

deleted_manifest_files.append(
self._file_set.index_manifest_files[manifest_id])

# Carry over survivor files to the new manifest data.
# Survivor stands for unmodified files.
survivor_files_filter = ~_build_file_path_filter(constants.FILE_PATH_FIELD,
deleted_files)
survivor_index_manifests = pq.ParquetDataset(
[self.full_path(f) for f in deleted_manifest_files],
filters=survivor_files_filter).read()
if survivor_index_manifests.num_rows > 0:
self._append_op.append_index_manifest(survivor_index_manifests)

reinsert_patch = self._append_op.finish()

# Populate the patch for the delete.
patch = runtime.Patch()
if reinsert_patch is not None:
patch.addition.index_manifest_files.extend(
reinsert_patch.addition.index_manifest_files)

for f in deleted_manifest_files:
patch.deletion.index_manifest_files.append(f)

# Compute storage statistics update.
survivor_stats = _read_index_statistics(survivor_index_manifests)
reinsert_stats = (reinsert_patch.storage_statistics_update if
reinsert_patch is not None else meta.StorageStatistics())

deleted_compressed_bytes = (reinsert_stats.index_compressed_bytes +
survivor_stats.index_compressed_bytes
) - stats_before_delete.index_compressed_bytes
deleted_uncompressed_bytes = (
reinsert_stats.index_uncompressed_bytes +
survivor_stats.index_uncompressed_bytes
) - stats_before_delete.index_uncompressed_bytes

patch.storage_statistics_update.CopyFrom(
meta.StorageStatistics(
num_rows=-deleted_rows,
index_compressed_bytes=deleted_compressed_bytes,
index_uncompressed_bytes=deleted_uncompressed_bytes))
return patch


def _build_file_path_filter(field_name: str, file_paths: List[str]):
filter_ = pc.scalar(False)
for f in file_paths:
filter_ |= (pc.field(field_name) == pc.scalar(f))
return filter_


def _read_index_statistics(manifest_data: pa.Table) -> meta.StorageStatistics:
"""Read storage statistics of unmodified index files from manifests."""

def sum_bytes(field_name: str) -> int:
return sum(manifest_data.column(
field_name).combine_chunks().to_pylist()) # type: ignore[arg-type]

return meta.StorageStatistics(index_compressed_bytes=sum_bytes(
constants.INDEX_COMPRESSED_BYTES_FIELD),
index_uncompressed_bytes=sum_bytes(
constants.INDEX_UNCOMPRESSED_BYTES_FIELD))


def _validate_files(file_set: runtime.FileSet) -> bool:
"""Return false if the file set does not contain sufficient information for
deletion.
"""
for f in file_set.index_files:
if (not f.path or f.storage_statistics.num_rows == 0
or f.manifest_file_id == 0):
return False

return len(file_set.index_manifest_files) > 0
Loading

0 comments on commit 5a08e57

Please sign in to comment.