From d3dedd84fb9ee17aa69dff98d7582ba2e05f7b3e Mon Sep 17 00:00:00 2001 From: Wes Bonelli Date: Sun, 5 Jun 2022 14:41:21 -0400 Subject: [PATCH] migrate computation results, logs, and metadata files too (#301) --- plantit/plantit/celery_tasks.py | 215 +++++++++++++++++++++++++------- plantit/plantit/queries.py | 5 +- plantit/plantit/users/models.py | 4 +- 3 files changed, 175 insertions(+), 49 deletions(-) diff --git a/plantit/plantit/celery_tasks.py b/plantit/plantit/celery_tasks.py index 96b1375c..69a87805 100644 --- a/plantit/plantit/celery_tasks.py +++ b/plantit/plantit/celery_tasks.py @@ -932,6 +932,7 @@ class ManagedFile(NamedTuple): id: str name: str path: str + type: str folder: str orphan: bool missing: bool @@ -945,40 +946,24 @@ async def push_migration_event(user: User, migration: Migration): }) -# queries for use with encode.io/databases -# SELECT_MANAGED_FILE_BY_PATH = """SELECT * FROM file_managed WHERE uri LIKE :path""" -# SELECT_MANAGED_FILE_BY_FID = """SELECT * FROM file_managed WHERE fid = :fid""" -# SELECT_ROOT_IMAGE = """SELECT * FROM field_data_field_root_image WHERE field_root_image_fid = :fid""" -# SELECT_ROOT_COLLECTION = """SELECT * FROM field_data_field_marked_coll_root_img_ref WHERE field_marked_coll_root_img_ref_target_id = :entity_id""" -# SELECT_ROOT_COLLECTION_TITLE = """SELECT * FROM node WHERE nid = :entity_id""" -# SELECT_ROOT_COLLECTION_METADATA = """SELECT * FROM field_data_field_collection_metadata WHERE entity_id = :entity_id""" -# SELECT_ROOT_COLLECTION_LOCATION = """SELECT * FROM field_data_field_collection_location WHERE entity_id = :entity_id""" -# SELECT_ROOT_COLLECTION_PLANTING = """SELECT * FROM field_data_field_collection_plantation WHERE entity_id = :entity_id""" -# SELECT_ROOT_COLLECTION_HARVEST = """SELECT * FROM field_data_field_collection_harvest WHERE entity_id = :entity_id""" -# SELECT_ROOT_COLLECTION_SOIL_GROUP = """SELECT * FROM field_data_field_collection_soil_group WHERE entity_id = :entity_id""" -# SELECT_ROOT_COLLECTION_SOIL_MOISTURE = """SELECT * FROM field_data_field_collection_soil_moisture WHERE entity_id = :entity_id""" -# SELECT_ROOT_COLLECTION_SOIL_N = """SELECT * FROM field_data_field_collection_soil_nitrogen WHERE entity_id = :entity_id""" -# SELECT_ROOT_COLLECTION_SOIL_P = """SELECT * FROM field_data_field_collection_soil_phosphorus WHERE entity_id = :entity_id""" -# SELECT_ROOT_COLLECTION_SOIL_K = """SELECT * FROM field_data_field_collection_soil_potassium WHERE entity_id = :entity_id""" -# SELECT_ROOT_COLLECTION_PESTICIDES = """SELECT * FROM field_data_field_collection_pesticides WHERE entity_id = :entity_id""" - - -# queries for use with PyMySQL -SELECT_MANAGED_FILE_BY_PATH = """SELECT fid, filename, uri FROM file_managed WHERE uri LIKE %s""" -SELECT_MANAGED_FILE_BY_FID = """SELECT fid, filename, uri FROM file_managed WHERE fid = %s""" -SELECT_ROOT_IMAGE = """SELECT entity_id FROM field_data_field_root_image WHERE field_root_image_fid = %s""" -SELECT_ROOT_COLLECTION = """SELECT entity_id FROM field_data_field_marked_coll_root_img_ref WHERE field_marked_coll_root_img_ref_target_id = %s""" -SELECT_ROOT_COLLECTION_TITLE = """SELECT title, created, changed FROM node WHERE nid = %s""" -SELECT_ROOT_COLLECTION_METADATA = """SELECT field_collection_metadata_first, field_collection_metadata_second FROM field_data_field_collection_metadata WHERE entity_id = %s""" -SELECT_ROOT_COLLECTION_LOCATION = """SELECT field_collection_location_lat, field_collection_location_lng FROM field_data_field_collection_location WHERE entity_id = %s""" -SELECT_ROOT_COLLECTION_PLANTING = """SELECT field_collection_plantation_value FROM field_data_field_collection_plantation WHERE entity_id = %s""" -SELECT_ROOT_COLLECTION_HARVEST = """SELECT field_collection_harvest_value FROM field_data_field_collection_harvest WHERE entity_id = %s""" -SELECT_ROOT_COLLECTION_SOIL_GROUP = """SELECT field_collection_soil_group_tid FROM field_data_field_collection_soil_group WHERE entity_id = %s""" -SELECT_ROOT_COLLECTION_SOIL_MOISTURE = """SELECT field_collection_soil_moisture_value FROM field_data_field_collection_soil_moisture WHERE entity_id = %s""" -SELECT_ROOT_COLLECTION_SOIL_N = """SELECT field_collection_soil_nitrogen_value FROM field_data_field_collection_soil_nitrogen WHERE entity_id = %s""" -SELECT_ROOT_COLLECTION_SOIL_P = """SELECT field_collection_soil_phosphorus_value FROM field_data_field_collection_soil_phosphorus WHERE entity_id = %s""" -SELECT_ROOT_COLLECTION_SOIL_K = """SELECT field_collection_soil_potassium_value FROM field_data_field_collection_soil_potassium WHERE entity_id = %s""" -SELECT_ROOT_COLLECTION_PESTICIDES = """SELECT field_collection_pesticides_value FROM field_data_field_collection_pesticides WHERE entity_id = %s""" +SELECT_MANAGED_FILE_BY_PATH = """SELECT fid, filename, uri FROM file_managed WHERE uri LIKE %s""" +SELECT_MANAGED_FILE_BY_FID = """SELECT fid, filename, uri FROM file_managed WHERE fid = %s""" +SELECT_ROOT_IMAGE = """SELECT entity_id FROM field_data_field_root_image WHERE field_root_image_fid = %s""" +SELECT_ROOT_COLLECTION = """SELECT entity_id FROM field_data_field_marked_coll_root_img_ref WHERE field_marked_coll_root_img_ref_target_id = %s""" +SELECT_ROOT_COLLECTION_TITLE = """SELECT title, created, changed FROM node WHERE nid = %s""" +SELECT_ROOT_COLLECTION_METADATA = """SELECT field_collection_metadata_first, field_collection_metadata_second FROM field_data_field_collection_metadata WHERE entity_id = %s""" +SELECT_ROOT_COLLECTION_LOCATION = """SELECT field_collection_location_lat, field_collection_location_lng FROM field_data_field_collection_location WHERE entity_id = %s""" +SELECT_ROOT_COLLECTION_PLANTING = """SELECT field_collection_plantation_value FROM field_data_field_collection_plantation WHERE entity_id = %s""" +SELECT_ROOT_COLLECTION_HARVEST = """SELECT field_collection_harvest_value FROM field_data_field_collection_harvest WHERE entity_id = %s""" +SELECT_ROOT_COLLECTION_SOIL_GROUP = """SELECT field_collection_soil_group_tid FROM field_data_field_collection_soil_group WHERE entity_id = %s""" +SELECT_ROOT_COLLECTION_SOIL_MOISTURE = """SELECT field_collection_soil_moisture_value FROM field_data_field_collection_soil_moisture WHERE entity_id = %s""" +SELECT_ROOT_COLLECTION_SOIL_N = """SELECT field_collection_soil_nitrogen_value FROM field_data_field_collection_soil_nitrogen WHERE entity_id = %s""" +SELECT_ROOT_COLLECTION_SOIL_P = """SELECT field_collection_soil_phosphorus_value FROM field_data_field_collection_soil_phosphorus WHERE entity_id = %s""" +SELECT_ROOT_COLLECTION_SOIL_K = """SELECT field_collection_soil_potassium_value FROM field_data_field_collection_soil_potassium WHERE entity_id = %s""" +SELECT_ROOT_COLLECTION_PESTICIDES = """SELECT field_collection_pesticides_value FROM field_data_field_collection_pesticides WHERE entity_id = %s""" +SELECT_OUTPUT_FILE = """SELECT entity_id FROM field_data_field_exec_result_file WHERE field_exec_result_file_fid = %s""" +SELECT_OUTPUT_LOG_FILE = """SELECT entity_id FROM field_revision_field_output_log_file WHERE field_exec_result_file_fid = %s""" +SELECT_METADATA_FILE = """SELECT entity_id FROM field_data_field_metadata_file WHERE field_exec_result_file_fid = %s""" @app.task(bind=True) @@ -1004,6 +989,10 @@ def migrate_dirt_datasets(self, username: str): logger.warning(f"Collection {root_collection_path} already exists, aborting DIRT migration for {user.username}") return client.mkdir(root_collection_path) + client.mkdir(join(root_collection_path, 'collections')) + client.mkdir(join(root_collection_path, 'metadata')) + client.mkdir(join(root_collection_path, 'outputs')) + client.mkdir(join(root_collection_path, 'logs')) # check how many files the user has in managed collections in the DIRT database # we want to keep track of progress and update the UI in real time, @@ -1019,19 +1008,58 @@ def migrate_dirt_datasets(self, username: str): cursor.execute(SELECT_MANAGED_FILE_BY_PATH, (storage_path,)) rows = cursor.fetchall() db.close() + + # filter root image files image_files: List[ManagedFile] = [ManagedFile( id=row[0], name=row[1], path=row[2].replace('public://', ''), + type='image', folder=row[2].rpartition('root-images')[2].replace(row[1], '').replace('/', ''), orphan=False, missing=False, uploaded=False) for row in rows if 'root-images' in row[2]] + # filter metadata files + metadata_files: list[ManagedFile] = [ManagedFile( + id=row[0], + name=row[1], + path=row[2].replace('public://', ''), + type='metadata', + folder=row[2].rpartition('metadata-files')[2].replace(row[1], '').replace('/', ''), + orphan=False, + missing=False, + uploaded=False) for row in rows if 'metadata-files' in row[2]] + + # filter output files + output_files: list[ManagedFile] = [ManagedFile( + id=row[0], + name=row[1], + path=row[2].replace('public://', ''), + type='output', + folder=row[2].rpartition('output-files')[2].replace(row[1], '').replace('/', ''), + orphan=False, + missing=False, + uploaded=False) for row in rows if 'output-files' in row[2]] + + # filter output logs + output_logs: list[ManagedFile] = [ManagedFile( + id=row[0], + name=row[1], + path=row[2].replace('public://', ''), + type='logs', + folder=row[2].rpartition('output-logs')[2].replace(row[1], '').replace('/', ''), + orphan=False, + missing=False, + uploaded=False) for row in rows if 'output-logs' in row[2]] + # TODO extract other kinds of managed files # associate each root image with the collection it's a member of too uploads = dict() + metadata = dict() + outputs = dict() + logs = dict() ssh = SSH( host=settings.DIRT_MIGRATION_HOST, @@ -1040,14 +1068,13 @@ def migrate_dirt_datasets(self, username: str): pkey=str(get_user_private_key_path(settings.DIRT_MIGRATION_USERNAME))) with ssh: with ssh.client.open_sftp() as sftp: - # check how many folders the user has on the DIRT server NFS - userdir = join(settings.DIRT_MIGRATION_DATA_DIR, username, 'root-images') - folders = [folder for folder in sftp.listdir(userdir)] - logger.info(f"User {username} has {len(folders)} DIRT folders: {', '.join(folders)}") - - # persist number of folders and files - migration.num_folders = len(folders) - migration.num_files = len(image_files) + userdir = join(settings.DIRT_MIGRATION_DATA_DIR, username) + + # persist number of each kind of managed file + migration.image_files = len(image_files) + migration.metadata_files = len(metadata_files) + migration.output_files = len(output_files) + migration.output_logs = len(output_logs) migration.save() for file in image_files: @@ -1086,14 +1113,14 @@ def migrate_dirt_datasets(self, username: str): logger.warning(f"DIRT root image collection with entity ID {file_entity_id} not found") # create the folder if we need to - subcoll_path = join(root_collection_path, file.folder) + subcoll_path = join(root_collection_path, 'collections', file.folder) if file.folder not in uploads.keys(): uploads[file.folder] = dict() logger.info(f"Creating DIRT migration subcollection {subcoll_path}") client.mkdir(subcoll_path) # download the file - dirt_nfs_path = join(userdir, file.folder, file.name) + dirt_nfs_path = join(userdir, 'root-images', file.folder, file.name) staging_path = join(staging_dir, file.name) logger.info(f"Downloading file from {dirt_nfs_path} to {staging_path}") try: @@ -1106,6 +1133,7 @@ def migrate_dirt_datasets(self, username: str): id=file.id, name=file.name, path=file.path, + type='image', folder=file.folder, orphan=False, missing=True, @@ -1125,6 +1153,7 @@ def migrate_dirt_datasets(self, username: str): id=file.id, name=file.name, path=file.path, + type='image', folder=file.folder, orphan=True, missing=False, @@ -1151,7 +1180,7 @@ def migrate_dirt_datasets(self, username: str): coll_title_row = cursor.fetchone() db.close() coll_title = coll_title_row[0] - coll_path = join(root_collection_path, coll_title) + coll_path = join(root_collection_path, 'collections', coll_title) coll_created = datetime.fromtimestamp(int(coll_title_row[1])) coll_changed = datetime.fromtimestamp(int(coll_title_row[2])) @@ -1227,7 +1256,7 @@ def migrate_dirt_datasets(self, username: str): client.set_metadata(id, props, []) # download the file - dirt_nfs_path = join(userdir, file.folder, file.name) + dirt_nfs_path = join(userdir, 'root-images', file.folder, file.name) staging_path = join(staging_dir, file.name) logger.info(f"Downloading file from {dirt_nfs_path} to {staging_path}") try: @@ -1259,6 +1288,7 @@ def migrate_dirt_datasets(self, username: str): id=file.id, name=file.name, path=file.path, + type='image', folder=coll_title, orphan=False, missing=False, @@ -1272,7 +1302,98 @@ def migrate_dirt_datasets(self, username: str): # TODO attach metadata to the file - # TODO: output files, output logs, output images, metadata files + for file in metadata_files: + # create the folder if we need to + subcoll_path = join(root_collection_path, 'metadata', file.folder) + if file.folder not in metadata.keys(): + metadata[file.folder] = dict() + logger.info(f"Creating DIRT migration subcollection {subcoll_path}") + client.mkdir(subcoll_path) + + # download the file + dirt_nfs_path = join(userdir, 'metadata-files', file.folder, file.name) + staging_path = join(staging_dir, file.name) + logger.info(f"Downloading file from {dirt_nfs_path} to {staging_path}") + try: + sftp.get(dirt_nfs_path, staging_path) + except FileNotFoundError: + logger.warning(f"File {dirt_nfs_path} not found! Skipping") + + # push a progress update to client + metadata[file.folder][file.name] = ManagedFile( + id=file.id, + name=file.name, + path=file.path, + type='metadata', + folder=file.folder, + orphan=False, + missing=True, + uploaded=False) + migration.metadata = json.dumps(metadata) + migration.save() + async_to_sync(push_migration_event)(user, migration) + + for file in output_files: + # create the folder if we need to + subcoll_path = join(root_collection_path, 'outputs', file.folder) + if file.folder not in outputs.keys(): + outputs[file.folder] = dict() + logger.info(f"Creating DIRT migration subcollection {subcoll_path}") + client.mkdir(subcoll_path) + + # download the file + dirt_nfs_path = join(userdir, 'output-files', file.folder, file.name) + staging_path = join(staging_dir, file.name) + logger.info(f"Downloading file from {dirt_nfs_path} to {staging_path}") + try: + sftp.get(dirt_nfs_path, staging_path) + except FileNotFoundError: + logger.warning(f"File {dirt_nfs_path} not found! Skipping") + + # push a progress update to client + outputs[file.folder][file.name] = ManagedFile( + id=file.id, + name=file.name, + path=file.path, + type='output', + folder=file.folder, + orphan=False, + missing=True, + uploaded=False) + migration.outputs = json.dumps(outputs) + migration.save() + async_to_sync(push_migration_event)(user, migration) + + for file in output_logs: + # create the folder if we need to + subcoll_path = join(root_collection_path, 'logs', file.folder) + if file.folder not in outputs.keys(): + logs[file.folder] = dict() + logger.info(f"Creating DIRT migration subcollection {subcoll_path}") + client.mkdir(subcoll_path) + + # download the file + dirt_nfs_path = join(userdir, 'output-logs', file.folder, file.name) + staging_path = join(staging_dir, file.name) + logger.info(f"Downloading file from {dirt_nfs_path} to {staging_path}") + try: + sftp.get(dirt_nfs_path, staging_path) + except FileNotFoundError: + logger.warning(f"File {dirt_nfs_path} not found! Skipping") + + # push a progress update to client + logs[file.folder][file.name] = ManagedFile( + id=file.id, + name=file.name, + path=file.path, + type='log', + folder=file.folder, + orphan=False, + missing=True, + uploaded=False) + migration.logs = json.dumps(logs) + migration.save() + async_to_sync(push_migration_event)(user, migration) # get ID of newly created migration collection add collection timestamp as metadata root_collection_id = client.stat(root_collection_path)['id'] diff --git a/plantit/plantit/queries.py b/plantit/plantit/queries.py index bbf43657..af466619 100644 --- a/plantit/plantit/queries.py +++ b/plantit/plantit/queries.py @@ -756,7 +756,10 @@ def migration_to_dict(migration: Migration) -> dict: 'target_path': migration.target_path, 'num_folders': migration.num_folders, 'num_files': migration.num_files, - 'uploads': json.loads(migration.uploads if migration.uploads is not None else '{}') + 'uploads': json.loads(migration.uploads if migration.uploads is not None else '{}'), + 'metadata': json.loads(migration.metadata if migration.metadata is not None else '{}'), + 'outputs': json.loads(migration.outputs if migration.outputs is not None else '{}'), + 'logs': json.loads(migration.logs if migration.logs is not None else '{}') } diff --git a/plantit/plantit/users/models.py b/plantit/plantit/users/models.py index 4cbd35f8..c19947e9 100644 --- a/plantit/plantit/users/models.py +++ b/plantit/plantit/users/models.py @@ -28,6 +28,8 @@ class Migration(models.Model): started = models.DateTimeField(null=True, blank=True) completed = models.DateTimeField(null=True, blank=True) target_path = models.CharField(max_length=255, null=True, blank=True) - num_folders = models.IntegerField(null=True, blank=True) num_files = models.IntegerField(null=True, blank=True) uploads = models.JSONField(null=True, blank=True) + metadata = models.JSONField(null=True, blank=True) + outputs = models.JSONField(null=True, blank=True) + logs = models.JSONField(null=True, blank=True)