Skip to content

Commit

Permalink
Added support for cloud based processing
Browse files Browse the repository at this point in the history
  • Loading branch information
beatfactor committed Jun 13, 2024
1 parent ad50bfb commit aa93d5a
Show file tree
Hide file tree
Showing 16 changed files with 469 additions and 54 deletions.
6 changes: 4 additions & 2 deletions oceanstream/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from .core import compute_sv, combine, convert, process_raw_file
from .core import initialize, compute_sv, combine, convert, process_raw_file

__all__ = ["compute_sv", "combine", "convert", "process_raw_file"]
__all__ = [
"compute_sv", "combine", "convert", "process_raw_file", "initialize"
]
2 changes: 0 additions & 2 deletions oceanstream/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import traceback

import typer
import asyncio
import os
Expand Down
3 changes: 1 addition & 2 deletions oceanstream/cli/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
typer
rich
typer
40 changes: 28 additions & 12 deletions oceanstream/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,28 @@

from pathlib import Path
from oceanstream.settings import load_config
from oceanstream.process import convert_raw_file, convert_raw_files, compute_single_file

DEFAULT_OUTPUT_FOLDER = "output"
DEFAULT_SONAR_MODEL = "EK60"

logging.basicConfig(level="ERROR", format='%(asctime)s - %(levelname)s - %(message)s')


def initialize(settings, file_path, log_level=None):
def initialize(settings, file_path=None, log_level=None, chunks=None):
logging.debug(f"Initializing with settings: {settings}, file path: {file_path}, log level: {log_level}")
if "config" not in settings:
settings["config"] = ""

config_data = load_config(settings["config"])
config_data["raw_path"] = file_path

if chunks:
config_data['chunks'] = chunks
else:
config_data['chunks'] = config_data.get('base_chunk_sizes', None)

if file_path is not None:
config_data["raw_path"] = file_path

if log_level is not None:
logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s', force=True)
Expand All @@ -27,10 +35,19 @@ def initialize(settings, file_path, log_level=None):
if 'sonar_model' in settings and settings["sonar_model"] is not None:
config_data["sonar_model"] = settings["sonar_model"]

if settings["output_folder"] is not None:
if settings.get('plot_echogram', None) is not None:
config_data["plot_echogram"] = settings["plot_echogram"]

if settings.get('waveform_mode', None) is not None:
config_data["waveform_mode"] = settings["waveform_mode"]

if settings.get('depth_offset', None) is not None:
config_data["depth_offset"] = settings["depth_offset"]

if settings.get("output_folder", None) is not None:
config_data["output_folder"] = settings["output_folder"]

if settings['cloud_storage'] is not None:
if settings.get('cloud_storage', None) is not None:
config_data['cloud_storage'] = settings['cloud_storage']

