Skip to content

Commit

Permalink
refactor: create ingest modules with convert_dataset function for e…
Browse files Browse the repository at this point in the history
…ach use case
  • Loading branch information
annehaley committed Aug 2, 2024
1 parent fb2cb07 commit 48a769d
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 160 deletions.
149 changes: 0 additions & 149 deletions sample_data/ingest_sample_data.py

This file was deleted.

166 changes: 166 additions & 0 deletions sample_data/ingest_use_case.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
from datetime import datetime
import importlib
import json
import os
from pathlib import Path

from django.contrib.gis.geos import Point
from django.core.files.base import ContentFile
import requests

from uvdat.core.models import Chart, Context, Dataset, FileItem

from .use_cases.boston_floods import ingest as boston_floods_ingest
from .use_cases.new_york_energy import ingest as new_york_energy_ingest


USE_CASE_FOLDER = Path('sample_data/use_cases')
DOWNLOADS_FOLDER = Path('sample_data/downloads')


def ingest_file(file_info, index=0, dataset=None, chart=None):
file_path = file_info.get('path')
file_name = file_info.get('name', file_path.split('/')[-1])
file_url = file_info.get('url')
file_metadata = file_info.get('metadata', {})

file_location = Path(DOWNLOADS_FOLDER, file_path)
file_type = file_path.split('.')[-1]
if not file_location.exists():
print(f'\t Downloading data file {file_name}.')
file_location.parent.mkdir(parents=True, exist_ok=True)
with open(file_location, 'wb') as f:
r = requests.get(file_url)
f.write(r.content)

existing = FileItem.objects.filter(name=file_name)
if existing.count():
print('\t', f'FileItem {file_name} already exists.')
else:
new_file_item = FileItem.objects.create(
name=file_name,
dataset=dataset,
chart=chart,
file_type=file_type,
file_size=os.path.getsize(file_location),
metadata=dict(
**file_metadata,
uploaded=str(datetime.now()),
),
index=index,
)
print('\t', f'FileItem {new_file_item.name} created.')
with file_location.open('rb') as f:
new_file_item.file.save(file_path, ContentFile(f.read()))


def ingest_contexts(use_case):
context_file_path = USE_CASE_FOLDER / use_case / 'contexts.json'
if context_file_path.exists():
print('Creating Context objects...')
with open(context_file_path) as contexts_json:
data = json.load(contexts_json)
for context in data:
print('\t- ', context['name'])
existing = Context.objects.filter(name=context['name'])
if existing.count():
context_for_setting = existing.first()
else:
context_for_setting = Context.objects.create(
name=context['name'],
default_map_center=Point(*context['default_map_center']),
default_map_zoom=context['default_map_zoom'],
)
print('\t', f'Context {context_for_setting.name} created.')

context_for_setting.datasets.set(Dataset.objects.filter(name__in=context['datasets']))


def ingest_charts(use_case):
chart_file_path = USE_CASE_FOLDER / use_case / 'charts.json'
if chart_file_path.exists():
print('Creating Chart objects...')
with open(chart_file_path) as charts_json:
data = json.load(charts_json)
for chart in data:
print('\t- ', chart['name'])
existing = Chart.objects.filter(name=chart['name'])
if existing.count():
chart_for_conversion = existing.first()
else:
new_chart = Chart.objects.create(
name=chart['name'],
description=chart['description'],
context=Context.objects.get(name=chart['context']),
chart_options=chart.get('chart_options'),
metadata=chart.get('metadata'),
editable=chart.get('editable', False),
)
print('\t', f'Chart {new_chart.name} created.')
for index, file_info in enumerate(chart.get('files', [])):
ingest_file(
file_info,
index=index,
chart=new_chart,
)
chart_for_conversion = new_chart

print('\t', f'Converting data for {chart_for_conversion.name}...')
chart_for_conversion.spawn_conversion_task(
conversion_options=chart.get('conversion_options'),
asynchronous=False,
)


