Skip to content

Commit

Permalink
configure logging and error handler for batch ops
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisbc committed Mar 11, 2024
1 parent 8697193 commit 8cae1cc
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 28 deletions.
10 changes: 5 additions & 5 deletions scripts/store_hazard_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
print("WARNING: the transform module uses the optional openquake dependencies - h5py, pandas and openquake.")


if USE_SQLITE_ADAPTER:
configure_adapter(adapter_model=SqliteAdapter)
# if USE_SQLITE_ADAPTER:
# configure_adapter(adapter_model=SqliteAdapter)


log = logging.getLogger()
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('nshm_toshi_client.toshi_client_base').setLevel(logging.INFO)
logging.getLogger('urllib3').setLevel(logging.INFO)
logging.getLogger('botocore').setLevel(logging.INFO)
Expand All @@ -32,8 +32,8 @@
root_handler = log.handlers[0]
root_handler.setFormatter(formatter)

log.debug('DEBUG message')
log.info('INFO message')
# log.debug('DEBUG message')
# log.info('INFO message')


def extract_and_save(args):
Expand Down
27 changes: 18 additions & 9 deletions toshi_hazard_store/multi_batch.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import logging
import multiprocessing

import random

from toshi_hazard_store import configure_adapter
from toshi_hazard_store.config import USE_SQLITE_ADAPTER # noqa TODO
from toshi_hazard_store.db_adapter.sqlite import SqliteAdapter
from toshi_hazard_store.model import openquake_models

if USE_SQLITE_ADAPTER:
configure_adapter(SqliteAdapter)

log = logging.getLogger(__name__)

class DynamoBatchWorker(multiprocessing.Process):
"""A worker that batches and saves records to DynamoDB.
Expand All @@ -24,15 +25,15 @@ def __init__(self, task_queue, toshi_id, model, batch_size):
self.batch_size = batch_size

def run(self):
print(f"worker {self.name} running with batch size: {self.batch_size}")
log.info(f"worker {self.name} running with batch size: {self.batch_size}")
proc_name = self.name
models = []

while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print('%s: Exiting' % proc_name)
log.info('%s: Exiting' % proc_name)
# finally
if len(models):
self._batch_save(models)
Expand All @@ -57,28 +58,36 @@ def _batch_save(self, models):
# elif self.model == model.ToshiOpenquakeHazardCurveRlzsV2:
# query.batch_save_hcurve_rlzs_v2(self.toshi_id, models=models)
if self.model == openquake_models.OpenquakeRealization:
with openquake_models.OpenquakeRealization.batch_write() as batch:
for item in models:
batch.save(item)
try:
with openquake_models.OpenquakeRealization.batch_write() as batch:
for item in models:
batch.save(item)
except Exception as err:
log.error(str(err))
raise
else:
raise ValueError("WHATT!")


def save_parallel(toshi_id: str, model_generator, model, num_workers, batch_size=50):
tasks: multiprocessing.JoinableQueue = multiprocessing.JoinableQueue()

print('Creating %d workers' % num_workers)
log.info('Creating %d workers' % num_workers)
workers = [DynamoBatchWorker(tasks, toshi_id, model, batch_size) for i in range(num_workers)]
for w in workers:
w.start()

# Enqueue jobs
task_count = 0
for t in model_generator:
tasks.put(t)
task_count +=1

# Add a poison pill for each to signal we've done everything
for i in range(num_workers):
tasks.put(None)

# Wait for all of the tasks to finish
tasks.join()
log.info(f'save_parallel completed {task_count} tasks.')

26 changes: 12 additions & 14 deletions toshi_hazard_store/oq_import/export_v3.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import math
import random
from dataclasses import dataclass
Expand All @@ -16,9 +17,7 @@
NUM_BATCH_WORKERS = 1 if USE_SQLITE_ADAPTER else NUM_BATCH_WORKERS
BATCH_SIZE = 1000 if USE_SQLITE_ADAPTER else random.randint(15, 50)

if USE_SQLITE_ADAPTER:
configure_adapter(SqliteAdapter)

log = logging.getLogger(__name__)

@dataclass
class OpenquakeMeta:
Expand All @@ -39,14 +38,14 @@ def export_meta_v3(extractor, toshi_hazard_id, toshi_gt_id, locations_id, source
df_len += len(rlz_lt.to_json())

if df_len >= 300e3:
print('WARNING: Dataframes for this job may be too large to store on DynamoDB.')
log.warning('WARNING: Dataframes for this job may be too large to store on DynamoDB.')

vs30 = oq['reference_vs30_value']

if math.isnan(vs30):
vs30 = 0

print('vs30: ', vs30)
log.debug(f'vs30: {vs30}')

obj = openquake_models.ToshiOpenquakeMeta(
partition_key="ToshiOpenquakeMeta",
Expand Down Expand Up @@ -77,19 +76,15 @@ def export_rlzs_v3(extractor, oqmeta: OpenquakeMeta, return_rlz=False):
rlz_keys = [k for k in rlzs.keys() if 'rlz-' in k]
imtls = oq['hazard_imtls'] # dict of imt and the levels used at each imt e.g {'PGA': [0.011. 0.222]}

print('rlz', oqmeta.rlz_lt)
print()
print('src', oqmeta.source_lt)
print()
print('gsim', oqmeta.gsim_lt)
print()

log.debug(f'rlz {oqmeta.rlz_lt}')
log.debug(f'src {oqmeta.source_lt}')
log.debug(f'gsim {oqmeta.gsim_lt}')

def generate_models():
count = 0
for i_site in range(len(sites)):
loc = normalise_site_code((sites.loc[i_site, 'lon'], sites.loc[i_site, 'lat']), True)
# print(f'loc: {loc}')
for i_rlz, rlz in enumerate(rlz_keys):

values = []
for i_imt, imt in enumerate(imtls.keys()):
values.append(
Expand All @@ -110,6 +105,9 @@ def generate_models():
if oqmeta.model.vs30 == 0:
oq_realization.site_vs30 = sites.loc[i_site, 'vs30']
yield oq_realization.set_location(loc)
count +=1

log.debug(f'generate_models() produced {count} models.')

# used for testing
if return_rlz:
Expand Down

0 comments on commit 8cae1cc

Please sign in to comment.