Skip to content

Commit

Permalink
refactor: Utilize python built-in for temp files
Browse files Browse the repository at this point in the history
By default when destination path is None, now the program
will use the built in tempfile module. This will ensure
that swap files can be cleaned up by the Operating System.

Introduced a new global variable in 'io' called 'ECHOPYPE_TEMP_DIR'
to specify the path to the temporary 'echopype' directory within the OS
temp directory. This is initialized on echopype import.

Additionally, moved I/O related functions with swap files now to the
higher level 'io' module in 'utils'.
  • Loading branch information
lsetiawan committed Oct 4, 2023
1 parent 94db308 commit 04a18c0
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 102 deletions.
7 changes: 1 addition & 6 deletions echopype/convert/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def open_raw(
xml_path: Optional["PathHint"] = None,
convert_params: Optional[Dict[str, str]] = None,
storage_options: Optional[Dict[str, str]] = None,
destination_path: Optional[str] = None,
destination_path: Optional[str] = "auto",
destination_storage_options: Optional[Dict[str, str]] = None,
max_chunk_size: str = "100MB",
) -> Optional[EchoData]:
Expand Down Expand Up @@ -384,11 +384,6 @@ def open_raw(
echosounders: EK60, ES70, EK80, ES80, EA640. Additionally, this feature
is currently in beta.
"""

# Set initial destination_path of "no_swap"
if destination_path is None:
destination_path = "no_swap"

if raw_file is None:
raise FileNotFoundError("The path to the raw data file must be specified.")

Expand Down
5 changes: 3 additions & 2 deletions echopype/convert/parse_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import zarr
from dask.array.core import auto_chunks

from ..utils.io import create_temp_zarr_store
from ..utils.log import _init_logger
from .utils.ek_raw_io import RawSimradFile, SimradEOF
from .utils.ek_swap import calc_final_shapes, create_temp_store
from .utils.ek_swap import calc_final_shapes

FILENAME_DATETIME_EK60 = (
"(?P<survey>.+)?-?D(?P<date>\\w{1,8})-T(?P<time>\\w{1,6})-?(?P<postfix>\\w+)?.raw"
Expand Down Expand Up @@ -162,7 +163,7 @@ def rectangularize_data(
if dest_path in ["swap", "auto"]:
dest_path = None
# Setup temp store
zarr_store = create_temp_store(dest_path, dest_storage_options)
zarr_store = create_temp_zarr_store(dest_path, dest_storage_options)
# Setup zarr store
zarr_root = zarr.group(
store=zarr_store, overwrite=True, synchronizer=zarr.ThreadSynchronizer()
Expand Down
87 changes: 0 additions & 87 deletions echopype/convert/utils/ek_swap.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,7 @@
import secrets
from operator import itemgetter
from typing import Any, Dict, List, Optional, Tuple

import fsspec
import numpy as np
from fsspec import AbstractFileSystem
from zarr.storage import FSStore

from ...utils.io import ECHOPYPE_DIR, check_file_permissions, validate_output_path

DEFAULT_ZARR_TEMP_DIR = ECHOPYPE_DIR / "temp_output" / "swap_files"


def _create_zarr_store_map(path, storage_options):
file_path = validate_output_path(
source_file=secrets.token_hex(16),
engine="zarr",
save_path=path,
output_storage_options=storage_options,
)
return fsspec.get_mapper(file_path, **storage_options)


def delete_store(store: "FSStore | str", fs: Optional[AbstractFileSystem] = None) -> None:
"""
Delete the store and all its contents.
Parameters
----------
store : FSStore or str
The store or store path to delete.
fs : AbstractFileSystem, optional
The fsspec file system to use
Returns
-------
None
"""
if isinstance(store, str):
if fs is None:
raise ValueError("Must provide fs if store is a path string")
store_path = store
else:
# Get the file system, this should already have the
# correct storage options
fs = store.fs

# Get the string path to the store
store_path: str = store.dir_path()

if fs.exists(store_path):
print(f"Deleting store: {store_path}")
# Delete the store when it exists
fs.rm(store_path, recursive=True)


def create_temp_store(dest_path, dest_storage_options=None, retries: int = 10):
if dest_path is None:
# Check permission of cwd, raise exception if no permission
check_file_permissions(ECHOPYPE_DIR)

# construct temporary directory that will hold the zarr file
dest_path = DEFAULT_ZARR_TEMP_DIR
if not dest_path.exists():
dest_path.mkdir(parents=True)

temp_zarr_dir = str(dest_path)

# Set default storage options if None
if dest_storage_options is None:
dest_storage_options = {}

# attempt to find different zarr_file_name
attempt = 0
exists = True
while exists:
zarr_store = _create_zarr_store_map(
path=temp_zarr_dir, storage_options=dest_storage_options
)
exists = zarr_store.fs.exists(zarr_store.root)
attempt += 1

if attempt == retries and exists:
raise RuntimeError(
(
"Unable to construct an unused zarr file name for swap ",
f"after {retries} retries!",
)
)
return zarr_store


def _get_datagram_max_shape(datagram_dict: Dict[Any, List[np.ndarray]]) -> Optional[Tuple[int]]:
Expand Down
5 changes: 2 additions & 3 deletions echopype/echodata/echodata.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
if TYPE_CHECKING:
from ..core import EngineHint, FileFormatHint, PathHint, SonarModelsHint

from ..convert.utils.ek_swap import delete_store
from ..utils.coding import sanitize_dtypes, set_time_encodings
from ..utils.io import check_file_existence, sanitize_file_path
from ..utils.io import check_file_existence, delete_zarr_store, sanitize_file_path
from ..utils.log import _init_logger
from ..utils.prov import add_processing_level
from .convention import sonarnetcdf_1
Expand Down Expand Up @@ -94,7 +93,7 @@ def cleanup_swap_files(self):
]
fs = zarr_stores[0].fs
for store in zarr_stores:
delete_store(store, fs)
delete_zarr_store(store, fs)

def __del__(self):
# TODO: this destructor seems to not work in Jupyter Lab if restart or
Expand Down
131 changes: 127 additions & 4 deletions echopype/utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
import os
import pathlib
import platform
import secrets
import sys
import tempfile
import uuid
from pathlib import Path, WindowsPath
from typing import TYPE_CHECKING, Dict, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union

import fsspec
import xarray as xr
from fsspec import FSMap
from fsspec import AbstractFileSystem, FSMap
from fsspec.implementations.local import LocalFileSystem
from zarr.storage import FSStore

from ..utils.coding import set_storage_encodings
from ..utils.log import _init_logger
Expand All @@ -30,15 +33,21 @@

logger = _init_logger(__name__)


ECHOPYPE_DIR = Path(os.path.expanduser("~")) / ".echopype"
# Get root echopype package name
ECHOPYPE = __name__.split(".")[0]
ECHOPYPE_DIR = Path(os.path.expanduser("~")) / ".{ECHOPYPE}"
ECHOPYPE_TEMP_DIR = Path(tempfile.tempdir) / ECHOPYPE
_SWAP_PREFIX = "ep-swap"


def init_ep_dir():
"""Initialize hidden directory for echopype"""
if not ECHOPYPE_DIR.exists():
ECHOPYPE_DIR.mkdir(exist_ok=True)

if not ECHOPYPE_TEMP_DIR.exists():
ECHOPYPE_TEMP_DIR.mkdir(exist_ok=True)


def get_files_from_dir(folder):
"""Retrieves all Netcdf and Zarr files from a given folder"""
Expand Down Expand Up @@ -422,3 +431,117 @@ def validate_source_ds_da(
check_file_existence(file_path=source_ds_da, storage_options=storage_options)

return source_ds_da, file_type


# Utilities for creating temporary swap zarr files -------------------------------------
def _create_zarr_store_map(path: str, storage_options: dict) -> FSMap:
"""Create a zarr store map"""
file_path = validate_output_path(
# Use same swap prefix for swap zarr files
source_file=f"{_SWAP_PREFIX}--{secrets.token_hex(16)[:8]}",
engine="zarr",
save_path=path,
output_storage_options=storage_options,
)
return fsspec.get_mapper(file_path, **storage_options)


def create_temp_zarr_store(
dest_path: Optional[str] = None, dest_storage_options: Dict[Any, Any] = {}, retries: int = 10
) -> FSMap:
"""Create a temporary zarr store for swapping data.
Parameters
----------
dest_path : str, optional
The destination path to create the swap file, by default None.
If None, then use system temp directory.
dest_storage_options : dict, optional
The storage options for the destination path
retries : int, default 10
The number of retries to attempt to create a unique swap file
Returns
-------
FSMap
The zarr store for swapping data
Raises
------
RuntimeError
If unable to construct an unused zarr file name for swap after retries
"""
if dest_path is None:
# Use system temp directory to create swap file by default
with tempfile.TemporaryDirectory(
suffix=".zarr",
prefix=f"{_SWAP_PREFIX}--",
dir=ECHOPYPE_TEMP_DIR,
) as zarr_path:
return fsspec.get_mapper(zarr_path)

# User is able to specify a path to create the swap file
temp_zarr_dir = str(dest_path)

# Set default storage options if None
if dest_storage_options is None:
dest_storage_options = {}

# attempt to find different zarr_file_name
attempt = 0
exists = True
while exists:
zarr_store = _create_zarr_store_map(
path=temp_zarr_dir, storage_options=dest_storage_options
)
exists = zarr_store.fs.exists(zarr_store.root)
attempt += 1

if attempt == retries and exists:
raise RuntimeError(
(
"Unable to construct an unused zarr file name for swap ",
f"after {retries} retries!",
)
)
return zarr_store


def delete_zarr_store(store: "FSStore | str", fs: Optional[AbstractFileSystem] = None) -> None:
"""
Delete the zarr store and all its contents.
Parameters
----------
store : FSStore or str
The store or store path to delete.
fs : AbstractFileSystem, optional
The fsspec file system to use
Returns
-------
None
Raises
------
ValueError
If store is a string path and fs is not provided
"""
if isinstance(store, str):
if fs is None:
raise ValueError("Must provide fs if store is a path string")
store_path = store
else:
# Get the file system, this should already have the
# correct storage options
fs = store.fs

# Get the string path to the store
store_path: str = store.dir_path()

if fs.exists(store_path):
# Delete the store when it exists
fs.rm(store_path, recursive=True)


# End of utilities for creating temporary swap zarr files ------------------------------

0 comments on commit 04a18c0

Please sign in to comment.