Skip to content

Commit

Permalink
migrate computation results, logs, and metadata files too (#301)
Browse files Browse the repository at this point in the history
  • Loading branch information
wpbonelli committed Jun 5, 2022
1 parent 8de1a1b commit d3dedd8
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 49 deletions.
215 changes: 168 additions & 47 deletions plantit/plantit/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,7 @@ class ManagedFile(NamedTuple):
id: str
name: str
path: str
type: str
folder: str
orphan: bool
missing: bool
Expand All @@ -945,40 +946,24 @@ async def push_migration_event(user: User, migration: Migration):
})


# queries for use with encode.io/databases
# SELECT_MANAGED_FILE_BY_PATH = """SELECT * FROM file_managed WHERE uri LIKE :path"""
# SELECT_MANAGED_FILE_BY_FID = """SELECT * FROM file_managed WHERE fid = :fid"""
# SELECT_ROOT_IMAGE = """SELECT * FROM field_data_field_root_image WHERE field_root_image_fid = :fid"""
# SELECT_ROOT_COLLECTION = """SELECT * FROM field_data_field_marked_coll_root_img_ref WHERE field_marked_coll_root_img_ref_target_id = :entity_id"""
# SELECT_ROOT_COLLECTION_TITLE = """SELECT * FROM node WHERE nid = :entity_id"""
# SELECT_ROOT_COLLECTION_METADATA = """SELECT * FROM field_data_field_collection_metadata WHERE entity_id = :entity_id"""
# SELECT_ROOT_COLLECTION_LOCATION = """SELECT * FROM field_data_field_collection_location WHERE entity_id = :entity_id"""
# SELECT_ROOT_COLLECTION_PLANTING = """SELECT * FROM field_data_field_collection_plantation WHERE entity_id = :entity_id"""
# SELECT_ROOT_COLLECTION_HARVEST = """SELECT * FROM field_data_field_collection_harvest WHERE entity_id = :entity_id"""
# SELECT_ROOT_COLLECTION_SOIL_GROUP = """SELECT * FROM field_data_field_collection_soil_group WHERE entity_id = :entity_id"""
# SELECT_ROOT_COLLECTION_SOIL_MOISTURE = """SELECT * FROM field_data_field_collection_soil_moisture WHERE entity_id = :entity_id"""
# SELECT_ROOT_COLLECTION_SOIL_N = """SELECT * FROM field_data_field_collection_soil_nitrogen WHERE entity_id = :entity_id"""
# SELECT_ROOT_COLLECTION_SOIL_P = """SELECT * FROM field_data_field_collection_soil_phosphorus WHERE entity_id = :entity_id"""
# SELECT_ROOT_COLLECTION_SOIL_K = """SELECT * FROM field_data_field_collection_soil_potassium WHERE entity_id = :entity_id"""
# SELECT_ROOT_COLLECTION_PESTICIDES = """SELECT * FROM field_data_field_collection_pesticides WHERE entity_id = :entity_id"""


