Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add machine-learning Processes #91

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Collate:
'Job-class.R'
'ProcessGraph-class.R'
'SessionConfig-Class.R'
'apply_prediction.R'
'error.R'
'train_model.R'
Depends:
R (>= 4.3.0)
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ RUN R -e "install.packages('gdalcubes')"

# install other necessary packages
RUN apt-get install -y libsodium-dev libudunits2-dev
RUN Rscript -e "install.packages(c('plumber', 'useful', 'ids', 'R6', 'sf', 'rstac','bfast'))"
RUN Rscript -e "install.packages(c('plumber', 'useful', 'ids', 'R6', 'sf', 'rstac','bfast', 'caret', 'randomForest', 'stringr'))"

# create directories
RUN mkdir -p /opt/dockerfiles/ && mkdir -p /var/openeo/workspace/ && mkdir -p /var/openeo/workspace/data/

# install packages from local directory
COPY ./ /opt/dockerfiles/
RUN Rscript -e "remotes::install_local('/opt/dockerfiles',dependencies=TRUE)"
RUN Rscript -e "remotes::install_local('/opt/dockerfiles',dependencies=FALSE)"

# cmd or entrypoint for startup
CMD ["R", "-q", "--no-save", "-f /opt/dockerfiles/startProduction.R"]

EXPOSE 8000
EXPOSE 8000
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export(Process)
export(ProcessGraph)
export(SessionConfig)
export(SessionInstance)
export(apply_prediction_opp)
export(createSessionInstance)
export(getJobIdIndex)
export(is.Collection)
Expand Down
2 changes: 1 addition & 1 deletion R/Job-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ Job <- R6Class(
},
error=function (e) {
self$status = "error"
self$results = NULL
self$results = e
writeJobInfo(self)
},
finally = {
Expand Down
12 changes: 12 additions & 0 deletions R/SessionConfig-Class.R
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ SessionConfig = function(api.port = NULL, host = NULL, aws.ipv4 = NULL) {

)
),
RDS = list(
title = "R Data Set",
description = "Export to RDS",
gis_data_types = list("raster"),
parameters = list(
format = list(
type = "string",
description = "RDS"
)
)

),
inputFormats = list(
ImageCollection = list(
title = "ImageCollection",
Expand Down
146 changes: 108 additions & 38 deletions R/api.R
Original file line number Diff line number Diff line change
Expand Up @@ -157,45 +157,113 @@ NULL


.executeSynchronous = function(req, res) {
tryCatch({
sent_job = jsonlite::fromJSON(req$rook.input$read_lines(),simplifyDataFrame = FALSE)
process_graph = sent_job$process
newJob = Job$new(process = process_graph)

job = newJob$run()
format = job$output

if (class(format) == "list") {
if (format$title == "Network Common Data Form") {
file = gdalcubes::write_ncdf(job$results)
}
else if (format$title == "GeoTiff") {
file = gdalcubes::write_tif(job$results)
}
else {
throwError("FormatUnsupported")
}
}
else {
if (format == "NetCDF") {
file = gdalcubes::write_ncdf(job$results)
}
else if (format == "GTiff") {
file = gdalcubes::write_tif(job$results)
}
else {
throwError("FormatUnsupported")
}
}

first = file[1]
res$status = 200
res$body = readBin(first, "raw", n = file.info(first)$size)
content_type = plumber:::getContentType(tools::file_ext(first))
res$setHeader("Content-Type", content_type)
tryCatch(
{
sent_job = jsonlite::fromJSON(req$rook.input$read_lines(),simplifyDataFrame = FALSE)
process_graph = sent_job$process
newJob = Job$new(process = process_graph)

job = newJob$run()
format = job$output

if(job$status == "error")
{
# throw error stored in job$results
stop(job$results)
}

return(res)
},error=handleError)
# Only support naming short names of output formats do remove redundant code
# to add a new supported class, just add a new else if Statement in the desired file format
message("\nStart file saving...")

# just one try catch were all evaluations happen
tryCatch(
{
if (format == "NetCDF")
{
# check class of the result
if ("cube" %in% class(job$results))
{
# result is a datacube
# write datacube as NetCDF and take first file path (see gdalcubes reference: https://gdalcubes.github.io/source/reference/ref/write_ncdf.html#value)
file = gdalcubes::write_ncdf(job$results)[1]
}
else if (all(c("sf", "data.frame") %in% class(job$results)))
{
# result is a sf data.frame
file = base::tempfile()

# save whole file
sf::st_write(job$results, file, driver = "netCDF")
}
}
else if (format == "GTiff")
{
# check class of the result
if ("cube" %in% class(job$results))
{
# result is a datacube
# write datacube as Tiff and take first file path (see gdalcubes reference: https://gdalcubes.github.io/source/reference/ref/write_tif.html#value)
file = gdalcubes::write_tif(job$results)[1]
}
}
else if (format == "RDS")
{
file = base::tempfile()

# check class of the result
if ("cube" %in% class(job$results))
{
# result is a datacube

# save proxy cube as json
base::saveRDS(gdalcubes:::as_json(job$results), file)
}
else if (all(c("train", "train.formula") %in% class(job$results)))
{
# result is a caret model

base::saveRDS(job$results, file)
}
else if (all(c("sf", "data.frame") %in% class(job$results)))
{
# result is a sf data.frame

base::saveRDS(job$results, file)
}
}
else
{
throwError("FormatUnsupported")
}
},
error = function(error)
{
message('An Error Occurred.')
message(toString(error))
stop("FormatUnsupported")
},
warning = function(warning) {
message('A Warning Occurred')
message(toString(warning))
})
# handle creation of http result

message("File saving finished!")

message("\nPreparing HTTP result...")

res$status = 200
res$body = readBin(file, "raw", n = file.info(file)$size)

content_type = plumber:::getContentType(tools::file_ext(file))
res$setHeader("Content-Type", content_type)

message("HTTP Result send!")
return(res)

},
error = handleError)
}

.cors_filter = function(req,res) {
Expand Down Expand Up @@ -315,5 +383,7 @@ addEndpoint = function() {
Session$assignProcess(multiply)
Session$assignProcess(divide)
Session$assignProcess(evi)
Session$assignProcess(train_model)
Session$assignProcess(apply_prediction)

}
135 changes: 135 additions & 0 deletions R/apply_prediction.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#' @export
apply_prediction_opp = function(data, model_id, keep_bands = FALSE, job) {

message("\napply_prediction called...")

message("\nCall parameters:")
message("\ndata:")
print(data)

message("\nmodel_id:")
message(model_id)

tryCatch({
message("\nTry loading the model from user workspace...")

path_to_model = paste0(Session$getConfig()$workspace.path, "/", model_id, ".rds")

message("Path to model: ", path_to_model)

# get model from user workspace
model = readRDS(path_to_model)

message("Model found in: ", path_to_model)
message("\nModel loaded successfully!")
},
error = function(err)
{
message("An Error occured!")
message(toString(err))
stop("No Model found!")
})

# get band names for to later create a data.frame
band_names = names(data)

# the model and band_names need to be loaded into a tempdir, so that it can be accessed in a new process in "FUN" from "apply_pixel" (see below)
t = tempdir()

message("Current tempdir: ", t)

# save variable in tempdir
saveRDS(model, paste0(t, "/model.rds"))
saveRDS(band_names, paste0(t, "/band_names.rds"))

# save tempdir string to later retrieve files in another process
Sys.setenv(TEMP_DIR = t)

# creates two bands "predicted_classes", "class_confidence" in the datacube
cube = gdalcubes::apply_pixel(
data,
names = c("predicted_class", "class_confidence"),
FUN = function(band_values_vector)
{
library(caret)
library(stringr)
library(randomForest)

# load tempdir path from Global Env, to ensure its the same as in the process above
tmp = Sys.getenv("TEMP_DIR")
message("Tempdir in FUN: ", tmp)

# load variables needed for prediction
model = readRDS(paste0(tmp, "/model.rds"))
band_names = readRDS(paste0(tmp, "/band_names.rds"))

# named vector for df creation
named_vector = setNames(band_values_vector, band_names)

# create 1-row df per pixel of the datacube
band_value_df = named_vector |> t() |> as.data.frame()

tryCatch({
predicted_class = stats::predict(model, newdata = band_value_df)
class_confidence = stats::predict(model, newdata = band_value_df, type = "prob")

# parse Integer value from string
predicted_class <- predicted_class |>
base::as.character() |>
stringr::str_extract_all("\\d+") |>
base::as.numeric()

# determine confidence value for the classified class
highest_class_confidence = base::apply(class_confidence, 1, base::max)

return(c(predicted_class, highest_class_confidence))
},
error = function(err)
{
stop("Error in apply_pixel!")
})

},
keep_bands = keep_bands)


message("\nDatacube: ")
print(cube)

return(cube)
}


#' apply_prediction
apply_prediction <- Process$new(
id = "apply_prediction",
description = "Apply a machine-learning model on each pixel of the datacube. This creates 2 new bands in the cube containing the predicted classes per pixel and the propability of the predicted class (class confidence). Bands of the source cube can optionally be included. This Algorithm will only save integer values in the datacube. If the levels of the model are factor values without an integer part like: 'expl', gdalcubes will assign a integer value instead. Therefore it is advised to provide a model with levels in the form of 'X1' 'X2', ..., 'Xn'.",
categories = as.array("cubes", "machine-learning"),
summary = "Apply a machine-learning based prediction on a datacube",
parameters = list(
Parameter$new(
name = "data",
description = "A data cube with bands.",
schema = list(
type = "object",
subtype = "raster-cube"
)
),
Parameter$new(
name = "model_id",
description = "Id of the model that should be used for prediction. The model will be searched in the user workspace.",
schema = list(
type = "string"
)
),
Parameter$new(
name = "keep_bands",
description = "Keep bands of input data cube, defaults to FALSE, i.e. original bands will be dropped.",
schema = list(
type = "boolean"
)
)
),
returns = eo_datacube,
operation = apply_prediction_opp
)
Loading
Loading