Skip to content

Commit

Permalink
windborne partition by year and month (#424)
Browse files Browse the repository at this point in the history
* windborne partition by year and month

* fix lint

* fix ingestor

* fix parquet appender class in ingesetor

* fix windborne ingestor

* fix init windborne appender

* add test windborne parquet ingestor

* skip writing parquet when count is zero

* use existing nearest_stations function

* refactor parquet reader for windborne

* close connection

* fix altitude and add to_json

* fix lint

* fix tests
  • Loading branch information
danangmassandy authored Feb 4, 2025
1 parent 273248e commit 0d43799
Show file tree
Hide file tree
Showing 14 changed files with 591 additions and 88 deletions.
89 changes: 88 additions & 1 deletion django_project/core/tests/test_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

from core.utils.date import (
find_max_min_epoch_dates,
split_epochs_by_year
split_epochs_by_year,
split_epochs_by_year_month
)


Expand Down Expand Up @@ -178,3 +179,89 @@ def test_same_start_and_end(self):
self.assertEqual(
split_epochs_by_year(int(start_epoch), int(start_epoch)), expected
)


class TestSplitEpochsByYearMonth(TestCase):
"""Test method split_epochs_by_year_month."""

def test_same_month(self):
"""Test same month."""
start_epoch = datetime(2023, 5, 10, tzinfo=timezone.utc).timestamp()
end_epoch = datetime(2023, 5, 25, tzinfo=timezone.utc).timestamp()
expected = [(2023, 5, int(start_epoch), int(end_epoch))]
self.assertEqual(
split_epochs_by_year_month(int(start_epoch), int(end_epoch)),
expected
)

def test_crossing_two_months(self):
"""Test crossing two months."""
start_epoch = datetime(2023, 11, 20, tzinfo=timezone.utc).timestamp()
end_epoch = datetime(2023, 12, 10, tzinfo=timezone.utc).timestamp()
expected = [
(2023, 11, int(start_epoch),
int(datetime(2023, 11, 30, 23, 59, 59,
tzinfo=timezone.utc).timestamp())),
(2023, 12,
int(datetime(2023, 12, 1, tzinfo=timezone.utc).timestamp()),
int(end_epoch))
]
self.assertEqual(
split_epochs_by_year_month(int(start_epoch), int(end_epoch)),
expected
)

def test_crossing_year_boundary(self):
"""Test crossing year."""
start_epoch = datetime(2023, 12, 20, tzinfo=timezone.utc).timestamp()
end_epoch = datetime(2024, 1, 10, tzinfo=timezone.utc).timestamp()
expected = [
(2023, 12, int(start_epoch),
int(datetime(2023, 12, 31, 23, 59, 59,
tzinfo=timezone.utc).timestamp())),
(2024, 1,
int(datetime(2024, 1, 1, tzinfo=timezone.utc).timestamp()),
int(end_epoch))
]
self.assertEqual(
split_epochs_by_year_month(int(start_epoch), int(end_epoch)),
expected
)

def test_multiple_years_and_months(self):
"""Test multiple years and months."""
start_epoch = datetime(2022, 10, 15, tzinfo=timezone.utc).timestamp()
end_epoch = datetime(2023, 2, 20, tzinfo=timezone.utc).timestamp()
expected = [
(2022, 10, int(start_epoch),
int(datetime(2022, 10, 31, 23, 59, 59,
tzinfo=timezone.utc).timestamp())),
(2022, 11,
int(datetime(2022, 11, 1, tzinfo=timezone.utc).timestamp()),
int(datetime(2022, 11, 30, 23, 59, 59,
tzinfo=timezone.utc).timestamp())),
(2022, 12,
int(datetime(2022, 12, 1, tzinfo=timezone.utc).timestamp()),
int(datetime(2022, 12, 31, 23, 59, 59,
tzinfo=timezone.utc).timestamp())),
(2023, 1,
int(datetime(2023, 1, 1, tzinfo=timezone.utc).timestamp()),
int(datetime(2023, 1, 31, 23, 59, 59,
tzinfo=timezone.utc).timestamp())),
(2023, 2,
int(datetime(2023, 2, 1, tzinfo=timezone.utc).timestamp()),
int(end_epoch))
]
self.assertEqual(
split_epochs_by_year_month(int(start_epoch), int(end_epoch)),
expected
)

def test_same_start_and_end(self):
"""Test same input."""
start_epoch = datetime(2023, 7, 15, tzinfo=timezone.utc).timestamp()
expected = [(2023, 7, int(start_epoch), int(start_epoch))]
self.assertEqual(
split_epochs_by_year_month(int(start_epoch), int(start_epoch)),
expected
)
49 changes: 49 additions & 0 deletions django_project/core/utils/date.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,52 @@ def split_epochs_by_year(start_epoch, end_epoch):
current_year += 1

return results


def split_epochs_by_year_month(start_epoch, end_epoch):
"""Split datetime that is in different year and month.
:param start_epoch: Start date time in epoch
:type start_epoch: int
:param end_epoch: End date time in epoch
:type end_epoch: int
:return: List of (year, start_epoch, end_epoch)
:rtype: list
"""
results = []
start_dt = datetime.fromtimestamp(start_epoch, tz=timezone.utc)
end_dt = datetime.fromtimestamp(end_epoch, tz=timezone.utc)

current_year, current_month = start_dt.year, start_dt.month
while (current_year, current_month) <= (end_dt.year, end_dt.month):
month_start = datetime(
current_year, current_month, 1, tzinfo=timezone.utc
).timestamp()

if current_month == 12:
# Last second of the month
month_end = datetime(
current_year + 1, 1, 1,
tzinfo=timezone.utc
).timestamp() - 1
else:
month_end = datetime(
current_year, current_month + 1, 1,
tzinfo=timezone.utc
).timestamp() - 1

start = max(start_epoch, month_start)
end = min(end_epoch, month_end)

results.append(
(current_year, current_month, int(start), int(end))
)

# Move to next month
if current_month == 12:
current_year += 1
current_month = 1
else:
current_month += 1

return results
19 changes: 12 additions & 7 deletions django_project/gap/models/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ def _run(self, working_dir):
from gap.ingestor.cbam_bias_adjust import CBAMBiasAdjustIngestor
from gap.ingestor.dcas_rule import DcasRuleIngestor
from gap.ingestor.farm_registry import DCASFarmRegistryIngestor
from gap.utils.parquet import ParquetIngestorAppender
from gap.utils.parquet import (
ParquetIngestorAppender,
WindborneParquetIngestorAppender
)

ingestor = None
if self.ingestor_type == IngestorType.TAHMO:
Expand Down Expand Up @@ -250,21 +253,23 @@ def _run(self, working_dir):
ingestor_obj.run()

if (
self.status == IngestorSessionStatus.SUCCESS and
self._trigger_parquet and
self.ingestor_type in [
IngestorType.ARABLE,
IngestorType.TAHMO_API,
IngestorType.WIND_BORNE_SYSTEMS_API
]
):
data_source, _ = ingestor._init_datasource()
data_source, _ = ingestor_obj._init_datasource()
# run converter to parquet
converter = ParquetIngestorAppender(
ingestor._init_dataset(),
appender_cls = ParquetIngestorAppender
if self.ingestor_type == IngestorType.WIND_BORNE_SYSTEMS_API:
appender_cls = WindborneParquetIngestorAppender
converter = appender_cls(
ingestor_obj._init_dataset(),
data_source,
ingestor.min_ingested_date,
ingestor.max_ingested_date
ingestor_obj.min_ingested_date,
ingestor_obj.max_ingested_date
)
converter.setup()
converter.run()
Expand Down
7 changes: 5 additions & 2 deletions django_project/gap/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

from gap.ingestor.wind_borne_systems import PROVIDER as WINBORNE_PROVIDER
from gap.models import Dataset, DatasetStore
from gap.providers.airborne_observation import ObservationAirborneDatasetReader
from gap.providers.airborne_observation import (
ObservationAirborneDatasetReader,
ObservationAirborneParquetReader
)
from gap.providers.cbam import CBAMZarrReader, CBAMNetCDFReader # noqa
from gap.providers.observation import (
ObservationDatasetReader, ObservationParquetReader
Expand Down Expand Up @@ -42,7 +45,7 @@ def get_reader_from_dataset(dataset: Dataset, use_parquet=False):
return ObservationDatasetReader
elif dataset.provider.name in [WINBORNE_PROVIDER]:
if use_parquet:
return ObservationParquetReader
return ObservationAirborneParquetReader
return ObservationAirborneDatasetReader
elif (
dataset.provider.name == TIO_PROVIDER and
Expand Down
15 changes: 14 additions & 1 deletion django_project/gap/providers/airborne_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
from django.contrib.gis.geos import Polygon, Point

from gap.models import Measurement, StationHistory
from gap.providers.observation import ObservationDatasetReader
from gap.providers.observation import (
ObservationDatasetReader,
ObservationParquetReader
)


class ObservationAirborneDatasetReader(ObservationDatasetReader):
Expand Down Expand Up @@ -100,3 +103,13 @@ def get_measurements(self, start_date: datetime, end_date: datetime):
dataset_attribute__in=self.attributes,
station_history__in=nearest_histories
).order_by('date_time')


class ObservationAirborneParquetReader(
ObservationParquetReader, ObservationAirborneDatasetReader
):
"""Class for parquet reader for Airborne dataset."""

has_month_partition = True
has_altitudes = True
station_id_key = 'st_hist_id'
Loading

0 comments on commit 0d43799

Please sign in to comment.