return config_data
Expand Down Expand Up @@ -63,27 +80,29 @@ def process_raw_file(source, output=None, sonar_model=None, plot_echogram=False,
sys.exit(1)


def convert(source, output=None, workers_count=None, config=None, log_level="WARNING", chunks=None):
def convert(source, output=None, base_path=None, workers_count=None, config=None, log_level="WARNING", chunks=None):
logging.debug("Starting convert function")
settings = {
"config": config,
"output_folder": output or DEFAULT_OUTPUT_FOLDER
}

if config is not None:
settings.update(config)

file_path = Path(source)
config_data = initialize(settings, file_path, log_level=log_level)

if chunks:
config_data['chunks'] = chunks
else:
config_data['chunks'] = config_data.get('base_chunk_sizes', None)

if file_path.is_file():
logging.debug(f"Converting raw file: {file_path}")
from oceanstream.process import convert_raw_file
convert_raw_file(file_path, config_data)
convert_raw_file(file_path, config_data, base_path=base_path)
logging.info(f"Converted raw file {source} to Zarr and wrote output to: {config_data['output_folder']}")
elif file_path.is_dir():
logging.debug(f"Converting raw files in directory: {file_path}")
from oceanstream.process import convert_raw_files
convert_raw_files(config_data, workers_count=workers_count)
else:
logging.error(f"The provided path '{source}' is not a valid file/folder.")
Expand Down Expand Up @@ -127,7 +146,6 @@ def compute_sv(source, output=None, workers_count=None, sonar_model=DEFAULT_SONA

if config is not None:
settings_dict.update(config)
# settings_dict["config"] = ''

file_path = Path(source)
config_data = initialize(settings_dict, file_path, log_level=log_level)
Expand All @@ -139,8 +157,6 @@ def compute_sv(source, output=None, workers_count=None, sonar_model=DEFAULT_SONA

if file_path.is_dir() and source.endswith(".zarr"):
logging.debug(f"Computing Sv for Zarr root file: {file_path}")
from oceanstream.process import compute_single_file

compute_single_file(config_data, chunks=chunks, plot_echogram=plot_echogram, waveform_mode=waveform_mode,
depth_offset=depth_offset)
elif file_path.is_dir():
Expand Down
1 change: 0 additions & 1 deletion oceanstream/echodata/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
pydantic
2 changes: 0 additions & 2 deletions oceanstream/echodata/sv_dataset_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ def add_depth(Sv: Dataset, depth_offset: float = 0, tilt: float = 0, downward: b
first_channel = Sv["channel"].values[0]
first_ping_time = Sv["ping_time"].values[0]

original_echo_range = Sv["echo_range"].sel(channel=first_channel, ping_time=first_ping_time).values

# Slice the echo_range to get the desired range of values
selected_echo_range = Sv["echo_range"].sel(channel=first_channel, ping_time=first_ping_time)
selected_echo_range = selected_echo_range.values.tolist()
Expand Down
9 changes: 9 additions & 0 deletions oceanstream/process/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .s3 import process_survey_data_with_progress, list_raw_files_from_bucket, convert_survey_data_from_bucket, \
download_file_from_bucket

__all__ = [
"process_survey_data_with_progress",
"list_raw_files_from_bucket",
"convert_survey_data_from_bucket",
"download_file_from_bucket"
]
93 changes: 93 additions & 0 deletions oceanstream/process/aws/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import os
import tempfile
import botocore
import boto3
from asyncio import CancelledError
from rich import print
from rich.traceback import install, Traceback
from tqdm.auto import tqdm
from dask import delayed
from botocore.config import Config


install(show_locals=False, width=120)


def process_survey_data_with_progress(files_by_day, bucket_name, client, config_data, process_func):
try:
total_files = sum(len(files) for files in files_by_day.values())
progress_bar = tqdm(total=total_files, desc="Processing Files", unit="file", ncols=100)

def update_progress(*args):
progress_bar.update()

temp_dir = tempfile.mkdtemp()
futures, temp_dir = convert_survey_data_from_bucket(files_by_day, temp_dir, bucket_name, client, config_data,
process_func)

for future in futures:
future.add_done_callback(update_progress)

client.gather(futures) # Ensure all tasks complete

progress_bar.close()
os.rmdir(temp_dir)
except KeyboardInterrupt:
print("Closing down.")
except CancelledError:
print("Closing down.")
except Exception as e:
print(f"[bold red]An error occurred:[/bold red] {e}")
print(f"{Traceback()}\n")


def convert_survey_data_from_bucket(files_by_day, temp_dir, bucket_name, dask_client, config_data, process_func):
"""Process survey data from S3."""
tasks = []

for day, files in files_by_day.items():
for file in files:
task = delayed(_process_raw_file)(file, day, temp_dir, bucket_name, config_data, process_func)
tasks.append(task)

# Execute all tasks in parallel
futures = dask_client.compute(tasks)

return futures, temp_dir


def list_raw_files_from_bucket(bucket_name, prefix):
"""List files in the S3 bucket along with their metadata."""
s3_client = boto3.client('s3', config=Config(signature_version=botocore.UNSIGNED))
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

files = [
{
'Key': item['Key'],
'Size': item['Size'],
'LastModified': item['LastModified']
}
for item in response.get('Contents', []) if item['Key'].endswith('.raw')
]

return files


def download_file_from_bucket(bucket_name, s3_key, local_dir):
"""Download a file from S3."""
s3_client = boto3.client('s3', config=Config(signature_version=botocore.UNSIGNED))
local_path = os.path.join(local_dir, os.path.basename(s3_key))
s3_client.download_file(bucket_name, s3_key, local_path)

return local_path


def _process_raw_file(file, day, temp_dir, bucket_name, config_data, process_func):
"""Process a single file."""
day_dir = os.path.join(temp_dir, day)
os.makedirs(day_dir, exist_ok=True)
local_path = download_file_from_bucket(bucket_name, file['Key'], day_dir)

process_func(local_path, config=config_data, base_path=temp_dir)
os.remove(local_path)

8 changes: 8 additions & 0 deletions oceanstream/process/azure/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .blob_storage import list_zarr_files
from .azure_processor import process_survey_data_with_progress, process_survey_data

__all__ = [
"list_zarr_files",
"process_survey_data",
"process_survey_data_with_progress"
]
66 changes: 66 additions & 0 deletions oceanstream/process/azure/azure_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import re

from asyncio import CancelledError
from rich import print
from rich.traceback import install, Traceback
from tqdm.auto import tqdm
from dask import delayed
from .blob_storage import open_zarr_store

install(show_locals=False, width=120)


def process_survey_data_with_progress(files_by_day, azfs, container_name, client, config_data, process_func):
try:
total_files = sum(len(files) for files in files_by_day.values())
progress_bar = tqdm(total=total_files, desc="Processing Files", unit="file", ncols=100)

def update_progress(*args):
progress_bar.update()

futures = process_survey_data(files_by_day, azfs, container_name, client, config_data, process_func)

for future in futures:
future.add_done_callback(update_progress)

client.gather(futures) # Ensure all tasks complete

progress_bar.close()
except KeyboardInterrupt:
print("Closing down.")
except CancelledError:
print("Closing down.")
except Exception as e:
print(f"[bold red]An error occurred:[/bold red] {e}")
print(f"{Traceback()}\n")


def process_survey_data(files_by_day, azfs, container_name, dask_client, config_data, process_func):
"""Process survey data from S3."""

tasks = []

for day, files in files_by_day.items():
for file in files:
task = delayed(_process_zarr_file)(file['Key'], azfs, container_name, config_data, process_func)
tasks.append(task)

# Execute all tasks in parallel
futures = dask_client.compute(tasks)

return futures


def _process_zarr_file(file, azfs, container_name, config_data, process_func=None):
"""Process a single Zarr file."""
base_path = file.replace('.zarr', '')
pattern = rf"^{re.escape(container_name)}/"
base_path = re.sub(pattern, '', base_path)

echodata = open_zarr_store(azfs, file, chunks=config_data['chunks'])

process_func(echodata, config_data, base_path=base_path,
chunks=config_data.get('chunks'),
plot_echogram=config_data.get('plot_echogram', False),
waveform_mode=config_data.get('waveform_mode', "CW"),
depth_offset=config_data.get('depth_offset', 0.0))
54 changes: 54 additions & 0 deletions oceanstream/process/azure/blob_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import echopype as ep


def list_zarr_files(azfs, path):
"""List all Zarr files in the Azure Blob Storage container along with their metadata."""
zarr_files = []
for blob in azfs.ls(path, detail=True):
if blob['type'] == 'directory' and not blob['name'].endswith('.zarr'):
subdir_files = list_zarr_files(azfs, blob['name'])
zarr_files.extend(subdir_files)
elif blob['name'].endswith('.zarr'):
zarr_files.append({
'Key': blob['name'],
'Size': blob['size'] if blob['size'] else 0,
'LastModified': blob['last_modified'] if 'last_modified' in blob else 0
})

return zarr_files


def open_zarr_store(azfs, store_name, chunks=None):
"""Open a Zarr store from Azure Blob Storage."""
mapper = azfs.get_mapper(store_name)

return ep.open_converted(mapper, chunks=chunks)


def _list_zarr_files_extended(azfs, path):
"""List all Zarr files in the Azure Blob Storage container along with their metadata."""
zarr_files = []
for blob in azfs.ls(path, detail=True):
if blob['type'] == 'directory' and not blob['name'].endswith('.zarr'):
subdir_files = list_zarr_files(azfs, blob['name'])
zarr_files.extend(subdir_files)
else:
# Calculate the total size and most recent modification date for the .zarr folder
total_size = 0
last_modified = None
for sub_blob in azfs.ls(blob['name'], detail=True):
if sub_blob['type'] == 'file':
total_size += sub_blob['size']
if last_modified is None or sub_blob['last_modified'] > last_modified:
last_modified = sub_blob['last_modified']

zarr_files.append({
'Key': blob['name'],
'Size': total_size,
'LastModified': last_modified
})

return zarr_files



2 changes: 1 addition & 1 deletion oceanstream/process/combine_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,4 @@ def from_filename(file_name):
creation_time = datetime.strptime(date_str + time_str, '%Y%m%d%H%M%S')
return creation_time

return None
return None
Loading

0 comments on commit aa93d5a

Please sign in to comment.