Skip to content

Commit

Permalink
calculate_impacts signature change (#320)
Browse files Browse the repository at this point in the history
* calculate_impacts signature change

Modify the signature of calculate_impacts to report more than one value by impact key

Signed-off-by: EglantineGiraud <eglantine.giraud@gmail.com>

* Modify the signature of calculate_impacts to report more than one value by impact key  Signed-off-by: EglantineGiraud <eglantine.giraud@gmail.com>

Add asset_id in the asset impact result if not None

Signed-off-by: EglantineGiraud <eglantine.giraud@gmail.com>

---------

Signed-off-by: EglantineGiraud <eglantine.giraud@gmail.com>
  • Loading branch information
EglantineGiraud authored Jul 10, 2024
1 parent 74a55ed commit e5fc013
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 117 deletions.
37 changes: 12 additions & 25 deletions src/physrisk/kernel/impact.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from physrisk.kernel.assets import Asset
from physrisk.kernel.hazard_event_distrib import HazardEventDistrib
from physrisk.kernel.hazard_model import HazardDataFailedResponse, HazardDataRequest, HazardDataResponse, HazardModel
from physrisk.kernel.impact_distrib import EmptyImpactDistrib, ImpactDistrib
from physrisk.kernel.impact_distrib import ImpactDistrib
from physrisk.kernel.vulnerability_distrib import VulnerabilityDistrib
from physrisk.kernel.vulnerability_model import (
DataRequester,
Expand Down Expand Up @@ -51,7 +51,7 @@ def calculate_impacts( # noqa: C901
*,
scenario: str,
year: int,
) -> Dict[ImpactKey, AssetImpactResult]:
) -> Dict[ImpactKey, List[AssetImpactResult]]:
"""Calculate asset level impacts."""

model_assets: Dict[DataRequester, List[Asset]] = defaultdict(
Expand All @@ -63,7 +63,7 @@ def calculate_impacts( # noqa: C901
mappings = vulnerability_models.vuln_model_for_asset_of_type(asset_type)
for mapping in mappings:
model_assets[mapping].append(asset)
results = {}
results: Dict[ImpactKey, List[AssetImpactResult]] = {}

asset_requests, responses = _request_consolidated(hazard_model, model_assets, scenario, year)

Expand All @@ -77,37 +77,24 @@ def calculate_impacts( # noqa: C901
for asset in assets:
requests = asset_requests[(model, asset)]
hazard_data = [responses[req] for req in get_iterable(requests)]
assert isinstance(model, VulnerabilityModelBase)
if ImpactKey(asset=asset, hazard_type=model.hazard_type, scenario=scenario, key_year=year) not in results:
results[ImpactKey(asset=asset, hazard_type=model.hazard_type, scenario=scenario, key_year=year)] = []
if any(isinstance(hd, HazardDataFailedResponse) for hd in hazard_data):
assert isinstance(model, VulnerabilityModelBase)
if (
ImpactKey(asset=asset, hazard_type=model.hazard_type, scenario=scenario, key_year=year)
not in results
):
results[ImpactKey(asset=asset, hazard_type=model.hazard_type, scenario=scenario, key_year=year)] = (
AssetImpactResult(EmptyImpactDistrib())
)
continue
try:
if isinstance(model, VulnerabilityModelAcuteBase):
impact, vul, event = model.get_impact_details(asset, hazard_data)
results[ImpactKey(asset=asset, hazard_type=model.hazard_type, scenario=scenario, key_year=year)] = (
AssetImpactResult(impact, vulnerability=vul, event=event, hazard_data=hazard_data)
)
results[
ImpactKey(asset=asset, hazard_type=model.hazard_type, scenario=scenario, key_year=year)
].append(AssetImpactResult(impact, vulnerability=vul, event=event, hazard_data=hazard_data))
elif isinstance(model, VulnerabilityModelBase):
impact = model.get_impact(asset, hazard_data)
results[ImpactKey(asset=asset, hazard_type=model.hazard_type, scenario=scenario, key_year=year)] = (
AssetImpactResult(impact, hazard_data=hazard_data)
)
results[
ImpactKey(asset=asset, hazard_type=model.hazard_type, scenario=scenario, key_year=year)
].append(AssetImpactResult(impact, hazard_data=hazard_data))
except Exception as e:
logger.exception(e)
assert isinstance(model, VulnerabilityModelBase)
if (
ImpactKey(asset=asset, hazard_type=model.hazard_type, scenario=scenario, key_year=year)
not in results
):
results[ImpactKey(asset=asset, hazard_type=model.hazard_type, scenario=scenario, key_year=year)] = (
AssetImpactResult(EmptyImpactDistrib())
)
return results


Expand Down
17 changes: 11 additions & 6 deletions src/physrisk/kernel/risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def _calculate_all_impacts(
):
# ensure "historical" is present, e.g. needed for risk measures
scenarios = set(["historical"] + list(prosp_scens)) if include_histo else prosp_scens
impact_results: Dict[ImpactKey, AssetImpactResult] = {}
impact_results: Dict[ImpactKey, List[AssetImpactResult]] = {}

# in case of multiple calculation, run on separate threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
Expand Down Expand Up @@ -170,13 +170,18 @@ def calculate_risk_measures(self, assets: Sequence[Asset], prosp_scens: Sequence
for prosp_scen in prosp_scens:
for year in years:
for hazard_type in measure_calc.supported_hazards():
base_impact = impacts.get(
base_impacts = impacts.get(
ImpactKey(asset=asset, hazard_type=hazard_type, scenario="historical", key_year=None)
)
prosp_impact = impacts.get(
prosp_impacts = impacts.get(
ImpactKey(asset=asset, hazard_type=hazard_type, scenario=prosp_scen, key_year=year)
)
risk_ind = measure_calc.calc_measure(hazard_type, base_impact, prosp_impact)
if risk_ind is not None:
measures[MeasureKey(asset, prosp_scen, year, hazard_type)] = risk_ind
risk_inds = [
measure_calc.calc_measure(hazard_type, base_impact, prosp_impact)
for base_impact, prosp_impact in zip(base_impacts, prosp_impacts)
]
risk_ind = [risk_ind for risk_ind in risk_inds if risk_ind is not None]
if 0 < len(risk_ind):
# TODO: Aggregate measures instead of picking the first value.
measures[MeasureKey(asset, prosp_scen, year, hazard_type)] = risk_ind[0]
return impacts, measures
91 changes: 48 additions & 43 deletions src/physrisk/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,12 +360,14 @@ def _get_asset_impacts(
return AssetImpactResponse(asset_impacts=asset_impacts, risk_measures=risk_measures)


def compile_asset_impacts(impacts: Dict[ImpactKey, AssetImpactResult], assets: List[Asset], include_calc_details: bool):
"""Convert (internal) AssetImpactResult objects to a list of AssetLevelImpact objects
ready for serialization.
def compile_asset_impacts(
impacts: Dict[ImpactKey, List[AssetImpactResult]], assets: List[Asset], include_calc_details: bool
):
"""Convert (internal) list of AssetImpactResult objects to a list of AssetLevelImpact
objects ready for serialization.
Args:
impacts (Dict[ImpactKey, AssetImpactResult]): Impact results.
impacts (Dict[ImpactKey, List[AssetImpactResult]]): Impact results.
assets (List[Asset]): Assets: the list will be returned using this order.
include_calc_details (bool): Include calculation details.
Expand All @@ -375,46 +377,49 @@ def compile_asset_impacts(impacts: Dict[ImpactKey, AssetImpactResult], assets: L
ordered_impacts: Dict[Asset, List[AssetSingleImpact]] = {}
for asset in assets:
ordered_impacts[asset] = []
for k, v in impacts.items():
if include_calc_details:
if v.event is not None and v.vulnerability is not None:
hazard_exceedance = v.event.to_exceedance_curve()

vulnerability_distribution = VulnerabilityDistrib(
intensity_bin_edges=v.vulnerability.intensity_bins,
impact_bin_edges=v.vulnerability.impact_bins,
prob_matrix=v.vulnerability.prob_matrix,
)
calc_details = AcuteHazardCalculationDetails(
hazard_exceedance=ExceedanceCurve(
values=hazard_exceedance.values, exceed_probabilities=hazard_exceedance.probs
),
hazard_distribution=Distribution(bin_edges=v.event.intensity_bin_edges, probabilities=v.event.prob),
vulnerability_distribution=vulnerability_distribution,
hazard_path=v.impact.path,
)
else:
calc_details = None

if isinstance(v.impact, EmptyImpactDistrib):
continue

impact_exceedance = v.impact.to_exceedance_curve()
key = APIImpactKey(hazard_type=k.hazard_type.__name__, scenario_id=k.scenario, year=str(k.key_year))
hazard_impacts = AssetSingleImpact(
key=key,
impact_type=v.impact.impact_type.name,
impact_exceedance=ExceedanceCurve(
values=impact_exceedance.values, exceed_probabilities=impact_exceedance.probs
),
impact_distribution=Distribution(bin_edges=v.impact.impact_bins, probabilities=v.impact.prob),
impact_mean=v.impact.mean_impact(),
impact_std_deviation=v.impact.stddev_impact(),
calc_details=None if v.event is None else calc_details,
)
ordered_impacts[k.asset].append(hazard_impacts)
for k, value in impacts.items():
for v in value:
if include_calc_details:
if v.event is not None and v.vulnerability is not None:
hazard_exceedance = v.event.to_exceedance_curve()

vulnerability_distribution = VulnerabilityDistrib(
intensity_bin_edges=v.vulnerability.intensity_bins,
impact_bin_edges=v.vulnerability.impact_bins,
prob_matrix=v.vulnerability.prob_matrix,
)
calc_details = AcuteHazardCalculationDetails(
hazard_exceedance=ExceedanceCurve(
values=hazard_exceedance.values, exceed_probabilities=hazard_exceedance.probs
),
hazard_distribution=Distribution(
bin_edges=v.event.intensity_bin_edges, probabilities=v.event.prob
),
vulnerability_distribution=vulnerability_distribution,
hazard_path=v.impact.path,
)
else:
calc_details = None

if isinstance(v.impact, EmptyImpactDistrib):
continue

impact_exceedance = v.impact.to_exceedance_curve()
key = APIImpactKey(hazard_type=k.hazard_type.__name__, scenario_id=k.scenario, year=str(k.key_year))
hazard_impacts = AssetSingleImpact(
key=key,
impact_type=v.impact.impact_type.name,
impact_exceedance=ExceedanceCurve(
values=impact_exceedance.values, exceed_probabilities=impact_exceedance.probs
),
impact_distribution=Distribution(bin_edges=v.impact.impact_bins, probabilities=v.impact.prob),
impact_mean=v.impact.mean_impact(),
impact_std_deviation=v.impact.stddev_impact(),
calc_details=None if v.event is None else calc_details,
)
ordered_impacts[k.asset].append(hazard_impacts)
# note that this does rely on ordering of dictionary (post 3.6)
return [AssetLevelImpact(asset_id="", impacts=a) for a in ordered_impacts.values()]
return [AssetLevelImpact(asset_id=k.id if k.id is not None else "", impacts=v) for k, v in ordered_impacts.items()]


def _create_risk_measures(
Expand Down
39 changes: 20 additions & 19 deletions src/physrisk/risk_models/loss_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,26 @@ def get_financial_impacts(

rg = np.random.Generator(np.random.MT19937(seed=111))

for impact_key, result in results.items():
# look up keys for results
impact = result.impact
keys = aggregator.get_aggregation_keys(impact_key.asset, impact)
# transform units of impact into currency for aggregation

# Monte-Carlo approach: note that if correlations of distributions are simple and model is otherwise linear
# then calculation by closed-form expression is preferred
impact_samples = self.uncorrelated_samples(impact, sims, rg)

if impact.impact_type == ImpactType.damage:
loss = financial_model.damage_to_loss(impact_key.asset, impact_samples, currency)
else: # impact.impact_type == ImpactType.disruption:
loss = financial_model.disruption_to_loss(impact_key.asset, impact_samples, year, currency)

for key in keys:
if key not in aggregation_pools:
aggregation_pools[key] = np.zeros(sims)
aggregation_pools[key] += loss # type: ignore
for impact_key, impact_values in results.items():
for result in impact_values:
# look up keys for results
impact = result.impact
keys = aggregator.get_aggregation_keys(impact_key.asset, impact)
# transform units of impact into currency for aggregation

# Monte-Carlo approach: note that if correlations of distributions are simple and
# model is otherwise linear then calculation by closed-form expression is preferred
impact_samples = self.uncorrelated_samples(impact, sims, rg)

if impact.impact_type == ImpactType.damage:
loss = financial_model.damage_to_loss(impact_key.asset, impact_samples, currency)
else: # impact.impact_type == ImpactType.disruption:
loss = financial_model.disruption_to_loss(impact_key.asset, impact_samples, year, currency)

for key in keys:
if key not in aggregation_pools:
aggregation_pools[key] = np.zeros(sims)
aggregation_pools[key] += loss # type: ignore

measures = {}
percentiles = [0, 10, 20, 40, 60, 80, 90, 95, 97.5, 99, 99.5, 99.9]
Expand Down
4 changes: 2 additions & 2 deletions tests/kernel/chronic_asset_impact_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ def test_chronic_vulnerability_model(self):

results = calculate_impacts(assets, hazard_model, vulnerability_models, scenario=scenario, year=year)

value_test = list(results.values())[0].impact.mean_impact()
value_test = list(results.values())[0].impact.prob
value_test = list(results.values())[0][0].impact.mean_impact()
value_test = list(results.values())[0][0].impact.prob
value_exp = np.array(
[
0.02656777935,
Expand Down
2 changes: 1 addition & 1 deletion tests/kernel/hazard_models_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,6 @@ def test_using_point_based_hazard_model():
hazard_model = PointBasedHazardModel([point])
vulnerability_models = DictBasedVulnerabilityModels({RealEstateAsset: [GenericTropicalCycloneModel()]})
results = calculate_impacts(assets, hazard_model, vulnerability_models, scenario=scenario, year=year)
impact_distrib = results[(assets[0], Wind, scenario, year)].impact
impact_distrib = results[(assets[0], Wind, scenario, year)][0].impact
mean_impact = impact_distrib.mean_impact()
np.testing.assert_almost_equal(mean_impact, 0.009909858317497338)
2 changes: 1 addition & 1 deletion tests/models/example_models_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,5 @@ def test_user_supplied_model(self):
results = calculate_impacts(assets, hazard_model, vulnerability_models, scenario=scenario, year=year)

self.assertAlmostEqual(
results[assets[0], RiverineInundation, scenario, year].impact.to_exceedance_curve().probs[0], 0.499
results[assets[0], RiverineInundation, scenario, year][0].impact.to_exceedance_curve().probs[0], 0.499
)
8 changes: 5 additions & 3 deletions tests/models/power_generating_asset_models_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test_create_synthetic_portfolios_and_test(self):
assets = [assets[i] for i in [0, 100, 200, 300, 400, 500, 600, 700, 800, 900]]
detailed_results = calculate_impacts(assets, scenario="ssp585", year=2030)
keys = list(detailed_results.keys())
means = np.array([detailed_results[key].impact.mean_impact() for key in detailed_results.keys()])
means = np.array([detailed_results[key][0].impact.mean_impact() for key in detailed_results.keys()])
interesting = [k for (k, m) in zip(keys, means) if m > 0]
assets_out = self.api_assets(item[0] for item in interesting[0:10])
with open(os.path.join(cache_folder, "assets_example_industrial_activity_small.json"), "w") as f:
Expand All @@ -104,7 +104,7 @@ def test_create_synthetic_portfolios_and_test(self):
]
detailed_results = calculate_impacts(assets, scenario="ssp585", year=2030)
keys = list(detailed_results.keys())
means = np.array([detailed_results[key].impact.mean_impact() for key in detailed_results.keys()])
means = np.array([detailed_results[key][0].impact.mean_impact() for key in detailed_results.keys()])
interesting = [k for (k, m) in zip(keys, means) if m > 0]
assets_out = self.api_assets(item[0] for item in interesting[0:10])
with open(os.path.join(cache_folder, "assets_example_real_estate_small.json"), "w") as f:
Expand Down Expand Up @@ -160,7 +160,9 @@ def test_thermal_power_generation_portfolio(self):
"latitude": result.asset.latitude,
"longitude": result.asset.longitude,
"impact_mean": (
None if isinstance(results[key].impact, EmptyImpactDistrib) else results[key].impact.mean_impact()
None
if isinstance(results[key][0].impact, EmptyImpactDistrib)
else results[key].impact.mean_impact()
),
"hazard_type": key.hazard_type.__name__,
}
Expand Down
Loading

0 comments on commit e5fc013

Please sign in to comment.