diff --git a/.gitignore b/.gitignore index d11d598..3c02128 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,6 @@ metastore_db/ .DS_Store vignettes/metastore_db/ vignettes/pipeline.RDS +vignettes/*.out +vignettes/*.R +vignettes/*.html diff --git a/DESCRIPTION b/DESCRIPTION index ac81371..5a8cdcc 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: analysisPipelines Type: Package Title: Compose interoperable analysis pipelines, and put them into production -Version: 0.1.1 +Version: 0.0.0.9002 Authors@R: c( person("Naren","Srinivasan", email = "Naren.Srinivasan@mu-sigma.com", role = c("cre","aut")), person("Sanjay","", email = "Sanjay@mu-sigma.com", role = c("ctb")), diff --git a/R/analysisPipelines_package.R b/R/analysisPipelines_package.R index d60fc47..49eb1d6 100644 --- a/R/analysisPipelines_package.R +++ b/R/analysisPipelines_package.R @@ -1,9 +1,8 @@ #' analysisPipelines #' -#' This package allows data scientists to compose and generate reports as a set of analytical operations. -#' The sequence of generation can be stored as a pipeline and reused, specifically for production systems where these -#' tasks are run repetitively. Additionally, the package implements a form of lazy evaluation where the pipelines are -#' run on datasets only when outputs/ reports need to be generated +#' The package aims at enabling data scientists to compose pipelines of analysis which consist of data manipulation, +#' exploratory analysis & reporting, as well as modeling steps. It also aims to enable data scientists to use tools +#' of their choice through an R interface, and compose interoperable pipelines between R, Spark, and Python. #' @docType package #' @name analysisPipelines NULL diff --git a/R/core-functions-batch.R b/R/core-functions-batch.R index 086b414..6719c87 100644 --- a/R/core-functions-batch.R +++ b/R/core-functions-batch.R @@ -436,11 +436,6 @@ checkSchema <- function(dfOld, dfNew){ }) - ##outAsIn - # if(funcDetails$outAsIn){ - # outputCache$workingInput <- output - # } - opName <- paste0("f", funcDetails$id, ".out") #eg: f1.out if(funcDetails$storeOutput){ assign(opName, value = output, envir = outputCache) diff --git a/R/core-functions.R b/R/core-functions.R index 2b63f30..2378433 100644 --- a/R/core-functions.R +++ b/R/core-functions.R @@ -77,17 +77,6 @@ setMethod( to = character()), loggerDetails <- list() ) - - # .Object@registry <- tibble( - # functionName = character(), - # heading = character(), - # engine = character(), - # exceptionHandlingFunction = character(), - # userDefined = logical(), - # isDataFunction = #logical(), - # firstArgClass = character() - # ) - .Object@output <- list() .Object %>>% setLoggerDetails -> .Object @@ -125,7 +114,7 @@ registerFunction <- function( functionName, heading = "", isDataFunction = T, firstArgClass = "", loadPipeline = F, userDefined = T ){ - tryCatch({ + # tryCatch({ #Define data frame class according to engine type @@ -142,11 +131,11 @@ registerFunction <- function( functionName, heading = "", ## Checking if already registered - existingM <- c() + doesMethodExist <- c() tryCatch({ - existingM <- methods::findMethod(functionName, signature = childClass, where = .GlobalEnv) + doesMethodExist <- methods::findMethod(functionName, signature = childClass, where = .GlobalEnv) - if(length(existingM) > 0){ + if(length(doesMethodExist) > 0){ futile.logger::flog.error(paste("|| A function of name '%s' has already been registered.", "If you'd like to change the definition, please re-assign the function definition", @@ -162,6 +151,7 @@ registerFunction <- function( functionName, heading = "", }) }) + ## Checking if class of first argument provided is valid if(!isDataFunction){ tryCatch({ getClass(firstArgClass) @@ -172,73 +162,79 @@ registerFunction <- function( functionName, heading = "", } - futile.logger::flog.info(existingM) - #Skip registration if already exists - if(length(existingM) == 0){ - parametersName <- "" - - - parametersName <- names(as.list(args(eval(parse(text=functionName))))) - parametersName <- paste0(parametersName[c(-1,-length(parametersName))],collapse=",") - if(parametersName != ""){ - parametersName <- paste0(", ", parametersName) - } - + if(length(doesMethodExist) == 0){ - methodBody <- paste0(utils::capture.output(body(eval(parse(text=functionName)))),collapse="\n") + parametersName <- "" + objectArg <- alist(object = ) + commonArgs <- alist(outAsIn = F, storeOutput = F) - originalArgs <- names(as.list(args(eval(parse(text=functionName))))) - firstArg <- originalArgs[1] + f <- get(functionName, .GlobalEnv) + originalArgs <- formals(f) %>>% as.list + firstArg <- names(originalArgs)[1] if(isDataFunction){ + originalArgs <- originalArgs[-1] firstArgClass <- dataFrameClass - }else{ - parametersName <- paste(", ", firstArg, parametersName) } - methodBody <- gsub(pattern = "\\{", replacement = paste0("{", firstArg , " = object"), x = methodBody) + parametersList <- originalArgs %>>% names + parametersName <- paste0(parametersList,collapse=", ") - methodArg <- paste0(utils::capture.output(args(eval(parse(text=functionName)))),collapse="") - methodArg <- strsplit(strsplit(methodArg,firstArg)[[1]][2],"NULL")[[1]][1] + if(parametersName != ""){ + parametersName <- paste0(", ", parametersName) + } + methodBody <- paste0(utils::capture.output(body(eval(parse(text=functionName)))),collapse="\n") + methodBody <- gsub(pattern = "\\{", replacement = paste0("{", firstArg , " = object;"), x = methodBody) ##Assigning the exception function to the global Environment assign(exceptionFunction, get(x = exceptionFunction, envir = environment()), - envir = globalenv()) + envir = .GlobalEnv) - #Register function - registerFunText <- # Generic - paste0('setGeneric(name = "', functionName,'",', - 'signature = "object",', - 'def = function(object ', ', ... ', ', outAsIn = F, storeOutput = F)', - # 'def = function(object, ',firstArg, ', ...)', - 'standardGeneric("', functionName,'"));', + newArgs <- append(objectArg, originalArgs) + newArgs <- append(newArgs, commonArgs) + formals(f) <- newArgs + body(f) <- paste('standardGeneric("', functionName,'")') + + registerFunText <- + paste0( # Adding to pipeline when run on a Analysis Pipeline object 'setMethod(f = "', functionName,'",', - 'signature = "', childClass, '",', - 'definition = function(object', - parametersName, ',', - 'outAsIn, storeOutput){', - 'parametersList <- unlist(strsplit(x = "', sub(", ", "", parametersName), '", split = ","' ,'));', - 'parametersPassed <- lapply(parametersList, function(x){eval(parse(text = x))});', - 'return(updateObject(object, - operation = "', functionName, '",', - 'heading = "', heading, '",', - 'parameters = parametersPassed, outAsIn = outAsIn, storeOutput = storeOutput));});', + 'signature = "', childClass, '",', + 'definition = function(object', + parametersName, ',', + 'outAsIn, storeOutput){', + 'parametersList <- unlist(strsplit(x = "', sub(", ", "", parametersName), '", split = ","' ,'));', + 'parametersPassed <- lapply(parametersList, function(x){eval(parse(text = x))});', + 'return(updateObject(object, + operation = "', functionName, '",', + 'heading = "', heading, '",', + 'parameters = parametersPassed, outAsIn = outAsIn, storeOutput = storeOutput));});', #Executing the actual function when pipeline is executed 'setMethod(f = "',functionName,'",', - 'signature = "', firstArgClass, '",', - 'definition = function(object ', parametersName,')', - methodBody, ')' + 'signature = "', firstArgClass, '",', + 'definition = function(object ', parametersName,')', + methodBody, ')' ) + #Register function + # Generic + setGeneric(name = functionName, + signature = "object", + def = f, + where = .GlobalEnv) + try({ + removeMethod(f = get(functionName, .GlobalEnv), signature = "ANY", where = .GlobalEnv) + }, silent = T) + + # Methods for pipeline object, and first argument eval(parse(text = registerFunText), envir=.GlobalEnv) - + # Updating registry if(loadPipeline==F){ fn <- paste0(functionName) if(nrow(getRegistry() %>>% dplyr::filter(functionName == fn)) == 0){ @@ -254,12 +250,11 @@ registerFunction <- function( functionName, heading = "", } } } - }, error = function(e){ - futile.logger::flog.error(e, name = "logger.base") - stop() - }) + # }, error = function(e){ + # futile.logger::flog.error(e, name = "logger.base") + # stop() + # }) } -# ) .updateRegistry <- function(functionName, heading = "", @@ -295,6 +290,8 @@ loadPredefinedFunctionRegistry <- function(){ heading = .batchPredefFunctions[['heading']][[rowNo]], engine = .batchPredefFunctions[['engine']][[rowNo]], exceptionFunction = .batchPredefFunctions[['exceptionHandlingFunction']][[rowNo]], + isDataFunction = .batchPredefFunctions[['isDataFunction']][[rowNo]], + firstArgClass = .batchPredefFunctions[['firstArgClass']][[rowNo]], userDefined = F, loadPipeline = F ) } @@ -305,12 +302,14 @@ loadPredefinedFunctionRegistry <- function(){ heading = .streamingPredefFunctions[['heading']][[rowNo]], engine = .streamingPredefFunctions[['engine']][[rowNo]], exceptionFunction = .streamingPredefFunctions[['exceptionHandlingFunction']][[rowNo]], + isDataFunction = .streamingPredefFunctions[['isDataFunction']][[rowNo]], + firstArgClass = .streamingPredefFunctions[['firstArgClass']][[rowNo]], userDefined = F, loadPipeline = F) } + futile.logger::flog.info('|| Predefined utility functions registered ||', name = "logger.base") }, error = function(e){ futile.logger::flog.error(e, name = "logger.base") }) - futile.logger::flog.info('|| Predefined utility functions registered ||', name = "logger.base") } #' @name setInput @@ -596,24 +595,6 @@ getRegistry <- function(){ return(registry) } -# setGeneric( -# name = "getRegistry", -# def = function(object) -# { -# standardGeneric("getRegistry") -# } -# ) -# -# .getRegistry = function(object){ -# return(object@registry) -# } -# -# setMethod( -# f = "getRegistry", -# signature = "BaseAnalysisPipeline", -# definition = .getRegistry -# ) - #' @name getInput #' @title Obtains the initializedInput #' @param object The \code{AnalysisPipeline} or \code{StreamingAnalysisPipeline} object diff --git a/R/spark-structured-streaming-utilities.R b/R/spark-structured-streaming-utilities.R index 0a2826e..4da3719 100644 --- a/R/spark-structured-streaming-utilities.R +++ b/R/spark-structured-streaming-utilities.R @@ -45,7 +45,7 @@ sparkRSessionCreateIfNotPresent <- function(...){ #' @export castKafkaStreamAsString <- function(streamObj){ - streamObj <- selectExpr(streamObj, "CAST(key AS STRING)", "CAST(value AS STRING)","topic","timestamp") + streamObj <- SparkR::selectExpr(streamObj, "CAST(key AS STRING)", "CAST(value AS STRING)","topic","timestamp") return(streamObj) } diff --git a/R/sysdata.rda b/R/sysdata.rda index 754dee4..12b6e1c 100644 Binary files a/R/sysdata.rda and b/R/sysdata.rda differ diff --git a/data-raw/predefFunctions.R b/data-raw/predefFunctions.R index 4f99fa8..107b085 100644 --- a/data-raw/predefFunctions.R +++ b/data-raw/predefFunctions.R @@ -14,26 +14,31 @@ ################################################################################################## .batchPredefFunctions <- data.frame(functionName = c("univarCatDistPlots"), heading = c("Univariate Distribution Categorical"), - # outAsIn = c(FALSE), engine = c("r"), exceptionHandlingFunction = c(as.character(substitute(genericPipelineException))), + isDataFunction = T, + firstArgClass = "", stringsAsFactors = F) .batchPredefFunctions %>>% dplyr::add_row(functionName = "outlierPlot", heading = "Univariate Outlier", # outAsIn = FALSE, engine = "r", - exceptionHandlingFunction = c(as.character(substitute(genericPipelineException)))) -> .batchPredefFunctions + exceptionHandlingFunction = c(as.character(substitute(genericPipelineException))), + isDataFunction = T, + firstArgClass = "") -> .batchPredefFunctions .batchPredefFunctions %>>% dplyr::add_row(functionName = "multiVarOutlierPlot", heading = "Multivariate Outlier", - # outAsIn = FALSE, engine = "r", - exceptionHandlingFunction = c(as.character(substitute(genericPipelineException)))) -> .batchPredefFunctions + exceptionHandlingFunction = c(as.character(substitute(genericPipelineException))), + isDataFunction = T, + firstArgClass = "") -> .batchPredefFunctions .batchPredefFunctions %>>% dplyr::add_row(functionName = "ignoreCols", - heading = "", - # outAsIn = TRUE, + heading = "Ignore Columns", engine = "r", - exceptionHandlingFunction = c(as.character(substitute(genericPipelineException)))) -> .batchPredefFunctions + exceptionHandlingFunction = c(as.character(substitute(genericPipelineException))), + isDataFunction = T, + firstArgClass = "") -> .batchPredefFunctions ################################################################################################## @@ -46,17 +51,19 @@ ################################################################################################## .streamingPredefFunctions <- data.frame(functionName = c("castKafkaStreamAsString"), - heading = c(""), + heading = c("Cast Kafka stream to a string"), engine = c("spark-structured-streaming"), exceptionHandlingFunction = c(as.character(substitute(genericPipelineException))), - # outAsIn = c(TRUE), + isDataFunction = T, + firstArgClass = "", stringsAsFactors = F) .streamingPredefFunctions %>>% dplyr::add_row(functionName = "convertKafkaValueFromJson", - heading = "", + heading = "Convert Kafka Value from JSON", engine = c("spark-structured-streaming"), - exceptionHandlingFunction = c(as.character(substitute(genericPipelineException))) - # outAsIn = TRUE + exceptionHandlingFunction = c(as.character(substitute(genericPipelineException))), + isDataFunction = T, + firstArgClass = "" ) -> .streamingPredefFunctions diff --git a/man/analysisPipelines.Rd b/man/analysisPipelines.Rd index 8c83a3f..cfd2b57 100644 --- a/man/analysisPipelines.Rd +++ b/man/analysisPipelines.Rd @@ -6,8 +6,7 @@ \alias{analysisPipelines-package} \title{analysisPipelines} \description{ -This package allows data scientists to compose and generate reports as a set of analytical operations. -The sequence of generation can be stored as a pipeline and reused, specifically for production systems where these -tasks are run repetitively. Additionally, the package implements a form of lazy evaluation where the pipelines are - run on datasets only when outputs/ reports need to be generated +The package aims at enabling data scientists to compose pipelines of analysis which consist of data manipulation, +exploratory analysis & reporting, as well as modeling steps. It also aims to enable data scientists to use tools +of their choice through an R interface, and compose interoperable pipelines between R, Spark, and Python. } diff --git a/vignettes/Analysis_pipelines_for_working_with_R_dataframes.Rmd b/vignettes/Analysis_pipelines_for_working_with_R_dataframes.Rmd index 0a163a7..c6c1d47 100644 --- a/vignettes/Analysis_pipelines_for_working_with_R_dataframes.Rmd +++ b/vignettes/Analysis_pipelines_for_working_with_R_dataframes.Rmd @@ -208,7 +208,7 @@ obj2 <- obj2 %>>% priColor = "blue", secColor = "black") # Printing the updated pipeline -obj2 %>>% getPipeline +obj2 %>>% getPipeline ``` The newly added function will also lazy evaluate depending on the trigger. @@ -250,7 +250,7 @@ obj %>>% getColor(color = "blue") %>>% getColumnName(columnName = "Occupancy") % complexPipeline %>>% getPipeline complexPipeline %>>% generateOutput -> op -op %>>% getOutputById("4") +op %>>% getOutputById("3") ``` # Visualizing pipelines diff --git a/vignettes/Streaming_pipelines_for_working_Apache_Spark_Structured_Streaming.Rmd b/vignettes/Streaming_pipelines_for_working_Apache_Spark_Structured_Streaming.Rmd index e49b7da..51833d9 100644 --- a/vignettes/Streaming_pipelines_for_working_Apache_Spark_Structured_Streaming.Rmd +++ b/vignettes/Streaming_pipelines_for_working_Apache_Spark_Structured_Streaming.Rmd @@ -73,7 +73,7 @@ Users can define their own functions and use it as a part of the pipeline. These # Function to convert datatype json struct to colums convertStructToDf <- function(streamObj) { - streamObj <- select(streamObj,list(getField(streamObj$`jsontostructs(value)`,"bannerId"), + streamObj <- SparkR::select(streamObj,list(getField(streamObj$`jsontostructs(value)`,"bannerId"), getField(streamObj$`jsontostructs(value)`,"mobile"), getField(streamObj$`jsontostructs(value)`,"homeAppliance"), getField(streamObj$`jsontostructs(value)`,"gamingConsole"), @@ -99,17 +99,17 @@ convertStructToDf <- function(streamObj) { # Function to cast columns as string, integer, etc castDfColumns <- function(streamObj) { - streamObj <- selectExpr(streamObj, "bannerId","mobile","homeAppliance","gamingConsole","accessories","brand", + streamObj <- SparkR::selectExpr(streamObj, "bannerId","mobile","homeAppliance","gamingConsole","accessories","brand", "CAST(previousPrice as INTEGER)","CAST(currentPrice as INTEGER)","CAST(discount as INTEGER)","emi", "crossSale","customerId","ts","CAST(click as INTEGER)","CAST(conversion as INTEGER)", "CAST(age as INTEGER)","CAST(income as INTEGER)","maritalStatus","segment") - streamObj$ts <- to_timestamp(streamObj$ts,"yyyy-MM-dd HH:mm:ss") + streamObj$ts <- SparkR::to_timestamp(streamObj$ts,"yyyy-MM-dd HH:mm:ss") return (streamObj) } # Function to convert datatype json struct to colums convertDfToKafkaKeyValuePairs <- function (streamObj, kafkaKey) { - streamObj <- toJSON(streamObj) + streamObj <- SparkR::toJSON(streamObj) streamObj$key <- kafkaKey return(streamObj) } @@ -117,11 +117,11 @@ convertDfToKafkaKeyValuePairs <- function (streamObj, kafkaKey) { # Function to summarize click stream data globalUiMetrics <- function (streamObj) { ## Aggregation query - streamObj <- summarize(groupBy(streamObj,streamObj$bannerId), + streamObj <- SparkR::summarize(SparkR::groupBy(streamObj,streamObj$bannerId), impressions=count(streamObj$customerId), clicks=sum(streamObj$click), conversions=sum(streamObj$conversion)) - colnames(streamObj) <- c("banner_id","impressions","clicks","conversions") + SparkR::colnames(streamObj) <- c("banner_id","impressions","clicks","conversions") return (streamObj) } @@ -166,7 +166,8 @@ getRegistry() # Define pipeline # Do data prep -pipelineObj %>% castKafkaStreamAsString() %>% convertKafkaValueFromJson(schema = consumerDataSchema, outAsIn = T) %>% convertStructToDf(outAsIn = T) %>% castDfColumns(outAsIn = T, storeOutput = T) -> pipelineObj +pipelineObj %>% castKafkaStreamAsString() %>% + convertKafkaValueFromJson(schema = consumerDataSchema, outAsIn = T) %>% convertStructToDf(outAsIn = T) %>% castDfColumns(outAsIn = T, storeOutput = T) -> pipelineObj pipelineObj %>>% getPipeline pipelineObj %>>% visualizePipeline