Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scaleutils: refactor to make external plugins possible and easier #395

Merged
merged 9 commits into from
Feb 24, 2021
12 changes: 5 additions & 7 deletions plugins/builtin/apm/nomad/plugin/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (

"github.com/hashicorp/nomad-autoscaler/sdk"
"github.com/hashicorp/nomad-autoscaler/sdk/helper/scaleutils"
"github.com/hashicorp/nomad-autoscaler/sdk/helper/scaleutils/nodepool"
"github.com/hashicorp/nomad/api"
)

// nodePoolQuery is the plugins internal representation of a query and contains
// all the information needed to perform a Nomad APM query for a node pool.
type nodePoolQuery struct {
metric string
poolIdentifier *scaleutils.PoolIdentifier
poolIdentifier nodepool.ClusterNodePoolIdentifier
operation string
}

Expand Down Expand Up @@ -78,7 +79,7 @@ func (a *APMPlugin) queryNodePool(q string) (sdk.TimestampedMetrics, error) {
// specified node pool. Any error in calling the Nomad API for details will
// result in an error. This is because with missing data, we cannot reliably
// make calculations.
func (a *APMPlugin) getPoolResources(id *scaleutils.PoolIdentifier) (*nodePoolResources, error) {
func (a *APMPlugin) getPoolResources(id nodepool.ClusterNodePoolIdentifier) (*nodePoolResources, error) {

nodes, _, err := a.client.Nodes().List(nil)
if err != nil {
Expand All @@ -87,7 +88,7 @@ func (a *APMPlugin) getPoolResources(id *scaleutils.PoolIdentifier) (*nodePoolRe

// Perform our node filtering so we are left with a list of nodes that form
// our pool and that are in the correct state.
nodePoolList, err := id.IdentifyNodes(nodes)
nodePoolList, err := scaleutils.FilterNodes(nodes, id.IsPoolMember)
if err != nil {
return nil, fmt.Errorf("failed to identify nodes within pool: %v", err)
}
Expand Down Expand Up @@ -166,10 +167,7 @@ func parseNodePoolQuery(q string) (*nodePoolQuery, error) {
}

query := nodePoolQuery{
poolIdentifier: &scaleutils.PoolIdentifier{
IdentifierKey: scaleutils.IdentifierKey(mainParts[2]),
Value: mainParts[1],
},
poolIdentifier: nodepool.NewNodeClassPoolIdentifier(mainParts[1]),
}

opMetricParts := strings.SplitN(mainParts[0], "_", 3)
Expand Down
20 changes: 7 additions & 13 deletions plugins/builtin/apm/nomad/plugin/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"errors"
"testing"

"github.com/hashicorp/nomad-autoscaler/sdk/helper/scaleutils"
"github.com/hashicorp/nomad-autoscaler/sdk/helper/scaleutils/nodepool"
"github.com/hashicorp/nomad/api"
"github.com/stretchr/testify/assert"
)
Expand All @@ -19,25 +19,19 @@ func Test_parseNodePoolQuery(t *testing.T) {
{
inputQuery: "node_percentage-allocated_memory/high-memory/class",
expectedOutputQuery: &nodePoolQuery{
metric: "memory",
poolIdentifier: &scaleutils.PoolIdentifier{
IdentifierKey: "class",
Value: "high-memory",
},
operation: "percentage-allocated",
metric: "memory",
poolIdentifier: nodepool.NewNodeClassPoolIdentifier("high-memory"),
operation: "percentage-allocated",
},
expectError: nil,
name: "node percentage-allocated memory",
},
{
inputQuery: "node_percentage-allocated_cpu/high-compute/class",
expectedOutputQuery: &nodePoolQuery{
metric: "cpu",
poolIdentifier: &scaleutils.PoolIdentifier{
IdentifierKey: "class",
Value: "high-compute",
},
operation: "percentage-allocated",
metric: "cpu",
poolIdentifier: nodepool.NewNodeClassPoolIdentifier("high-compute"),
operation: "percentage-allocated",
},
expectError: nil,
name: "node percentage-allocated cpu",
Expand Down
74 changes: 17 additions & 57 deletions plugins/builtin/target/aws-asg/plugin/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@ package plugin
import (
"context"
"fmt"
"strconv"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/external"
"github.com/aws/aws-sdk-go-v2/service/autoscaling"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/hashicorp/nomad-autoscaler/sdk"
"github.com/hashicorp/nomad-autoscaler/sdk/helper/scaleutils"
"github.com/hashicorp/nomad/api"
)

const (
defaultRetryInterval = 10 * time.Second
defaultRetryLimit = 15
defaultRetryInterval = 10 * time.Second
defaultRetryLimit = 15
nodeAttrAWSInstanceID = "unique.platform.aws.instance-id"
)

// setupAWSClients takes the passed config mapping and instantiates the
Expand Down Expand Up @@ -94,12 +93,7 @@ func (t *TargetPlugin) scaleOut(ctx context.Context, asg *autoscaling.AutoScalin

func (t *TargetPlugin) scaleIn(ctx context.Context, asg *autoscaling.AutoScalingGroup, num int64, config map[string]string) error {

scaleReq, err := t.generateScaleReq(num, config)
if err != nil {
return fmt.Errorf("failed to generate scale in request: %v", err)
}

ids, err := t.scaleInUtils.RunPreScaleInTasks(ctx, scaleReq)
ids, err := t.clusterUtils.RunPreScaleInTasks(ctx, config, int(num))
if err != nil {
return fmt.Errorf("failed to perform pre-scale Nomad scale in tasks: %v", err)
}
Expand All @@ -109,7 +103,7 @@ func (t *TargetPlugin) scaleIn(ctx context.Context, asg *autoscaling.AutoScaling
var instanceIDs []string

for _, node := range ids {
instanceIDs = append(instanceIDs, node.RemoteID)
instanceIDs = append(instanceIDs, node.RemoteResourceID)
}

// Create the event writer and write that the drain event has been
Expand Down Expand Up @@ -141,57 +135,13 @@ func (t *TargetPlugin) scaleIn(ctx context.Context, asg *autoscaling.AutoScaling
eWriter.write(ctx, scalingEventTerminate)

// Run any post scale in tasks that are desired.
if err := t.scaleInUtils.RunPostScaleInTasks(config, ids); err != nil {
if err := t.clusterUtils.RunPostScaleInTasks(ctx, config, ids); err != nil {
return fmt.Errorf("failed to perform post-scale Nomad scale in tasks: %v", err)
}

return nil
}

func (t *TargetPlugin) generateScaleReq(num int64, config map[string]string) (*scaleutils.ScaleInReq, error) {

// Pull the class key from the config mapping. This is a required value and
// we cannot scale without this.
class, ok := config[sdk.TargetConfigKeyClass]
if !ok {
return nil, fmt.Errorf("required config param %q not found", sdk.TargetConfigKeyClass)
}

// The drain_deadline is an optional parameter so define out default and
// then attempt to find an operator specified value.
drain := scaleutils.DefaultDrainDeadline
ignoreSystemJobs := scaleutils.DefaultIgnoreSystemJobs

if drainString, ok := config[sdk.TargetConfigKeyDrainDeadline]; ok {
d, err := time.ParseDuration(drainString)
if err != nil {
return nil, fmt.Errorf("failed to parse %q as time duration", drainString)
}
drain = d
}

if ignoreSystemJobsString, ok := config[sdk.TargetConfigKeyIgnoreSystemJobs]; ok {
isj, err := strconv.ParseBool(ignoreSystemJobsString)
if err != nil {
return nil, fmt.Errorf("failed to parse %q as boolean", ignoreSystemJobsString)
}
ignoreSystemJobs = isj
}

return &scaleutils.ScaleInReq{
Num: int(num),
DrainDeadline: drain,
IgnoreSystemJobs: ignoreSystemJobs,

PoolIdentifier: &scaleutils.PoolIdentifier{
IdentifierKey: scaleutils.IdentifierKeyClass,
Value: class,
},
RemoteProvider: scaleutils.RemoteProviderAWSInstanceID,
NodeIDStrategy: scaleutils.IDStrategyNewestCreateIndex,
}, nil
}

func (t *TargetPlugin) detachInstances(ctx context.Context, asgName *string, instanceIDs []string) error {

asgInput := autoscaling.DetachInstancesInput{
Expand Down Expand Up @@ -363,3 +313,13 @@ func (t *TargetPlugin) ensureASGInstancesCount(ctx context.Context, desired int6

return retry(ctx, defaultRetryInterval, defaultRetryLimit, f)
}

// awsNodeIDMap is used to identify the AWS InstanceID of a Nomad node using
// the relevant attribute value.
func awsNodeIDMap(n *api.Node) (string, error) {
val, ok := n.Attributes[nodeAttrAWSInstanceID]
if !ok || val == "" {
return "", fmt.Errorf("attribute %q not found", nodeAttrAWSInstanceID)
}
return val, nil
}
96 changes: 20 additions & 76 deletions plugins/builtin/target/aws-asg/plugin/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,104 +3,48 @@ package plugin
import (
"errors"
"testing"
"time"

"github.com/hashicorp/nomad-autoscaler/sdk/helper/scaleutils"
"github.com/hashicorp/nomad/api"
"github.com/stretchr/testify/assert"
)

func TestTargetPlugin_generateScaleReq(t *testing.T) {
func Test_awsNodeIDMap(t *testing.T) {
testCases := []struct {
inputNum int64
inputConfig map[string]string
expectedOutputReq *scaleutils.ScaleInReq
inputNode *api.Node
expectedOutputID string
expectedOutputError error
name string
}{
{
inputNum: 2,
inputConfig: map[string]string{
"node_class": "high-memory",
"node_drain_deadline": "5m",
},
expectedOutputReq: &scaleutils.ScaleInReq{
Num: 2,
DrainDeadline: 5 * time.Minute,
IgnoreSystemJobs: false,
PoolIdentifier: &scaleutils.PoolIdentifier{
IdentifierKey: scaleutils.IdentifierKeyClass,
Value: "high-memory",
},
RemoteProvider: scaleutils.RemoteProviderAWSInstanceID,
NodeIDStrategy: scaleutils.IDStrategyNewestCreateIndex,
inputNode: &api.Node{
Attributes: map[string]string{"unique.platform.aws.instance-id": "i-1234567890abcdef0"},
},
expectedOutputID: "i-1234567890abcdef0",
expectedOutputError: nil,
name: "valid request with drain_deadline in config",
name: "required attribute found",
},
{
inputNum: 2,
inputConfig: map[string]string{
"node_class": "high-memory",
"node_drain_ignore_system_jobs": "true",
},
expectedOutputReq: &scaleutils.ScaleInReq{
Num: 2,
IgnoreSystemJobs: true,
DrainDeadline: 15 * time.Minute,
PoolIdentifier: &scaleutils.PoolIdentifier{
IdentifierKey: scaleutils.IdentifierKeyClass,
Value: "high-memory",
},
RemoteProvider: scaleutils.RemoteProviderAWSInstanceID,
NodeIDStrategy: scaleutils.IDStrategyNewestCreateIndex,
inputNode: &api.Node{
Attributes: map[string]string{},
},
expectedOutputError: nil,
name: "valid request with node_drain_ignore_system_jobs in config",
expectedOutputID: "",
expectedOutputError: errors.New(`attribute "unique.platform.aws.instance-id" not found`),
name: "required attribute not found",
},
{
inputNum: 2,
inputConfig: map[string]string{},
expectedOutputReq: nil,
expectedOutputError: errors.New("required config param \"node_class\" not found"),
name: "no class key found in config",
},
{
inputNum: 2,
inputConfig: map[string]string{
"node_class": "high-memory",
},
expectedOutputReq: &scaleutils.ScaleInReq{
Num: 2,
IgnoreSystemJobs: false,
DrainDeadline: 15 * time.Minute,
PoolIdentifier: &scaleutils.PoolIdentifier{
IdentifierKey: scaleutils.IdentifierKeyClass,
Value: "high-memory",
},
RemoteProvider: scaleutils.RemoteProviderAWSInstanceID,
NodeIDStrategy: scaleutils.IDStrategyNewestCreateIndex,
inputNode: &api.Node{
Attributes: map[string]string{"unique.platform.aws.instance-id": ""},
},
expectedOutputError: nil,
name: "drain_deadline not specified within config",
},
{
inputNum: 2,
inputConfig: map[string]string{
"node_class": "high-memory",
"node_drain_deadline": "time to make a cuppa",
},
expectedOutputReq: nil,
expectedOutputError: errors.New("failed to parse \"time to make a cuppa\" as time duration"),
name: "malformed drain_deadline config value",
expectedOutputID: "",
expectedOutputError: errors.New(`attribute "unique.platform.aws.instance-id" not found`),
name: "required attribute found but empty",
},
}

tp := TargetPlugin{}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualReq, actualErr := tp.generateScaleReq(tc.inputNum, tc.inputConfig)
assert.Equal(t, tc.expectedOutputReq, actualReq, tc.name)
actualID, actualErr := awsNodeIDMap(tc.inputNode)
assert.Equal(t, tc.expectedOutputID, actualID, tc.name)
assert.Equal(t, tc.expectedOutputError, actualErr, tc.name)
})
}
Expand Down
30 changes: 14 additions & 16 deletions plugins/builtin/target/aws-asg/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ var _ target.Target = (*TargetPlugin)(nil)

// TargetPlugin is the AWS ASG implementation of the target.Target interface.
type TargetPlugin struct {
config map[string]string
logger hclog.Logger
asg *autoscaling.Client
ec2 *ec2.Client
scaleInUtils *scaleutils.ScaleIn
config map[string]string
logger hclog.Logger
asg *autoscaling.Client
ec2 *ec2.Client

// clusterUtils provides general cluster scaling utilities for querying the
// state of nodes pools and performing scaling tasks.
clusterUtils *scaleutils.ClusterScaleUtils
}

// NewAWSASGPlugin returns the AWS ASG implementation of the target.Target
Expand All @@ -73,11 +76,14 @@ func (t *TargetPlugin) SetConfig(config map[string]string) error {
return err
}

utils, err := scaleutils.NewScaleInUtils(nomad.ConfigFromNamespacedMap(config), t.logger)
clusterUtils, err := scaleutils.NewClusterScaleUtils(nomad.ConfigFromNamespacedMap(config), t.logger)
if err != nil {
return err
}
t.scaleInUtils = utils

// Store and set the remote ID callback function.
t.clusterUtils = clusterUtils
t.clusterUtils.ClusterNodeIDLookupFunc = awsNodeIDMap

return nil
}
Expand Down Expand Up @@ -138,18 +144,10 @@ func (t *TargetPlugin) Scale(action sdk.ScalingAction, config map[string]string)
// Status satisfies the Status function on the target.Target interface.
func (t *TargetPlugin) Status(config map[string]string) (*sdk.TargetStatus, error) {

class, ok := config[sdk.TargetConfigKeyClass]
if !ok {
return nil, fmt.Errorf("required config param %q not found", sdk.TargetConfigKeyClass)
}

// Perform our check of the Nomad node pool. If the pool is not ready, we
// can exit here and avoid calling the AWS API as it won't affect the
// outcome.
ready, err := t.scaleInUtils.Ready(scaleutils.PoolIdentifier{
IdentifierKey: scaleutils.IdentifierKeyClass,
Value: class,
})
ready, err := t.clusterUtils.IsPoolReady(config)
if err != nil {
return nil, fmt.Errorf("failed to run Nomad node readiness check: %v", err)
}
Expand Down
Loading