diff --git a/django_project/core/tests/test_date.py b/django_project/core/tests/test_date.py index c8fc3d33..070fe863 100644 --- a/django_project/core/tests/test_date.py +++ b/django_project/core/tests/test_date.py @@ -10,7 +10,8 @@ from core.utils.date import ( find_max_min_epoch_dates, - split_epochs_by_year + split_epochs_by_year, + split_epochs_by_year_month ) @@ -178,3 +179,89 @@ def test_same_start_and_end(self): self.assertEqual( split_epochs_by_year(int(start_epoch), int(start_epoch)), expected ) + + +class TestSplitEpochsByYearMonth(TestCase): + """Test method split_epochs_by_year_month.""" + + def test_same_month(self): + """Test same month.""" + start_epoch = datetime(2023, 5, 10, tzinfo=timezone.utc).timestamp() + end_epoch = datetime(2023, 5, 25, tzinfo=timezone.utc).timestamp() + expected = [(2023, 5, int(start_epoch), int(end_epoch))] + self.assertEqual( + split_epochs_by_year_month(int(start_epoch), int(end_epoch)), + expected + ) + + def test_crossing_two_months(self): + """Test crossing two months.""" + start_epoch = datetime(2023, 11, 20, tzinfo=timezone.utc).timestamp() + end_epoch = datetime(2023, 12, 10, tzinfo=timezone.utc).timestamp() + expected = [ + (2023, 11, int(start_epoch), + int(datetime(2023, 11, 30, 23, 59, 59, + tzinfo=timezone.utc).timestamp())), + (2023, 12, + int(datetime(2023, 12, 1, tzinfo=timezone.utc).timestamp()), + int(end_epoch)) + ] + self.assertEqual( + split_epochs_by_year_month(int(start_epoch), int(end_epoch)), + expected + ) + + def test_crossing_year_boundary(self): + """Test crossing year.""" + start_epoch = datetime(2023, 12, 20, tzinfo=timezone.utc).timestamp() + end_epoch = datetime(2024, 1, 10, tzinfo=timezone.utc).timestamp() + expected = [ + (2023, 12, int(start_epoch), + int(datetime(2023, 12, 31, 23, 59, 59, + tzinfo=timezone.utc).timestamp())), + (2024, 1, + int(datetime(2024, 1, 1, tzinfo=timezone.utc).timestamp()), + int(end_epoch)) + ] + self.assertEqual( + split_epochs_by_year_month(int(start_epoch), int(end_epoch)), + expected + ) + + def test_multiple_years_and_months(self): + """Test multiple years and months.""" + start_epoch = datetime(2022, 10, 15, tzinfo=timezone.utc).timestamp() + end_epoch = datetime(2023, 2, 20, tzinfo=timezone.utc).timestamp() + expected = [ + (2022, 10, int(start_epoch), + int(datetime(2022, 10, 31, 23, 59, 59, + tzinfo=timezone.utc).timestamp())), + (2022, 11, + int(datetime(2022, 11, 1, tzinfo=timezone.utc).timestamp()), + int(datetime(2022, 11, 30, 23, 59, 59, + tzinfo=timezone.utc).timestamp())), + (2022, 12, + int(datetime(2022, 12, 1, tzinfo=timezone.utc).timestamp()), + int(datetime(2022, 12, 31, 23, 59, 59, + tzinfo=timezone.utc).timestamp())), + (2023, 1, + int(datetime(2023, 1, 1, tzinfo=timezone.utc).timestamp()), + int(datetime(2023, 1, 31, 23, 59, 59, + tzinfo=timezone.utc).timestamp())), + (2023, 2, + int(datetime(2023, 2, 1, tzinfo=timezone.utc).timestamp()), + int(end_epoch)) + ] + self.assertEqual( + split_epochs_by_year_month(int(start_epoch), int(end_epoch)), + expected + ) + + def test_same_start_and_end(self): + """Test same input.""" + start_epoch = datetime(2023, 7, 15, tzinfo=timezone.utc).timestamp() + expected = [(2023, 7, int(start_epoch), int(start_epoch))] + self.assertEqual( + split_epochs_by_year_month(int(start_epoch), int(start_epoch)), + expected + ) diff --git a/django_project/core/utils/date.py b/django_project/core/utils/date.py index a3c1b10d..c1fe4bf8 100644 --- a/django_project/core/utils/date.py +++ b/django_project/core/utils/date.py @@ -69,3 +69,52 @@ def split_epochs_by_year(start_epoch, end_epoch): current_year += 1 return results + + +def split_epochs_by_year_month(start_epoch, end_epoch): + """Split datetime that is in different year and month. + + :param start_epoch: Start date time in epoch + :type start_epoch: int + :param end_epoch: End date time in epoch + :type end_epoch: int + :return: List of (year, start_epoch, end_epoch) + :rtype: list + """ + results = [] + start_dt = datetime.fromtimestamp(start_epoch, tz=timezone.utc) + end_dt = datetime.fromtimestamp(end_epoch, tz=timezone.utc) + + current_year, current_month = start_dt.year, start_dt.month + while (current_year, current_month) <= (end_dt.year, end_dt.month): + month_start = datetime( + current_year, current_month, 1, tzinfo=timezone.utc + ).timestamp() + + if current_month == 12: + # Last second of the month + month_end = datetime( + current_year + 1, 1, 1, + tzinfo=timezone.utc + ).timestamp() - 1 + else: + month_end = datetime( + current_year, current_month + 1, 1, + tzinfo=timezone.utc + ).timestamp() - 1 + + start = max(start_epoch, month_start) + end = min(end_epoch, month_end) + + results.append( + (current_year, current_month, int(start), int(end)) + ) + + # Move to next month + if current_month == 12: + current_year += 1 + current_month = 1 + else: + current_month += 1 + + return results diff --git a/django_project/gap/models/ingestor.py b/django_project/gap/models/ingestor.py index a5779588..1ef9419d 100644 --- a/django_project/gap/models/ingestor.py +++ b/django_project/gap/models/ingestor.py @@ -215,7 +215,10 @@ def _run(self, working_dir): from gap.ingestor.cbam_bias_adjust import CBAMBiasAdjustIngestor from gap.ingestor.dcas_rule import DcasRuleIngestor from gap.ingestor.farm_registry import DCASFarmRegistryIngestor - from gap.utils.parquet import ParquetIngestorAppender + from gap.utils.parquet import ( + ParquetIngestorAppender, + WindborneParquetIngestorAppender + ) ingestor = None if self.ingestor_type == IngestorType.TAHMO: @@ -250,7 +253,6 @@ def _run(self, working_dir): ingestor_obj.run() if ( - self.status == IngestorSessionStatus.SUCCESS and self._trigger_parquet and self.ingestor_type in [ IngestorType.ARABLE, @@ -258,13 +260,16 @@ def _run(self, working_dir): IngestorType.WIND_BORNE_SYSTEMS_API ] ): - data_source, _ = ingestor._init_datasource() + data_source, _ = ingestor_obj._init_datasource() # run converter to parquet - converter = ParquetIngestorAppender( - ingestor._init_dataset(), + appender_cls = ParquetIngestorAppender + if self.ingestor_type == IngestorType.WIND_BORNE_SYSTEMS_API: + appender_cls = WindborneParquetIngestorAppender + converter = appender_cls( + ingestor_obj._init_dataset(), data_source, - ingestor.min_ingested_date, - ingestor.max_ingested_date + ingestor_obj.min_ingested_date, + ingestor_obj.max_ingested_date ) converter.setup() converter.run() diff --git a/django_project/gap/providers/__init__.py b/django_project/gap/providers/__init__.py index 542e284f..bb84a42f 100644 --- a/django_project/gap/providers/__init__.py +++ b/django_project/gap/providers/__init__.py @@ -7,7 +7,10 @@ from gap.ingestor.wind_borne_systems import PROVIDER as WINBORNE_PROVIDER from gap.models import Dataset, DatasetStore -from gap.providers.airborne_observation import ObservationAirborneDatasetReader +from gap.providers.airborne_observation import ( + ObservationAirborneDatasetReader, + ObservationAirborneParquetReader +) from gap.providers.cbam import CBAMZarrReader, CBAMNetCDFReader # noqa from gap.providers.observation import ( ObservationDatasetReader, ObservationParquetReader @@ -42,7 +45,7 @@ def get_reader_from_dataset(dataset: Dataset, use_parquet=False): return ObservationDatasetReader elif dataset.provider.name in [WINBORNE_PROVIDER]: if use_parquet: - return ObservationParquetReader + return ObservationAirborneParquetReader return ObservationAirborneDatasetReader elif ( dataset.provider.name == TIO_PROVIDER and diff --git a/django_project/gap/providers/airborne_observation.py b/django_project/gap/providers/airborne_observation.py index 9c5305f1..8ef5c8b7 100644 --- a/django_project/gap/providers/airborne_observation.py +++ b/django_project/gap/providers/airborne_observation.py @@ -13,7 +13,10 @@ from django.contrib.gis.geos import Polygon, Point from gap.models import Measurement, StationHistory -from gap.providers.observation import ObservationDatasetReader +from gap.providers.observation import ( + ObservationDatasetReader, + ObservationParquetReader +) class ObservationAirborneDatasetReader(ObservationDatasetReader): @@ -100,3 +103,13 @@ def get_measurements(self, start_date: datetime, end_date: datetime): dataset_attribute__in=self.attributes, station_history__in=nearest_histories ).order_by('date_time') + + +class ObservationAirborneParquetReader( + ObservationParquetReader, ObservationAirborneDatasetReader +): + """Class for parquet reader for Airborne dataset.""" + + has_month_partition = True + has_altitudes = True + station_id_key = 'st_hist_id' diff --git a/django_project/gap/providers/observation.py b/django_project/gap/providers/observation.py index d31784d1..59d1a5c2 100644 --- a/django_project/gap/providers/observation.py +++ b/django_project/gap/providers/observation.py @@ -9,6 +9,7 @@ import json import duckdb import uuid +import numpy as np from _collections_abc import dict_values from datetime import datetime import pandas as pd @@ -576,6 +577,25 @@ def conn(self) -> duckdb.DuckDBPyConnection: """Get DuckDB Connection.""" return self._val + def to_json(self): + """Generate json.""" + output = { + 'geometry': json.loads(self.location_input.geometry.json), + } + # Convert query results to a DataFrame + df = self.conn.sql(self.query).df() + # Combine date and time columns + df['datetime'] = pd.to_datetime( + df['date'].dt.strftime('%Y-%m-%d') + ' ' + df['time'] + ) + df = df.drop(columns=['date', 'time', 'lat', 'lon']) + # Replace NaN with None + df = df.replace({np.nan: None}) + output['data'] = df.to_dict(orient="records") + # TODO: the current structure is not consistent with others + self.conn.close() + return output + def to_csv_stream(self, suffix='.csv', separator=','): """Generate csv bytes stream. @@ -635,6 +655,8 @@ def to_csv(self, suffix='.csv', separator=','): except Exception as e: print(f"Error generating CSV: {e}") raise + finally: + self.conn.close() return output @@ -664,6 +686,7 @@ def to_netcdf_stream(self, suffix='.nc'): with open(tmp_file.name, 'rb') as f: while chunk := f.read(self.chunk_size_in_bytes): yield chunk + self.conn.close() def to_netcdf(self, suffix=".nc"): """Generate NetCDF file and save directly to object storage. @@ -704,6 +727,8 @@ def to_netcdf(self, suffix=".nc"): except Exception as e: print(f"Error generating NetCDF: {e}") raise + finally: + self.conn.close() return output @@ -728,6 +753,10 @@ def _get_file_remote_url(self, suffix): class ObservationParquetReader(ObservationDatasetReader): """Class to read tahmo dataset in GeoParquet.""" + has_month_partition = False + has_altitudes = False + station_id_key = 'st_id' + def __init__( self, dataset: Dataset, attributes: List[DatasetAttribute], location_input: DatasetReaderInput, start_date: datetime, @@ -825,25 +854,6 @@ def _get_connection(self): conn.load_extension("spatial") return conn - def _get_nearest_stations_from_postgis(self, points): - """Find the nearest stations for a list of points using PostGIS.""" - nearest_stations = [] - - for point in points: - nearest_station = Station.objects.filter( - provider=self.dataset.provider - ).annotate( - distance=Distance("geometry", point) - ).order_by("distance").first() - - if nearest_station: - nearest_stations.append(nearest_station.id) - - if not nearest_stations: - raise ValueError("No nearest stations found!") - - return nearest_stations - def read_historical_data(self, start_date: datetime, end_date: datetime): """Read historical data from dataset. @@ -855,7 +865,13 @@ def read_historical_data(self, start_date: datetime, end_date: datetime): attributes = ', '.join( [a.attribute.variable_name for a in self.attributes] ) + if self.has_altitudes: + attributes = 'altitude, ' + attributes s3_path = self._get_directory_path() + if self.has_month_partition: + s3_path += 'year=*/month=*/*.parquet' + else: + s3_path += 'year=*/*.parquet' # Determine if dataset has time column time_column = ( @@ -872,9 +888,7 @@ def read_historical_data(self, start_date: datetime, end_date: datetime): {time_column} loc_y as lat, loc_x as lon, st_code as station_id, {attributes} - FROM read_parquet( - '{s3_path}year=*/*.parquet', - hive_partitioning=true) + FROM read_parquet('{s3_path}', hive_partitioning=true) WHERE year>={start_date.year} AND year<={end_date.year} AND date_time>='{start_date}' AND date_time<='{end_date}' AND ST_Within(geometry, ST_MakeEnvelope( @@ -885,30 +899,26 @@ def read_historical_data(self, start_date: datetime, end_date: datetime): # Handle Point Query elif self.location_input.type == LocationInputType.POINT: - query_point = self.location_input.point - # **Step 1: Find the nearest station using PostGIS** - nearest_station = Station.objects.filter( - provider=self.dataset.provider - ).annotate( - distance=Distance("geometry", query_point) - ).order_by("distance").first() + nearest_stations = self.get_nearest_stations() - if not nearest_station: + if ( + nearest_stations is None or + self._get_count(nearest_stations) == 0 + ): raise ValueError("No nearest station found!") + nearest_station = nearest_stations[0] # **Step 2: Use the nearest station ID in the DuckDB query** self.query = ( f""" SELECT date_time::date as date, {time_column} loc_y as lat, loc_x as lon, st_code as station_id, {attributes} - FROM read_parquet( - '{s3_path}year=*/*.parquet', - hive_partitioning=true) + FROM read_parquet('{s3_path}', hive_partitioning=true) WHERE year>={start_date.year} AND year<={end_date.year} AND date_time>='{start_date}' AND date_time<='{end_date}' AND - st_code = '{nearest_station.code}' + {self.station_id_key} = '{nearest_station.id}' ORDER BY date_time """ ) @@ -921,9 +931,7 @@ def read_historical_data(self, start_date: datetime, end_date: datetime): {time_column} loc_y as lat, loc_x as lon, st_code as station_id, {attributes} - FROM read_parquet( - '{s3_path}year=*/*.parquet', - hive_partitioning=true) + FROM read_parquet('{s3_path}', hive_partitioning=true) WHERE year>={start_date.year} AND year<={end_date.year} AND date_time>='{start_date}' AND date_time<='{end_date}' AND ST_Within(geometry, ST_GeomFromText('{polygon_wkt}')) @@ -932,23 +940,23 @@ def read_historical_data(self, start_date: datetime, end_date: datetime): ) # Handle List of Points Query elif self.location_input.type == LocationInputType.LIST_OF_POINT: - points = self.location_input.points - nearest_station_ids = ( - self._get_nearest_stations_from_postgis(points) - ) + nearest_stations = self.get_nearest_stations() + if ( + nearest_stations is None or + self._get_count(nearest_stations) == 0 + ): + raise ValueError("No nearest station found!") + station_ids = ", ".join(f"'{s.id}'" for s in nearest_stations) self.query = ( f""" SELECT date_time::date as date, {time_column} loc_y as lat, loc_x as lon, st_code as station_id, {attributes} - FROM read_parquet( - '{s3_path}year=*/*.parquet', - hive_partitioning=true) + FROM read_parquet('{s3_path}', hive_partitioning=true) WHERE year>={start_date.year} AND year<={end_date.year} AND date_time>='{start_date}' AND date_time<='{end_date}' AND - st_code IN ( - {", ".join(f"'{s}'" for s in nearest_station_ids)}) + {self.station_id_key} IN ({station_ids}) ORDER BY date_time """ ) diff --git a/django_project/gap/tests/ingestor/test_arable.py b/django_project/gap/tests/ingestor/test_arable.py index 631fc487..f66f1612 100644 --- a/django_project/gap/tests/ingestor/test_arable.py +++ b/django_project/gap/tests/ingestor/test_arable.py @@ -119,7 +119,8 @@ def test_run(self): os.environ[API_KEY_ENV_NAME] = 'API_KEY_ENV_NAME' session = IngestorSession.objects.create( ingestor_type=self.ingestor_type, - trigger_task=False + trigger_task=False, + trigger_parquet=False ) session.run() session.refresh_from_db() diff --git a/django_project/gap/tests/ingestor/test_tahmo.py b/django_project/gap/tests/ingestor/test_tahmo.py index a7edcb56..9f476202 100644 --- a/django_project/gap/tests/ingestor/test_tahmo.py +++ b/django_project/gap/tests/ingestor/test_tahmo.py @@ -176,6 +176,7 @@ def test_with_reset_data_config(self): file=self.correct_file, ingestor_type=self.ingestor_type, trigger_task=False, + trigger_parquet=False, additional_config={ 'reset_data': True } @@ -203,6 +204,7 @@ def test_with_min_date_config(self): file=self.correct_file, ingestor_type=self.ingestor_type, trigger_task=False, + trigger_parquet=False, additional_config={ 'min_date': '2018-03-14' } diff --git a/django_project/gap/tests/ingestor/test_tahmo_api.py b/django_project/gap/tests/ingestor/test_tahmo_api.py index 88d84e8e..031fa683 100644 --- a/django_project/gap/tests/ingestor/test_tahmo_api.py +++ b/django_project/gap/tests/ingestor/test_tahmo_api.py @@ -207,7 +207,8 @@ def test_run(self, mock_timezone): os.environ[TAHMO_API_PASSWORD_ENV_NAME] = 'password' session = IngestorSession.objects.create( ingestor_type=self.ingestor_type, - trigger_task=False + trigger_task=False, + trigger_parquet=False ) session.run() session.refresh_from_db() @@ -255,7 +256,8 @@ def test_run(self, mock_timezone): ) session = IngestorSession.objects.create( ingestor_type=self.ingestor_type, - trigger_task=False + trigger_task=False, + trigger_parquet=False ) session.run() session.refresh_from_db() diff --git a/django_project/gap/tests/ingestor/test_wind_borne_sistems.py b/django_project/gap/tests/ingestor/test_wind_borne_sistems.py index faf7f46f..c94ad183 100644 --- a/django_project/gap/tests/ingestor/test_wind_borne_sistems.py +++ b/django_project/gap/tests/ingestor/test_wind_borne_sistems.py @@ -9,7 +9,9 @@ import responses from django.contrib.gis.gdal import DataSource from django.contrib.gis.geos import GEOSGeometry, Point -from django.test import TestCase +from django.test import TestCase, override_settings +from django.core.files.storage import storages +from django.utils import timezone from core.settings.utils import absolute_path from gap.ingestor.exceptions import EnvIsNotSetException @@ -20,8 +22,10 @@ ) from gap.models import ( Provider, StationType, Country, Station, IngestorSession, - IngestorSessionStatus, IngestorType + IngestorSessionStatus, IngestorType, Dataset, DataSourceFile, + DatasetStore ) +from gap.utils.parquet import WindborneParquetConverter from gap.tests.mock_response import BaseTestWithPatchResponses, PatchRequest @@ -121,6 +125,9 @@ def mock_requests(self): def setUp(self): """Init test case.""" + self.dataset = Dataset.objects.get( + name='WindBorne Balloons Observations' + ) # Init kenya Country shp_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), @@ -202,7 +209,8 @@ def test_run(self): # First import session = IngestorSession.objects.create( ingestor_type=self.ingestor_type, - trigger_task=False + trigger_task=False, + trigger_parquet=False ) session.run() session.refresh_from_db() @@ -336,3 +344,144 @@ def test_run(self): ), [3, 400, 30, 30] ) + + def _remove_output(self, s3_path, s3, year): + """Remove parquet output from object storage.""" + s3_storage = storages['gap_products'] + path = ( + f'{s3_path.replace(f's3://{s3['AWS_BUCKET_NAME']}/', '')}' + f'year={year}' + ) + _, files = s3_storage.listdir(path) + print(files) + for file in files: + s3_storage.delete(file) + + @responses.activate + @override_settings(DEBUG=True) + def test_convert_to_parquet(self): + """Test convert windborne data to parquet.""" + self.init_mock_requests() + os.environ[USERNAME_ENV_NAME] = 'Username' + os.environ[PASSWORD_ENV_NAME] = 'password' + + # Create mission 2 + point = Point( + x=36.756561, + y=-1.131241, + srid=4326 + ) + provider = Provider.objects.get( + name=PROVIDER + ) + station_type = StationType.objects.get( + name=STATION_TYPE + ) + Station.objects.update_or_create( + provider=provider, + station_type=station_type, + code='mission-2', + defaults={ + 'name': 'mission-2', + 'geometry': point, + 'altitude': 500, + } + ) + + # First import + session = IngestorSession.objects.create( + ingestor_type=self.ingestor_type, + trigger_task=False, + trigger_parquet=False + ) + session.run() + session.refresh_from_db() + self.assertEqual(session.status, IngestorSessionStatus.SUCCESS) + self.assertEqual(Station.objects.count(), 2) + first_station = Station.objects.first() + self.assertEqual( + first_station.stationhistory_set.count(), 2 + ) + + data_source = DataSourceFile(name='winborne_test_store') + converter = WindborneParquetConverter( + self.dataset, data_source + ) + converter.setup() + converter.run() + self.assertTrue(converter._check_parquet_exists( + converter._get_directory_path(data_source), + 2024, + month=9 + )) + self._remove_output( + converter._get_directory_path(data_source), + converter._get_s3_variables(), + 2024 + ) + + @responses.activate + @override_settings(DEBUG=True) + def test_ingestor_to_parquet(self): + """Test ingest windborne data to parquet.""" + self.init_mock_requests() + os.environ[USERNAME_ENV_NAME] = 'Username' + os.environ[PASSWORD_ENV_NAME] = 'password' + + # Create mission 2 + point = Point( + x=36.756561, + y=-1.131241, + srid=4326 + ) + provider = Provider.objects.get( + name=PROVIDER + ) + station_type = StationType.objects.get( + name=STATION_TYPE + ) + Station.objects.update_or_create( + provider=provider, + station_type=station_type, + code='mission-2', + defaults={ + 'name': 'mission-2', + 'geometry': point, + 'altitude': 500, + } + ) + + # First import + data_source = DataSourceFile.objects.create( + name='winborne_test_store', + dataset=self.dataset, + format=DatasetStore.PARQUET, + start_date_time=timezone.now(), + end_date_time=timezone.now(), + created_on=timezone.now() + ) + session = IngestorSession.objects.create( + ingestor_type=self.ingestor_type, + trigger_task=False, + additional_config={ + 'datasourcefile_id': data_source.id + } + ) + session.run() + session.refresh_from_db() + self.assertEqual(session.status, IngestorSessionStatus.SUCCESS) + self.assertEqual(Station.objects.count(), 2) + converter = WindborneParquetConverter( + self.dataset, data_source + ) + converter.setup() + self.assertTrue(converter._check_parquet_exists( + converter._get_directory_path(data_source), + 2024, + month=9 + )) + self._remove_output( + converter._get_directory_path(data_source), + converter._get_s3_variables(), + 2024 + ) diff --git a/django_project/gap/tests/providers/test_observation.py b/django_project/gap/tests/providers/test_observation.py index 1d316caa..3ca9cabd 100644 --- a/django_project/gap/tests/providers/test_observation.py +++ b/django_project/gap/tests/providers/test_observation.py @@ -372,6 +372,13 @@ def test_generate_query_for_point( geometry=Point(26.97, -12.56, srid=4326), provider=self.dataset.provider ) + dt1 = datetime(2019, 11, 1, 0, 0, 0) + MeasurementFactory.create( + station=self.station, + dataset_attribute=self.dataset_attr, + date_time=dt1, + value=100 + ) # Mock the S3 directory path to prevent querying DataSourceFile mock_get_directory_path.return_value = "s3://test-bucket/tahmo/" @@ -390,7 +397,7 @@ def test_generate_query_for_point( # Assert query is generated correctly self.assertIn("FROM read_parquet(", reader.query) self.assertIn("WHERE year>=", reader.query) - self.assertIn("st_code =", reader.query) + self.assertIn("st_id =", reader.query) @patch( "gap.providers.observation.ObservationParquetReader._get_connection" @@ -496,6 +503,25 @@ def test_generate_query_for_list_of_points( geometry=Point(36.9, -1.4, srid=4326), provider=self.dataset.provider ) + dt1 = datetime(2019, 11, 1, 0, 0, 0) + MeasurementFactory.create( + station=self.station_1, + dataset_attribute=self.dataset_attr, + date_time=dt1, + value=100 + ) + MeasurementFactory.create( + station=self.station_2, + dataset_attribute=self.dataset_attr, + date_time=dt1, + value=100 + ) + MeasurementFactory.create( + station=self.station_3, + dataset_attribute=self.dataset_attr, + date_time=dt1, + value=100 + ) # Create a LIST_OF_POINT location input points = MultiPoint([ @@ -517,7 +543,7 @@ def test_generate_query_for_list_of_points( # Assert query is generated correctly self.assertIn("FROM read_parquet(", reader.query) self.assertIn("WHERE year>=", reader.query) - self.assertIn("st_code IN (", reader.query) + self.assertIn("st_id IN (", reader.query) @patch( "gap.providers.observation.ObservationParquetReader._get_connection" @@ -543,6 +569,13 @@ def test_read_historical_data( geometry=Point(26.97, -12.56, srid=4326), provider=self.dataset.provider ) + dt1 = datetime(2019, 11, 1, 0, 0, 0) + MeasurementFactory.create( + station=self.station, + dataset_attribute=self.dataset_attr, + date_time=dt1, + value=100 + ) mock_get_directory_path.return_value = "s3://test-bucket/tahmo/" # Mock DuckDB connection to prevent real queries @@ -643,6 +676,13 @@ def test_read_historical_data_netcdf( geometry=Point(26.97, -12.56, srid=4326), provider=self.dataset.provider ) + dt1 = datetime(2019, 11, 1, 0, 0, 0) + MeasurementFactory.create( + station=self.station, + dataset_attribute=self.dataset_attr, + date_time=dt1, + value=100 + ) mock_get_directory_path.return_value = "s3://test-bucket/tahmo/" # Mock DuckDB connection to prevent real queries diff --git a/django_project/gap/tests/utils/test_netcdf.py b/django_project/gap/tests/utils/test_netcdf.py index 1ab07126..22ec0e3d 100644 --- a/django_project/gap/tests/utils/test_netcdf.py +++ b/django_project/gap/tests/utils/test_netcdf.py @@ -35,7 +35,8 @@ SalientZarrReader, CBAMZarrReader, get_reader_from_dataset, - ObservationParquetReader + ObservationParquetReader, + ObservationAirborneParquetReader ) from gap.factories import ( ProviderFactory, @@ -487,7 +488,7 @@ def test_from_dataset(self): dataset4 = DatasetFactory.create( provider=ProviderFactory(name='WindBorne Systems')) reader = get_reader_from_dataset(dataset4, use_parquet=True) - self.assertEqual(reader, ObservationParquetReader) + self.assertEqual(reader, ObservationAirborneParquetReader) def test_read_variables_by_point(self): """Test read variables xarray by point.""" diff --git a/django_project/gap/tests/utils/test_parquet.py b/django_project/gap/tests/utils/test_parquet.py index 0531c599..f5f64c0e 100644 --- a/django_project/gap/tests/utils/test_parquet.py +++ b/django_project/gap/tests/utils/test_parquet.py @@ -63,7 +63,8 @@ def test_store(self): session = IngestorSession.objects.create( file=self.correct_file, ingestor_type=self.ingestor_type, - trigger_task=False + trigger_task=False, + trigger_parquet=False ) session.run() session.refresh_from_db() @@ -99,7 +100,8 @@ def test_convert_dataset_to_parquet(self): session = IngestorSession.objects.create( file=self.correct_file, ingestor_type=self.ingestor_type, - trigger_task=False + trigger_task=False, + trigger_parquet=False ) session.run() session.refresh_from_db() diff --git a/django_project/gap/utils/parquet.py b/django_project/gap/utils/parquet.py index 251cc190..1410e726 100644 --- a/django_project/gap/utils/parquet.py +++ b/django_project/gap/utils/parquet.py @@ -15,7 +15,8 @@ from django.db.models.functions.datetime import ( ExtractYear, ExtractMonth, - ExtractDay + ExtractDay, + TruncMonth ) from django.contrib.gis.db.models import Union from django.contrib.gis.db.models.functions import AsWKB @@ -29,7 +30,7 @@ ) from gap.providers.observation import ST_X, ST_Y from gap.utils.ingestor_config import get_ingestor_config_from_preferences -from core.utils.date import split_epochs_by_year +from core.utils.date import split_epochs_by_year, split_epochs_by_year_month logger = logging.getLogger(__name__) @@ -149,16 +150,20 @@ def _get_connection(self, s3): conn.load_extension("spatial") return conn - def _check_parquet_exists(self, s3_path: str, year: int): + def _check_parquet_exists(self, s3_path: str, year: int, month=None): s3_storage = storages['gap_products'] path = ( f'{s3_path.replace(f's3://{self.s3['AWS_BUCKET_NAME']}/', '')}' f'year={year}' ) + if month: + path += f'/month={month}' _, files = s3_storage.listdir(path) return len(files) > 0 - def _store_dataframe_as_geoparquet(self, df: pd.DataFrame, s3_path, bbox): + def _store_dataframe_as_geoparquet( + self, df: pd.DataFrame, s3_path, bbox, use_month=False + ): print('Writing a new parquet file') conn = self._get_connection(self.s3) # copy df to duckdb table @@ -178,11 +183,15 @@ def _store_dataframe_as_geoparquet(self, df: pd.DataFrame, s3_path, bbox): ) conn.sql(sql) # export to parquet file + partition_by = 'year' + if use_month: + partition_by += ',month' sql = ( f""" COPY (SELECT * FROM weather) TO '{s3_path}' - (FORMAT 'parquet', COMPRESSION 'zstd', PARTITION_BY (year), + (FORMAT 'parquet', COMPRESSION 'zstd', + PARTITION_BY ({partition_by}), OVERWRITE_OR_IGNORE true); """ ) @@ -190,19 +199,24 @@ def _store_dataframe_as_geoparquet(self, df: pd.DataFrame, s3_path, bbox): conn.close() def _append_dataframe_to_geoparquet( - self, df: pd.DataFrame, s3_path, bbox, year + self, df: pd.DataFrame, s3_path, bbox, year, month=None ): print(f'Appending dataframe to existing {year}') conn = self._get_connection(self.s3) # copy original parquet to duckdb table + parquet_dir = f'{s3_path}year=*/*.parquet' + month_cond = '' + if month: + parquet_dir = f'{s3_path}year=*/month=*/*.parquet' + month_cond = f'AND month={month}' sql = ( f""" CREATE TABLE tmp_weather AS SELECT * - FROM read_parquet('{s3_path}year=*/*.parquet', + FROM read_parquet('{parquet_dir}', hive_partitioning=true) - WHERE year={year} + WHERE year={year} {month_cond} """ ) conn.sql(sql) @@ -234,11 +248,15 @@ def _append_dataframe_to_geoparquet( conn.sql(sql) # export to parquet file + partition_by = 'year' + if month: + partition_by += ',month' sql = ( f""" COPY (SELECT * FROM weather) TO '{s3_path}' - (FORMAT 'parquet', COMPRESSION 'zstd', PARTITION_BY (year), + (FORMAT 'parquet', COMPRESSION 'zstd', + PARTITION_BY ({partition_by}), OVERWRITE_OR_IGNORE true); """ ) @@ -253,11 +271,13 @@ def _get_station_bounds(self): ) return combined_bbox['combined_geometry'].extent - def _get_station_df(self, year): + def _get_station_df(self, year, month=None): station_ids = Measurement.objects.filter( dataset_attribute__dataset=self.dataset, date_time__year=year - ).distinct('station_id').values_list( + ) + + station_ids = station_ids.distinct('station_id').values_list( 'station_id', flat=True ) @@ -280,7 +300,7 @@ def _get_station_df(self, year): df['altitude'] = df['altitude'].astype('double') return df - def _process_weather_df(self, year: int, measurements: list): + def _process_weather_df(self, year: int, measurements: list, month=None): # Convert to DataFrame df = pd.DataFrame(measurements) # Pivot the data to make attributes as columns @@ -310,22 +330,31 @@ def _process_weather_df(self, year: int, measurements: list): missing_cols.insert(0, df) df = pd.concat(missing_cols, axis=1) - station_df = self._get_station_df(year) + station_df = self._get_station_df(year, month=month) # merge df with station_df return df.merge(station_df, on=[self.STATION_JOIN_KEY], how='inner') - def _process_year(self, year: int): + def _process_subset(self, year: int, month=None): measurements = Measurement.objects.filter( dataset_attribute__dataset=self.dataset, date_time__year=year - ).order_by('date_time', 'station_id') + ) + if month: + measurements = measurements.filter( + date_time__month=month + ) + + measurements = measurements.order_by('date_time', 'station_id') print(f'Year {year} total_count: {measurements.count()}') + if measurements.count() == 0: + return None + measurements = measurements.annotate(**self.WEATHER_FIELDS).values( *(list(self.WEATHER_FIELDS.keys()) + ['value']) ) - return self._process_weather_df(year, list(measurements)) + return self._process_weather_df(year, list(measurements), month=month) def run(self): """Run the converter.""" @@ -344,7 +373,10 @@ def run(self): station_bbox = self._get_station_bounds() for year in years: parquet_exists = self._check_parquet_exists(s3_path, year) - df = self._process_year(year) + df = self._process_subset(year) + + if df is None: + continue if self.mode == 'a' and parquet_exists: self._append_dataframe_to_geoparquet( @@ -386,10 +418,11 @@ def _get_station_bounds(self): ) return combined_bbox['combined_geometry'].extent - def _get_station_df(self, year): + def _get_station_df(self, year, month=None): station_hist_ids = Measurement.objects.filter( dataset_attribute__dataset=self.dataset, - date_time__year=year + date_time__year=year, + date_time__month=month ).distinct('station_history_id').values_list( 'station_history_id', flat=True @@ -414,6 +447,41 @@ def _get_station_df(self, year): df['altitude'] = df['altitude'].astype('double') return df + def run(self): + """Run the converter.""" + s3_path = self._get_directory_path(self.data_source) + + # get all distinct year and months + months = list(Measurement.objects.annotate( + month=TruncMonth('date_time') + ).filter( + dataset_attribute__dataset=self.dataset + ).order_by('month').distinct('month').values_list( + 'month', + flat=True + )) + + station_bbox = self._get_station_bounds() + for month_year in months: + year = month_year.year + month = month_year.month + parquet_exists = self._check_parquet_exists( + s3_path, year, month=month + ) + df = self._process_subset(year, month=month) + + if df is None: + continue + + if self.mode == 'a' and parquet_exists: + self._append_dataframe_to_geoparquet( + df, s3_path, station_bbox, year, month=month + ) + else: + self._store_dataframe_as_geoparquet( + df, s3_path, station_bbox, use_month=True + ) + class ParquetIngestorAppender(ParquetConverter): """Class to append data to parquet from Ingestor.""" @@ -440,6 +508,9 @@ def _process_date_range( f'{start_date} to {end_date} total_count: {measurements.count()}' ) + if measurements.count() == 0: + return None + measurements = measurements.annotate(**self.WEATHER_FIELDS).values( *(list(self.WEATHER_FIELDS.keys()) + ['value']) ) @@ -462,9 +533,79 @@ def run(self): parquet_exists = self._check_parquet_exists(s3_path, year) df = self._process_date_range(year, start_dt, end_dt) + if df is None: + continue + if self.mode == 'a' and parquet_exists: self._append_dataframe_to_geoparquet( df, s3_path, station_bbox, year ) else: self._store_dataframe_as_geoparquet(df, s3_path, station_bbox) + + +class WindborneParquetIngestorAppender(WindborneParquetConverter): + """Class to append data to parquet from Ingestor.""" + + def __init__( + self, dataset: Dataset, data_source: DataSourceFile, + start_date: datetime, end_date: datetime, mode='a' + ): + """Initialize ParquetIngestorAppender.""" + super().__init__(dataset, data_source, mode) + self.start_date = start_date + self.end_date = end_date + + def _process_date_range( + self, year: int, start_date: datetime, end_date: datetime + ): + measurements = Measurement.objects.filter( + dataset_attribute__dataset=self.dataset, + date_time__year=year, + date_time__gte=start_date, + date_time__lte=end_date + ).order_by('date_time', 'station_id') + print( + f'{start_date} to {end_date} total_count: {measurements.count()}' + ) + + if measurements.count() == 0: + return None + + measurements = measurements.annotate(**self.WEATHER_FIELDS).values( + *(list(self.WEATHER_FIELDS.keys()) + ['value']) + ) + + return self._process_weather_df( + year, list(measurements), month=start_date.month + ) + + def run(self): + """Run the converter.""" + s3_path = self._get_directory_path(self.data_source) + date_list = split_epochs_by_year_month( + int(self.start_date.timestamp()), + int(self.end_date.timestamp()), + ) + + station_bbox = self._get_station_bounds() + for year, month, start_epoch, end_epoch in date_list: + start_dt = datetime.fromtimestamp(start_epoch, tz=timezone.utc) + end_dt = datetime.fromtimestamp(end_epoch, tz=timezone.utc) + + parquet_exists = self._check_parquet_exists( + s3_path, year, month=month + ) + df = self._process_date_range(year, start_dt, end_dt) + + if df is None: + continue + + if self.mode == 'a' and parquet_exists: + self._append_dataframe_to_geoparquet( + df, s3_path, station_bbox, year, month=month + ) + else: + self._store_dataframe_as_geoparquet( + df, s3_path, station_bbox, use_month=True + )