Skip to content

Commit

Permalink
preserve collection names and metadata in DIRT migration (#301)
Browse files Browse the repository at this point in the history
  • Loading branch information
wpbonelli committed May 26, 2022
1 parent 3791a04 commit ee5f10e
Showing 1 changed file with 129 additions and 74 deletions.
203 changes: 129 additions & 74 deletions plantit/plantit/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,14 +944,30 @@ async def push_migration_event(user: User, migration: Migration):
})


SELECT_MANAGED_FILE_BY_PATH = """SELECT * FROM file_managed WHERE uri LIKE :path"""
SELECT_MANAGED_FILE_BY_FID = """SELECT * FROM file_managed WHERE fid = :fid"""
SELECT_ROOT_IMAGE = """SELECT * FROM field_data_field_root_image WHERE field_root_image_fid = :fid"""
SELECT_ROOT_COLLECTION = """SELECT * FROM field_data_field_marked_coll_root_img_ref WHERE field_marked_coll_root_img_ref_target_id = :entity_id"""
SELECT_ROOT_COLLECTION_TITLE = """SELECT * FROM node WHERE nid = :entity_id"""
SELECT_ROOT_COLLECTION_METADATA = """SELECT * FROM field_data_field_collection_metadata WHERE entity_id = :entity_id"""
SELECT_ROOT_COLLECTION_LOCATION = """SELECT * FROM field_data_field_collection_location WHERE entity_id = :entity_id"""
SELECT_ROOT_COLLECTION_PLANTING = """SELECT * FROM field_data_field_collection_plantation WHERE entity_id = :entity_id"""
SELECT_ROOT_COLLECTION_HARVEST = """SELECT * FROM field_data_field_collection_harvest WHERE entity_id = :entity_id"""
SELECT_ROOT_COLLECTION_SOIL_GROUP = """SELECT * FROM field_data_field_collection_soil_group WHERE entity_id = :entity_id"""
SELECT_ROOT_COLLECTION_SOIL_MOISTURE = """SELECT * FROM field_data_field_collection_soil_moisture WHERE entity_id = :entity_id"""
SELECT_ROOT_COLLECTION_SOIL_N = """SELECT * FROM field_data_field_collection_soil_nitrogen WHERE entity_id = :entity_id"""
SELECT_ROOT_COLLECTION_SOIL_P = """SELECT * FROM field_data_field_collection_soil_phosphorus WHERE entity_id = :entity_id"""
SELECT_ROOT_COLLECTION_SOIL_K = """SELECT * FROM field_data_field_collection_soil_potassium WHERE entity_id = :entity_id"""
SELECT_ROOT_COLLECTION_PESTICIDES = """SELECT * FROM field_data_field_collection_pesticides WHERE entity_id = :entity_id"""

@app.task(bind=True)
def migrate_dirt_datasets(self, username: str):
try:
user = User.objects.get(username=username)
profile = Profile.objects.get(user=user)
migration = Migration.objects.get(profile=profile)
except:
logger.warning(f"Could not find user {username}")
logger.warning(f"Couldn't find DIRT migration info for user {username}, aborting")
self.request.callbacks = None
return

Expand All @@ -963,13 +979,13 @@ def migrate_dirt_datasets(self, username: str):
with ssh:
with ssh.client.open_sftp() as sftp:

# check how many datasets the user has on the DIRT server
user_dir = join(settings.DIRT_MIGRATION_DATA_DIR, username, 'root-images')
datasets = [folder for folder in sftp.listdir(user_dir)]
logger.info(f"User {username} has {len(datasets)} DIRT folders: {', '.join(datasets)}")
# check how many folders the user has on the DIRT server NFS
userdir = join(settings.DIRT_MIGRATION_DATA_DIR, username, 'root-images')
folders = [folder for folder in sftp.listdir(userdir)]
logger.info(f"User {username} has {len(folders)} DIRT folders: {', '.join(folders)}")

# persist number of datasets
migration.num_folders = len(datasets)
# persist number of folders
migration.num_folders = len(folders)
migration.save()

# create a client for the CyVerse APIs and create a collection for the migrated DIRT data
Expand All @@ -978,100 +994,139 @@ def migrate_dirt_datasets(self, username: str):
if client.dir_exists(root_collection_path):
logger.warning(f"Collection {root_collection_path} already exists, aborting DIRT migration for {user.username}")
return
else: client.mkdir(root_collection_path)
client.mkdir(root_collection_path)

# create a database client for the DIRT DB and open a connection
db = Database(settings.DIRT_MIGRATION_DB_CONN_STR)
async_to_sync(db.connect)()

# create local staging folder for this user
staging_dir = join(settings.DIRT_MIGRATION_STAGING_DIR, user.username)
Path(staging_dir).mkdir(parents=True, exist_ok=True)

