Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiregion Deployments (OSS integration) #8184

Merged
merged 13 commits into from
Jun 17, 2020
22 changes: 22 additions & 0 deletions api/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ func (d *Deployments) PromoteGroups(deploymentID string, groups []string, q *Wri
return &resp, wm, nil
}

// Unblock is used to unblock the given deployment.
func (d *Deployments) Unblock(deploymentID string, q *WriteOptions) (*DeploymentUpdateResponse, *WriteMeta, error) {
var resp DeploymentUpdateResponse
req := &DeploymentUnblockRequest{
DeploymentID: deploymentID,
}
wm, err := d.client.write("/v1/deployment/unblock/"+deploymentID, req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}

// SetAllocHealth is used to set allocation health for allocs that are part of
// the given deployment
func (d *Deployments) SetAllocHealth(deploymentID string, healthy, unhealthy []string, q *WriteOptions) (*DeploymentUpdateResponse, *WriteMeta, error) {
Expand Down Expand Up @@ -150,6 +163,9 @@ type Deployment struct {
// present the correct list of deployments for the job and not old ones.
JobCreateIndex uint64

// IsMultiregion specifies if this deployment is part of a multi-region deployment
IsMultiregion bool

// TaskGroups is the set of task groups effected by the deployment and their
// current deployment status.
TaskGroups map[string]*DeploymentState
Expand Down Expand Up @@ -257,6 +273,12 @@ type DeploymentFailRequest struct {
WriteRequest
}

// DeploymentUnblockRequest is used to unblock a particular deployment
type DeploymentUnblockRequest struct {
DeploymentID string
WriteRequest
}

// SingleDeploymentResponse is used to respond with a single deployment
type SingleDeploymentResponse struct {
Deployment *Deployment
Expand Down
81 changes: 81 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,78 @@ func (u *UpdateStrategy) Empty() bool {
return true
}

type Multiregion struct {
Strategy *MultiregionStrategy
Regions []*MultiregionRegion
}

func (m *Multiregion) Canonicalize() {
if m.Strategy == nil {
m.Strategy = &MultiregionStrategy{
MaxParallel: intToPtr(0),
OnFailure: stringToPtr(""),
}
} else {
if m.Strategy.MaxParallel == nil {
m.Strategy.MaxParallel = intToPtr(0)
}
if m.Strategy.OnFailure == nil {
m.Strategy.OnFailure = stringToPtr("")
}
}
if m.Regions == nil {
m.Regions = []*MultiregionRegion{}
}
for _, region := range m.Regions {
if region.Count == nil {
region.Count = intToPtr(1)
}
if region.Datacenters == nil {
region.Datacenters = []string{}
}
if region.Meta == nil {
region.Meta = map[string]string{}
}
}
}

func (m *Multiregion) Copy() *Multiregion {
if m == nil {
return nil
}
copy := new(Multiregion)
if m.Strategy != nil {
copy.Strategy = new(MultiregionStrategy)
copy.Strategy.MaxParallel = intToPtr(*m.Strategy.MaxParallel)
copy.Strategy.OnFailure = stringToPtr(*m.Strategy.OnFailure)
}
for _, region := range m.Regions {
copyRegion := new(MultiregionRegion)
copyRegion.Name = region.Name
copyRegion.Count = intToPtr(*region.Count)
for _, dc := range region.Datacenters {
copyRegion.Datacenters = append(copyRegion.Datacenters, dc)
}
for k, v := range region.Meta {
copyRegion.Meta[k] = v
}
copy.Regions = append(copy.Regions, copyRegion)
}
return copy
}

type MultiregionStrategy struct {
MaxParallel *int `mapstructure:"max_parallel"`
OnFailure *string `mapstructure:"on_failure"`
}

type MultiregionRegion struct {
Name string
Count *int
Datacenters []string
Meta map[string]string
}

// PeriodicConfig is for serializing periodic config for a job.
type PeriodicConfig struct {
Enabled *bool
Expand Down Expand Up @@ -711,6 +783,7 @@ type Job struct {
Affinities []*Affinity
TaskGroups []*TaskGroup
Update *UpdateStrategy
Multiregion *Multiregion
Spreads []*Spread
Periodic *PeriodicConfig
ParameterizedJob *ParameterizedJobConfig
Expand Down Expand Up @@ -741,6 +814,11 @@ func (j *Job) IsParameterized() bool {
return j.ParameterizedJob != nil && !j.Dispatched
}

// IsMultiregion returns whether a job is a multiregion job
func (j *Job) IsMultiregion() bool {
return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0
}

func (j *Job) Canonicalize() {
if j.ID == nil {
j.ID = stringToPtr("")
Expand Down Expand Up @@ -807,6 +885,9 @@ func (j *Job) Canonicalize() {
} else if *j.Type == JobTypeService {
j.Update = DefaultUpdateStrategy()
}
if j.Multiregion != nil {
j.Multiregion.Canonicalize()
}

for _, tg := range j.TaskGroups {
tg.Canonicalize(j)
Expand Down
62 changes: 62 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,68 @@ func TestJobs_Canonicalize(t *testing.T) {
},
},
},

{
name: "multiregion",
input: &Job{
Name: stringToPtr("foo"),
ID: stringToPtr("bar"),
ParentID: stringToPtr("lol"),
Multiregion: &Multiregion{
Regions: []*MultiregionRegion{
{
Name: "west",
Count: intToPtr(1),
},
},
},
},
expected: &Job{
Multiregion: &Multiregion{
Strategy: &MultiregionStrategy{
MaxParallel: intToPtr(0),
OnFailure: stringToPtr(""),
},
Regions: []*MultiregionRegion{
{
Name: "west",
Count: intToPtr(1),
Datacenters: []string{},
Meta: map[string]string{},
},
},
},
Namespace: stringToPtr(DefaultNamespace),
ID: stringToPtr("bar"),
Name: stringToPtr("foo"),
Region: stringToPtr("global"),
Type: stringToPtr("service"),
ParentID: stringToPtr("lol"),
Priority: intToPtr(50),
AllAtOnce: boolToPtr(false),
ConsulToken: stringToPtr(""),
VaultToken: stringToPtr(""),
Stop: boolToPtr(false),
Stable: boolToPtr(false),
Version: uint64ToPtr(0),
Status: stringToPtr(""),
StatusDescription: stringToPtr(""),
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
},
},
}

for _, tc := range testCases {
Expand Down
76 changes: 52 additions & 24 deletions command/agent/deployment_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
)

func (s *HTTPServer) DeploymentsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodGet {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

args := structs.DeploymentListRequest{}
Expand Down Expand Up @@ -47,15 +47,18 @@ func (s *HTTPServer) DeploymentSpecificRequest(resp http.ResponseWriter, req *ht
case strings.HasPrefix(path, "allocation-health/"):
deploymentID := strings.TrimPrefix(path, "allocation-health/")
return s.deploymentSetAllocHealth(resp, req, deploymentID)
case strings.HasPrefix(path, "unblock/"):
deploymentID := strings.TrimPrefix(path, "unblock/")
return s.deploymentUnblock(resp, req, deploymentID)
default:
return s.deploymentQuery(resp, req, path)
}
}

// TODO test and api
func (s *HTTPServer) deploymentFail(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodPut && req.Method != http.MethodPost {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}
args := structs.DeploymentFailRequest{
DeploymentID: deploymentID,
Expand All @@ -71,19 +74,19 @@ func (s *HTTPServer) deploymentFail(resp http.ResponseWriter, req *http.Request,
}

func (s *HTTPServer) deploymentPause(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodPut && req.Method != http.MethodPost {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

var pauseRequest structs.DeploymentPauseRequest
if err := decodeBody(req, &pauseRequest); err != nil {
return nil, CodedError(400, err.Error())
return nil, CodedError(http.StatusBadRequest, err.Error())
}
if pauseRequest.DeploymentID == "" {
return nil, CodedError(400, "DeploymentID must be specified")
return nil, CodedError(http.StatusBadRequest, "DeploymentID must be specified")
}
if pauseRequest.DeploymentID != deploymentID {
return nil, CodedError(400, "Deployment ID does not match")
return nil, CodedError(http.StatusBadRequest, "Deployment ID does not match")
}
s.parseWriteRequest(req, &pauseRequest.WriteRequest)

Expand All @@ -96,19 +99,19 @@ func (s *HTTPServer) deploymentPause(resp http.ResponseWriter, req *http.Request
}

func (s *HTTPServer) deploymentPromote(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodPut && req.Method != http.MethodPost {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

var promoteRequest structs.DeploymentPromoteRequest
if err := decodeBody(req, &promoteRequest); err != nil {
return nil, CodedError(400, err.Error())
return nil, CodedError(http.StatusBadRequest, err.Error())
}
if promoteRequest.DeploymentID == "" {
return nil, CodedError(400, "DeploymentID must be specified")
return nil, CodedError(http.StatusBadRequest, "DeploymentID must be specified")
}
if promoteRequest.DeploymentID != deploymentID {
return nil, CodedError(400, "Deployment ID does not match")
return nil, CodedError(http.StatusBadRequest, "Deployment ID does not match")
}
s.parseWriteRequest(req, &promoteRequest.WriteRequest)

Expand All @@ -120,20 +123,45 @@ func (s *HTTPServer) deploymentPromote(resp http.ResponseWriter, req *http.Reque
return out, nil
}

func (s *HTTPServer) deploymentUnblock(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != http.MethodPut && req.Method != http.MethodPost {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

var unblockRequest structs.DeploymentUnblockRequest
if err := decodeBody(req, &unblockRequest); err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}
if unblockRequest.DeploymentID == "" {
return nil, CodedError(http.StatusBadRequest, "DeploymentID must be specified")
}
if unblockRequest.DeploymentID != deploymentID {
return nil, CodedError(http.StatusBadRequest, "Deployment ID does not match")
}
s.parseWriteRequest(req, &unblockRequest.WriteRequest)

var out structs.DeploymentUpdateResponse
if err := s.agent.RPC("Deployment.Unblock", &unblockRequest, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
return out, nil
}

func (s *HTTPServer) deploymentSetAllocHealth(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodPut && req.Method != http.MethodPost {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

var healthRequest structs.DeploymentAllocHealthRequest
if err := decodeBody(req, &healthRequest); err != nil {
return nil, CodedError(400, err.Error())
return nil, CodedError(http.StatusBadRequest, err.Error())
}
if healthRequest.DeploymentID == "" {
return nil, CodedError(400, "DeploymentID must be specified")
return nil, CodedError(http.StatusBadRequest, "DeploymentID must be specified")
}
if healthRequest.DeploymentID != deploymentID {
return nil, CodedError(400, "Deployment ID does not match")
return nil, CodedError(http.StatusBadRequest, "Deployment ID does not match")
}
s.parseWriteRequest(req, &healthRequest.WriteRequest)

Expand All @@ -146,8 +174,8 @@ func (s *HTTPServer) deploymentSetAllocHealth(resp http.ResponseWriter, req *htt
}

func (s *HTTPServer) deploymentAllocations(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodGet {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

args := structs.DeploymentSpecificRequest{
Expand All @@ -173,8 +201,8 @@ func (s *HTTPServer) deploymentAllocations(resp http.ResponseWriter, req *http.R
}

func (s *HTTPServer) deploymentQuery(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodGet {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

args := structs.DeploymentSpecificRequest{
Expand All @@ -191,7 +219,7 @@ func (s *HTTPServer) deploymentQuery(resp http.ResponseWriter, req *http.Request

setMeta(resp, &out.QueryMeta)
if out.Deployment == nil {
return nil, CodedError(404, "deployment not found")
return nil, CodedError(http.StatusNotFound, "deployment not found")
}
return out.Deployment, nil
}
Loading