Skip to content

Commit

Permalink
DIRT migration WIP (#301)
Browse files Browse the repository at this point in the history
  • Loading branch information
wpbonelli committed May 15, 2022
1 parent f369154 commit 31d91ef
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 43 deletions.
64 changes: 51 additions & 13 deletions plantit/front_end/src/components/navigation.vue
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,8 @@ export default {
dismissSecs: 10,
dismissCountDown: 0,
maintenanceWindows: [],
// DIRT migration
migrationData: null
};
},
computed: {
Expand Down Expand Up @@ -917,12 +919,18 @@ export default {
return;
}
let begin = Date.now();
console.log(begin);
await Promise.all([
this.loadVersion(),
this.loadProfile(),
this.loadDataModel(),
this.loadMaintenanceWindows(),
]);
let timing = Date.now() - begin;
console.log('Load time: ' + timing + 'ms');
// TODO move websockets to vuex
// connect to this user's event stream, pushed from backend channel
let wsProtocol = location.protocol === 'https:' ? 'wss://' : 'ws://';
Expand All @@ -941,9 +949,29 @@ export default {
},
methods: {
showDirtMigration() {
// TODO
axios
.get(`/apis/v1/users/start_dirt_migration/`)
.then(async (response) => {
await Promise.all([
this.$store.dispatch('user/setDirtMigrationStarted', true),
this.$store.dispatch('alerts/add', {
variant: 'success',
message: `Started DIRT migration (target collection: ${response.data.target_path})`,
guid: guid().toString(),
})
]);
})
.catch((error) => {
Sentry.captureException(error);
this.$store.dispatch('alerts/add', {
variant: 'danger',
message: `Failed to start DIRT migration`,
guid: guid().toString(),
})
if (error.response.status === 500) throw error;
});
},
async loadProfile() {
async loadDataModel() {
// feature flag to toggle between the old/new state loading method
if (process.env.VUE_APP_LOAD_STATE_SEPARATELY) {
await this.$store.dispatch('user/loadProfile');
Expand All @@ -969,8 +997,6 @@ export default {
}
await this.$store.dispatch('user/setProfileLoading', true);
const begin = Date.now();
console.log(begin);
await axios
.get(`/apis/v1/users/get_current/`)
.then((response) => {
Expand Down Expand Up @@ -1092,8 +1118,6 @@ export default {
response.data.projects
);
this.$store.dispatch('projects/setLoading', false);
const timing = Date.now() - begin;
console.log('Load time: ' + timing + 'ms');
})
.catch((error) => {
Expand Down Expand Up @@ -1247,15 +1271,32 @@ export default {
let data = JSON.parse(event.data);
if (data.task !== undefined) {
// task event
await this.handleTask(data.task);
await this.handleTaskEvent(data.task);
} else if (data.notification !== undefined) {
// notification event
await this.handleNotification(data.notification);
await this.handleNotificationEvent(data.notification);
} else if (data.migration !== undefined) {
// DIRT migration status event
await this.handleMigrationEvent(data.migration);
} else {
// TODO: log unrecognized event type
}
},
async handleTask(task) {
async handleMigrationEvent(migration) {
let data = JSON.parse(migration.data);
this.migrationData = data;
// check if completed and update user profile & create an alert if so
let completed = data.completed;
if (completed !== null && completed !== undefined) {
await this.$store.dispatch('user/setDirtMigrationCompleted', true);
}
},
async handleNotificationEvent(notification) {
await this.$store.dispatch('notifications/update', notification);
},
async handleTaskEvent(task) {
await this.$store.dispatch('tasks/addOrUpdate', task);
await this.$store.dispatch('alerts/add', {
variant: 'success',
Expand All @@ -1274,9 +1315,6 @@ export default {
}
return task.status.toUpperCase();
},
async handleNotification(notification) {
await this.$store.dispatch('notifications/update', notification);
},
removeTask(task) {
axios
.get(`/apis/v1/tasks/${task.owner}/${task.name}/delete/`)
Expand Down
14 changes: 14 additions & 0 deletions plantit/front_end/src/store/user.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ export const user = {
projects: [],
hints: false,
stats: null,
dirtMigrationStarted: null,
dirtMigrationCompleted: null
},
profileLoading: true
}),
Expand Down Expand Up @@ -58,6 +60,12 @@ export const user = {
},
setProfileLoading(state, loading) {
state.profileLoading = loading;
},
setDirtMigrationStarted(state, started) {
state.dirtMigrationStarted = started;
},
setDirtMigrationCompleted(state, completed) {
state.dirtMigrationCompleted = completed;
}
},
actions: {
Expand Down Expand Up @@ -170,6 +178,12 @@ export const user = {
setStats({commit}, stats) {
commit('setStats', stats);
},
setDirtMigrationStarted({ commit }, started) {
commit('setDirtMigrationStarted', started);
},
setDirtMigrationCompleted({ commit }, completed) {
commit('setDirtMigrationCompleted', completed);
}
},
getters: {
profile: state => state.profile,
Expand Down
111 changes: 89 additions & 22 deletions plantit/plantit/celery_tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
import traceback
from typing import List, TypedDict, Optional
from os import environ
from os.path import join
from datetime import datetime

from asgiref.sync import async_to_sync
from asgiref.sync import async_to_sync, sync_to_async
from channels.layers import get_channel_layer
from celery import group
from celery.utils.log import get_task_logger
from django.contrib.auth.models import User
Expand Down Expand Up @@ -256,7 +258,7 @@ def share_data(self, guid: str):
if 'input' in options:
input_path = options['input']['path']
if ('/iplant/home/shared' not in input_path and # no need for temporary access if input is public shared dir
input_path != output_path): # skip input permissions if reading and writing from same dir
input_path != output_path): # skip input permissions if reading and writing from same dir
paths.append({
'path': input_path,
'permission': 'write'
Expand Down Expand Up @@ -309,7 +311,8 @@ def submit_jobs(self, guid: str):
pull_id = submit_pull_to_scheduler(task, ssh)
async_to_sync(push_task_channel_event)(task)
job_ids.append(pull_id + ' (inbound transfer)')
else: pull_id = None
else:
pull_id = None

# schedule user workflow and outbound transfer jobs
job_id = submit_job_to_scheduler(task, ssh, pull_id=pull_id)
Expand Down Expand Up @@ -766,13 +769,13 @@ def refresh_all_users_stats():
redis = RedisClient.get()

for user in User.objects.all():
logger.info(f"Computing statistics for {user.username}")
logger.info(f"Computing statistics for {user.username}")

# overall statistics (no need to save result, just trigger reevaluation)
async_to_sync(q.get_user_statistics)(user, True)
# overall statistics (no need to save result, just trigger reevaluation)
async_to_sync(q.get_user_statistics)(user, True)

# timeseries (no need to save result, just trigger reevaluation)
q.get_user_timeseries(user, invalidate=True)
# timeseries (no need to save result, just trigger reevaluation)
q.get_user_timeseries(user, invalidate=True)

logger.info(f"Computing aggregate statistics")
redis.set("stats_counts", json.dumps(q.get_total_counts(True)))
Expand Down Expand Up @@ -882,8 +885,8 @@ def refresh_all_user_cyverse_tokens():
users = [task.user for task in list(tasks)]

if len(users) == 0:
logger.info(f"No users with running tasks, not refreshing CyVerse tokens")
return
logger.info(f"No users with running tasks, not refreshing CyVerse tokens")
return

group([refresh_cyverse_tokens.s(user.username) for user in users])()
logger.info(f"Refreshed CyVerse tokens for {len(users)} user(s)")
Expand All @@ -909,16 +912,42 @@ def agents_healthchecks():
checks_saved = int(settings.AGENTS_HEALTHCHECKS_SAVED)
if length > checks_saved: redis.rpop(f"healthchecks/{agent.name}")
check = {
'timestamp': timezone.now().isoformat(),
'healthy': healthy,
'output': output
}
'timestamp': timezone.now().isoformat(),
'healthy': healthy,
'output': output
}
redis.lpush(f"healthchecks/{agent.name}", json.dumps(check))
finally:
__release_lock(task_name)


# DIRT migration task
# DIRT migration


class DownloadedFile(TypedDict):
path: str


class UploadedFolder(TypedDict):
path: str
id: str


class Migration(TypedDict):
started: datetime
completed: Optional[datetime]
target_path: str
num_folders: Optional[int]
downloads: List[DownloadedFile]
uploads: List[UploadedFolder]


async def push_migration_event(user: User, migration: Migration):
await get_channel_layer().group_send(f"{user.username}", {
'type': 'migration_event',
'migration': migration,
})


@app.task(bind=True)
def migrate_dirt_datasets(self, username: str):
Expand All @@ -930,10 +959,10 @@ def migrate_dirt_datasets(self, username: str):
self.request.callbacks = None
return

# record starting time
start = timezone.now()
# get started time
start = profile.dirt_migration_started

# create SSH/SFTP client
# create SSH/SFTP client and open SFTP connection
ssh = SSH(
host='tucco.cyverse.org',
port=1657,
Expand All @@ -947,6 +976,8 @@ def migrate_dirt_datasets(self, username: str):
new_line = '\n'
logger.info(f"User {username} has {len(datasets)} datasets:{new_line}{new_line.join(datasets)}")

return

# create a client for the CyVerse APIs and create a collection for the migrated DIRT data
client = TerrainClient(access_token=profile.cyverse_access_token)
root_collection_path = f"/iplant/home/{user.username}/dirt_migration"
Expand All @@ -955,15 +986,30 @@ def migrate_dirt_datasets(self, username: str):
return
else: client.mkdir(root_collection_path)

# transfer all the user's datasets to the temporary staging directory on this server
# keep track of progress so we can update the UI in real time
downloads = []
uploads = []

# transfer all the user's datasets to temporary staging directory
for folder in datasets:
files = [f for f in sftp.listdir(join(user_dir, folder))]
folder_name = join(user_dir, folder)
files = [f for f in sftp.listdir(folder_name)]
logger.info(f"User {username} folder {folder} has {len(files)} datasets:{new_line}{new_line.join(files)}")

# download files
for file in files:
sftp.get(file.filename, join(settings.DIRT_STAGING_DIR, folder, file.filename))

# push download status update to UI
downloads.append(DownloadedFile(path=file.filename))
async_to_sync(push_migration_event)(user, Migration(
started=start.isoformat(),
completed=None,
num_folders=len(datasets),
target_path=root_collection_path,
downloads=downloads,
uploads=[]))

# create subcollection for this folder
collection_path = join(root_collection_path, folder.rpartition('/')[2])
if client.dir_exists(collection_path):
Expand All @@ -978,14 +1024,25 @@ def migrate_dirt_datasets(self, username: str):
to_prefix=collection_path)

# get ID of newly created collection
id = client.stat(collection_path)['id']
stat = client.stat(collection_path)
id = stat['id']

# mark collection as originating from DIRT
client.set_metadata(id, [
f"dirt_migration_timestamp={timezone.now().isoformat()}",
# TODO: anything else we need to add here?
])

# push upload status update to UI
uploads.append(UploadedFolder(path=collection_path, id=id))
async_to_sync(push_migration_event)(user, Migration(
started=start.isoformat(),
completed=None,
num_folders=len(datasets),
target_path=root_collection_path,
downloads=downloads,
uploads=uploads))

# get ID of newly created collection
root_collection_id = client.stat(root_collection_path)['id']

Expand All @@ -1004,10 +1061,20 @@ def migrate_dirt_datasets(self, username: str):
{})

# mark user's profile that DIRT transfer has been completed
profile.dirt_migrated = True
end = timezone.now()
profile.dirt_migration_completed = end
profile.save()
user.save()

# push completion update to the UI
async_to_sync(push_migration_event)(user, Migration(
started=start.isoformat(),
completed=end.isoformat(),
num_folders=len(datasets),
target_path=root_collection_path,
downloads=downloads,
uploads=uploads))


# see https://stackoverflow.com/a/41119054/6514033
# `@app.on_after_finalize.connect` is necessary for some reason instead of `@app.on_after_configure.connect`
Expand Down
15 changes: 8 additions & 7 deletions plantit/plantit/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ def push_notification(self, event):
'notification': notification,
}))

# def task_log_event(self, event):
# task = event['task']
# logs = event['logs']
# self.logger.info(f"Sending user {self.username} task {task['name']} log event (status {task['status']}) to client ({len(logs)} lines)")
# self.send(text_data=json.dumps({'logs': logs}))

def task_event(self, event):
task = event['task']
self.logger.info(f"Sending user {self.username} task {task['name']} event (status {task['status']}) to client")
self.send(text_data=json.dumps({'task': task}))
self.send(text_data=json.dumps({'task': task}))

def dirt_migration_event(self, event):
migration = event['migration']
self.logger.info(f"DIRT migration status for user {self.username}: {migration}")
self.send(text_data=json.dumps({
'migration': migration,
}))
Loading

0 comments on commit 31d91ef

Please sign in to comment.