Skip to content
This repository has been archived by the owner on Jul 6, 2022. It is now read-only.

Commit

Permalink
Provision and Deprovision enhancements to use new async features (#204)
Browse files Browse the repository at this point in the history
* Enhancement of asynch engine

This PR enhances the asynch engine to enable the
concenpt of delayed or paused tasks. Tasks can be
added to the engine in a queue separate from the main
work queue. As delayed tasks are added, a  resumer is
notified and will periodically start the tasks up. Delayed
tasks can also be started by engine clients.

Using this, pkg/api/provision and deprovision were enhanced to
determine if a provision or deprovision action needs to wait for
another operation to finish. If so, they submit delayed tasks, either
wait_for_parent or wait_for_children, whichever is appropriate. Those
new jobs will make the same determination, and either readd themselves
as delayed tasks, or initiate the task that would otherwise originally
have been created.

* Added bad parent response message

* Merge master into feature branch (#209)

* refactor event function signatures (#147)

* pass around instances and bindings instead of pointers

* Refactor instance encryption/decryption

Previously, encryption and decryption of certain instance
fields was accomplished through special getters and setters.
To use these, you had to pass in a codec. This meant that any
bit of code needing to access these fields needs access to the
codec. Since I'm entertaining changes to the service.Module
interface that will result in modules having access to instance
objects, I wanted a way for modules to be able to extract
information from encrypted instance fields _without_ needing
them to be burdened with additional moving parts-- like codecs.
So this commit refactors instances and the storage layer so that
encryption occurs during instance marshaling and decryption occurs
during instance unmarshaling.

* Refactor binding encryption/decryption

Previously, encryption and decryption of certain binding
fields was accomplished through special getters and setters.
To use these, you had to pass in a codec. This meant that any
bit of code needing to access these fields needs access to the
codec. Since I'm entertaining changes to the service.Module
interface that will result in modules having access to binding
objects, I wanted a way for modules to be able to extract
information from encrypted binding fields _without_ needing
them to be burdened with additional moving parts-- like codecs.
So this commit refactors bindings and the storage layer so that
encryption occurs during bindings marshaling and decryption occurs
during bindings unmarshaling.

* clean up a few errant lint errors

* pass instance to provisioning steps

* pass instance to deprovisioning steps

* make bind and unbind accept instances as arguments

* get defensive about possible unanticipated modifications to instances by module code

* pass instance to updating steps

* minor lint fixes

* refactor lifecycle tests

* Firewall rule parameterization (#152)

* Refactor to support allowing firewall rules.

Fixes: #146

Changes to suport optional parameters to allow specifying start
and end IP address for firewall rules on MySQL, PostgreSQL and
MSSQL. Previously, this defaulted to allow any connection. This
isn't ecure in practice, so allowing the customer to specify desired
rules. Also changed the default to 0.0.0.0 for both, which results in
only Azure internal addresses. Added some parameter validation
for the firewall values as well:

* Valid IPV4 addresses
* Start <= End

Extracted a method to build the arm template parameter map. Providing
empty strings is invalid for the ARM template, so only including
when non-empty.

Added parameters to the lifecycle tests to allow running them
from local machines (will need ao enhance the charts in helm-charts)

Added some tests.

* Fixing merge conflicts

* Test cleanup

* json field cleanup

* Resolving code review comments

* error message cleanup

* error message cleanup

* specify firewall rules for relational databased in example manifests

* Few minor corrections to the Quickstart (#136)

* Correcting the az ad sp delete command

* Update quickstart-minikube.md

Remove a leading $ from a few bash commands. copy paste on mac is messed up :-)

* Update quickstart-minikube.md

Clarification around logging in to WordPress

* restoring prompts in quickstart

* minor punctuation fix

* removing distracting echo command

* make prompts used consistently

* kick of redis tests sooner (#155)

* s/module lifecycle tests/service lifecycle tests/g (#156)

* make storage know how to deal with service-specific types (#153)

* don't use codec where we don't need it anymore (#161)

* Updating CircleCI to define DOCKER_REPO env var (#167)

* Updating CircleCI to define REGISTRY env var

We previously set the REGISTRY environment variable in a deploy script.
When that was removed to streamline the release process, we need to
define it in Circle. Added two environment blocks to the publish-rc-images
and publish-release-images jobs.

Fixes: 145

* Qualified registry with docker.io

* Changed REGISTRY to DOCKER_REPO

* s/context/details/g (#163)

* make fake bind/unbind functionality not rely on standard context

* collapse standard provisioning context onto instance

* do away with (sort of) redundant standard provisioning parameters

* s/provisioning context/instance details/

* s/binding context/binding details/

* fix error handling when standard params aren't expected types

* add comment that adds some clarity to instance comparisons

* don't store credentials as their own field (#169)

* quickstart fixes (#174)

* adding missing docker pushes to release process

* Adding build flags to inject the version and commit (#173)

* Adding build flags to inject the version and commit

This commit injects the version and commit to the OSBA binary. If there
is no version set, then sets the version to “devel”

* Using new variable for version

* Removing redundant setting for main.commit

* Printing version and commit on broker startup

* Adding fields to the log message on startup

* Removing build flags on the CLI builds

* broker doesn't need to hang on to codec (#180)

* remove quotes from devel pseudo-version string (#184)

Merging without CI.

* fix bug decoding tags (#183)

* move version info to avoid import cycle later (#185)

* Adding user-agent string to requests to ARM (#172)

* Adding user-agent string to requests to ARM

* get version for user agent string from version package

* use separate redis clients for storage and async engine (#182)

* Adding ability to filter lifecycle tests (#181)

* Adding ability to filter lifecycle tests

This adds the ability to filter lifecycle tests using an environment
variable (TEST_MODULES). Via this variable, the developer can provide
a comma delimited list of module names and only the test cases matching
that module will be enabled. If the variable is empty, all tests will run.
If an invalid module name is provided, no tests will run.

* fix typo and lint error

* remove unnecessary funtion

* use redis namespaces (#188)

* use redis namespaces for instances and bindings

* use redis namespaces in async engine

* start using lightweight-docker-go image for development tasks (#190)

* simplify ci config (#194)

* README work (#196)

* README formatting + one cmdline fix

* Link fixes

* Revert heading to title case

* append instead of replacing user-agent (#198)

* append instead of replacing user-agent

* fix lint errors

* add deferred task execution to async package (#208)

achieving this involved a significant refactor
of the entire package. the good news is that the
tests are improved beyond what we had before,
so i think we can be more confident in the
refactored package than the original.

* Enhancement of asynch engine

This PR enhances the asynch engine to enable the
concenpt of delayed or paused tasks. Tasks can be
added to the engine in a queue separate from the main
work queue. As delayed tasks are added, a  resumer is
notified and will periodically start the tasks up. Delayed
tasks can also be started by engine clients.

Using this, pkg/api/provision and deprovision were enhanced to
determine if a provision or deprovision action needs to wait for
another operation to finish. If so, they submit delayed tasks, either
wait_for_parent or wait_for_children, whichever is appropriate. Those
new jobs will make the same determination, and either readd themselves
as delayed tasks, or initiate the task that would otherwise originally
have been created.

* Remove resumer changes

* removed references to submitdelayed task

* Fixed debug statements that got clobbered in merge

* Debug message

* Fixed response code

* Whitespace

* Code review comments

* Code review comments

* Refactor based on code-review comments.

* Code review comments

* Lint fixes
  • Loading branch information
jeremyrickard authored Jan 16, 2018
1 parent 4fd3c11 commit f785b7e
Show file tree
Hide file tree
Showing 9 changed files with 513 additions and 19 deletions.
42 changes: 34 additions & 8 deletions pkg/api/deprovision.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"net/http"
"strconv"
"time"

"github.com/Azure/open-service-broker-azure/pkg/async/model"
"github.com/Azure/open-service-broker-azure/pkg/service"
Expand Down Expand Up @@ -142,21 +143,46 @@ func (s *server) deprovision(w http.ResponseWriter, r *http.Request) {
return
}

task := model.NewTask(
"deprovisionStep",
map[string]string{
"stepName": firstStepName,
"instanceID": instanceID,
},
)
childCount, err := s.store.GetInstanceChildCountByAlias(instance.Alias)
if err != nil {
logFields["step"] = firstStepName
logFields["error"] = err
log.WithFields(logFields).Error(
"deprovisioning error: error determining child count",
)
s.writeResponse(w, http.StatusInternalServerError, responseEmptyJSON)
}

var task model.Task
if childCount > 0 {
logFields["provisionedChildren"] = childCount
task = model.NewDelayedTask(
"checkChildrenStatuses",
map[string]string{
"instanceID": instanceID,
},
time.Minute*1,
)
log.WithFields(logFields).Debug("children not deprovisioned, waiting")
} else {
task = model.NewTask(
"deprovisionStep",
map[string]string{
"stepName": firstStepName,
"instanceID": instanceID,
},
)
log.WithFields(logFields).Debug(
"no provisioned children, starting deprovision",
)
}
if err = s.asyncEngine.SubmitTask(task); err != nil {
logFields["step"] = firstStepName
logFields["error"] = err
log.WithFields(logFields).Error(
"deprovisioning error: error submitting deprovisioning task",
)
s.writeResponse(w, http.StatusInternalServerError, responseEmptyJSON)
return
}

// If we get all the way to here, we've been successful!
Expand Down
98 changes: 90 additions & 8 deletions pkg/api/provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,17 @@ func (s *server) provision(w http.ResponseWriter, r *http.Request) {
Details: serviceManager.GetEmptyInstanceDetails(),
Created: time.Now(),
}

waitForParent, err := s.isParentProvisioning(instance)
if err != nil {
logFields["error"] = err
log.WithFields(logFields).Error(
"provisioning error: error related to parent instance",
)
s.writeResponse(w, http.StatusBadRequest, responseParentInvalid)
return
}

if err = s.store.WriteInstance(instance); err != nil {
logFields["error"] = err
log.WithFields(logFields).Error(
Expand All @@ -395,13 +406,29 @@ func (s *server) provision(w http.ResponseWriter, r *http.Request) {
return
}

task := model.NewTask(
"provisionStep",
map[string]string{
"stepName": firstStepName,
"instanceID": instanceID,
},
)
var task model.Task
if waitForParent {
task = model.NewDelayedTask(
"checkParentStatus",
map[string]string{
"instanceID": instanceID,
},
time.Minute*1,
)
log.WithFields(logFields).Debug("parent not provisioned, waiting")
} else {
task = model.NewTask(
"provisionStep",
map[string]string{
"stepName": firstStepName,
"instanceID": instanceID,
},
)
log.WithFields(logFields).Debug(
"no need to wait for parent, starting provision",
)
}

if err = s.asyncEngine.SubmitTask(task); err != nil {
logFields["step"] = firstStepName
logFields["error"] = err
Expand All @@ -411,13 +438,68 @@ func (s *server) provision(w http.ResponseWriter, r *http.Request) {
s.writeResponse(w, http.StatusInternalServerError, responseEmptyJSON)
return
}

// If we get all the way to here, we've been successful!
s.writeResponse(w, http.StatusAccepted, responseProvisioningAccepted)

log.WithFields(logFields).Debug("asynchronous provisioning initiated")
}

func (s *server) isParentProvisioning(instance service.Instance) (bool, error) {
//No parent, so no need to wait
if instance.ParentAlias == "" {
return false, nil
}

parent, parentFound, err := s.store.GetInstanceByAlias(instance.ParentAlias)

if err != nil {
log.WithFields(log.Fields{
"error": "waitforParent",
"instanceID": instance.InstanceID,
"parentAlias": instance.ParentAlias,
}).Error(
"bad provision request: unable to retrieve parent",
)
return false, err
}

//Parent has was not found, so wait for that that to occur
if !parentFound {
return true, nil
}

//If parent failed, we should not even attempt to provision this
if parent.Status == service.InstanceStateProvisioningFailed {
log.WithFields(log.Fields{
"error": "waitforParent",
"instanceID": instance.InstanceID,
"parentID": instance.Parent.InstanceID,
}).Info(
"bad provision request: parent failed provisioning",
)
return false, fmt.Errorf("error provisioning: parent provision failed")
}

//If parent is deprovisioning, we should not even attempt to provision this
if parent.Status == service.InstanceStateDeprovisioning {
log.WithFields(log.Fields{
"error": "waitforParent",
"instanceID": instance.InstanceID,
"parentID": instance.Parent.InstanceID,
}).Info(
"bad provision request: parent is deprovisioning",
)
return false, fmt.Errorf("error provisioning: parent is deprovisioning")
}

//If parent is provisioned, then no need to wait.
if parent.Status == service.InstanceStateProvisioned {
return false, nil
}

return true, nil
}

func (s *server) validateLocation(svc service.Service, location string) error {
// Validate location only if this is a "root" service type (i.e. has no
// parent)
Expand Down
6 changes: 6 additions & 0 deletions pkg/api/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,9 @@ var responseOperationInvalid = []byte(
`{ "error": "OperationInvalid", "description": "The polling request ` +
`included an invalid value for the required operation query parameter" }`,
)

var responseParentInvalid = []byte(
`{ "error": "InvalidParent", "description": "The parentAlias provided ` +
`refers to a service instance that failed to provision or is currently ` +
`deprovisioning" }`,
)
18 changes: 18 additions & 0 deletions pkg/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,24 @@ func NewBroker(
)
}

err = b.asyncEngine.RegisterJob("checkParentStatus", b.doCheckParentStatus)
if err != nil {
return nil, errors.New(
"error registering async job for executing check of parent status",
)
}

err = b.asyncEngine.RegisterJob(
"checkChildrenStatuses",
b.doCheckChildrenStatuses,
)
if err != nil {
return nil, errors.New(
"error registering async job for executing check of children " +
"statuses",
)
}

b.apiServer, err = api.NewServer(
8080,
b.store,
Expand Down
170 changes: 170 additions & 0 deletions pkg/broker/check_children_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package broker

import (
"context"
"errors"
"fmt"
"time"

"github.com/Azure/open-service-broker-azure/pkg/async/model"
"github.com/Azure/open-service-broker-azure/pkg/service"
log "github.com/Sirupsen/logrus"
)

func (b *broker) doCheckChildrenStatuses(
_ context.Context,
args map[string]string,
) error {
instanceID, ok := args["instanceID"]
if !ok {
return errors.New(`missing required argument "instanceID"`)
}
instance, ok, err := b.store.GetInstance(instanceID)
if !ok {
return b.handleDeprovisioningError(
instanceID,
"checkChildrenStatus",
nil,
"error loading persisted instance",
)
}
if err != nil {
return b.handleDeprovisioningError(
instanceID,
"checkChildrenStatus",
err,
"error loading persisted instance",
)
}
childCount, err := b.store.GetInstanceChildCountByAlias(instance.Alias)
if err != nil {
log.WithFields(log.Fields{
"step": "checkChildrenStatus",
"instanceID": instanceID,
"error": err,
}).Error(
"deprovisioning error: error determining child count",
)
return b.handleDeprovisioningError(
instance,
"checkChildrenStatus",
err,
"error determining child count",
)
}
var task model.Task
if childCount > 0 {
//Put this task back into the queue
task = model.NewDelayedTask(
"checkChildrenStatus",
map[string]string{
"instanceID": instanceID,
},
time.Minute*1,
)
log.WithFields(log.Fields{
"instanceID": instanceID,
"provisionedChildren": childCount,
}).Debug("children not deprovisioned, will wait again")
} else {
svc, ok := b.catalog.GetService(instance.ServiceID)
if !ok {
// If we don't find the Service in the catalog, something is really wrong.
// (It should exist, because an instance with this serviceID exists.)
log.WithFields(log.Fields{
"step": "checkChildrenStatus",
"instanceID": instanceID,
"serviceID": instance.ServiceID,
}).Error(
"deprovisioning error: no Service found for serviceID",
)
return b.handleDeprovisioningError(
instance,
"checkChildrenStatus",
nil,
"error: no Service found for serviceID",
)
}
plan, ok := svc.GetPlan(instance.PlanID)
if !ok {
// If we don't find the Service in the catalog, something is really wrong.
// (It should exist, because an instance with this serviceID exists.)
log.WithFields(log.Fields{
"instanceID": instanceID,
"serviceID": instance.ServiceID,
"planID": instance.PlanID,
}).Error(
"deprovisioning error: no Plan found for planID in Service",
)
return b.handleDeprovisioningError(
instance,
"checkChildrenStatus",
nil,
"error: no Plan found for planID",
)
}
serviceManager := svc.GetServiceManager()
var deprovisioner service.Deprovisioner
deprovisioner, err = serviceManager.GetDeprovisioner(plan)
if err != nil {
log.WithFields(log.Fields{
"instanceID": instanceID,
"serviceID": instance.ServiceID,
"planID": instance.PlanID,
"error": err,
}).Error(
"deprovisioning error: error retrieving deprovisioner for " +
"service and plan",
)
return b.handleDeprovisioningError(
instance,
"checkChildrenStatus",
err,
"error retrieving deprovisioner for service and service",
)
}
deprovisionFirstStep, ok := deprovisioner.GetFirstStepName()
if !ok {
log.WithFields(log.Fields{
"instanceID": instanceID,
"serviceID": instance.ServiceID,
"planID": instance.PlanID,
}).Error(
"pre-deprovisioning error: no steps found for deprovisioning " +
"service and plan",
)
return b.handleDeprovisioningError(
instance,
"checkChildrenStatus",
nil,
"error: no steps found for deprovisioning service ance plan",
)
}

//Put the real deprovision task into the queue
task = model.NewTask(
"deprovisionStep",
map[string]string{
"stepName": deprovisionFirstStep,
"instanceID": instanceID,
},
)
log.WithFields(log.Fields{
"step": "checkChildrenStatus",
"instanceID": instanceID,
}).Debug("children deprovisioned, sending start deprovision task")
}
if err = b.asyncEngine.SubmitTask(task); err != nil {
return b.handleDeprovisioningError(
instance,
"checkChildrenStatus",
err,
fmt.Sprintf(
`error submitting task %s from checkChildrenStatuses`,
task.GetJobName(),
),
)
}

return nil
}
Loading

0 comments on commit f785b7e

Please sign in to comment.