Skip to content

Commit

Permalink
Merge pull request #42 from UNSW-CEEM/new_aemo_mms_naming_convention
Browse files Browse the repository at this point in the history
New aemo mms naming convention
  • Loading branch information
nick-gorman authored Oct 22, 2024
2 parents 00e7f52 + fb05b1a commit 151d397
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 188 deletions.
176 changes: 96 additions & 80 deletions nemosis/data_fetch_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,16 +359,22 @@ def _get_read_function(fformat, table_type, day):
return func


def _count_csv_lines(file_path):
with open(file_path, 'rb') as f:
return sum(1 for _ in f)


def _read_mms_csv(path_and_name, dtype=None, usecols=None, nrows=None, names=None):
last_line_number = _count_csv_lines(path_and_name) - 1
data = _pd.read_csv(
path_and_name,
skiprows=[0],
skiprows=[0, last_line_number],
dtype=dtype,
usecols=usecols,
nrows=nrows,
names=names,
)
return data[:-1]
return data


def _read_constructed_csv(
Expand Down Expand Up @@ -545,81 +551,90 @@ def _dynamic_data_fetch_loop(
date_gen = _processing_info_maps.date_gen[table_type](start_search, end_time)

for year, month, day, index in date_gen:
filename_stub, full_filename, path_and_name = _create_filename(
table_name, table_type, raw_data_location, fformat, day, month, year, index
)
check_for_next_data_chunk = True
chunk = 0
while check_for_next_data_chunk:
chunk += 1

if not (
_glob.glob(full_filename) or _glob.glob(path_and_name + ".[cC][sS][vV]")
) or (not _glob.glob(path_and_name + ".[cC][sS][vV]") and rebuild):
_download_data(
table_name,
table_type,
filename_stub,
day,
month,
year,
index,
raw_data_location,
filename_stub, full_filename, path_and_name = _create_filename(
table_name, table_type, raw_data_location, fformat, day, month, year, chunk, index
)

if _glob.glob(full_filename) and fformat != "csv" and not rebuild:
if not caching_mode:
data = _get_read_function(fformat, table_type, day)(full_filename)
else:
data = None
logger.info(
f"Cache for {table_name} in date range already compiled in"
+ f" {raw_data_location}."
if not (
_glob.glob(full_filename) or _glob.glob(path_and_name + ".[cC][sS][vV]")
) or (not _glob.glob(path_and_name + ".[cC][sS][vV]") and rebuild):
_download_data(
table_name,
table_type,
filename_stub,
day,
month,
year,
chunk,
index,
raw_data_location,
)

elif _glob.glob(path_and_name + ".[cC][sS][vV]"):
if _glob.glob(full_filename) and fformat != "csv" and not rebuild:
if not caching_mode:
data = _get_read_function(fformat, table_type, day)(full_filename)
else:
data = None
logger.info(
f"Cache for {table_name} in date range already compiled in"
+ f" {raw_data_location}."
)

if select_columns != "all":
read_all_columns = False
else:
read_all_columns = True
elif _glob.glob(path_and_name + ".[cC][sS][vV]"):

if not caching_mode:
dtypes = "str"
else:
dtypes = "all"
if select_columns != "all":
read_all_columns = False
else:
read_all_columns = True

csv_path_and_name = _glob.glob(path_and_name + ".[cC][sS][vV]")[0]
if not caching_mode:
dtypes = "str"
else:
dtypes = "all"

csv_read_function = _get_read_function(
fformat="csv", table_type=table_type, day=day
)
data = _determine_columns_and_read_csv(
table_name,
csv_path_and_name,
csv_read_function,
read_all_columns=read_all_columns,
dtypes=dtypes,
)
csv_path_and_name = _glob.glob(path_and_name + ".[cC][sS][vV]")[0]

if caching_mode:
data = _perform_column_selection(data, select_columns, full_filename)
csv_read_function = _get_read_function(
fformat="csv", table_type=table_type, day=day
)
data = _determine_columns_and_read_csv(
table_name,
csv_path_and_name,
csv_read_function,
read_all_columns=read_all_columns,
dtypes=dtypes,
)

if data is not None and fformat != "csv":
_log_file_creation_message(fformat, table_name, year, month, day, index)
_write_to_format(data, fformat, full_filename, write_kwargs)
if caching_mode:
data = _perform_column_selection(data, select_columns, full_filename)

if not keep_csv:
_os.remove(_glob.glob(path_and_name + ".[cC][sS][vV]")[0])
else:
data = None
if data is not None and fformat != "csv":
_log_file_creation_message(fformat, table_name, year, month, day, index)
_write_to_format(data, fformat, full_filename, write_kwargs)

if not caching_mode and data is not None:
if not keep_csv:
_os.remove(_glob.glob(path_and_name + ".[cC][sS][vV]")[0])
else:
data = None

if date_filter is not None:
data = date_filter(data, start_time, end_time)
if not caching_mode and data is not None:

data = _perform_column_selection(data, select_columns, full_filename)
if date_filter is not None:
data = date_filter(data, start_time, end_time)

data = _perform_column_selection(data, select_columns, full_filename)

data_tables.append(data)
elif not caching_mode:
logger.warning(f"Loading data from {full_filename} failed.")
data_tables.append(data)
elif not caching_mode and chunk == 1:
logger.warning(f"Loading data from {full_filename} failed.")

if data is None or '#' not in filename_stub:
check_for_next_data_chunk = False

return data_tables

Expand All @@ -642,7 +657,7 @@ def _perform_column_selection(data, select_columns, full_filename):


def _create_filename(
table_name, table_type, raw_data_location, fformat, day, month, year, index
table_name, table_type, raw_data_location, fformat, day, month, year, chunk, index
):
"""
Gather:
Expand All @@ -652,7 +667,7 @@ def _create_filename(
Returns: filename_stub, full_filename and path_and_name
"""
filename_stub, path_and_name = _processing_info_maps.write_filename[table_type](
table_name, month, year, day, index, raw_data_location
table_name, month, year, day, chunk, index, raw_data_location
)
full_filename = path_and_name + f".{fformat}"
return filename_stub, full_filename, path_and_name
Expand Down Expand Up @@ -751,31 +766,32 @@ def _write_to_format(data, fformat, full_filename, write_kwargs):


def _download_data(
table_name, table_type, filename_stub, day, month, year, index, raw_data_location
table_name, table_type, filename_stub, day, month, year, chunk, index, raw_data_location
):
"""
Dispatch table to downloader to be downloaded.
Returns: nothing
"""
if day is None:
logger.info(
f"Downloading data for table {table_name}, " + f"year {year}, month {month}"
)
elif index is None:
logger.info(
f"Downloading data for table {table_name}, "
+ f"year {year}, month {month}, day {day}"
)
else:
logger.info(
f"Downloading data for table {table_name}, "
+ f"year {year}, month {month}, day {day},"
+ f"time {index}."
)
if chunk == 1:
if day is None:
logger.info(
f"Downloading data for table {table_name}, " + f"year {year}, month {month}"
)
elif index is None:
logger.info(
f"Downloading data for table {table_name}, "
+ f"year {year}, month {month}, day {day}"
)
else:
logger.info(
f"Downloading data for table {table_name}, "
+ f"year {year}, month {month}, day {day},"
+ f"time {index}."
)

_processing_info_maps.downloader[table_type](
year, month, day, index, filename_stub, raw_data_location
year, month, day, chunk, index, filename_stub, raw_data_location
)
return

Expand Down
1 change: 1 addition & 0 deletions nemosis/date_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

logger = logging.getLogger(__name__)


def year_and_month_gen(start_time, end_time):

if start_time.day == 1 and start_time.hour == 0 and start_time.minute == 0:
Expand Down
4 changes: 2 additions & 2 deletions nemosis/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@
"FCAS_4s_SCADA_MAP": None,
"MARKET_PRICE_THRESHOLDS": "EFFECTIVEDATE",
"DAILY_REGION_SUMMARY": "SETTLEMENTDATE",
"ROOFTOP_PV_ACTUAL": "INTERVAL_DATETIME",
"ROOFTOP_PV_ACTUAL": "INTERVAL_DATETIME"
}

reg_exemption_list_tabs = {
Expand Down Expand Up @@ -888,4 +888,4 @@
join_type = ["inner", "left", "right"]

# Testing settings
raw_data_cache = "D:/nemosis_test_cache"
raw_data_cache = "/media/nick/Samsung_T5/nemosis_test_cache"
17 changes: 10 additions & 7 deletions nemosis/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import zipfile
import io
import pandas as pd
from urllib.parse import quote

from . import defaults, custom_errors

Expand All @@ -20,7 +21,7 @@
}


def run(year, month, day, index, filename_stub, down_load_to):
def run(year, month, day, chunk, index, filename_stub, down_load_to):
"""This function"""

url = defaults.aemo_mms_url
Expand All @@ -31,12 +32,13 @@ def run(year, month, day, index, filename_stub, down_load_to):
try:
download_unzip_csv(url_formatted, down_load_to)
except Exception:
logger.warning(f"{filename_stub} not downloaded")
if chunk == 1:
logger.warning(f"{filename_stub} not downloaded")


def run_bid_tables(year, month, day, index, filename_stub, down_load_to):
def run_bid_tables(year, month, day, chunk, index, filename_stub, down_load_to):
if day is None:
run(year, month, day, index, filename_stub, down_load_to)
run(year, month, day, chunk, index, filename_stub, down_load_to)
else:
try:
filename_stub = "BIDMOVE_COMPLETE_{year}{month}{day}".format(year=year, month=month, day=day)
Expand All @@ -50,7 +52,7 @@ def run_bid_tables(year, month, day, index, filename_stub, down_load_to):
logger.warning(f"{filename_stub} not downloaded")


def run_next_day_region_tables(year, month, day, index, filename_stub, down_load_to):
def run_next_day_region_tables(year, month, day, chunk, index, filename_stub, down_load_to):
try:
filename_stub = "PUBLIC_DAILY_{year}{month}{day}".format(year=year, month=month, day=day)
download_url = _get_current_url(
Expand All @@ -63,7 +65,7 @@ def run_next_day_region_tables(year, month, day, index, filename_stub, down_load
logger.warning(f"{filename_stub} not downloaded")


def run_next_dispatch_tables(year, month, day, index, filename_stub, down_load_to):
def run_next_dispatch_tables(year, month, day, chunk, index, filename_stub, down_load_to):
try:
filename_stub = "PUBLIC_NEXT_DAY_DISPATCH_{year}{month}{day}".format(year=year, month=month, day=day)
download_url = _get_current_url(
Expand Down Expand Up @@ -189,7 +191,7 @@ def _find_start_row_nth_table(sub_folder_zipfile, file_name, n):



def run_fcas4s(year, month, day, index, filename_stub, down_load_to):
def run_fcas4s(year, month, day, chunk, index, filename_stub, down_load_to):
"""This function"""

# Add the year and month information to the generic AEMO data url
Expand All @@ -216,6 +218,7 @@ def download_unzip_csv(url, down_load_to):
This function downloads a zipped csv using a url,
extracts the csv and saves it a specified location
"""
url = url.replace('#', '%23')
r = requests.get(url, headers=USR_AGENT_HEADER)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall(down_load_to)
Expand Down
6 changes: 4 additions & 2 deletions nemosis/processing_info_maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@
],
"MARKET_PRICE_THRESHOLDS": None,
"DAILY_REGION_SUMMARY": None,
"ROOFTOP_PV_ACTUAL": None,
"ROOFTOP_PV_ACTUAL": [
query_wrappers.drop_duplicates_by_primary_key
],
}

date_gen = {
Expand All @@ -249,7 +251,7 @@
}

write_filename = {
"MMS": write_file_names.write_file_names,
"MMS": write_file_names.write_mms_file_names,
"NEXT_DAY_DISPATCHLOAD": write_file_names.write_file_names_current,
"BIDDING": write_file_names.write_file_names_mms_and_current,
"DAILY_REGION_SUMMARY": write_file_names.write_file_names_mms_and_current,
Expand Down
Loading

0 comments on commit 151d397

Please sign in to comment.