# queries for use with PyMySQL
SELECT_MANAGED_FILE_BY_PATH = """SELECT fid, filename, uri FROM file_managed WHERE uri LIKE %s"""
SELECT_MANAGED_FILE_BY_FID = """SELECT fid, filename, uri FROM file_managed WHERE fid = %s"""
SELECT_ROOT_IMAGE = """SELECT entity_id FROM field_data_field_root_image WHERE field_root_image_fid = %s"""
SELECT_ROOT_COLLECTION = """SELECT entity_id FROM field_data_field_marked_coll_root_img_ref WHERE field_marked_coll_root_img_ref_target_id = %s"""
SELECT_ROOT_COLLECTION_TITLE = """SELECT title, created, changed FROM node WHERE nid = %s"""
SELECT_ROOT_COLLECTION_METADATA = """SELECT field_collection_metadata_first, field_collection_metadata_second FROM field_data_field_collection_metadata WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_LOCATION = """SELECT field_collection_location_lat, field_collection_location_lng FROM field_data_field_collection_location WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_PLANTING = """SELECT field_collection_plantation_value FROM field_data_field_collection_plantation WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_HARVEST = """SELECT field_collection_harvest_value FROM field_data_field_collection_harvest WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_SOIL_GROUP = """SELECT field_collection_soil_group_tid FROM field_data_field_collection_soil_group WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_SOIL_MOISTURE = """SELECT field_collection_soil_moisture_value FROM field_data_field_collection_soil_moisture WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_SOIL_N = """SELECT field_collection_soil_nitrogen_value FROM field_data_field_collection_soil_nitrogen WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_SOIL_P = """SELECT field_collection_soil_phosphorus_value FROM field_data_field_collection_soil_phosphorus WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_SOIL_K = """SELECT field_collection_soil_potassium_value FROM field_data_field_collection_soil_potassium WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_PESTICIDES = """SELECT field_collection_pesticides_value FROM field_data_field_collection_pesticides WHERE entity_id = %s"""
SELECT_MANAGED_FILE_BY_PATH = """SELECT fid, filename, uri FROM file_managed WHERE uri LIKE %s"""
SELECT_MANAGED_FILE_BY_FID = """SELECT fid, filename, uri FROM file_managed WHERE fid = %s"""
SELECT_ROOT_IMAGE = """SELECT entity_id FROM field_data_field_root_image WHERE field_root_image_fid = %s"""
SELECT_ROOT_COLLECTION = """SELECT entity_id FROM field_data_field_marked_coll_root_img_ref WHERE field_marked_coll_root_img_ref_target_id = %s"""
SELECT_ROOT_COLLECTION_TITLE = """SELECT title, created, changed FROM node WHERE nid = %s"""
SELECT_ROOT_COLLECTION_METADATA = """SELECT field_collection_metadata_first, field_collection_metadata_second FROM field_data_field_collection_metadata WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_LOCATION = """SELECT field_collection_location_lat, field_collection_location_lng FROM field_data_field_collection_location WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_PLANTING = """SELECT field_collection_plantation_value FROM field_data_field_collection_plantation WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_HARVEST = """SELECT field_collection_harvest_value FROM field_data_field_collection_harvest WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_SOIL_GROUP = """SELECT field_collection_soil_group_tid FROM field_data_field_collection_soil_group WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_SOIL_MOISTURE = """SELECT field_collection_soil_moisture_value FROM field_data_field_collection_soil_moisture WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_SOIL_N = """SELECT field_collection_soil_nitrogen_value FROM field_data_field_collection_soil_nitrogen WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_SOIL_P = """SELECT field_collection_soil_phosphorus_value FROM field_data_field_collection_soil_phosphorus WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_SOIL_K = """SELECT field_collection_soil_potassium_value FROM field_data_field_collection_soil_potassium WHERE entity_id = %s"""
SELECT_ROOT_COLLECTION_PESTICIDES = """SELECT field_collection_pesticides_value FROM field_data_field_collection_pesticides WHERE entity_id = %s"""
SELECT_OUTPUT_FILE = """SELECT entity_id FROM field_data_field_exec_result_file WHERE field_exec_result_file_fid = %s"""
SELECT_OUTPUT_LOG_FILE = """SELECT entity_id FROM field_revision_field_output_log_file WHERE field_exec_result_file_fid = %s"""
SELECT_METADATA_FILE = """SELECT entity_id FROM field_data_field_metadata_file WHERE field_exec_result_file_fid = %s"""


@app.task(bind=True)
Expand All @@ -1004,6 +989,10 @@ def migrate_dirt_datasets(self, username: str):
logger.warning(f"Collection {root_collection_path} already exists, aborting DIRT migration for {user.username}")
return
client.mkdir(root_collection_path)
client.mkdir(join(root_collection_path, 'collections'))
client.mkdir(join(root_collection_path, 'metadata'))
client.mkdir(join(root_collection_path, 'outputs'))
client.mkdir(join(root_collection_path, 'logs'))

# check how many files the user has in managed collections in the DIRT database
# we want to keep track of progress and update the UI in real time,
Expand All @@ -1019,19 +1008,58 @@ def migrate_dirt_datasets(self, username: str):
cursor.execute(SELECT_MANAGED_FILE_BY_PATH, (storage_path,))
rows = cursor.fetchall()
db.close()

# filter root image files
image_files: List[ManagedFile] = [ManagedFile(
id=row[0],
name=row[1],
path=row[2].replace('public://', ''),
type='image',
folder=row[2].rpartition('root-images')[2].replace(row[1], '').replace('/', ''),
orphan=False,
missing=False,
uploaded=False) for row in rows if 'root-images' in row[2]]

# filter metadata files
metadata_files: list[ManagedFile] = [ManagedFile(
id=row[0],
name=row[1],
path=row[2].replace('public://', ''),
type='metadata',
folder=row[2].rpartition('metadata-files')[2].replace(row[1], '').replace('/', ''),
orphan=False,
missing=False,
uploaded=False) for row in rows if 'metadata-files' in row[2]]

# filter output files
output_files: list[ManagedFile] = [ManagedFile(
id=row[0],
name=row[1],
path=row[2].replace('public://', ''),
type='output',
folder=row[2].rpartition('output-files')[2].replace(row[1], '').replace('/', ''),
orphan=False,
missing=False,
uploaded=False) for row in rows if 'output-files' in row[2]]

# filter output logs
output_logs: list[ManagedFile] = [ManagedFile(
id=row[0],
name=row[1],
path=row[2].replace('public://', ''),
type='logs',
folder=row[2].rpartition('output-logs')[2].replace(row[1], '').replace('/', ''),
orphan=False,
missing=False,
uploaded=False) for row in rows if 'output-logs' in row[2]]

# TODO extract other kinds of managed files

# associate each root image with the collection it's a member of too
uploads = dict()
metadata = dict()
outputs = dict()
logs = dict()

ssh = SSH(
host=settings.DIRT_MIGRATION_HOST,
Expand All @@ -1040,14 +1068,13 @@ def migrate_dirt_datasets(self, username: str):
pkey=str(get_user_private_key_path(settings.DIRT_MIGRATION_USERNAME)))
with ssh:
with ssh.client.open_sftp() as sftp:
# check how many folders the user has on the DIRT server NFS
userdir = join(settings.DIRT_MIGRATION_DATA_DIR, username, 'root-images')
folders = [folder for folder in sftp.listdir(userdir)]
logger.info(f"User {username} has {len(folders)} DIRT folders: {', '.join(folders)}")

# persist number of folders and files
migration.num_folders = len(folders)
migration.num_files = len(image_files)
userdir = join(settings.DIRT_MIGRATION_DATA_DIR, username)

# persist number of each kind of managed file
migration.image_files = len(image_files)
migration.metadata_files = len(metadata_files)
migration.output_files = len(output_files)
migration.output_logs = len(output_logs)
migration.save()

for file in image_files:
Expand Down Expand Up @@ -1086,14 +1113,14 @@ def migrate_dirt_datasets(self, username: str):
logger.warning(f"DIRT root image collection with entity ID {file_entity_id} not found")

# create the folder if we need to
subcoll_path = join(root_collection_path, file.folder)
subcoll_path = join(root_collection_path, 'collections', file.folder)
if file.folder not in uploads.keys():
uploads[file.folder] = dict()
logger.info(f"Creating DIRT migration subcollection {subcoll_path}")
client.mkdir(subcoll_path)

# download the file
dirt_nfs_path = join(userdir, file.folder, file.name)
dirt_nfs_path = join(userdir, 'root-images', file.folder, file.name)
staging_path = join(staging_dir, file.name)
logger.info(f"Downloading file from {dirt_nfs_path} to {staging_path}")
try:
Expand All @@ -1106,6 +1133,7 @@ def migrate_dirt_datasets(self, username: str):
id=file.id,
name=file.name,
path=file.path,
type='image',
folder=file.folder,
orphan=False,
missing=True,
Expand All @@ -1125,6 +1153,7 @@ def migrate_dirt_datasets(self, username: str):
id=file.id,
name=file.name,
path=file.path,
type='image',
folder=file.folder,
orphan=True,
missing=False,
Expand All @@ -1151,7 +1180,7 @@ def migrate_dirt_datasets(self, username: str):
coll_title_row = cursor.fetchone()
db.close()
coll_title = coll_title_row[0]
coll_path = join(root_collection_path, coll_title)
coll_path = join(root_collection_path, 'collections', coll_title)
coll_created = datetime.fromtimestamp(int(coll_title_row[1]))
coll_changed = datetime.fromtimestamp(int(coll_title_row[2]))

Expand Down Expand Up @@ -1227,7 +1256,7 @@ def migrate_dirt_datasets(self, username: str):
client.set_metadata(id, props, [])

# download the file
dirt_nfs_path = join(userdir, file.folder, file.name)
dirt_nfs_path = join(userdir, 'root-images', file.folder, file.name)
staging_path = join(staging_dir, file.name)
logger.info(f"Downloading file from {dirt_nfs_path} to {staging_path}")
try:
Expand Down Expand Up @@ -1259,6 +1288,7 @@ def migrate_dirt_datasets(self, username: str):
id=file.id,
name=file.name,
path=file.path,
type='image',
folder=coll_title,
orphan=False,
missing=False,
Expand All @@ -1272,7 +1302,98 @@ def migrate_dirt_datasets(self, username: str):

# TODO attach metadata to the file

# TODO: output files, output logs, output images, metadata files
for file in metadata_files:
# create the folder if we need to
subcoll_path = join(root_collection_path, 'metadata', file.folder)
if file.folder not in metadata.keys():
metadata[file.folder] = dict()
logger.info(f"Creating DIRT migration subcollection {subcoll_path}")
client.mkdir(subcoll_path)

# download the file
dirt_nfs_path = join(userdir, 'metadata-files', file.folder, file.name)
staging_path = join(staging_dir, file.name)
logger.info(f"Downloading file from {dirt_nfs_path} to {staging_path}")
try:
sftp.get(dirt_nfs_path, staging_path)
except FileNotFoundError:
logger.warning(f"File {dirt_nfs_path} not found! Skipping")

# push a progress update to client
metadata[file.folder][file.name] = ManagedFile(
id=file.id,
name=file.name,
path=file.path,
type='metadata',
folder=file.folder,
orphan=False,
missing=True,
uploaded=False)
migration.metadata = json.dumps(metadata)
migration.save()
async_to_sync(push_migration_event)(user, migration)

for file in output_files:
# create the folder if we need to
subcoll_path = join(root_collection_path, 'outputs', file.folder)
if file.folder not in outputs.keys():
outputs[file.folder] = dict()
logger.info(f"Creating DIRT migration subcollection {subcoll_path}")
client.mkdir(subcoll_path)

# download the file
dirt_nfs_path = join(userdir, 'output-files', file.folder, file.name)
staging_path = join(staging_dir, file.name)
logger.info(f"Downloading file from {dirt_nfs_path} to {staging_path}")
try:
sftp.get(dirt_nfs_path, staging_path)
except FileNotFoundError:
logger.warning(f"File {dirt_nfs_path} not found! Skipping")

# push a progress update to client
outputs[file.folder][file.name] = ManagedFile(
id=file.id,
name=file.name,
path=file.path,
type='output',
folder=file.folder,
orphan=False,
missing=True,
uploaded=False)
migration.outputs = json.dumps(outputs)
migration.save()
async_to_sync(push_migration_event)(user, migration)

for file in output_logs:
# create the folder if we need to
subcoll_path = join(root_collection_path, 'logs', file.folder)
if file.folder not in outputs.keys():
logs[file.folder] = dict()
logger.info(f"Creating DIRT migration subcollection {subcoll_path}")
client.mkdir(subcoll_path)

# download the file
dirt_nfs_path = join(userdir, 'output-logs', file.folder, file.name)
staging_path = join(staging_dir, file.name)
logger.info(f"Downloading file from {dirt_nfs_path} to {staging_path}")
try:
sftp.get(dirt_nfs_path, staging_path)
except FileNotFoundError:
logger.warning(f"File {dirt_nfs_path} not found! Skipping")

# push a progress update to client
logs[file.folder][file.name] = ManagedFile(
id=file.id,
name=file.name,
path=file.path,
type='log',
folder=file.folder,
orphan=False,
missing=True,
uploaded=False)
migration.logs = json.dumps(logs)
migration.save()
async_to_sync(push_migration_event)(user, migration)

# get ID of newly created migration collection add collection timestamp as metadata
root_collection_id = client.stat(root_collection_path)['id']
Expand Down
5 changes: 4 additions & 1 deletion plantit/plantit/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,10 @@ def migration_to_dict(migration: Migration) -> dict:
'target_path': migration.target_path,
'num_folders': migration.num_folders,
'num_files': migration.num_files,
'uploads': json.loads(migration.uploads if migration.uploads is not None else '{}')
'uploads': json.loads(migration.uploads if migration.uploads is not None else '{}'),
'metadata': json.loads(migration.metadata if migration.metadata is not None else '{}'),
'outputs': json.loads(migration.outputs if migration.outputs is not None else '{}'),
'logs': json.loads(migration.logs if migration.logs is not None else '{}')
}


Expand Down
4 changes: 3 additions & 1 deletion plantit/plantit/users/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class Migration(models.Model):
started = models.DateTimeField(null=True, blank=True)
completed = models.DateTimeField(null=True, blank=True)
target_path = models.CharField(max_length=255, null=True, blank=True)
num_folders = models.IntegerField(null=True, blank=True)
num_files = models.IntegerField(null=True, blank=True)
uploads = models.JSONField(null=True, blank=True)
metadata = models.JSONField(null=True, blank=True)
outputs = models.JSONField(null=True, blank=True)
logs = models.JSONField(null=True, blank=True)

0 comments on commit d3dedd8

Please sign in to comment.