Skip to content

Commit

Permalink
add filesystem argument
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisbc committed Jun 17, 2024
1 parent 49ed167 commit b578900
Showing 1 changed file with 8 additions and 10 deletions.
18 changes: 8 additions & 10 deletions toshi_hazard_store/model/revision_4/pyarrow_aggr_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import pathlib
import uuid
from functools import partial
from typing import TYPE_CHECKING, Iterable, Union
from typing import TYPE_CHECKING, Iterable, Optional, Union

import pandas as pd
import pyarrow as pa
import pyarrow.dataset
import pyarrow.dataset as ds
from pyarrow import fs

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,8 +51,9 @@ def write_metadata(output_folder: pathlib.Path, visited_file: pyarrow.dataset.Wr

def append_models_to_dataset(
models: Iterable[Union['HazardRealizationCurve', 'HazardAggregation']],
output_folder: pathlib.Path,
base_dir: str,
dataset_format: str = 'parquet',
filesystem: Optional[fs.FileSystem] = None,
) -> int:
"""
append realisation models to dataset using the pyarrow library
Expand All @@ -60,25 +62,21 @@ def append_models_to_dataset(
TODO: schema checks
"""

def groomed_models(models):
for model in models:
yield model.as_pandas_model()

df = pd.DataFrame(groomed_models(models))

df = pd.DataFrame([model.as_pandas_model() for model in models])
table = pa.Table.from_pandas(df)

write_metadata_fn = partial(write_metadata, output_folder)
write_metadata_fn = partial(write_metadata, base_dir)

ds.write_dataset(
table,
base_dir=str(output_folder),
base_dir=base_dir,
basename_template="%s-part-{i}.%s" % (uuid.uuid4(), dataset_format),
partitioning=['nloc_0'],
partitioning_flavor="hive",
existing_data_behavior="overwrite_or_ignore",
format=dataset_format,
file_visitor=write_metadata_fn,
filesystem=filesystem,
)

return df.shape[0]

0 comments on commit b578900

Please sign in to comment.