# keep track of progress so we can update the UI in real time
downloads = []
uploads = []

# transfer all the user's datasets to temporary staging directory, 1 file at a time (to preserve local disk space)
for folder in datasets:
folder_name = join(user_dir, folder)
files = [f for f in sftp.listdir(folder_name)]
logger.info(f"User {username} folder {folder} has {len(files)} files: {', '.join(files)}")

# create temp local folder for this dataset
staging_dir = join(settings.DIRT_MIGRATION_STAGING_DIR, user.username, folder)
Path(staging_dir).mkdir(parents=True, exist_ok=True)

# download files
for file in files:
# download the file
sftp.get(join(folder_name, file), join(staging_dir, file))

# push a progress update to client
downloads.append(DownloadedFile(name=file, folder=folder))
migration.downloads = json.dumps(downloads)
async_to_sync(push_migration_event)(user, migration)

# query DB for file IDs for each file in this folder
# files are stored under the user's CAS username if they used CAS to create their DIRT account, otherwise under their full name
dirt_folder_path = f"public://{user.username if profile.dirt_name is None else profile.dirt_name}/root-images/{folder}/%"
query = f"SELECT * FROM file_managed WHERE uri LIKE '{dirt_folder_path}';"
rows = async_to_sync(db.fetch_all)(query=query)
fids = [row['fid'] for row in rows]

# query DB for entity ID given file ID
query = """SELECT * FROM field_data_field_root_image WHERE field_root_image_fid = :fid"""
values = [{'fid': fid} for fid in fids]
rows = async_to_sync(db.fetch_all)(query=query)
file_entity_ids = [row['entity_id'] for row in rows]

# associate each file with the collection it's a member of
for folder in folders:
folder_name = join(userdir, folder)
file_names = [f for f in sftp.listdir(folder_name)]
logger.info(f"User {username} folder {folder} has {len(file_names)} files: {', '.join(file_names)}")

# get Drupal managed file IDs for each root image file in the current folder
# (files are stored under CAS username if CAS was used for the user's DIRT account, otherwise under their full name)
values = [{'path': f"public://{user.username if profile.dirt_name is None else profile.dirt_name}/root-images/{folder}/%"}]
rows = async_to_sync(db.fetch_all)(query=SELECT_MANAGED_FILE_BY_PATH, values=values)
managed_files = [{
'fid': row['fid'],
'name': row['filename'],
'path': row['uri'].replace('public://', '')
} for row in rows]

# associate each root image with the collection it's a member of
collections = dict()
for fid in file_entity_ids:
# query DB for each file's collection membership
query = """SELECT * FROM field_data_field_marked_coll_root_img_ref WHERE field_marked_coll_root_img_ref_target_id = :entity_id"""
values = [{'entity_id': eid} for eid in file_entity_ids]
row = async_to_sync(db.fetch_one)(query=query)
for file in managed_files:
file_id = file['fid']
file_path = file['path']
file_name = file['name']

# get Drupal entity ID given root image file ID
values = {'fid': file_id}
rows = async_to_sync(db.fetch_one)(query=SELECT_ROOT_IMAGE, values=values)
if not rows:
logger.warning(f"DIRT root image with file ID {file_id} not found")
continue
file_entity_id = rows[0]['entity_id']

# get Drupal entity ID for the collection this root image file is in
values = {'entity_id': file_entity_id}
row = async_to_sync(db.fetch_one)(query=SELECT_ROOT_COLLECTION, values=values)
coll_entity_id = row['entity_id']

# create the association
if coll_entity_id not in collections: collections[coll_entity_id] = []
collections[coll_entity_id].append(fid)

# TODO create collection for each DIRT marked collection
# collection_path = join(root_collection_path, folder.rpartition('/')[2])
# if client.dir_exists(collection_path):
# logger.warning(f"Collection {collection_path} already exists, aborting DIRT migration for {user.username}")
# return
# else:
# client.mkdir(collection_path)
# if we haven't encountered this collection yet..
if coll_entity_id not in collections:
# get its title, creation/modification timestamps, metadata and environmental data
values = {'entity_id': coll_entity_id}
title_row = async_to_sync(db.fetch_one)(query=SELECT_ROOT_COLLECTION_TITLE, values=values)
metadata_rows = async_to_sync(db.fetch_all)(query=SELECT_ROOT_COLLECTION_METADATA, values=values)
location_rows = async_to_sync(db.fetch_one)(query=SELECT_ROOT_COLLECTION_LOCATION, values=values)
planting_rows = async_to_sync(db.fetch_one)(query=SELECT_ROOT_COLLECTION_PLANTING, values=values)
harvest_rows = async_to_sync(db.fetch_one)(query=SELECT_ROOT_COLLECTION_HARVEST, values=values)
soil_group_rows = async_to_sync(db.fetch_one)(query=SELECT_ROOT_COLLECTION_SOIL_GROUP, values=values)
soil_moist_rows = async_to_sync(db.fetch_one)(query=SELECT_ROOT_COLLECTION_SOIL_MOISTURE, values=values)
soil_n_rows = async_to_sync(db.fetch_one)(query=SELECT_ROOT_COLLECTION_SOIL_N, values=values)
soil_p_rows = async_to_sync(db.fetch_one)(query=SELECT_ROOT_COLLECTION_SOIL_P, values=values)
soil_k_rows = async_to_sync(db.fetch_one)(query=SELECT_ROOT_COLLECTION_SOIL_K, values=values)
pesticides_rows = async_to_sync(db.fetch_one)(query=SELECT_ROOT_COLLECTION_PESTICIDES, values=values)

