diff --git a/DESCRIPTION b/DESCRIPTION index 7171a9a0..922e722c 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: raveio Type: Package Title: File-System Toolbox for RAVE Project -Version: 0.9.0.61 +Version: 0.9.0.62 Language: en-US Authors@R: c( person("Zhengjia", "Wang", email = "dipterix.wang@gmail.com", role = c("aut", "cre", "cph")), diff --git a/NAMESPACE b/NAMESPACE index cf2a75ee..671f0496 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -114,7 +114,6 @@ export(as_rave_project) export(as_rave_subject) export(as_rave_unit) export(as_yael_process) -export(auto_process_blackrock) export(backup_file) export(cache_root) export(cache_to_filearray) diff --git a/R/class-pipeline_tools.R b/R/class-pipeline_tools.R index a850fb87..6b800a9c 100644 --- a/R/class-pipeline_tools.R +++ b/R/class-pipeline_tools.R @@ -415,13 +415,15 @@ PipelineTools <- R6::R6Class( #' @description fork (copy) the current pipeline to a new directory #' @param path path to the new pipeline, a folder will be created there - #' @param filter_pattern file pattern to copy + #' @param policy fork policy defined by module author, see text file + #' 'fork-policy' under the pipeline directory; if missing, then default to + #' avoid copying \code{main.html} and \code{shared} folder #' @returns A new pipeline object based on the path given - fork = function(path, filter_pattern = PIPELINE_FORK_PATTERN) { + fork = function(path, policy = "default") { pipeline_fork( src = self$pipeline_path, dest = path, - filter_pattern = filter_pattern, + policy = policy, activate = FALSE ) pipeline( diff --git a/R/class-watch-dog.R b/R/class-watch-dog.R index b0dfeb6c..2fa7762d 100644 --- a/R/class-watch-dog.R +++ b/R/class-watch-dog.R @@ -1,739 +1,739 @@ -RAVEWatchDog <- R6::R6Class( - classname = "RAVEWatchDog", - portable = TRUE, - private = list( - pipeline_names = c( - "import_lfp_native", - "notch_filter", - "wavelet_module", - 'reference_module' - ), - .raw_path = character(0), - .job_name = character(0), - .watch_path = character(0), - .time_threshold = NULL, - .project_name = character(0), - .file_pattern = "^([a-zA-Z0-9]+)_datafile_([a-zA-Z0-9]+)\\.nev$", - - .registry_cache = NULL, - .set_status = function(item) { - stopifnot( - is.data.frame(item) && nrow(item) == 1 && - setequal(names(item), c("Filename", "Subject", "Block", - "Status", "Details", "LastModified", - "TaskStarted", "TaskEnded", "Directory")) - ) - registry <- self$load_registry(update = TRUE) - sel <- registry$Filename == item$Filename - if(length(sel) && any(sel)) { - registry <- registry[!sel, ] - } - # Directly binding two items will result in error (with NA) - item$LastModified <- as.POSIXlt(item$LastModified) - item$TaskStarted <- as.POSIXlt(item$TaskStarted) - item$TaskEnded <- as.POSIXlt(item$TaskEnded) - registry <- rbind(item, registry) - registry <- registry[order(registry$TaskStarted, na.last = TRUE, decreasing = TRUE), ] - private$.registry_cache <- registry - - # save to csv - registry$LastModified <- strftime(registry$LastModified) - registry$TaskStarted <- strftime(registry$TaskStarted) - registry$TaskEnded <- strftime(registry$TaskEnded) - dir_create2(dirname(self$registry_path)) - safe_write_csv(registry, file = self$registry_path, row.names = FALSE, quiet = TRUE) - }, - - .get_status = function(file, update = FALSE) { - stopifnot(length(file) == 1 && file.exists(file.path(private$.watch_path, file))) - - mtime <- file.mtime(file.path(private$.watch_path, file)) - - # get subject code, block ID - fname <- filenames(file) - m <- gregexec(self$file_pattern, fname, ignore.case = TRUE, useBytes = TRUE)[[1]] - ml <- attr(m, "match.length") - - error_item <- data.frame( - Filename = file, - Subject = NA, - Block = NA, - Status = "parse-error", - Details = "Cannot parse subject code or block ID", - LastModified = mtime, - TaskStarted = NA, - TaskEnded = NA, - Directory = private$.watch_path - ) - if(length(m) != 3) { - return(error_item) - } - - subject_code <- substr(fname, m[[2]], m[[2]] + ml[[2]] - 1) - block <- substr(fname, m[[3]], m[[3]] + ml[[3]] - 1) - if(!nchar(subject_code) || !nchar(block)) { - return(error_item) - } - - if(startsWith(block, "0")) { - block <- sprintf("block%s", block) - } - - registry <- self$load_registry(update = update) - if(file %in% registry$Filename) { - sel <- which(registry$Filename == file) - item <- registry[sel[[1]], ] - item$LastModified <- mtime - } else { - item <- data.frame( - Filename = file, - Subject = subject_code, - Block = block, - Status = "ready", - Details = "Just found the subject", - LastModified = mtime, - TaskStarted = NA, - TaskEnded = NA, - Directory = private$.watch_path - ) - } - item - - }, - - .queue = NULL, - .jobs = NULL, - .max_jobs = numeric(0L) - ), - public = list( - - cache_path = character(0), - - NSType_LFP = "ns3", - NSType_stimuli = "ns5", - - initialize = function(watch_path, job_name = "RAVEWatchDog"){ - if(!grepl("^[a-zA-Z0-9-]+$", job_name)) { - stop("Watch dog name can only contain letters, digits, and dash [-].") - } - private$.job_name <- job_name - private$.watch_path <- normalizePath(watch_path, mustWork = FALSE) - - # Default values - private$.max_jobs <- 1L - private$.jobs <- dipsaus::fastmap2() - - # We do not allow users to change raw_data_dir once created - private$.raw_path <- raveio_getopt("raw_data_dir") - - # Automatically set cache path, it can be changed - self$cache_path <- file.path(cache_root(), "_automation_", job_name) - - dir_create2(self$log_path) - }, - - load_registry = function(update = TRUE) { - if(!is.data.frame(private$.registry_cache)) { - update <- TRUE - } - - if(update) { - if(file.exists(self$registry_path)) { - tbl <- utils::read.csv(self$registry_path, header = TRUE, - colClasses = "character") - colnms <- c("Filename", "Subject", "Block", - "Status", "Details", "LastModified", - "TaskStarted", "TaskEnded", "Directory") - if(all(colnms %in% names(tbl))) { - tbl <- tbl[, colnms] - tbl$LastModified <- as.POSIXlt(tbl$LastModified) - tbl$TaskStarted <- as.POSIXlt(tbl$TaskStarted) - tbl$TaskEnded <- as.POSIXlt(tbl$TaskEnded) - # re-order - tbl <- tbl[order(tbl$TaskStarted, decreasing = TRUE),] - private$.registry_cache <- tbl - return(tbl) - } - } - private$.registry_cache <- data.frame( - Filename = character(0L), - Subject = character(0L), - Block = character(0L), - Status = character(0L), - Details = character(0L), - LastModified = as.POSIXlt(character(0L)), - TaskStarted = as.POSIXlt(character(0L)), - TaskEnded = as.POSIXlt(character(0L)), - Directory = character(0L) - ) - } - - private$.registry_cache - - }, - - check_file_registry = function() { - - # check the watch path - fs <- list.files( - private$.watch_path, - all.files = FALSE, - full.names = FALSE, - include.dirs = FALSE, - recursive = TRUE - ) - fs <- fs[grepl(self$file_pattern, filenames(fs), ignore.case = TRUE)] - - if(!length(fs)) { return(character(0L)) } - - mtime <- file.mtime(file.path(private$.watch_path, fs)) - sel <- mtime >= self$time_threshold - - if(!any(sel)) { return(character(0L)) } - - fs <- fs[sel] - - registry <- self$load_registry() - - ignored_files <- registry$Filename[!registry$Status %in% c("queued")] - - fs <- fs[!fs %in% ignored_files] - - return(fs) - - }, - - add_to_queue = function(files, force = FALSE) { - files <- unique(files) - files <- files[!is.na(files)] - - if(!length(files)) { - return(length(private$.queue)) - } - - if(length(files) > 1) { - lapply(files, function(file) { - self$add_to_queue(file) - }) - return(length(private$.queue)) - } - - # extract information - file <- files - - tryCatch({ - item <- private$.get_status(file) - - if(force && item$Status == "running") { - stop("A process is working on the block. Only one process can work on a block at a time.") - } - - if(item$Status == "parse-error") { - private$.set_status(item) - stop(item$Details) - } - - if(force || item$Status == "ready") { - item$Details <- "" - item$Status <- "queued" - private$.set_status(item) - if(force) { - private$.queue <- unique(c(file, private$.queue)) - catgl("File [{file}] prepended to the task queue", level = "INFO") - } else { - private$.queue <- unique(c(private$.queue, file)) - catgl("File [{file}] appended to the task queue", level = "INFO") - } - } else if (item$Status == "queued") { - if(!file %in% private$.queue) { - private$.queue <- unique(c(private$.queue, file)) - } - } - }, error = function(e) { - catgl("Cannot add file [{file}] to queue. Reason: ", - e$message, level = "ERROR") - }) - - - return(length(private$.queue)) - - }, - - check_job_status = function() { - jobs <- private$.jobs - nms <- names(jobs) - lapply(nms, function(file) { - check <- jobs[[file]] - if(!is.function(check)) { - private$.queue <- private$.queue[!private$.queue %in% file] - jobs[["@remove"]](file) - catgl("Cannot find process handler of file [{file}]. Removed from queue", level = "WARNING") - return() - } - code <- check() - if(code == 0) { - item <- private$.get_status(file) - item$Status <- "finished" - item$Details <- "" - item$TaskEnded <- as.POSIXlt(Sys.time()) - private$.set_status(item) - # remove from queue - private$.queue <- private$.queue[!private$.queue %in% file] - jobs[["@remove"]](file) - catgl("File [{file}] finished. Removed from queue", level = "INFO") - return() - } - if(code < 0) { - item <- private$.get_status(file) - item$Status <- "errored" - item$Details <- paste(attr(code, "rs_exec_error"), collapse = "") - item$TaskEnded <- as.POSIXlt(Sys.time()) - private$.set_status(item) - # remove from queue - private$.queue <- private$.queue[!private$.queue %in% file] - jobs[["@remove"]](file) - catgl("File [{file}] errored (reason: {item$Details}). Removed from queue", level = "ERROR") - return() - } - }) - length(jobs) - }, - - get_pipeline_default_settings = function() { - re <- lapply(private$pipeline_names, function(pname) { - pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines")) - settings <- dipsaus::list_to_fastmap2(pipeline$get_settings()) - settings[["@remove"]](c( - "import_setup__project_name", - "import_setup__subject_code", - "force_import", - "skip_validation", - "import_channels__sample_rate", - "import_channels__electrodes", - "import_channels__electrode_file", - "import_blocks__format", - "import_blocks__session_block", - "project_name", - "subject_code", - "electrode_group", - "changes" - )) - if(pname == "notch_filter") { - settings$diagnostic_plot_params$path <- NULL - } - as.list(settings, sorted = TRUE) - }) - names(re) <- private$pipeline_names - re - }, - - create_settings_file = function(overwrite = FALSE) { - settings_path <- file.path(self$log_path, "settings.yaml") - if(!overwrite && file.exists(settings_path)) { - stop("Existing settings file already created. If you want to overwrite that file, use `overwrite=TRUE`") - } - backup_file(settings_path, remove = FALSE) - save_yaml(self$get_pipeline_default_settings(), file = settings_path) - catgl("A settings file has been created at [{settings_path}]", level = "INFO") - settings_path - }, - - get_pipeline_settings = function(pname, file, brfile) { - item <- private$.get_status(file) - - # load blackrock file - electrode_table <- brfile$electrode_table - electrodes <- electrode_table$Electrode[electrode_table$NSType == self$NSType_LFP] - - # load pipeline - pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines")) - settings <- dipsaus::list_to_fastmap2(pipeline$get_settings()) - - # override user-defined settings - settings_path <- file.path(self$log_path, "settings.yaml") - if(file.exists(settings_path)) { - tmp <- load_yaml(settings_path) - dipsaus::list_to_fastmap2(as.list(tmp[[pname]]), settings) - } - - # pipeline-specific settings - subject_code <- sprintf("%s__%s", item$Subject, item$Block) - - switch ( - pname, - "import_lfp_native" = { - settings$import_setup__project_name <- self$project_name - settings$import_setup__subject_code <- subject_code - settings$force_import <- TRUE - settings$skip_validation <- FALSE - srate <- brfile$sample_rates[[self$NSType_LFP]] - settings$import_channels__sample_rate <- srate - settings$import_channels__electrodes <- dipsaus::deparse_svec(electrodes) - settings$import_channels__electrode_file <- "auto" - settings$import_blocks__format <- names(IMPORT_FORMATS)[unlist(IMPORT_FORMATS) == 'native_blackrock'] - settings$import_blocks__session_block <- item$Block - }, - "notch_filter" = { - graph_path <- file.path(private$.raw_path, item$Subject, item$Block, "notch-diagnostic-plots") - settings$project_name <- self$project_name - settings$subject_code <- subject_code - - graph_path <- dir_create2(graph_path) - settings$diagnostic_plot_params$path <- file.path(graph_path, "notch-diagnostic-plots.pdf") - }, - "wavelet_module" = { - settings$project_name <- self$project_name - settings$subject_code <- subject_code - }, - "reference_module" = { - settings$project_name <- self$project_name - settings$subject_code <- subject_code - settings$reference_name <- "[new reference]" - }, - {} - ) - - return(settings) - - }, - - process_file = function(file) { - - item <- private$.get_status(file) - brfile <- BlackrockFile$new(path = file.path(private$.watch_path, file), block = item$Block) - - # prepare working directory - workdir <- file.path(self$cache_path, paste0(file, ".workdir")) - if(dir.exists(workdir)) { - unlink(workdir, force = TRUE, recursive = TRUE) - } - workdir <- dir_create2(workdir) - - - # copy pipelines - for(pname in private$pipeline_names) { - pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines")) - dest <- file.path(workdir, "pipelines", pipeline$pipeline_name) - pipeline_fork( - src = pipeline$pipeline_path, - dest = dest, - activate = FALSE - ) - pipeline <- pipeline(pname, paths = file.path(workdir, "pipelines")) - settings <- self$get_pipeline_settings(pname = pname, file = file, brfile = brfile) - pipeline$set_settings(.list = settings) - catgl("Set pipeline [{pname}]:", level = "DEFAULT") - save_yaml(settings, file = stdout()) - } - - # copy files - block_path <- file.path(private$.raw_path, item$Subject, item$Block) - dir_create2(block_path) - fs <- paste0(brfile$base_path, c(".nev", ".ccf", paste0(".ns", 1:9))) - fs <- fs[file.exists(fs)] - for(f in fs) { - file.copy(f, file.path(block_path, basename(f)), overwrite = TRUE, recursive = FALSE, copy.date = TRUE) - } - - fake_path <- file.path(private$.raw_path, sprintf("%s__%s", item$Subject, item$Block)) - fake_path <- dir_create2(fake_path) - if(!file.exists(file.path(fake_path, item$Block))) { - if(dipsaus::get_os() == "windows") { - file.copy(block_path, to = fake_path, recursive = TRUE, copy.date = TRUE) - } else { - file.symlink(block_path, to = fake_path) - } - } - - # Make sure the subject surface files can be loaded properly? - if(!file.exists(file.path(fake_path, 'rave-imaging'))) { - # check if original subject has the fs recon - imaging_path_orig <- file.path(private$.raw_path, item$Subject, 'rave-imaging') - if(file.exists(imaging_path_orig)) { - if(dipsaus::get_os() == "windows" || !dir.exists(imaging_path_orig)) { - # On windows, symlink does not work well so just copy - # On Unix, if rave-imaging is a symlink, then R (4.0) will treat - # the path as a file. Just copy over - file.copy(imaging_path_orig, to = fake_path, - recursive = TRUE, copy.date = TRUE) - } else { - file.symlink(imaging_path_orig, to = fake_path) - } - } - } - - # set to running - catgl("Start processing [{file}]", level = "INFO") - item$Status <- "running" - item$Details <- "" - item$TaskStarted <- as.POSIXlt(Sys.time()) - item$TaskEnded <- as.POSIXlt(NA) - private$.set_status(item) - private$.jobs[[file]] <- dipsaus::rs_exec( - name = file, - focus_on_console = TRUE, - rs = TRUE, - wait = FALSE, - quoted = TRUE, - nested_ok = TRUE, - expr = bquote({ - - workdir <- .(workdir) - cwd <- getwd() - - setwd(workdir) - on.exit({ setwd(cwd) }, add = TRUE, after = FALSE) - raveio <- asNamespace('raveio') - - if(dipsaus::package_installed('ravedash')){ - ravedash <- do.call('asNamespace', list('ravedash')) - ravedash$set_logger_path(root_path = .(self$log_path), max_files = 10L) - ravedash$logger_threshold("trace", type = 'file') - ravedash$logger_threshold("trace", type = 'console') - } else { - ravedash <- NULL - } - blackrock_src <- .(file) - - pname <- "import_lfp_native" - pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines")) - raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO") - pipeline$run(async = FALSE, as_promise = FALSE, - scheduler = "none", type = "smart") - raveio$catgl("[{blackrock_src}]: [{pname}] finished", level = "INFO") - pname <- "notch_filter" - pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines")) - raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO") - pipeline$run(names = "apply_notch", async = FALSE, as_promise = FALSE, - scheduler = "none", type = "smart") - - pname <- "wavelet_module" - pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines")) - raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO") - pipeline$run(async = FALSE, as_promise = FALSE, - scheduler = "none", type = "smart") - - subject <- pipeline$read("subject") - - # generate reference if exists - pname <- "reference_module" - pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines")) - raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] (reference_table_initial) at [{pipeline$pipeline_path}]", level = "INFO") - # check subject's localization - elec_path <- .(file.path(private$.raw_path, item$Subject, "rave-imaging", "electrodes.csv")) - if(!file.exists(elec_path)) { - elec_path <- .(file.path(raveio_getopt("data_dir"), private$.project_name, item$Subject, "rave", "meta", "electrodes.csv")) - } - - if(!file.exists(elec_path)) { - # list all projects, try to find - all_projects <- raveio$get_projects(refresh = TRUE) - elec_path <- file.path(raveio_getopt("data_dir"), all_projects, - .(item$Subject), "rave", "meta", "electrodes.csv") - elec_path <- elec_path[!is.na(elec_path) & file.exists(elec_path)] - } - if(length(elec_path)) { - elec_path <- elec_path[[1]] - } - - if(length(elec_path) == 1 && !is.na(elec_path) && file.exists(elec_path)) { - tryCatch({ - elec_path <- elec_path[[1]] - elec_table <- utils::read.csv(elec_path) - elec_table$Electrode <- as.integer(elec_table$Electrode) - if(length(elec_table$Electrode) == length(subject$electrodes)) { - o <- order(elec_table$Electrode) - elec_table <- elec_table[o, ] - elec_table$Electrode <- sort(subject$electrodes) - } - - raveio$safe_write_csv(elec_table, file.path(subject$meta_path, "electrodes.csv"), - row.names = FALSE) - - }, error = function(e) { - if(is.environment(ravedash)) { - ravedash$logger_error_condition(e, level = "warning") - } else { - warning(e) - } - }) - } - pipeline$run(names = "reference_table_initial", - async = FALSE, as_promise = FALSE, - scheduler = "none", type = "smart") - unsaved_meta <- file.path(subject$meta_path, "reference__unsaved.csv") - target_meta <- file.path(subject$meta_path, "reference_auto_generated.csv") - if(file.exists(unsaved_meta) && !file.exists(target_meta)) { - file.copy(unsaved_meta, target_meta, overwrite = TRUE) - } - - # make subject backward-compatible - raveio$catgl("[{blackrock_src}]: Making data format backward-compatible", level = "INFO") - raveio$rave_subject_format_conversion(subject$subject_id) - raveio$catgl("[{blackrock_src}]: Done", evel = "INFO") - - }) - ) - - }, - - scan = function() { - - files <- self$check_file_registry() - self$add_to_queue(files) - - # check job status - njobs <- self$check_job_status() - inactives <- private$.queue[!private$.queue %in% names(private$.jobs)] - - if(length(inactives) && njobs < private$.max_jobs) { - # schedule job - navails <- private$.max_jobs - njobs - inactives <- inactives[seq_len(min(navails, length(inactives)))] - for(file in inactives) { - self$process_file(file) - } - } - - }, - - reset_registry = function() { - if(!interactive()) { - stop("Cannot reset registry in non-interactive mode") - } - ans <- dipsaus::ask_yesno(sprintf("Clearing registry for [%s]?", private$.job_name)) - if(isTRUE(ans)) { - unlink(self$registry_path, force = TRUE) - } - }, - - watch = function(interval = 5) { - interval <- as.numeric(interval) - if(!isTRUE(interval >= 1)) { - stop("Min interval must be >= 1 seconds") - } - - if(dipsaus::package_installed('ravedash')){ - ravedash <- do.call('asNamespace', list('ravedash')) - ravedash$set_logger_path(root_path = self$log_path, max_files = 10L) - ravedash$logger_threshold("trace", type = 'file') - ravedash$logger_threshold("trace", type = 'console') - } else { - ravedash <- NULL - } - - on.exit({ - if(is.environment(ravedash)) { - ravedash$set_logger_path(NULL) - } - }, add = TRUE, after = TRUE) - - # make sure directories are there - dir_create2(self$log_path) - dir_create2(self$cache_path) - - while(TRUE) { - tryCatch({ - self$scan() - }, error = function(e) { - catgl("Error raised while the master process rescans/schedules tasks. Reason: {paste(e$message, collapse = '\n')}\nWill try again later", - level = "ERROR") - }) - Sys.sleep(interval) - } - } - - ), - active = list( - - log_path = function() { - file.path(private$.raw_path, "_automation", private$.job_name) - }, - - registry_path = function() { - file.path(self$log_path, "registry.csv") - }, - - time_threshold = function(v) { - if(!missing(v)) { - - if(length(v) != 1 || is.na(v)) { - stop("Cannot set time threshold with invalid time") - } - tm <- v - if(is.character(tm)) { - v <- as.POSIXlt(tm) - if(is.na(v)) { - stop("`time_threshold` must have format of [year-month-day hour:minute:second]. For example, '2022-08-03 16:38:00'") - } - } else if (!inherits(tm, "POSIXlt")) { - stop("`time_threshold` must be characters or a `POSIXlt` time object") - } - - private$.time_threshold <- v - - } - private$.time_threshold - }, - - project_name = function(v) { - if(!missing(v)) { - if(!length(v)) { - private$.project_name <- character(0) - } else if(length(v) > 1) { - stop("Project name must have length of 1") - } else if(!grepl("^[a-zA-Z0-9_]", v)) { - stop("Project name can only contain letters, digits, and underscore [_]") - } else { - private$.project_name <- as.character(v) - } - } - pn <- private$.project_name - if(!length(pn)) { - pn <- "automated" - } - pn - }, - - file_pattern = function(v) { - if(!missing(v)) { - v <- v[[1]] - m <- gregexpr("(\\([^\\(\\)]+\\))", v, ignore.case = TRUE) - m <- unique(m[[1]]) - if(length(m) < 2) { - stop("File pattern must be a regular expression containing at least two keyword extractors so I can decide the subject code and session block ID. For example, regular expression ['^([a-zA-Z0-9]+)_datafile_([a-zA-Z0-9]+)\\.nev$'] matches [YAB_datafile_001.nev]. RAVE will set subject code to [YAB], and block ID as [001].") - } - private$.file_pattern <- v - } - private$.file_pattern - }, - - queued = function() { - private$.queue - }, - - max_jobs = function(v) { - if(!missing(v)) { - errored <- TRUE - if(length(v) == 1) { - v <- as.integer(v) - if(isTRUE(v > 0)) { - private$.max_jobs <- v - errored <- FALSE - } - } - - if(errored) { - stop("Cannot set `max_jobs`, the value must be a positive integer") - } - - } - private$.max_jobs - } - - ) -) - +# RAVEWatchDog <- R6::R6Class( +# classname = "RAVEWatchDog", +# portable = TRUE, +# private = list( +# pipeline_names = c( +# "import_lfp_native", +# "notch_filter", +# "wavelet_module", +# 'reference_module' +# ), +# .raw_path = character(0), +# .job_name = character(0), +# .watch_path = character(0), +# .time_threshold = NULL, +# .project_name = character(0), +# .file_pattern = "^([a-zA-Z0-9]+)_datafile_([a-zA-Z0-9]+)\\.nev$", +# +# .registry_cache = NULL, +# .set_status = function(item) { +# stopifnot( +# is.data.frame(item) && nrow(item) == 1 && +# setequal(names(item), c("Filename", "Subject", "Block", +# "Status", "Details", "LastModified", +# "TaskStarted", "TaskEnded", "Directory")) +# ) +# registry <- self$load_registry(update = TRUE) +# sel <- registry$Filename == item$Filename +# if(length(sel) && any(sel)) { +# registry <- registry[!sel, ] +# } +# # Directly binding two items will result in error (with NA) +# item$LastModified <- as.POSIXlt(item$LastModified) +# item$TaskStarted <- as.POSIXlt(item$TaskStarted) +# item$TaskEnded <- as.POSIXlt(item$TaskEnded) +# registry <- rbind(item, registry) +# registry <- registry[order(registry$TaskStarted, na.last = TRUE, decreasing = TRUE), ] +# private$.registry_cache <- registry +# +# # save to csv +# registry$LastModified <- strftime(registry$LastModified) +# registry$TaskStarted <- strftime(registry$TaskStarted) +# registry$TaskEnded <- strftime(registry$TaskEnded) +# dir_create2(dirname(self$registry_path)) +# safe_write_csv(registry, file = self$registry_path, row.names = FALSE, quiet = TRUE) +# }, +# +# .get_status = function(file, update = FALSE) { +# stopifnot(length(file) == 1 && file.exists(file.path(private$.watch_path, file))) +# +# mtime <- file.mtime(file.path(private$.watch_path, file)) +# +# # get subject code, block ID +# fname <- filenames(file) +# m <- gregexec(self$file_pattern, fname, ignore.case = TRUE, useBytes = TRUE)[[1]] +# ml <- attr(m, "match.length") +# +# error_item <- data.frame( +# Filename = file, +# Subject = NA, +# Block = NA, +# Status = "parse-error", +# Details = "Cannot parse subject code or block ID", +# LastModified = mtime, +# TaskStarted = NA, +# TaskEnded = NA, +# Directory = private$.watch_path +# ) +# if(length(m) != 3) { +# return(error_item) +# } +# +# subject_code <- substr(fname, m[[2]], m[[2]] + ml[[2]] - 1) +# block <- substr(fname, m[[3]], m[[3]] + ml[[3]] - 1) +# if(!nchar(subject_code) || !nchar(block)) { +# return(error_item) +# } +# +# if(startsWith(block, "0")) { +# block <- sprintf("block%s", block) +# } +# +# registry <- self$load_registry(update = update) +# if(file %in% registry$Filename) { +# sel <- which(registry$Filename == file) +# item <- registry[sel[[1]], ] +# item$LastModified <- mtime +# } else { +# item <- data.frame( +# Filename = file, +# Subject = subject_code, +# Block = block, +# Status = "ready", +# Details = "Just found the subject", +# LastModified = mtime, +# TaskStarted = NA, +# TaskEnded = NA, +# Directory = private$.watch_path +# ) +# } +# item +# +# }, +# +# .queue = NULL, +# .jobs = NULL, +# .max_jobs = numeric(0L) +# ), +# public = list( +# +# cache_path = character(0), +# +# NSType_LFP = "ns3", +# NSType_stimuli = "ns5", +# +# initialize = function(watch_path, job_name = "RAVEWatchDog"){ +# if(!grepl("^[a-zA-Z0-9-]+$", job_name)) { +# stop("Watch dog name can only contain letters, digits, and dash [-].") +# } +# private$.job_name <- job_name +# private$.watch_path <- normalizePath(watch_path, mustWork = FALSE) +# +# # Default values +# private$.max_jobs <- 1L +# private$.jobs <- dipsaus::fastmap2() +# +# # We do not allow users to change raw_data_dir once created +# private$.raw_path <- raveio_getopt("raw_data_dir") +# +# # Automatically set cache path, it can be changed +# self$cache_path <- file.path(cache_root(), "_automation_", job_name) +# +# dir_create2(self$log_path) +# }, +# +# load_registry = function(update = TRUE) { +# if(!is.data.frame(private$.registry_cache)) { +# update <- TRUE +# } +# +# if(update) { +# if(file.exists(self$registry_path)) { +# tbl <- utils::read.csv(self$registry_path, header = TRUE, +# colClasses = "character") +# colnms <- c("Filename", "Subject", "Block", +# "Status", "Details", "LastModified", +# "TaskStarted", "TaskEnded", "Directory") +# if(all(colnms %in% names(tbl))) { +# tbl <- tbl[, colnms] +# tbl$LastModified <- as.POSIXlt(tbl$LastModified) +# tbl$TaskStarted <- as.POSIXlt(tbl$TaskStarted) +# tbl$TaskEnded <- as.POSIXlt(tbl$TaskEnded) +# # re-order +# tbl <- tbl[order(tbl$TaskStarted, decreasing = TRUE),] +# private$.registry_cache <- tbl +# return(tbl) +# } +# } +# private$.registry_cache <- data.frame( +# Filename = character(0L), +# Subject = character(0L), +# Block = character(0L), +# Status = character(0L), +# Details = character(0L), +# LastModified = as.POSIXlt(character(0L)), +# TaskStarted = as.POSIXlt(character(0L)), +# TaskEnded = as.POSIXlt(character(0L)), +# Directory = character(0L) +# ) +# } +# +# private$.registry_cache +# +# }, +# +# check_file_registry = function() { +# +# # check the watch path +# fs <- list.files( +# private$.watch_path, +# all.files = FALSE, +# full.names = FALSE, +# include.dirs = FALSE, +# recursive = TRUE +# ) +# fs <- fs[grepl(self$file_pattern, filenames(fs), ignore.case = TRUE)] +# +# if(!length(fs)) { return(character(0L)) } +# +# mtime <- file.mtime(file.path(private$.watch_path, fs)) +# sel <- mtime >= self$time_threshold +# +# if(!any(sel)) { return(character(0L)) } +# +# fs <- fs[sel] +# +# registry <- self$load_registry() +# +# ignored_files <- registry$Filename[!registry$Status %in% c("queued")] +# +# fs <- fs[!fs %in% ignored_files] +# +# return(fs) +# +# }, +# +# add_to_queue = function(files, force = FALSE) { +# files <- unique(files) +# files <- files[!is.na(files)] +# +# if(!length(files)) { +# return(length(private$.queue)) +# } +# +# if(length(files) > 1) { +# lapply(files, function(file) { +# self$add_to_queue(file) +# }) +# return(length(private$.queue)) +# } +# +# # extract information +# file <- files +# +# tryCatch({ +# item <- private$.get_status(file) +# +# if(force && item$Status == "running") { +# stop("A process is working on the block. Only one process can work on a block at a time.") +# } +# +# if(item$Status == "parse-error") { +# private$.set_status(item) +# stop(item$Details) +# } +# +# if(force || item$Status == "ready") { +# item$Details <- "" +# item$Status <- "queued" +# private$.set_status(item) +# if(force) { +# private$.queue <- unique(c(file, private$.queue)) +# catgl("File [{file}] prepended to the task queue", level = "INFO") +# } else { +# private$.queue <- unique(c(private$.queue, file)) +# catgl("File [{file}] appended to the task queue", level = "INFO") +# } +# } else if (item$Status == "queued") { +# if(!file %in% private$.queue) { +# private$.queue <- unique(c(private$.queue, file)) +# } +# } +# }, error = function(e) { +# catgl("Cannot add file [{file}] to queue. Reason: ", +# e$message, level = "ERROR") +# }) +# +# +# return(length(private$.queue)) +# +# }, +# +# check_job_status = function() { +# jobs <- private$.jobs +# nms <- names(jobs) +# lapply(nms, function(file) { +# check <- jobs[[file]] +# if(!is.function(check)) { +# private$.queue <- private$.queue[!private$.queue %in% file] +# jobs[["@remove"]](file) +# catgl("Cannot find process handler of file [{file}]. Removed from queue", level = "WARNING") +# return() +# } +# code <- check() +# if(code == 0) { +# item <- private$.get_status(file) +# item$Status <- "finished" +# item$Details <- "" +# item$TaskEnded <- as.POSIXlt(Sys.time()) +# private$.set_status(item) +# # remove from queue +# private$.queue <- private$.queue[!private$.queue %in% file] +# jobs[["@remove"]](file) +# catgl("File [{file}] finished. Removed from queue", level = "INFO") +# return() +# } +# if(code < 0) { +# item <- private$.get_status(file) +# item$Status <- "errored" +# item$Details <- paste(attr(code, "rs_exec_error"), collapse = "") +# item$TaskEnded <- as.POSIXlt(Sys.time()) +# private$.set_status(item) +# # remove from queue +# private$.queue <- private$.queue[!private$.queue %in% file] +# jobs[["@remove"]](file) +# catgl("File [{file}] errored (reason: {item$Details}). Removed from queue", level = "ERROR") +# return() +# } +# }) +# length(jobs) +# }, +# +# get_pipeline_default_settings = function() { +# re <- lapply(private$pipeline_names, function(pname) { +# pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines")) +# settings <- dipsaus::list_to_fastmap2(pipeline$get_settings()) +# settings[["@remove"]](c( +# "import_setup__project_name", +# "import_setup__subject_code", +# "force_import", +# "skip_validation", +# "import_channels__sample_rate", +# "import_channels__electrodes", +# "import_channels__electrode_file", +# "import_blocks__format", +# "import_blocks__session_block", +# "project_name", +# "subject_code", +# "electrode_group", +# "changes" +# )) +# if(pname == "notch_filter") { +# settings$diagnostic_plot_params$path <- NULL +# } +# as.list(settings, sorted = TRUE) +# }) +# names(re) <- private$pipeline_names +# re +# }, +# +# create_settings_file = function(overwrite = FALSE) { +# settings_path <- file.path(self$log_path, "settings.yaml") +# if(!overwrite && file.exists(settings_path)) { +# stop("Existing settings file already created. If you want to overwrite that file, use `overwrite=TRUE`") +# } +# backup_file(settings_path, remove = FALSE) +# save_yaml(self$get_pipeline_default_settings(), file = settings_path) +# catgl("A settings file has been created at [{settings_path}]", level = "INFO") +# settings_path +# }, +# +# get_pipeline_settings = function(pname, file, brfile) { +# item <- private$.get_status(file) +# +# # load blackrock file +# electrode_table <- brfile$electrode_table +# electrodes <- electrode_table$Electrode[electrode_table$NSType == self$NSType_LFP] +# +# # load pipeline +# pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines")) +# settings <- dipsaus::list_to_fastmap2(pipeline$get_settings()) +# +# # override user-defined settings +# settings_path <- file.path(self$log_path, "settings.yaml") +# if(file.exists(settings_path)) { +# tmp <- load_yaml(settings_path) +# dipsaus::list_to_fastmap2(as.list(tmp[[pname]]), settings) +# } +# +# # pipeline-specific settings +# subject_code <- sprintf("%s__%s", item$Subject, item$Block) +# +# switch ( +# pname, +# "import_lfp_native" = { +# settings$import_setup__project_name <- self$project_name +# settings$import_setup__subject_code <- subject_code +# settings$force_import <- TRUE +# settings$skip_validation <- FALSE +# srate <- brfile$sample_rates[[self$NSType_LFP]] +# settings$import_channels__sample_rate <- srate +# settings$import_channels__electrodes <- dipsaus::deparse_svec(electrodes) +# settings$import_channels__electrode_file <- "auto" +# settings$import_blocks__format <- names(IMPORT_FORMATS)[unlist(IMPORT_FORMATS) == 'native_blackrock'] +# settings$import_blocks__session_block <- item$Block +# }, +# "notch_filter" = { +# graph_path <- file.path(private$.raw_path, item$Subject, item$Block, "notch-diagnostic-plots") +# settings$project_name <- self$project_name +# settings$subject_code <- subject_code +# +# graph_path <- dir_create2(graph_path) +# settings$diagnostic_plot_params$path <- file.path(graph_path, "notch-diagnostic-plots.pdf") +# }, +# "wavelet_module" = { +# settings$project_name <- self$project_name +# settings$subject_code <- subject_code +# }, +# "reference_module" = { +# settings$project_name <- self$project_name +# settings$subject_code <- subject_code +# settings$reference_name <- "[new reference]" +# }, +# {} +# ) +# +# return(settings) +# +# }, +# +# process_file = function(file) { +# +# item <- private$.get_status(file) +# brfile <- BlackrockFile$new(path = file.path(private$.watch_path, file), block = item$Block) +# +# # prepare working directory +# workdir <- file.path(self$cache_path, paste0(file, ".workdir")) +# if(dir.exists(workdir)) { +# unlink(workdir, force = TRUE, recursive = TRUE) +# } +# workdir <- dir_create2(workdir) +# +# +# # copy pipelines +# for(pname in private$pipeline_names) { +# pipeline <- pipeline(pname, paths = file.path(R_user_dir('raveio', "data"), "pipelines")) +# dest <- file.path(workdir, "pipelines", pipeline$pipeline_name) +# pipeline_fork( +# src = pipeline$pipeline_path, +# dest = dest, +# activate = FALSE +# ) +# pipeline <- pipeline(pname, paths = file.path(workdir, "pipelines")) +# settings <- self$get_pipeline_settings(pname = pname, file = file, brfile = brfile) +# pipeline$set_settings(.list = settings) +# catgl("Set pipeline [{pname}]:", level = "DEFAULT") +# save_yaml(settings, file = stdout()) +# } +# +# # copy files +# block_path <- file.path(private$.raw_path, item$Subject, item$Block) +# dir_create2(block_path) +# fs <- paste0(brfile$base_path, c(".nev", ".ccf", paste0(".ns", 1:9))) +# fs <- fs[file.exists(fs)] +# for(f in fs) { +# file.copy(f, file.path(block_path, basename(f)), overwrite = TRUE, recursive = FALSE, copy.date = TRUE) +# } +# +# fake_path <- file.path(private$.raw_path, sprintf("%s__%s", item$Subject, item$Block)) +# fake_path <- dir_create2(fake_path) +# if(!file.exists(file.path(fake_path, item$Block))) { +# if(dipsaus::get_os() == "windows") { +# file.copy(block_path, to = fake_path, recursive = TRUE, copy.date = TRUE) +# } else { +# file.symlink(block_path, to = fake_path) +# } +# } +# +# # Make sure the subject surface files can be loaded properly? +# if(!file.exists(file.path(fake_path, 'rave-imaging'))) { +# # check if original subject has the fs recon +# imaging_path_orig <- file.path(private$.raw_path, item$Subject, 'rave-imaging') +# if(file.exists(imaging_path_orig)) { +# if(dipsaus::get_os() == "windows" || !dir.exists(imaging_path_orig)) { +# # On windows, symlink does not work well so just copy +# # On Unix, if rave-imaging is a symlink, then R (4.0) will treat +# # the path as a file. Just copy over +# file.copy(imaging_path_orig, to = fake_path, +# recursive = TRUE, copy.date = TRUE) +# } else { +# file.symlink(imaging_path_orig, to = fake_path) +# } +# } +# } +# +# # set to running +# catgl("Start processing [{file}]", level = "INFO") +# item$Status <- "running" +# item$Details <- "" +# item$TaskStarted <- as.POSIXlt(Sys.time()) +# item$TaskEnded <- as.POSIXlt(NA) +# private$.set_status(item) +# private$.jobs[[file]] <- dipsaus::rs_exec( +# name = file, +# focus_on_console = TRUE, +# rs = TRUE, +# wait = FALSE, +# quoted = TRUE, +# nested_ok = TRUE, +# expr = bquote({ +# +# workdir <- .(workdir) +# cwd <- getwd() +# +# setwd(workdir) +# on.exit({ setwd(cwd) }, add = TRUE, after = FALSE) +# raveio <- asNamespace('raveio') +# +# if(dipsaus::package_installed('ravedash')){ +# ravedash <- do.call('asNamespace', list('ravedash')) +# ravedash$set_logger_path(root_path = .(self$log_path), max_files = 10L) +# ravedash$logger_threshold("trace", type = 'file') +# ravedash$logger_threshold("trace", type = 'console') +# } else { +# ravedash <- NULL +# } +# blackrock_src <- .(file) +# +# pname <- "import_lfp_native" +# pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines")) +# raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO") +# pipeline$run(async = FALSE, as_promise = FALSE, +# scheduler = "none", type = "smart") +# raveio$catgl("[{blackrock_src}]: [{pname}] finished", level = "INFO") +# pname <- "notch_filter" +# pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines")) +# raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO") +# pipeline$run(names = "apply_notch", async = FALSE, as_promise = FALSE, +# scheduler = "none", type = "smart") +# +# pname <- "wavelet_module" +# pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines")) +# raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] at [{pipeline$pipeline_path}]", level = "INFO") +# pipeline$run(async = FALSE, as_promise = FALSE, +# scheduler = "none", type = "smart") +# +# subject <- pipeline$read("subject") +# +# # generate reference if exists +# pname <- "reference_module" +# pipeline <- raveio$pipeline(pname, paths = file.path(workdir, "pipelines")) +# raveio$catgl("[{blackrock_src}]: Running pipeline: [{pname}] (reference_table_initial) at [{pipeline$pipeline_path}]", level = "INFO") +# # check subject's localization +# elec_path <- .(file.path(private$.raw_path, item$Subject, "rave-imaging", "electrodes.csv")) +# if(!file.exists(elec_path)) { +# elec_path <- .(file.path(raveio_getopt("data_dir"), private$.project_name, item$Subject, "rave", "meta", "electrodes.csv")) +# } +# +# if(!file.exists(elec_path)) { +# # list all projects, try to find +# all_projects <- raveio$get_projects(refresh = TRUE) +# elec_path <- file.path(raveio_getopt("data_dir"), all_projects, +# .(item$Subject), "rave", "meta", "electrodes.csv") +# elec_path <- elec_path[!is.na(elec_path) & file.exists(elec_path)] +# } +# if(length(elec_path)) { +# elec_path <- elec_path[[1]] +# } +# +# if(length(elec_path) == 1 && !is.na(elec_path) && file.exists(elec_path)) { +# tryCatch({ +# elec_path <- elec_path[[1]] +# elec_table <- utils::read.csv(elec_path) +# elec_table$Electrode <- as.integer(elec_table$Electrode) +# if(length(elec_table$Electrode) == length(subject$electrodes)) { +# o <- order(elec_table$Electrode) +# elec_table <- elec_table[o, ] +# elec_table$Electrode <- sort(subject$electrodes) +# } +# +# raveio$safe_write_csv(elec_table, file.path(subject$meta_path, "electrodes.csv"), +# row.names = FALSE) +# +# }, error = function(e) { +# if(is.environment(ravedash)) { +# ravedash$logger_error_condition(e, level = "warning") +# } else { +# warning(e) +# } +# }) +# } +# pipeline$run(names = "reference_table_initial", +# async = FALSE, as_promise = FALSE, +# scheduler = "none", type = "smart") +# unsaved_meta <- file.path(subject$meta_path, "reference__unsaved.csv") +# target_meta <- file.path(subject$meta_path, "reference_auto_generated.csv") +# if(file.exists(unsaved_meta) && !file.exists(target_meta)) { +# file.copy(unsaved_meta, target_meta, overwrite = TRUE) +# } +# +# # make subject backward-compatible +# raveio$catgl("[{blackrock_src}]: Making data format backward-compatible", level = "INFO") +# raveio$rave_subject_format_conversion(subject$subject_id) +# raveio$catgl("[{blackrock_src}]: Done", evel = "INFO") +# +# }) +# ) +# +# }, +# +# scan = function() { +# +# files <- self$check_file_registry() +# self$add_to_queue(files) +# +# # check job status +# njobs <- self$check_job_status() +# inactives <- private$.queue[!private$.queue %in% names(private$.jobs)] +# +# if(length(inactives) && njobs < private$.max_jobs) { +# # schedule job +# navails <- private$.max_jobs - njobs +# inactives <- inactives[seq_len(min(navails, length(inactives)))] +# for(file in inactives) { +# self$process_file(file) +# } +# } +# +# }, +# +# reset_registry = function() { +# if(!interactive()) { +# stop("Cannot reset registry in non-interactive mode") +# } +# ans <- dipsaus::ask_yesno(sprintf("Clearing registry for [%s]?", private$.job_name)) +# if(isTRUE(ans)) { +# unlink(self$registry_path, force = TRUE) +# } +# }, +# +# watch = function(interval = 5) { +# interval <- as.numeric(interval) +# if(!isTRUE(interval >= 1)) { +# stop("Min interval must be >= 1 seconds") +# } +# +# if(dipsaus::package_installed('ravedash')){ +# ravedash <- do.call('asNamespace', list('ravedash')) +# ravedash$set_logger_path(root_path = self$log_path, max_files = 10L) +# ravedash$logger_threshold("trace", type = 'file') +# ravedash$logger_threshold("trace", type = 'console') +# } else { +# ravedash <- NULL +# } +# +# on.exit({ +# if(is.environment(ravedash)) { +# ravedash$set_logger_path(NULL) +# } +# }, add = TRUE, after = TRUE) +# +# # make sure directories are there +# dir_create2(self$log_path) +# dir_create2(self$cache_path) +# +# while(TRUE) { +# tryCatch({ +# self$scan() +# }, error = function(e) { +# catgl("Error raised while the master process rescans/schedules tasks. Reason: {paste(e$message, collapse = '\n')}\nWill try again later", +# level = "ERROR") +# }) +# Sys.sleep(interval) +# } +# } +# +# ), +# active = list( +# +# log_path = function() { +# file.path(private$.raw_path, "_automation", private$.job_name) +# }, +# +# registry_path = function() { +# file.path(self$log_path, "registry.csv") +# }, +# +# time_threshold = function(v) { +# if(!missing(v)) { +# +# if(length(v) != 1 || is.na(v)) { +# stop("Cannot set time threshold with invalid time") +# } +# tm <- v +# if(is.character(tm)) { +# v <- as.POSIXlt(tm) +# if(is.na(v)) { +# stop("`time_threshold` must have format of [year-month-day hour:minute:second]. For example, '2022-08-03 16:38:00'") +# } +# } else if (!inherits(tm, "POSIXlt")) { +# stop("`time_threshold` must be characters or a `POSIXlt` time object") +# } +# +# private$.time_threshold <- v +# +# } +# private$.time_threshold +# }, +# +# project_name = function(v) { +# if(!missing(v)) { +# if(!length(v)) { +# private$.project_name <- character(0) +# } else if(length(v) > 1) { +# stop("Project name must have length of 1") +# } else if(!grepl("^[a-zA-Z0-9_]", v)) { +# stop("Project name can only contain letters, digits, and underscore [_]") +# } else { +# private$.project_name <- as.character(v) +# } +# } +# pn <- private$.project_name +# if(!length(pn)) { +# pn <- "automated" +# } +# pn +# }, +# +# file_pattern = function(v) { +# if(!missing(v)) { +# v <- v[[1]] +# m <- gregexpr("(\\([^\\(\\)]+\\))", v, ignore.case = TRUE) +# m <- unique(m[[1]]) +# if(length(m) < 2) { +# stop("File pattern must be a regular expression containing at least two keyword extractors so I can decide the subject code and session block ID. For example, regular expression ['^([a-zA-Z0-9]+)_datafile_([a-zA-Z0-9]+)\\.nev$'] matches [YAB_datafile_001.nev]. RAVE will set subject code to [YAB], and block ID as [001].") +# } +# private$.file_pattern <- v +# } +# private$.file_pattern +# }, +# +# queued = function() { +# private$.queue +# }, +# +# max_jobs = function(v) { +# if(!missing(v)) { +# errored <- TRUE +# if(length(v) == 1) { +# v <- as.integer(v) +# if(isTRUE(v > 0)) { +# private$.max_jobs <- v +# errored <- FALSE +# } +# } +# +# if(errored) { +# stop("Cannot set `max_jobs`, the value must be a positive integer") +# } +# +# } +# private$.max_jobs +# } +# +# ) +# ) +# #' Monitors 'BlackRock' output folder and automatically import data into 'RAVE' #' @description Automatically import 'BlackRock' files from designated folder @@ -756,81 +756,82 @@ RAVEWatchDog <- R6::R6Class( #' is equal to \code{dry_run} #' @returns When \code{dry_run} is true, then the watcher's instance will be #' returned; otherwise nothing will be returned. -#' @export -auto_process_blackrock <- function( - watch_path, project_name = "automated", task_name = "RAVEWatchDog", - scan_interval = 10, time_threshold = Sys.time(), max_jobs = 1L, - as_job = NA, dry_run = FALSE, config_open = dry_run -) { - - time_threshold <- as.POSIXlt(time_threshold) - time_threshold <- time_threshold[!is.na(time_threshold)] - if(!length(time_threshold)) { - time_threshold <- as.POSIXlt(Sys.time()) - } else { - time_threshold <- time_threshold[[1]] - } - - if(!isFALSE(as_job)) { - as_job <- dipsaus::rs_avail() - } - - fun <- dipsaus::new_function2(body = bquote({ - - raveio <- asNamespace('raveio') - watcher <- raveio$RAVEWatchDog$new( - watch_path = .(watch_path), - job_name = .(task_name) - ) - watcher$time_threshold <- .(time_threshold) - watcher$max_jobs <- .(max_jobs) - watcher$project_name <- .(project_name) - - settings_path <- file.path(watcher$log_path, "settings.yaml") - if(!file.exists(settings_path)) { - watcher$create_settings_file() - } - - return(watcher) - - }), quote_type = "quote") - - watcher <- fun() - if( config_open ) { - - settings_path <- file.path(watcher$log_path, "settings.yaml") - if(!file.exists(settings_path)) { - watcher$create_settings_file() - } - - try({ - dipsaus::rs_edit_file(settings_path) - catgl("Watcher's settings file has been opened. Please check the settings, and edit if necessary. All auto-discovered BlackRock files will be preprocessed using this settings file.", level = "INFO") - }, silent = TRUE) - - } - - if( dry_run ) { - return(watcher) - } - - - - if(as_job) { - dipsaus::rs_exec( - bquote({ - fun <- dipsaus::new_function2(body = .(body(fun))) - # return(fun) - watcher <- fun() - watcher$watch(interval = .(scan_interval)) - }), - quoted = TRUE, - rs = TRUE, - name = task_name, - focus_on_console = TRUE - ) - } else { - watcher$watch(interval = scan_interval) - } - -} +#' @noRd +NULL +# auto_process_blackrock <- function( +# watch_path, project_name = "automated", task_name = "RAVEWatchDog", +# scan_interval = 10, time_threshold = Sys.time(), max_jobs = 1L, +# as_job = NA, dry_run = FALSE, config_open = dry_run +# ) { +# +# time_threshold <- as.POSIXlt(time_threshold) +# time_threshold <- time_threshold[!is.na(time_threshold)] +# if(!length(time_threshold)) { +# time_threshold <- as.POSIXlt(Sys.time()) +# } else { +# time_threshold <- time_threshold[[1]] +# } +# +# if(!isFALSE(as_job)) { +# as_job <- dipsaus::rs_avail() +# } +# +# fun <- dipsaus::new_function2(body = bquote({ +# +# raveio <- asNamespace('raveio') +# watcher <- raveio$RAVEWatchDog$new( +# watch_path = .(watch_path), +# job_name = .(task_name) +# ) +# watcher$time_threshold <- .(time_threshold) +# watcher$max_jobs <- .(max_jobs) +# watcher$project_name <- .(project_name) +# +# settings_path <- file.path(watcher$log_path, "settings.yaml") +# if(!file.exists(settings_path)) { +# watcher$create_settings_file() +# } +# +# return(watcher) +# +# }), quote_type = "quote") +# +# watcher <- fun() +# if( config_open ) { +# +# settings_path <- file.path(watcher$log_path, "settings.yaml") +# if(!file.exists(settings_path)) { +# watcher$create_settings_file() +# } +# +# try({ +# dipsaus::rs_edit_file(settings_path) +# catgl("Watcher's settings file has been opened. Please check the settings, and edit if necessary. All auto-discovered BlackRock files will be preprocessed using this settings file.", level = "INFO") +# }, silent = TRUE) +# +# } +# +# if( dry_run ) { +# return(watcher) +# } +# +# +# +# if(as_job) { +# dipsaus::rs_exec( +# bquote({ +# fun <- dipsaus::new_function2(body = .(body(fun))) +# # return(fun) +# watcher <- fun() +# watcher$watch(interval = .(scan_interval)) +# }), +# quoted = TRUE, +# rs = TRUE, +# name = task_name, +# focus_on_console = TRUE +# ) +# } else { +# watcher$watch(interval = scan_interval) +# } +# +# } diff --git a/R/pipeline-docs.R b/R/pipeline-docs.R index ad44bc44..55fd677e 100644 --- a/R/pipeline-docs.R +++ b/R/pipeline-docs.R @@ -41,8 +41,9 @@ #' @param func function to call when reading customized pipeline progress; #' default is \code{\link[targets]{tar_progress_summary}} #' @param src,dest pipeline folder to copy the pipeline script from and to -#' @param filter_pattern file name patterns used to filter the scripts to -#' avoid copying data files; default is \code{PIPELINE_FORK_PATTERN} +#' @param policy fork policy defined by module author, see text file +#' 'fork-policy' under the pipeline directory; if missing, then default to +#' avoid copying \code{main.html} and \code{shared} folder #' @param activate whether to activate the new pipeline folder from \code{dest}; #' default is false #' @param var_names variable name to fetch or to check diff --git a/R/pipeline-tools.R b/R/pipeline-tools.R index 9a3f80ba..4efeaf44 100644 --- a/R/pipeline-tools.R +++ b/R/pipeline-tools.R @@ -654,9 +654,11 @@ pipeline_progress <- function( pipeline_fork <- function( src = Sys.getenv("RAVE_PIPELINE", "."), dest = tempfile(pattern = "rave_pipeline_"), - filter_pattern = PIPELINE_FORK_PATTERN, - activate = FALSE + policy = "default", + activate = FALSE, + ... ){ + if(!dir.exists(src)){ stop("pipeline_fork: `src` must be a pipeline directory") } @@ -667,13 +669,73 @@ pipeline_fork <- function( stop("pipeline_fork: `src/make-main.R` is missing") } + # search for fork-policy + fork_policy_path <- file.path(src, "fork-policy") + fork_policy_regexp <- c("^shared", "^main\\.html$") + if(file.exists(fork_policy_path)) { + fork_policy <- readLines(fork_policy_path) + idx <- which(startsWith(fork_policy, "[")) + if(length(idx)) { + sel <- which(tolower(trimws(fork_policy[idx])) == sprintf("[%s]", tolower(policy))) + if(length(sel)) { + sel <- sel[[1]] + if(sel == length(idx)) { + start <- idx[[sel]] + 1 + end <- length(fork_policy) + if( start <= end ) { + fork_policy_regexp <- fork_policy[seq(start, end)] + } + } else { + start <- idx[[sel]] + 1 + end <- idx[[sel + 1]] - 1 + if( start <= end ) { + fork_policy_regexp <- fork_policy[seq(start, end)] + } + } + } + } + } + # clean up + fork_policy_regexp <- fork_policy_regexp[nzchar(trimws(fork_policy_regexp))] + + # ignore .files + fork_policy_regexp <- c(fork_policy_regexp, "(^\\.|/\\.)") + + # list all the files + fs <- list.files( + src, + include.dirs = FALSE, + full.names = FALSE, + ignore.case = TRUE, + all.files = TRUE, + recursive = TRUE, + no.. = TRUE + ) + # format backslashes + fs <- gsub("[/|\\\\]+", "/", fs) - fs <- list.files(src, include.dirs = TRUE, full.names = FALSE, pattern = filter_pattern, ignore.case = TRUE) + ignore_files <- lapply(fork_policy_regexp, function(regexp) { + fs[grepl(regexp, x = fs, ignore.case = TRUE)] + }) + ignore_files <- unique(unlist(ignore_files)) + fs <- fs[!fs %in% ignore_files] dir_create2(dest) dest <- normalizePath(dest, mustWork = TRUE) - file.copy(from = file.path(src, fs), to = dest, overwrite = TRUE, - recursive = TRUE, copy.date = TRUE) + + # Copy the files + lapply(fs, function(file) { + src_path <- file.path(src, file) + dst_path <- file.path(dest, file) + dir_create2(dirname(dst_path)) + file.copy( + from = src_path, + to = dst_path, + overwrite = TRUE, + recursive = FALSE, + copy.date = TRUE + ) + }) if( activate ){ pipeline_build(dest) diff --git a/inst/rave-pipelines/template-r/fork-policy b/inst/rave-pipelines/template-r/fork-policy new file mode 100644 index 00000000..fc5c6670 --- /dev/null +++ b/inst/rave-pipelines/template-r/fork-policy @@ -0,0 +1,3 @@ +[default] +^shared +^main\.html$ diff --git a/inst/rave-pipelines/template-r/migrate.R b/inst/rave-pipelines/template-r/migrate.R index 1b9d6116..9ce12837 100644 --- a/inst/rave-pipelines/template-r/migrate.R +++ b/inst/rave-pipelines/template-r/migrate.R @@ -60,8 +60,7 @@ migrate <- function(project_name, subject_code, ..., overwrite = FALSE, backup = raveio::pipeline_fork( src = ".", dest = file.path(subject$pipeline_path, target_name), - activate = FALSE, - filter_pattern = "\\.(R|yaml|txt|csv|fst|conf)$" + activate = FALSE ) # Modify the settings file of target pipeline diff --git a/inst/rave-pipelines/template-rmd-bare/fork-policy b/inst/rave-pipelines/template-rmd-bare/fork-policy new file mode 100644 index 00000000..fc5c6670 --- /dev/null +++ b/inst/rave-pipelines/template-rmd-bare/fork-policy @@ -0,0 +1,3 @@ +[default] +^shared +^main\.html$ diff --git a/inst/rave-pipelines/template-rmd-bare/migrate.R b/inst/rave-pipelines/template-rmd-bare/migrate.R index 1b9d6116..9ce12837 100644 --- a/inst/rave-pipelines/template-rmd-bare/migrate.R +++ b/inst/rave-pipelines/template-rmd-bare/migrate.R @@ -60,8 +60,7 @@ migrate <- function(project_name, subject_code, ..., overwrite = FALSE, backup = raveio::pipeline_fork( src = ".", dest = file.path(subject$pipeline_path, target_name), - activate = FALSE, - filter_pattern = "\\.(R|yaml|txt|csv|fst|conf)$" + activate = FALSE ) # Modify the settings file of target pipeline diff --git a/inst/rave-pipelines/template-rmd-scheduler/fork-policy b/inst/rave-pipelines/template-rmd-scheduler/fork-policy new file mode 100644 index 00000000..fc5c6670 --- /dev/null +++ b/inst/rave-pipelines/template-rmd-scheduler/fork-policy @@ -0,0 +1,3 @@ +[default] +^shared +^main\.html$ diff --git a/inst/rave-pipelines/template-rmd-scheduler/migrate.R b/inst/rave-pipelines/template-rmd-scheduler/migrate.R index 1b9d6116..9ce12837 100644 --- a/inst/rave-pipelines/template-rmd-scheduler/migrate.R +++ b/inst/rave-pipelines/template-rmd-scheduler/migrate.R @@ -60,8 +60,7 @@ migrate <- function(project_name, subject_code, ..., overwrite = FALSE, backup = raveio::pipeline_fork( src = ".", dest = file.path(subject$pipeline_path, target_name), - activate = FALSE, - filter_pattern = "\\.(R|yaml|txt|csv|fst|conf)$" + activate = FALSE ) # Modify the settings file of target pipeline diff --git a/inst/rave-pipelines/template-rmd/fork-policy b/inst/rave-pipelines/template-rmd/fork-policy new file mode 100644 index 00000000..fc5c6670 --- /dev/null +++ b/inst/rave-pipelines/template-rmd/fork-policy @@ -0,0 +1,3 @@ +[default] +^shared +^main\.html$ diff --git a/inst/rave-pipelines/template-rmd/migrate.R b/inst/rave-pipelines/template-rmd/migrate.R index 1b9d6116..9ce12837 100644 --- a/inst/rave-pipelines/template-rmd/migrate.R +++ b/inst/rave-pipelines/template-rmd/migrate.R @@ -60,8 +60,7 @@ migrate <- function(project_name, subject_code, ..., overwrite = FALSE, backup = raveio::pipeline_fork( src = ".", dest = file.path(subject$pipeline_path, target_name), - activate = FALSE, - filter_pattern = "\\.(R|yaml|txt|csv|fst|conf)$" + activate = FALSE ) # Modify the settings file of target pipeline diff --git a/man/PipelineTools.Rd b/man/PipelineTools.Rd index 0bc27643..6bcbe873 100644 --- a/man/PipelineTools.Rd +++ b/man/PipelineTools.Rd @@ -396,7 +396,7 @@ but also their ancestors will be excluded from the result.} \subsection{Method \code{fork()}}{ fork (copy) the current pipeline to a new directory \subsection{Usage}{ -\if{html}{\out{