diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 9952a6c1..b320d1bb 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -13,7 +13,7 @@ jobs: strategy: matrix: os: [ ubuntu-latest ] - python-version: [ '3.8', '3.9', '3.x' ] + python-version: ['3.8', '3.9', '3.x' ] steps: - name: checkout repository uses: actions/checkout@v2 diff --git a/examples/Sflux/gen_sflux_gfs2.py b/examples/Sflux/gen_sflux_gfs2.py index b7321f63..4499331f 100644 --- a/examples/Sflux/gen_sflux_gfs2.py +++ b/examples/Sflux/gen_sflux_gfs2.py @@ -2,14 +2,17 @@ from time import time import multiprocessing as mp import logging +import pathlib import numpy as np import pandas as pd +from matplotlib.transforms import Bbox from pyschism.mesh.hgrid import Hgrid from pyschism.forcing.nws.nws2.gfs2 import GFS from pyschism.dates import nearest_cycle + logging.basicConfig( format="[%(asctime)s] %(name)s %(levelname)s: %(message)s", force=True, @@ -20,16 +23,19 @@ logging.getLogger('pyschism').setLevel(log_level) if __name__ == "__main__": - t0 = time() - startdate = datetime(2021, 10, 13) - pscr = '/sciclone/pscr/lcui01/GFS/' - rnday = 5 - record = 1 + #now = datetime.now() + #last_cycle = np.datetime64(pd.DatetimeIndex([now-timedelta(hours=2)]).floor('6H').values[0], 'h').tolist() + #start = (last_cycle - timedelta(days=1)).replace(hour=0) + start = datetime(2023, 10, 1) + rnday = 10 + #record = 5 + #outdir = path = pathlib.Path('./GFS_2023') hgrid = Hgrid.open('../../static/hgrid.gr3', crs='epsg:4326') - - gfs = GFS(start_date=startdate, rnday=rnday, pscr=pscr, record=record, bbox=hgrid.bbox) - - print(f'It took {(time()-t0)/60} mins to process {rnday} days, {record*24} records/day') + pscr = '/sciclone/pscr/lcui01/GFS/' + gfs = GFS(level=1, pscr=pscr, bbox=hgrid.bbox) + gfs.write(start_date=start, rnday=rnday, air=True, prc=True, rad=True) + + print(f'It took {(time()-t0)/60} mins to process {rnday} days') diff --git a/examples/Sflux/gen_sflux_hrrr3.py b/examples/Sflux/gen_sflux_hrrr3.py index 45f2fed6..63557cd5 100644 --- a/examples/Sflux/gen_sflux_hrrr3.py +++ b/examples/Sflux/gen_sflux_hrrr3.py @@ -1,6 +1,11 @@ from datetime import datetime, timedelta from time import time import logging +import pathlib + +import numpy as np +import pandas as pd +from matplotlib.transforms import Bbox from pyschism.mesh.hgrid import Hgrid from pyschism.forcing.nws.nws2.hrrr3 import HRRR @@ -17,11 +22,24 @@ if __name__ == "__main__": t0 = time() - rnday = 1 - record = 1 - startdate = nearest_cycle() - hgrid = Hgrid.open('../../static/hgrid.gr3', crs='epsg:4326') + + #now = datetime.now() + #last_cycle = np.datetime64(pd.DatetimeIndex([now-timedelta(hours=2)]).floor('6H').values[0], 'h').tolist() + #print(last_cycle) + + #start = (last_cycle - timedelta(days=1)).replace(hour=0) + #end = last_cycle + timedelta(days=2) + + #rndelta = end -start + #rnday = rndelta.total_seconds() / 86400. + start = datetime(2023, 10, 26) + rnday = 2 + hgrid = Hgrid.open('./hgrid.gr3', crs='epsg:4326') + #outdir = path = pathlib.Path('./HRRR_2017_2') + #bbox = Bbox.from_extents(-162, 60.79, -143, 69.1) + pscr='/sciclone/pscr/lcui01/HRRR/' - hrrr = HRRR(start_date=startdate, rnday=rnday, pscr=pscr, record=record, bbox=hgrid.bbox) - print(f'It took {(time()-t0)/60} mins to process {rnday} days, {record*24} files/day') + hrrr = HRRR(level=2, region='alaska', pscr=pscr, bbox=bbox) + hrrr.write(start_date=start, rnday=rnday, air=True, prc=True, rad=True) + print(f'It took {(time()-t0)/60} mins to process {rnday} days!') diff --git a/pyschism/forcing/nws/nws2/gfs2.py b/pyschism/forcing/nws/nws2/gfs2.py index 5267fcd1..e5123741 100644 --- a/pyschism/forcing/nws/nws2/gfs2.py +++ b/pyschism/forcing/nws/nws2/gfs2.py @@ -1,3 +1,4 @@ +import os from datetime import datetime, timedelta import logging import pathlib @@ -27,44 +28,94 @@ def __init__( product='atmos', ): """ - This will download the GFS data. + Download GFS data from AWS. + This dataset GFS V16.3.0 starts on Feb 26, 2021. """ self.start_date = nearest_cycle() if start_date is None else start_date - self.record = record self.pscr = pscr self.product = product - self.forecast_cycle = self.start_date - self.cycle=self.forecast_cycle.hour - paginator=self.s3.get_paginator('list_objects_v2') - pages=paginator.paginate(Bucket=self.bucket, - Prefix=f'gfs.{self.forecast_cycle.strftime("%Y%m%d")}' - f'/{self.cycle:02d}/{self.product}/') + self.forecast_cycle = self.start_date - data=[] - for page in pages: - for obj in page['Contents']: - data.append(obj) + timevector = np.arange( + self.start_date, + self.start_date + timedelta(hours=record*24+1), + np.timedelta64(1, 'h') + ).astype(datetime) - file_metadata = list(sorted([ - _['Key'] for _ in data if '0p25' in _['Key'] and 'pgrb2' in _['Key'] and not 'goessim' in _['Key'] and not 'anl' in _['Key'] and not 'idx' in _['Key']])) + file_metadata = self.get_file_namelist(timevector) - for key in file_metadata[1:self.record*24+1]: - key2 = key + '.grib2' - filename = pathlib.Path(self.tmpdir) / key2 + for dt in timevector: + + outfile_name = f"gfs.{self.start_date.strftime('%Y%m%d')}/gfs.pgrb2.0p25.{dt.strftime('%Y%m%d%H')}.grib2" + filename = pathlib.Path(self.tmpdir) / outfile_name filename.parent.mkdir(parents=True, exist_ok=True) with open(filename, 'wb') as f: - logger.info(f'Downloading file {key}, ') - try: - self.s3.download_fileobj(self.bucket, key, f) - except: - logger.info(f'file {key} is not available') + while(file_metadata[dt]): + try: + key = file_metadata[dt].pop(0) + logger.info(f"Downloading file {key} for {dt}") + self.s3.download_fileobj(self.bucket, key, f) + logger.info("Success!") + break + except: + if not file_metadata[dt]: + logger.info(f'No file for {dt}') + if os.path.exists(filename): + os.remove(filename) + else: + logger.info(f'file {key} is not available, try next file') + continue + + def get_file_namelist(self, requested_dates): + + file_metadata = {} + hours = (datetime.utcnow() - self.start_date).days * 24 + (datetime.utcnow() - self.start_date).seconds // 3600 + n_cycles = hours // 6 if hours < 25 else 4 + cycle_index = (int(self.start_date.hour) - 1) // 6 + dt = requested_dates[0] + for it, dt in enumerate(requested_dates[:n_cycles*6+1]): + levels = 3 + i = 0 + fhour = int(dt.hour) + cycle_index = (fhour - 1) // 6 + date2 = (dt - timedelta(days=1)).strftime('%Y%m%d') if dt.hour == 0 else dt.strftime('%Y%m%d') + + while (levels): + + cycle = self.fcst_cycles[cycle_index - i] + fhour2 = fhour + i * 6 if cycle_index == 0 else fhour - cycle_index * 6 + i * 6 + + file_metadata.setdefault(dt, []).append(f"gfs.{date2}/{cycle}/{self.product}/gfs.t{cycle}z.pgrb2.0p25.f{fhour2:03d}") + levels -= 1 + i += 1 + + if it < 25: + date2 = (dt - timedelta(days=1)).strftime('%Y%m%d') if dt.hour == 0 else dt.strftime('%Y%m%d') + for it, dt in enumerate(requested_dates[n_cycles*6+1:]): + levels = 3 + i = 0 + hours = (dt - self.forecast_cycle).days * 24 + (dt - self.forecast_cycle).seconds // 3600 + + while (levels): + #starting from the last cycle + cycle = self.fcst_cycles[cycle_index - i] + fhour2 = (hours - (n_cycles - 1) * 6) + i * 6 + file_metadata.setdefault(dt, []).append(f"gfs.{date2}/{cycle}/{self.product}/gfs.t{cycle}z.pgrb2.0p25.f{fhour2:03d}") + levels -= 1 + i += 1 + + return file_metadata @property def bucket(self): return 'noaa-gfs-bdp-pds' + @property + def fcst_cycles(self): + return ['00', '06', '12', '18'] + @property def s3(self): try: @@ -82,42 +133,63 @@ def tmpdir(self): @property def files(self): - grbfiles=glob.glob(f'{self.tmpdir}/gfs.{self.forecast_cycle.strftime("%Y%m%d")}/{self.cycle:02d}/{self.product}/gfs.t{self.cycle:02d}z.pgrb2.0p25.f*.grib2') + grbfiles=glob.glob(f'{self.tmpdir}/gfs.{self.forecast_cycle.strftime("%Y%m%d")}/gfs.pgrb2.0p25.*.grib2') grbfiles.sort() return grbfiles class GFS: - def __init__(self, start_date=None, rnday=None, pscr=None, record=1, bbox=None, outdir=None): - start_date = nearest_cycle() if start_date is None else start_date - logger.info(f'start_date is {start_date}') + def __init__(self, level=1, bbox=None, pscr=None, outdir=None): + self.level = level self.bbox = bbox + self.pscr = pscr + self.outdir = outdir + self.record = 1 - if outdir is None: - self.path = pathlib.Path(start_date.strftime("%Y%m%d")) - self.path.mkdir(parents=True, exist_ok=True) - else: - self.path = outdir + def write(self, start_date, rnday, air: bool=True, prc: bool=True, rad: bool=True): - end_date = start_date + timedelta(days=rnday) + start_date = nearest_cycle() if start_date is None else start_date + + if (start_date + timedelta(days=rnday)) > datetime.utcnow(): + logger.info(f'End date is beyond the current time, set rnday to 1 day and record to 5 days') + rnday = 1 + self.record = 5 #days + + end_date = start_date + timedelta(hours=rnday * self.record + 1) + logger.info(f'start time is {start_date}, end time is {end_date}') + + if self.outdir is None: + #self.outdir = pathlib.Path(start_date.strftime("%Y%m%d")) + self.outdir = pathlib.Path("sflux") + self.outdir.mkdir(parents=True, exist_ok=True) - datevector = np.arange(start_date, end_date, np.timedelta64(1, 'D'), - dtype='datetime64') + datevector = np.arange( + start_date, + start_date + timedelta(days=rnday), + np.timedelta64(1, 'D'), + dtype='datetime64', + ) datevector = pd.to_datetime(datevector) npool = len(datevector) if len(datevector) < mp.cpu_count()/2 else mp.cpu_count()/2 logger.info(f'npool is {npool}') pool = mp.Pool(int(npool)) - pool.starmap(self.gen_sflux, [(date, record, pscr) for date in datevector]) - + pool.starmap(self.gen_sflux, [(istack+1, date, air, prc, rad) for istack, date in enumerate(datevector)]) pool.close() - - def gen_sflux(self, date, record, pscr): - - inventory = AWSGrib2Inventory(date, record, pscr) + #self.gen_sflux(1, datevector[0], air, prc, rad) + + def gen_sflux( + self, + istack, + date, + air: bool = True, + prc: bool = True, + rad: bool = True, + ): + + inventory = AWSGrib2Inventory(date, self.record, self.pscr) grbfiles = inventory.files - cycle = date.hour - + #cycle = date.hour prate = [] dlwrf = [] @@ -139,6 +211,7 @@ def gen_sflux(self, date, record, pscr): #Get lon/lat lon, lat, idx_ymin, idx_ymax, idx_xmin, idx_xmax = self.modified_latlon(grbfiles[0]) + times = [] for ifile, file in enumerate(grbfiles): logger.info(f'file {ifile} is {file}') @@ -159,6 +232,7 @@ def gen_sflux(self, date, record, pscr): for key2, value2 in value.items(): tmp = ds[key2][idx_ymin:idx_ymax+1, idx_xmin:idx_xmax+1].astype('float32') value2[1].append(tmp[::-1,:]) + times.append(ds.valid_time.values) ds.close() elif key == 'group3': @@ -179,90 +253,154 @@ def gen_sflux(self, date, record, pscr): value2[1].append(tmp[::-1, :]) ds.close() - fout = xr.Dataset({'stmp': (['time', 'ny_grid', 'nx_grid'], np.array(stmp)), + #write to netcdf + bdate = date.strftime('%Y %m %d %H').split(' ') + bdate = [int(q) for q in bdate[:4]] + [0] + + if air: + ds = xr.Dataset({ + 'stmp': (['time', 'ny_grid', 'nx_grid'], np.array(stmp)), 'spfh': (['time', 'ny_grid', 'nx_grid'], np.array(spfh)), 'uwind': (['time', 'ny_grid', 'nx_grid'], np.array(uwind)), 'vwind': (['time', 'ny_grid', 'nx_grid'], np.array(vwind)), 'prmsl': (['time', 'ny_grid', 'nx_grid'], np.array(prmsl)), + }, + coords={ + 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), + 'lon': (['ny_grid', 'nx_grid'], lon), + 'lat': (['ny_grid', 'nx_grid'], lat) + }) + + ds.time.attrs = { + 'long_name': 'Time', + 'standard_name': 'time', + 'base_date': bdate, + 'units': f"days since {date.strftime('%Y-%m-%d %H:00')} UTC", + } + + ds.lat.attrs = { + 'units': 'degrees_north', + 'long_name': 'Latitude', + 'standard_name': 'latitude', + } + + ds.lon.attrs = { + 'units': 'degrees_east', + 'long_name': 'Longitude', + 'standard_name': 'longitude', + } + + ds.uwind.attrs={ + 'units': 'm/s', + 'long_name': '10m_above_ground/UGRD', + 'standard_name':'eastward_wind' + } + + ds.vwind.attrs={ + 'units': 'm/s', + 'long_name': '10m_above_ground/VGRD', + 'standard_name':'northward_wind' + } + + ds.spfh.attrs={ + 'units': 'kg kg-1', + 'long_name': '2m_above_ground/Specific Humidity', + 'standard_name':'specific_humidity' + } + + ds.prmsl.attrs = { + 'units': 'Pa', + 'long_name': 'Pressure reduced to MSL', + 'standard_name': 'air_pressure_at_sea_level' + } + + ds.stmp.attrs={ + 'units': 'K', + 'long_name': '2m_above_ground/Temperature', + } + + ds.to_netcdf(f'{self.outdir}/sflux_air_{self.level}.{istack:04d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') + ds.close() + + if prc: + ds = xr.Dataset({ 'prate': (['time', 'ny_grid', 'nx_grid'], np.array(prate)), + }, + coords={ + 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), + 'lon': (['ny_grid', 'nx_grid'], lon), + 'lat': (['ny_grid', 'nx_grid'], lat) + }) + + ds.time.attrs = { + 'long_name': 'Time', + 'standard_name': 'time', + 'base_date': bdate, + 'units': f"days since {date.strftime('%Y-%m-%d %H:00')} UTC" + } + + ds.lat.attrs = { + 'units': 'degrees_north', + 'long_name': 'Latitude', + 'standard_name': 'latitude', + } + + ds.lon.attrs = { + 'units': 'degrees_east', + 'long_name': 'Longitude', + 'standard_name': 'longitude', + } + + ds.prate.attrs={ + 'units': 'kg m-2 s-1', + 'long_name': 'Precipitation rate' + } + + ds.to_netcdf(f'{self.outdir}/sflux_prc_{self.level}.{istack:04d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') + ds.close() + + if rad: + ds = xr.Dataset({ 'dlwrf': (['time', 'ny_grid', 'nx_grid'], np.array(dlwrf)), 'dswrf': (['time', 'ny_grid', 'nx_grid'], np.array(dswrf)), - }, + }, coords={ - 'time': np.round(np.arange(1, len(grbfiles)+1)/24, 4).astype('float32'), + 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), 'lon': (['ny_grid', 'nx_grid'], lon), 'lat': (['ny_grid', 'nx_grid'], lat) - }) - - bdate = date.strftime('%Y %m %d %H').split(' ') - bdate = [int(q) for q in bdate[:4]] + [0] - - fout.time.attrs = { - 'long_name': 'Time', - 'standard_name': 'time', - 'base_date': bdate, - 'units': f"days since {date.year}-{date.month}-{date.day} {cycle:02d}:00 UTC" - } - - fout.lat.attrs = { - 'units': 'degrees_north', - 'long_name': 'Latitude', - 'standard_name': 'latitude', - } - - fout.lon.attrs = { - 'units': 'degrees_east', - 'long_name': 'Longitude', - 'standard_name': 'longitude', - } - - fout.uwind.attrs={ - 'units': 'm/s', - 'long_name': '10m_above_ground/UGRD', - 'standard_name':'eastward_wind' - } - - fout.vwind.attrs={ - 'units': 'm/s', - 'long_name': '10m_above_ground/VGRD', - 'standard_name':'northward_wind' - } - - - fout.spfh.attrs={ - 'units': 'kg kg-1', - 'long_name': '2m_above_ground/Specific Humidity', - 'standard_name':'specific_humidity' - } - - fout.prmsl.attrs = { - 'units': 'Pa', - 'long_name': 'Pressure reduced to MSL', - 'standard_name': 'air_pressure_at_sea_level' - } - - fout.stmp.attrs={ - 'units': 'K', - 'long_name': '2m_above_ground/Temperature', - } - - fout.prate.attrs={ - 'units': 'kg m-2 s-1', - 'long_name': 'Precipitation rate' - } - - fout.dlwrf.attrs = { - 'units': 'W m-2', - 'long_name': 'Downward short-wave radiation flux' - } - - fout.dswrf.attrs = { - 'units': 'W m-2', - 'long_name': 'Downward long-wave radiation flux' - } - - fout.to_netcdf(self.path / f'gfs_{date.strftime("%Y%m%d")}{cycle:02d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') - + }) + + ds.time.attrs = { + 'long_name': 'Time', + 'standard_name': 'time', + 'base_date': bdate, + 'units': f"days since {date.strftime('%Y-%m-%d %H:00')} UTC" + } + + ds.lat.attrs = { + 'units': 'degrees_north', + 'long_name': 'Latitude', + 'standard_name': 'latitude', + } + + ds.lon.attrs = { + 'units': 'degrees_east', + 'long_name': 'Longitude', + 'standard_name': 'longitude', + } + + ds.dlwrf.attrs = { + 'units': 'W m-2', + 'long_name': 'Downward short-wave radiation flux' + } + + ds.dswrf.attrs = { + 'units': 'W m-2', + 'long_name': 'Downward long-wave radiation flux' + } + + ds.to_netcdf(f'{self.outdir}/sflux_rad_{self.level}.{istack:04d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') + ds.close() def modified_latlon(self, grbfile): xmin, xmax, ymin, ymax = self.bbox.xmin, self.bbox.xmax, self.bbox.ymin, self.bbox.ymax diff --git a/pyschism/forcing/nws/nws2/hrrr3.py b/pyschism/forcing/nws/nws2/hrrr3.py index 0389cb3a..bcbd12e4 100644 --- a/pyschism/forcing/nws/nws2/hrrr3.py +++ b/pyschism/forcing/nws/nws2/hrrr3.py @@ -26,7 +26,7 @@ class AWSGrib2Inventory: def __init__( self, start_date: datetime = None, - record = 2, + record = 1, pscr = None, #tmpdir to save grib files product='conus', ): @@ -36,64 +36,90 @@ def __init__( self.forecast_cycle = self.start_date #nearest_cycle() - # paginator=self.s3.get_paginator('list_objects_v2') - # pages=paginator.paginate(Bucket=self.bucket, - # Prefix=f'hrrr.{self.forecast_cycle.strftime("%Y%m%d")}' - # f'/{self.product}/') - # data=[] - # for page in pages: - # for obj in page['Contents']: - # data.append(obj) - - # self.cycle=self.forecast_cycle.hour - # tz='t{:02d}z'.format(self.cycle) - # self.file_metadata = list(sorted([ - # _['Key'] for _ in data if 'wrfsfcf' in _['Key'] and tz in _['Key'] and not 'idx' in _['Key'] - # ])) - self.get_file_namelist(self.forecast_cycle) - - while (not self.file_metadata): - logger.info(f'No data for cycle t{self.forecast_cycle.hour:02d}z on {self.forecast_cycle}, try next cycle: +1!') - self.forecast_cycle += timedelta(hours=1) - self.get_file_namelist(self.forecast_cycle) - - for key in self.file_metadata[1:record*24+1]: - filename = pathlib.Path(self.tmpdir) / key + timevector = np.arange( + self.start_date, + self.start_date + timedelta(hours=record*24+1), + np.timedelta64(1, 'h') + ).astype(datetime) + + file_metadata = self.get_file_namelist(timevector) + + for dt in timevector: + + outfile_name = f"hrrr.{self.start_date.strftime('%Y%m%d')}/hrrr_wrfsfcf{dt.strftime('%Y%m%d%H')}.grib2" + filename = pathlib.Path(self.tmpdir) / outfile_name filename.parent.mkdir(parents=True, exist_ok=True) with open(filename, 'wb') as f: - logger.info(f'Downloading file {key}, ') - try: - self.s3.download_fileobj(self.bucket, key, f) - except: - logger.info(f'file {key} is not available') - - def get_file_namelist(self, requested_date): - paginator=self.s3.get_paginator('list_objects_v2') - pages=paginator.paginate(Bucket=self.bucket, - Prefix=f'hrrr.{requested_date.strftime("%Y%m%d")}' - f'/{self.product}/') - data=[] - for page in pages: - for obj in page['Contents']: - data.append(obj) - - self.cycle = requested_date.hour - tz='t{:02d}z'.format(self.cycle) - self.file_metadata = list(sorted([ - _['Key'] for _ in data if 'wrfsfcf' in _['Key'] and tz in _['Key'] and not 'idx' in _['Key'] - ])) - return self.file_metadata + while(file_metadata[dt]): + try: + key = file_metadata[dt].pop(0) + logger.info(f"Downloading file {key} for {dt}") + self.s3.download_fileobj(self.bucket, key, f) + logger.info("Success!") + break + except: + if not file_metadata[dt]: + logger.info(f'No file for {dt}') + if os.path.exists(filename): + os.remove(filename) + else: + logger.info(f'file {key} is not available, try next file') + continue + + def get_file_namelist(self, requested_dates): + + file_metadata = {} + file_metadata = {} + hours = (datetime.utcnow() - self.start_date).days * 24 + (datetime.utcnow() - self.start_date).seconds // 3600 + n_cycles = hours // 6 if hours < 25 else 4 + cycle_index = (int(self.start_date.hour) - 1) // 6 + dt = requested_dates[0] + for it, dt in enumerate(requested_dates[:n_cycles*6+1]): + levels = 3 + i = 0 + fhour = int(dt.hour) + cycle_index = (fhour - 1) // 6 + date2 = (dt - timedelta(days=1)).strftime('%Y%m%d') if dt.hour == 0 else dt.strftime('%Y%m%d') + + while (levels): + + cycle = self.fcst_cycles[cycle_index - i] + fhour2 = fhour + i * 6 if cycle_index == 0 else fhour - cycle_index * 6 + i * 6 + if self.product == 'conus': + file_metadata.setdefault(dt, []).append(f"hrrr.{date2}/{self.product}/hrrr.t{cycle}z.wrfsfcf{fhour2:02d}.grib2") + elif self.product == 'alaska': + file_metadata.setdefault(dt, []).append(f"hrrr.{date2}/{self.product}/hrrr.t{cycle}z.wrfsfcf{fhour2:02d}.ak.grib2") + levels -= 1 + i += 1 + + if it < 25: + date2 = (dt - timedelta(days=1)).strftime('%Y%m%d') if dt.hour == 0 else dt.strftime('%Y%m%d') + for it, dt in enumerate(requested_dates[n_cycles*6+1:]): + levels = 3 + i = 0 + hours = (dt - self.forecast_cycle).days * 24 + (dt - self.forecast_cycle).seconds // 3600 + + while (levels): + cycle = self.fcst_cycles[cycle_index - i] + fhour2 = ( hours - (n_cycles - 1) * 6) + i * 6 + if self.product == 'conus': + file_metadata.setdefault(dt, []).append(f"hrrr.{date2}/{self.product}/hrrr.t{cycle}z.wrfsfcf{fhour2:02d}.grib2") + elif self.product == 'alaska': + file_metadata.setdefault(dt, []).append(f"hrrr.{date2}/{self.product}/hrrr.t{cycle}z.wrfsfcf{fhour2:02d}.ak.grib2") + + levels -= 1 + i += 1 + + return file_metadata @property def bucket(self): return 'noaa-hrrr-bdp-pds' @property - def output_interval(self) -> timedelta: - return { - 'conus': timedelta(hours=1) - }[self.product] + def fcst_cycles(self): + return ['00', '06', '12', '18'] @property def s3(self): @@ -112,28 +138,44 @@ def tmpdir(self): @property def files(self): - grbfiles=glob.glob(f'{self.tmpdir}/hrrr.{self.forecast_cycle.strftime("%Y%m%d")}/conus/hrrr.t{self.cycle:02d}z.wrfsfcf*.grib2') + grbfiles=glob.glob(f'{self.tmpdir}/hrrr.{self.forecast_cycle.strftime("%Y%m%d")}/hrrr_wrfsfcf*.grib2') grbfiles.sort() return grbfiles class HRRR: - def __init__(self, start_date=None, rnday=None, pscr=None, record=2, bbox=None, outdir=None): + def __init__(self, level=2, region='conus', bbox=None, pscr=None, outdir=None): - start_date = nearest_cycle() if start_date is None else start_date + self.level = level + self.region = region self.bbox = bbox + self.pscr = pscr + self.outdir = outdir + self.record = 1 - if outdir is None: - self.path = pathlib.Path(start_date.strftime("%Y%m%d")) - self.path.mkdir(parents=True, exist_ok=True) - else: - self.path = outdir + def write(self, start_date, rnday, air: bool=True, prc: bool=True, rad: bool=True): - end_date = start_date + timedelta(days=rnday) - logger.info(f'start_date is {start_date}, end_date is {end_date}') + start_date = nearest_cycle() if start_date is None else start_date + + if (start_date + timedelta(days=rnday)) > datetime.utcnow(): + logger.info(f"End date is beyond current time, set rnday to one day and record to 2 days") + rnday = 1 + self.record = 2 #days + + end_date = start_date + timedelta(hours=rnday * self.record + 1) + logger.info(f'start time is {start_date}, end time is {end_date}') + + if self.outdir is None: + #self.outdir = pathlib.Path(start_date.strftime("%Y%m%d")) + self.outdir = pathlib.Path("sflux") + self.outdir.mkdir(parents=True, exist_ok=True) - datevector = np.arange(start_date, end_date, np.timedelta64(1, 'D'), - dtype='datetime64') + datevector = np.arange( + start_date, + start_date + timedelta(days=rnday), + np.timedelta64(1, 'D'), + dtype='datetime64', + ) datevector = pd.to_datetime(datevector) #Use all CPUs may cause memory issue @@ -141,16 +183,23 @@ def __init__(self, start_date=None, rnday=None, pscr=None, record=2, bbox=None, logger.info(f'npool is {npool}') pool = mp.Pool(int(npool)) - pool.starmap(self.gen_sflux, [(date, record, pscr) for date in datevector]) + pool.starmap(self.gen_sflux, [(istack+1, date, air, prc, rad) for istack, date in enumerate(datevector)]) #self.gen_sflux(datevector[0], record, pscr) pool.close() - def gen_sflux(self, date, record, pscr): - - inventory = AWSGrib2Inventory(date, record, pscr) + def gen_sflux( + self, + istack, + date, + air: bool = True, + prc: bool = True, + rad: bool = True, + ): + + inventory = AWSGrib2Inventory(date, self.record, self.pscr, self.region) grbfiles = inventory.files - cycle = date.hour + #cycle = date.hour stmp = [] spfh = [] @@ -169,7 +218,6 @@ def gen_sflux(self, date, record, pscr): #Get lon/lat lon, lat, idx_ymin, idx_ymax, idx_xmin, idx_xmax = self.modified_latlon(grbfiles[0]) - times = [] for ifile, fname in enumerate(grbfiles): @@ -215,89 +263,162 @@ def gen_sflux(self, date, record, pscr): ds.close() #write netcdf - fout = xr.Dataset({ - 'stmp': (['time', 'ny_grid', 'nx_grid'], np.array(stmp)), - 'spfh': (['time', 'ny_grid', 'nx_grid'], np.array(spfh)), - 'uwind': (['time', 'ny_grid', 'nx_grid'], np.array(uwind)), - 'vwind': (['time', 'ny_grid', 'nx_grid'], np.array(vwind)), - 'prmsl': (['time', 'ny_grid', 'nx_grid'], np.array(prmsl)), - 'prate': (['time', 'ny_grid', 'nx_grid'], np.array(prate)), - 'dlwrf': (['time', 'ny_grid', 'nx_grid'], np.array(dlwrf)), - 'dswrf': (['time', 'ny_grid', 'nx_grid'], np.array(dswrf)), - }, - coords={ - 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), - 'lon': (['ny_grid', 'nx_grid'], lon), - 'lat': (['ny_grid', 'nx_grid'], lat)}) - - #date_string = np.datetime_as_string(times[0], unit='m') - #bdate = datetime.strptime(date_string, '%Y-%m-%dT%H:%M') bdate = date.strftime('%Y %m %d %H').split(' ') bdate = [int(q) for q in bdate[:4]] + [0] - fout.time.attrs = { - 'long_name': 'Time', - 'standard_name': 'time', - 'base_date': bdate, - 'units': f'days since {date.year}-{date.month}-{date.day} {cycle:02d}:00 UTC' - } - - fout.lat.attrs = { - 'units': 'degrees_north', - 'long_name': 'Latitude', - 'standard_name': 'latitude', - } - - fout.lon.attrs = { - 'units': 'degrees_east', - 'long_name': 'Longitude', - 'standard_name': 'longitude', - } + if air: + ds = xr.Dataset( + { + 'stmp': (['time', 'ny_grid', 'nx_grid'], np.array(stmp)), + 'spfh': (['time', 'ny_grid', 'nx_grid'], np.array(spfh)), + 'uwind': (['time', 'ny_grid', 'nx_grid'], np.array(uwind)), + 'vwind': (['time', 'ny_grid', 'nx_grid'], np.array(vwind)), + 'prmsl': (['time', 'ny_grid', 'nx_grid'], np.array(prmsl)), + }, + coords = { + 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), + 'lon': (['ny_grid', 'nx_grid'], lon), + 'lat': (['ny_grid', 'nx_grid'], lat) + } + ) - fout.uwind.attrs={ - 'units': 'm/s', - 'long_name': '10m_above_ground/UGRD', - 'standard_name':'eastward_wind' - } + ds.time.attrs = { + 'long_name': 'Time', + 'standard_name': 'time', + 'base_date': bdate, + 'units': f"days since {date.strftime('%Y-%m-%d %H:00')} UTC", + } + + ds.lat.attrs = { + 'units': 'degrees_north', + 'long_name': 'Latitude', + 'standard_name': 'latitude', + } + + ds.lon.attrs = { + 'units': 'degrees_east', + 'long_name': 'Longitude', + 'standard_name': 'longitude', + } + + ds.uwind.attrs={ + 'units': 'm/s', + 'long_name': '10m_above_ground/UGRD', + 'standard_name':'eastward_wind' + } - fout.vwind.attrs={ - 'units': 'm/s', - 'long_name': '10m_above_ground/UGRD', - 'standard_name':'northward_wind' - } - - fout.spfh.attrs={ - 'units': 'kg kg-1', - 'long_name': '2m_above_ground/Specific Humidity', - 'standard_name':'specific_humidity' - } + ds.vwind.attrs={ + 'units': 'm/s', + 'long_name': '10m_above_ground/UGRD', + 'standard_name':'northward_wind' + } + + ds.spfh.attrs={ + 'units': 'kg kg-1', + 'long_name': '2m_above_ground/Specific Humidity', + 'standard_name':'specific_humidity' + } + + ds.prmsl.attrs = { + 'units': 'Pa', + 'long_name': 'Pressure reduced to MSL', + 'standard_name': 'air_pressure_at_sea_level' + } + + ds.stmp.attrs={ + 'units': 'K', + 'long_name': '2m_above_ground/Temperature', + } + + #write to file + ds.to_netcdf(self.outdir / f'sflux_air_{self.level}.{istack:04d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') + ds.close() - fout.prmsl.attrs = { - 'units': 'Pa', - 'long_name': 'Pressure reduced to MSL', - 'standard_name': 'air_pressure_at_sea_level' - } + if prc: + ds = xr.Dataset( + { + 'prate': (['time', 'ny_grid', 'nx_grid'], np.array(prate)), + }, + coords = { + 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), + 'lon': (['ny_grid', 'nx_grid'], lon), + 'lat': (['ny_grid', 'nx_grid'], lat) + } + ) - fout.stmp.attrs={ - 'units': 'K', - 'long_name': '2m_above_ground/Temperature', - } + ds.time.attrs = { + 'long_name': 'Time', + 'standard_name': 'time', + 'base_date': bdate, + 'units': f"days since {date.strftime('%Y-%m-%d %H:00')} UTC", + } + + ds.lat.attrs = { + 'units': 'degrees_north', + 'long_name': 'Latitude', + 'standard_name': 'latitude', + } + + ds.lon.attrs = { + 'units': 'degrees_east', + 'long_name': 'Longitude', + 'standard_name': 'longitude', + } + + ds.prate.attrs={ + 'units': 'kg m-2 s-1', + 'long_name': 'Precipitation rate' + } + + #write to file + ds.to_netcdf(self.outdir / f'sflux_prc_{self.level}.{istack:04d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') + ds.close() - fout.prate.attrs={ - 'units': 'kg m-2 s-1', - 'long_name': 'Precipitation rate' - } - fout.dlwrf.attrs = { - 'units': 'W m-2', - 'long_name': 'Downward short-wave radiation flux' - } + if rad: + ds = xr.Dataset( + { + 'dlwrf': (['time', 'ny_grid', 'nx_grid'], np.array(dlwrf)), + 'dswrf': (['time', 'ny_grid', 'nx_grid'], np.array(dswrf)), + }, + coords = { + 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), + 'lon': (['ny_grid', 'nx_grid'], lon), + 'lat': (['ny_grid', 'nx_grid'], lat) + } + ) - fout.dswrf.attrs = { - 'units': 'W m-2', - 'long_name': 'Downward long-wave radiation flux' - } + ds.time.attrs = { + 'long_name': 'Time', + 'standard_name': 'time', + 'base_date': bdate, + 'units': f"days since {date.strftime('%Y-%m-%d %H:00')} UTC", + } + + ds.lat.attrs = { + 'units': 'degrees_north', + 'long_name': 'Latitude', + 'standard_name': 'latitude', + } + + ds.lon.attrs = { + 'units': 'degrees_east', + 'long_name': 'Longitude', + 'standard_name': 'longitude', + } + + ds.dlwrf.attrs = { + 'units': 'W m-2', + 'long_name': 'Downward short-wave radiation flux' + } + + ds.dswrf.attrs = { + 'units': 'W m-2', + 'long_name': 'Downward long-wave radiation flux' + } - fout.to_netcdf(self.path / f'hrrr_{date.strftime("%Y%m%d")}{cycle:02d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') + #write to file + ds.to_netcdf(self.outdir / f'sflux_rad_{self.level}.{istack:04d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') + ds.close() def modified_latlon(self, grbfile): diff --git a/setup.py b/setup.py index be27b4ac..755978dd 100755 --- a/setup.py +++ b/setup.py @@ -93,7 +93,7 @@ def run(self): long_description_content_type="text/markdown", url=meta['url'], packages=setuptools.find_packages(exclude=['tests', 'examples', 'docs', 'docker']), - python_requires='>3.6', + python_requires='>=3.8', setup_requires=['wheel', 'setuptools_scm', 'setuptools>=41.2', 'netcdf-flattener>=1.2.0'], include_package_data=True,