title = title_row['title']
created = datetime.fromtimestamp(int(title_row['created']))
changed = datetime.fromtimestamp(int(title_row['changed']))
metadata = {row['field_collection_metadata_first']: row['field_collection_metadata_second'] for row in metadata_rows}
latitude = None if not location_rows else location_rows[0]['field_collection_location_lat']
longitude = None if not location_rows else location_rows[0]['field_collection_location_lng']
planting = None if not planting_rows else planting_rows[0]['field_collection_plantation_value']
harvest = None if not harvest_rows else harvest_rows[0]['field_collection_harvest_value']
soil_group = None if not soil_group_rows else soil_group_rows[0]['field_collection_soil_group_tid']
soil_moist = None if not soil_moist_rows else soil_moist_rows[0]['field_collection_soil_moisture_value']
soil_n = None if not soil_n_rows else soil_n_rows[0]['field_collection_soil_nitrogen_value']
soil_p = None if not soil_p_rows else soil_p_rows[0]['field_collection_soil_phosphorus_value']
soil_k = None if not soil_k_rows else soil_k_rows[0]['field_collection_soil_potassium_value']
pesticides = None if not pesticides_rows else pesticides_rows[0]['field_collection_pesticides_value']

# mark the collection as seen
collections[coll_entity_id] = join(root_collection_path, title)

# create collection in CyVerse data store for this marked collection
collection_path = join(root_collection_path, title)
if client.dir_exists(collection_path):
logger.warning(f"Collection {collection_path} already exists, skipping")
continue
client.mkdir(collection_path)

# get ID of newly created collection
stat = client.stat(collection_path)
id = stat['id']

# attach metadata to collection
props = [
f"migrated={timezone.now().isoformat()}",
f"created={created.isoformat()}",
f"changed={changed.isoformat()}",
]
if latitude is not None: props.append(f"latitude={latitude}")
if longitude is not None: props.append(f"longitude={longitude}")
if planting is not None: props.append(f"planting={planting}")
if harvest is not None: props.append(f"harvest={harvest}")
if soil_group is not None: props.append(f"soil_group={soil_group}")
if soil_moist is not None: props.append(f"soil_moisture={soil_moist}")
if soil_n is not None: props.append(f"soil_nitrogen={soil_n}")
if soil_p is not None: props.append(f"soil_phosphorus={soil_p}")
if soil_k is not None: props.append(f"soil_potassium={soil_k}")
if pesticides is not None: props.append(f"pesticides={pesticides}")
for k, v in metadata.items(): props.append(f"{k}={v}")
client.set_metadata(id, props, [])

# TODO attach metadata to collections
# download the file
sftp.get(join(folder_name, file_name), join(staging_dir, file_name))

# upload files to each subcollection
for file in files:
# upload the file
client.upload(from_path=join(staging_dir, file), to_prefix=collection_path)
# upload the file to the corresponding collection
client.upload(from_path=join(staging_dir, file_name), to_prefix=collections[coll_entity_id])

# push a progress update to client
uploads.append(UploadedFile(name=file, folder=folder))
uploads.append(UploadedFile(name=file_name, folder=folder))
migration.uploads = json.dumps(uploads)
async_to_sync(push_migration_event)(user, migration)

# remove file from staging dir
os.remove(join(staging_dir, file))
os.remove(join(staging_dir, file_name))

# TODO attach metadata to each file

# persist migration status
migration.save()

# get ID of newly created collection
stat = client.stat(collection_path)
id = stat['id']

# mark collection as originating from DIRT
client.set_metadata(id, [
f"dirt_migration_timestamp={timezone.now().isoformat()}",
# TODO: anything else we need to add here?
], [])

# close the DB connection
async_to_sync(db.disconnect)()

Expand Down

0 comments on commit ee5f10e

Please sign in to comment.