Skip to content

Commit

Permalink
Merge branch 'dev-rpackage' into 'master'
Browse files Browse the repository at this point in the history
R package version - 0.0.0.9002

See merge request eoc_foundation_wip/analysis-pipelines!8
  • Loading branch information
naren1991 committed Nov 16, 2018
2 parents f27a4a8 + aadfc35 commit c86d3b6
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 117 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ metastore_db/
.DS_Store
vignettes/metastore_db/
vignettes/pipeline.RDS
vignettes/*.out
vignettes/*.R
vignettes/*.html
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: analysisPipelines
Type: Package
Title: Compose interoperable analysis pipelines, and put them into production
Version: 0.1.1
Version: 0.0.0.9002
Authors@R: c(
person("Naren","Srinivasan", email = "Naren.Srinivasan@mu-sigma.com", role = c("cre","aut")),
person("Sanjay","", email = "Sanjay@mu-sigma.com", role = c("ctb")),
Expand Down
7 changes: 3 additions & 4 deletions R/analysisPipelines_package.R
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#' analysisPipelines
#'
#' This package allows data scientists to compose and generate reports as a set of analytical operations.
#' The sequence of generation can be stored as a pipeline and reused, specifically for production systems where these
#' tasks are run repetitively. Additionally, the package implements a form of lazy evaluation where the pipelines are
#' run on datasets only when outputs/ reports need to be generated
#' The package aims at enabling data scientists to compose pipelines of analysis which consist of data manipulation,
#' exploratory analysis & reporting, as well as modeling steps. It also aims to enable data scientists to use tools
#' of their choice through an R interface, and compose interoperable pipelines between R, Spark, and Python.
#' @docType package
#' @name analysisPipelines
NULL
5 changes: 0 additions & 5 deletions R/core-functions-batch.R
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,6 @@ checkSchema <- function(dfOld, dfNew){

})

##outAsIn
# if(funcDetails$outAsIn){
# outputCache$workingInput <- output
# }

opName <- paste0("f", funcDetails$id, ".out") #eg: f1.out
if(funcDetails$storeOutput){
assign(opName, value = output, envir = outputCache)
Expand Down
143 changes: 62 additions & 81 deletions R/core-functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,6 @@ setMethod(
to = character()),
loggerDetails <- list()
)

# .Object@registry <- tibble(
# functionName = character(),
# heading = character(),
# engine = character(),
# exceptionHandlingFunction = character(),
# userDefined = logical(),
# isDataFunction = #logical(),
# firstArgClass = character()
# )

.Object@output <- list()

.Object %>>% setLoggerDetails -> .Object
Expand Down Expand Up @@ -125,7 +114,7 @@ registerFunction <- function( functionName, heading = "",
isDataFunction = T, firstArgClass = "",
loadPipeline = F, userDefined = T
){
tryCatch({
# tryCatch({


#Define data frame class according to engine type
Expand All @@ -142,11 +131,11 @@ registerFunction <- function( functionName, heading = "",


## Checking if already registered
existingM <- c()
doesMethodExist <- c()
tryCatch({
existingM <- methods::findMethod(functionName, signature = childClass, where = .GlobalEnv)
doesMethodExist <- methods::findMethod(functionName, signature = childClass, where = .GlobalEnv)

if(length(existingM) > 0){
if(length(doesMethodExist) > 0){

futile.logger::flog.error(paste("|| A function of name '%s' has already been registered.",
"If you'd like to change the definition, please re-assign the function definition",
Expand All @@ -162,6 +151,7 @@ registerFunction <- function( functionName, heading = "",
})
})

## Checking if class of first argument provided is valid
if(!isDataFunction){
tryCatch({
getClass(firstArgClass)
Expand All @@ -172,73 +162,79 @@ registerFunction <- function( functionName, heading = "",

}

futile.logger::flog.info(existingM)

#Skip registration if already exists
if(length(existingM) == 0){
parametersName <- ""


parametersName <- names(as.list(args(eval(parse(text=functionName)))))
parametersName <- paste0(parametersName[c(-1,-length(parametersName))],collapse=",")
if(parametersName != ""){
parametersName <- paste0(", ", parametersName)
}

if(length(doesMethodExist) == 0){

methodBody <- paste0(utils::capture.output(body(eval(parse(text=functionName)))),collapse="\n")
parametersName <- ""
objectArg <- alist(object = )
commonArgs <- alist(outAsIn = F, storeOutput = F)

originalArgs <- names(as.list(args(eval(parse(text=functionName)))))
firstArg <- originalArgs[1]
f <- get(functionName, .GlobalEnv)
originalArgs <- formals(f) %>>% as.list
firstArg <- names(originalArgs)[1]

if(isDataFunction){
originalArgs <- originalArgs[-1]
firstArgClass <- dataFrameClass
}else{
parametersName <- paste(", ", firstArg, parametersName)
}

methodBody <- gsub(pattern = "\\{", replacement = paste0("{", firstArg , " = object"), x = methodBody)
parametersList <- originalArgs %>>% names
parametersName <- paste0(parametersList,collapse=", ")

methodArg <- paste0(utils::capture.output(args(eval(parse(text=functionName)))),collapse="")
methodArg <- strsplit(strsplit(methodArg,firstArg)[[1]][2],"NULL")[[1]][1]
if(parametersName != ""){
parametersName <- paste0(", ", parametersName)
}

methodBody <- paste0(utils::capture.output(body(eval(parse(text=functionName)))),collapse="\n")
methodBody <- gsub(pattern = "\\{", replacement = paste0("{", firstArg , " = object;"), x = methodBody)

##Assigning the exception function to the global Environment
assign(exceptionFunction, get(x = exceptionFunction,
envir = environment()),
envir = globalenv())
envir = .GlobalEnv)

#Register function
registerFunText <- # Generic
paste0('setGeneric(name = "', functionName,'",',
'signature = "object",',
'def = function(object ', ', ... ', ', outAsIn = F, storeOutput = F)',
# 'def = function(object, ',firstArg, ', ...)',
'standardGeneric("', functionName,'"));',

newArgs <- append(objectArg, originalArgs)
newArgs <- append(newArgs, commonArgs)
formals(f) <- newArgs
body(f) <- paste('standardGeneric("', functionName,'")')

registerFunText <-
paste0(
# Adding to pipeline when run on a Analysis Pipeline object
'setMethod(f = "', functionName,'",',
'signature = "', childClass, '",',
'definition = function(object',
parametersName, ',',
'outAsIn, storeOutput){',
'parametersList <- unlist(strsplit(x = "', sub(", ", "", parametersName), '", split = ","' ,'));',
'parametersPassed <- lapply(parametersList, function(x){eval(parse(text = x))});',
'return(updateObject(object,
operation = "', functionName, '",',
'heading = "', heading, '",',
'parameters = parametersPassed, outAsIn = outAsIn, storeOutput = storeOutput));});',
'signature = "', childClass, '",',
'definition = function(object',
parametersName, ',',
'outAsIn, storeOutput){',
'parametersList <- unlist(strsplit(x = "', sub(", ", "", parametersName), '", split = ","' ,'));',
'parametersPassed <- lapply(parametersList, function(x){eval(parse(text = x))});',
'return(updateObject(object,
operation = "', functionName, '",',
'heading = "', heading, '",',
'parameters = parametersPassed, outAsIn = outAsIn, storeOutput = storeOutput));});',

#Executing the actual function when pipeline is executed
'setMethod(f = "',functionName,'",',
'signature = "', firstArgClass, '",',
'definition = function(object ', parametersName,')',
methodBody, ')'
'signature = "', firstArgClass, '",',
'definition = function(object ', parametersName,')',
methodBody, ')'
)

#Register function
# Generic
setGeneric(name = functionName,
signature = "object",
def = f,
where = .GlobalEnv)
try({
removeMethod(f = get(functionName, .GlobalEnv), signature = "ANY", where = .GlobalEnv)
}, silent = T)

# Methods for pipeline object, and first argument
eval(parse(text = registerFunText), envir=.GlobalEnv)


# Updating registry
if(loadPipeline==F){
fn <- paste0(functionName)
if(nrow(getRegistry() %>>% dplyr::filter(functionName == fn)) == 0){
Expand All @@ -254,12 +250,11 @@ registerFunction <- function( functionName, heading = "",
}
}
}
}, error = function(e){
futile.logger::flog.error(e, name = "logger.base")
stop()
})
# }, error = function(e){
# futile.logger::flog.error(e, name = "logger.base")
# stop()
# })
}
# )

.updateRegistry <- function(functionName,
heading = "",
Expand Down Expand Up @@ -295,6 +290,8 @@ loadPredefinedFunctionRegistry <- function(){
heading = .batchPredefFunctions[['heading']][[rowNo]],
engine = .batchPredefFunctions[['engine']][[rowNo]],
exceptionFunction = .batchPredefFunctions[['exceptionHandlingFunction']][[rowNo]],
isDataFunction = .batchPredefFunctions[['isDataFunction']][[rowNo]],
firstArgClass = .batchPredefFunctions[['firstArgClass']][[rowNo]],
userDefined = F, loadPipeline = F )
}

Expand All @@ -305,12 +302,14 @@ loadPredefinedFunctionRegistry <- function(){
heading = .streamingPredefFunctions[['heading']][[rowNo]],
engine = .streamingPredefFunctions[['engine']][[rowNo]],
exceptionFunction = .streamingPredefFunctions[['exceptionHandlingFunction']][[rowNo]],
isDataFunction = .streamingPredefFunctions[['isDataFunction']][[rowNo]],
firstArgClass = .streamingPredefFunctions[['firstArgClass']][[rowNo]],
userDefined = F, loadPipeline = F)
}
futile.logger::flog.info('|| Predefined utility functions registered ||', name = "logger.base")
}, error = function(e){
futile.logger::flog.error(e, name = "logger.base")
})
futile.logger::flog.info('|| Predefined utility functions registered ||', name = "logger.base")
}

#' @name setInput
Expand Down Expand Up @@ -596,24 +595,6 @@ getRegistry <- function(){
return(registry)
}

# setGeneric(
# name = "getRegistry",
# def = function(object)
# {
# standardGeneric("getRegistry")
# }
# )
#
# .getRegistry = function(object){
# return(object@registry)
# }
#
# setMethod(
# f = "getRegistry",
# signature = "BaseAnalysisPipeline",
# definition = .getRegistry
# )

#' @name getInput
#' @title Obtains the initializedInput
#' @param object The \code{AnalysisPipeline} or \code{StreamingAnalysisPipeline} object
Expand Down
2 changes: 1 addition & 1 deletion R/spark-structured-streaming-utilities.R
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ sparkRSessionCreateIfNotPresent <- function(...){
#' @export

castKafkaStreamAsString <- function(streamObj){
streamObj <- selectExpr(streamObj, "CAST(key AS STRING)", "CAST(value AS STRING)","topic","timestamp")
streamObj <- SparkR::selectExpr(streamObj, "CAST(key AS STRING)", "CAST(value AS STRING)","topic","timestamp")
return(streamObj)
}

Expand Down
Binary file modified R/sysdata.rda
Binary file not shown.
31 changes: 19 additions & 12 deletions data-raw/predefFunctions.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,31 @@
##################################################################################################
.batchPredefFunctions <- data.frame(functionName = c("univarCatDistPlots"),
heading = c("Univariate Distribution Categorical"),
# outAsIn = c(FALSE),
engine = c("r"),
exceptionHandlingFunction = c(as.character(substitute(genericPipelineException))),
isDataFunction = T,
firstArgClass = "",
stringsAsFactors = F)

.batchPredefFunctions %>>% dplyr::add_row(functionName = "outlierPlot",
heading = "Univariate Outlier",
# outAsIn = FALSE,
engine = "r",
exceptionHandlingFunction = c(as.character(substitute(genericPipelineException)))) -> .batchPredefFunctions
exceptionHandlingFunction = c(as.character(substitute(genericPipelineException))),
isDataFunction = T,
firstArgClass = "") -> .batchPredefFunctions
.batchPredefFunctions %>>% dplyr::add_row(functionName = "multiVarOutlierPlot",
heading = "Multivariate Outlier",
# outAsIn = FALSE,
engine = "r",
exceptionHandlingFunction = c(as.character(substitute(genericPipelineException)))) -> .batchPredefFunctions
exceptionHandlingFunction = c(as.character(substitute(genericPipelineException))),
isDataFunction = T,
firstArgClass = "") -> .batchPredefFunctions
.batchPredefFunctions %>>% dplyr::add_row(functionName = "ignoreCols",
heading = "",
# outAsIn = TRUE,
heading = "Ignore Columns",
engine = "r",
exceptionHandlingFunction = c(as.character(substitute(genericPipelineException)))) -> .batchPredefFunctions
exceptionHandlingFunction = c(as.character(substitute(genericPipelineException))),
isDataFunction = T,
firstArgClass = "") -> .batchPredefFunctions

##################################################################################################

Expand All @@ -46,17 +51,19 @@
##################################################################################################

.streamingPredefFunctions <- data.frame(functionName = c("castKafkaStreamAsString"),
heading = c(""),
heading = c("Cast Kafka stream to a string"),
engine = c("spark-structured-streaming"),
exceptionHandlingFunction = c(as.character(substitute(genericPipelineException))),
# outAsIn = c(TRUE),
isDataFunction = T,
firstArgClass = "",
stringsAsFactors = F)

.streamingPredefFunctions %>>% dplyr::add_row(functionName = "convertKafkaValueFromJson",
heading = "",
heading = "Convert Kafka Value from JSON",
engine = c("spark-structured-streaming"),
exceptionHandlingFunction = c(as.character(substitute(genericPipelineException)))
# outAsIn = TRUE
exceptionHandlingFunction = c(as.character(substitute(genericPipelineException))),
isDataFunction = T,
firstArgClass = ""
) -> .streamingPredefFunctions


Expand Down
7 changes: 3 additions & 4 deletions man/analysisPipelines.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ obj2 <- obj2 %>>%
priColor = "blue", secColor = "black")
# Printing the updated pipeline
obj2 %>>% getPipeline
obj2 %>>% getPipeline
```

The newly added function will also lazy evaluate depending on the trigger.
Expand Down Expand Up @@ -250,7 +250,7 @@ obj %>>% getColor(color = "blue") %>>% getColumnName(columnName = "Occupancy") %
complexPipeline %>>% getPipeline
complexPipeline %>>% generateOutput -> op
op %>>% getOutputById("4")
op %>>% getOutputById("3")
```

# Visualizing pipelines
Expand Down
Loading

0 comments on commit c86d3b6

Please sign in to comment.