def ingest_datasets(use_case, include_large=False, dataset_indexes=None):
dataset_file_path = USE_CASE_FOLDER / use_case / 'datasets.json'
if dataset_file_path.exists():
print('Creating Dataset objects...')
with open(dataset_file_path) as datasets_json:
data = json.load(datasets_json)
for index, dataset in enumerate(data):
if dataset_indexes is None or index in dataset_indexes:
existing = Dataset.objects.filter(name=dataset['name'])
if existing.count():
dataset_for_conversion = existing.first()
else:
# Create dataset
new_dataset = Dataset.objects.create(
name=dataset['name'],
description=dataset['description'],
category=dataset['category'],
dataset_type=dataset.get('type', 'vector').upper(),
metadata=dataset.get('metadata', {}),
)
print('\t', f'Dataset {new_dataset.name} created.')
for index, file_info in enumerate(dataset.get('files', [])):
ingest_file(
file_info,
index=index,
dataset=new_dataset,
)
dataset_for_conversion = new_dataset

dataset_size_mb = dataset_for_conversion.get_size() >> 20
if include_large or dataset_size_mb < 50:
if use_case == 'boston_floods':
boston_floods_ingest.convert_dataset(dataset_for_conversion, dataset)
elif use_case == 'new_york_energy':
new_york_energy_ingest.convert_dataset(dataset_for_conversion, dataset)
else:
print(
'\t', f'Dataset too large ({dataset_size_mb} MB); skipping conversion step.'
)
print('\t', 'Use `--include_large` to include conversions for large datasets.')



def ingest_use_case(use_case_name, include_large=False, dataset_indexes=None):
print(f'Populating server with data for use case {use_case_name}...')
ingest_datasets(
use_case=use_case_name,
include_large=include_large,
dataset_indexes=dataset_indexes,
)
ingest_charts(use_case=use_case_name)
ingest_contexts(use_case=use_case_name)
9 changes: 9 additions & 0 deletions sample_data/use_cases/boston_floods/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

def convert_dataset(dataset, options):
print('\t', f'Converting data for {dataset.name}...')
dataset.spawn_conversion_task(
style_options=options.get('style_options'),
network_options=options.get('network_options'),
region_options=options.get('region_options'),
asynchronous=False,
)
1 change: 0 additions & 1 deletion sample_data/use_cases/new_york_energy/charts.json

This file was deleted.

4 changes: 1 addition & 3 deletions sample_data/use_cases/new_york_energy/datasets.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
"url": "https://data.kitware.com/api/v1/item/66a7bdab0ea2cce8e698b958/download",
"path": "nyc/networks.zip"
}
],
"module": "import_networks",
"function": "perform_import"
]
}
]
27 changes: 27 additions & 0 deletions sample_data/use_cases/new_york_energy/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pathlib import Path
from .import_networks import perform_import
from .export_networks import perform_export
from .nysdp import create_consolidated_network


DOWNLOADS_FOLDER = DOWNLOADS_FOLDER = Path('../../sample_data/downloads')
PULL_LATEST = False


def convert_dataset(dataset, options, ):
print('\t', f'Converting data for {dataset.name}...')
if dataset.name == 'National Grid County Networks':
if PULL_LATEST:
# pull latest data from NYSDP and run network interpretation algorithm
dataset.source_files.delete()
create_consolidated_network(dataset, downloads_folder=DOWNLOADS_FOLDER)
perform_export()
else:
perform_import(dataset, downloads_folder=DOWNLOADS_FOLDER)
else:
dataset.spawn_conversion_task(
style_options=options.get('style_options'),
network_options=options.get('network_options'),
region_options=options.get('region_options'),
asynchronous=False,
)
11 changes: 4 additions & 7 deletions uvdat/core/management/commands/populate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from django.core.management.base import BaseCommand

from sample_data.ingest_sample_data import ingest_charts, ingest_contexts, ingest_datasets
from sample_data.ingest_use_case import ingest_use_case


class Command(BaseCommand):
Expand All @@ -20,17 +20,14 @@ def add_arguments(self, parser):
parser.add_argument('--dataset_indexes', nargs='*', type=int)

def handle(self, *args, **kwargs):
use_case = kwargs['use_case']
use_case_name = kwargs['use_case']
include_large = kwargs['include_large']
dataset_indexes = kwargs['dataset_indexes']
if dataset_indexes is None or len(dataset_indexes) == 0:
dataset_indexes = None

print(f'Populating server with sample data for use case {use_case}...')
ingest_datasets(
use_case,
ingest_use_case(
use_case_name,
include_large=include_large,
dataset_indexes=dataset_indexes,
)
ingest_contexts(use_case)
ingest_charts(use_case)

0 comments on commit 48a769d

Please sign in to comment.