diff --git a/agent/agent.go b/agent/agent.go index c4b76dc4..06419478 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -3,9 +3,6 @@ package agent import ( "context" "fmt" - "reflect" - "sync" - "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad-autoscaler/agent/config" @@ -15,7 +12,8 @@ import ( "github.com/hashicorp/nomad-autoscaler/plugins/manager" strategypkg "github.com/hashicorp/nomad-autoscaler/plugins/strategy" targetpkg "github.com/hashicorp/nomad-autoscaler/plugins/target" - "github.com/hashicorp/nomad-autoscaler/policystorage" + "github.com/hashicorp/nomad-autoscaler/policy" + nomadpolicy "github.com/hashicorp/nomad-autoscaler/policy/nomad" "github.com/hashicorp/nomad/api" ) @@ -24,6 +22,7 @@ type Agent struct { config *config.Agent nomadClient *api.Client pluginManager *manager.PluginManager + healthServer *healthServer } func NewAgent(c *config.Agent, logger hclog.Logger) *Agent { @@ -34,14 +33,13 @@ func NewAgent(c *config.Agent, logger hclog.Logger) *Agent { } func (a *Agent) Run(ctx context.Context) error { + defer a.stop() // Generate the Nomad client. if err := a.generateNomadClient(); err != nil { return err } - ps := policystorage.Nomad{Client: a.nomadClient} - // launch plugins if err := a.setupPlugins(); err != nil { return fmt.Errorf("failed to setup plugins: %v", err) @@ -52,56 +50,37 @@ func (a *Agent) Run(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to setup HTTP getHealth server: %v", err) } - go healthServer.run() - // loop like there's no tomorrow - var wg sync.WaitGroup - ticker := time.NewTicker(a.config.ScanInterval) -Loop: + a.healthServer = healthServer + go a.healthServer.run() + + source := nomadpolicy.NewNomadSource(a.logger, a.nomadClient) + manager := policy.NewManager(a.logger, source, a.pluginManager) + + policyEvalCh := make(chan *policy.Evaluation, 10) + go manager.Run(ctx, policyEvalCh) + for { select { - case <-ticker.C: - logger := a.logger.With("policy_storage", reflect.TypeOf(ps)) - logger.Info("reading policies") - - // read policies - policies, err := ps.List() - if err != nil { - logger.Error("failed to fetch policies", "error", err) - continue - } - logger.Info(fmt.Sprintf("found %d policies", len(policies))) - - // handle policies - for _, p := range policies { - wg.Add(1) - go func(ID string) { - defer wg.Done() - select { - case <-ctx.Done(): - return - default: - policy, err := ps.Get(ID) - if err != nil { - logger.Error("failed to fetch policy", "policy_id", ID, "error", err) - return - } - a.handlePolicy(policy) - } - }(p.ID) - } - wg.Wait() case <-ctx.Done(): - // Stop the health server. - healthServer.stop() - - // stop plugins before exiting - a.pluginManager.KillPlugins() - break Loop + a.logger.Info("context closed, shutting down") + return nil + case policyEval := <-policyEvalCh: + a.handlePolicy(policyEval.Policy) } } +} - return nil +func (a *Agent) stop() { + // Stop the health server. + if a.healthServer != nil { + a.healthServer.stop() + } + + // Kill all the plugins. + if a.pluginManager != nil { + a.pluginManager.KillPlugins() + } } // generateNomadClient takes the internal Nomad configuration, translates and @@ -161,7 +140,7 @@ func (a *Agent) generateNomadClient() error { return nil } -func (a *Agent) handlePolicy(p *policystorage.Policy) { +func (a *Agent) handlePolicy(p *policy.Policy) { logger := a.logger.With( "policy_id", p.ID, "source", p.Source, @@ -169,10 +148,7 @@ func (a *Agent) handlePolicy(p *policystorage.Policy) { "strategy", p.Strategy.Name, ) - if !p.Enabled { - logger.Info("policy not enabled") - return - } + logger.Info("received policy for evaluation") var targetInst targetpkg.Target var apmInst apmpkg.APM @@ -202,11 +178,15 @@ func (a *Agent) handlePolicy(p *policystorage.Policy) { // fetch target count logger.Info("fetching current count") - currentCount, err := targetInst.Count(p.Target.Config) + currentStatus, err := targetInst.Status(p.Target.Config) if err != nil { logger.Error("failed to fetch current count", "error", err) return } + if !currentStatus.Ready { + logger.Info("target not ready") + return + } // query policy's APM logger.Info("querying APM") @@ -220,7 +200,7 @@ func (a *Agent) handlePolicy(p *policystorage.Policy) { logger.Info("calculating new count") req := strategypkg.RunRequest{ PolicyID: p.ID, - Count: currentCount, + Count: currentStatus.Count, Metric: value, Config: p.Strategy.Config, } @@ -235,15 +215,15 @@ func (a *Agent) handlePolicy(p *policystorage.Policy) { // no action to execute var minMaxAction *strategypkg.Action - if currentCount < p.Min { + if currentStatus.Count < p.Min { minMaxAction = &strategypkg.Action{ Count: &p.Min, - Reason: fmt.Sprintf("current count (%d) below limit (%d)", currentCount, p.Min), + Reason: fmt.Sprintf("current count (%d) below limit (%d)", currentStatus.Count, p.Min), } - } else if currentCount > p.Max { + } else if currentStatus.Count > p.Max { minMaxAction = &strategypkg.Action{ Count: &p.Max, - Reason: fmt.Sprintf("current count (%d) above limit (%d)", currentCount, p.Max), + Reason: fmt.Sprintf("current count (%d) above limit (%d)", currentStatus.Count, p.Max), } } @@ -276,16 +256,16 @@ func (a *Agent) handlePolicy(p *policystorage.Policy) { if action.Count == nil { actionLogger.Info("registering scaling event", - "count", currentCount, "reason", action.Reason, "meta", action.Meta) + "count", currentStatus.Count, "reason", action.Reason, "meta", action.Meta) } else { // Skip action if count doesn't change. - if currentCount == *action.Count { - actionLogger.Info("nothing to do", "from", currentCount, "to", *action.Count) + if currentStatus.Count == *action.Count { + actionLogger.Info("nothing to do", "from", currentStatus.Count, "to", *action.Count) continue } actionLogger.Info("scaling target", - "from", currentCount, "to", *action.Count, + "from", currentStatus.Count, "to", *action.Count, "reason", action.Reason, "meta", action.Meta) } diff --git a/docs/policy/README.md b/docs/policy/README.md index d9c5c429..37318da5 100644 --- a/docs/policy/README.md +++ b/docs/policy/README.md @@ -4,6 +4,7 @@ Nomad task groups can be configured for Autoscaling using the [scaling stanza](h ## General Options * `source` - The APM plugin that should handle the metric query. If omitted, this defaults to using the Nomad APM. * `query` - The query to run against the specified APM. Currently this query should return a single value. + * `evaluation_interval` - Defines how often the policy is evaluated by the Autoscaler. It should be provided as a duration (i.e.: `"5s"`, `"1m"` etc). If omitted the configuration value `scan_interval` from the agent will be used. * `target` - Defines where the autoscaling target is running. A Nomad task group for example has a target of Nomad, as it is running on a Nomad cluster. If omitted, this defaults to `name = "nomad"`. * `strategy` - The strategy to use, and it's configuration when calculating the desired state based on the current task group count and the metric returned by the APM. @@ -17,6 +18,7 @@ scaling { policy { source = "prometheus" query = "scalar(avg((haproxy_server_current_sessions{backend=\"http_back\"}) and (haproxy_server_up{backend=\"http_back\"} == 1)))" + evaluation_interval = "10s" strategy = { name = "target-value" diff --git a/go.mod b/go.mod index 2cbdd6a1..208da45e 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/agext/levenshtein v1.2.3 // indirect github.com/docker/go-units v0.4.0 // indirect github.com/fatih/color v1.9.0 // indirect + github.com/google/go-cmp v0.4.0 github.com/gorilla/websocket v1.4.2 // indirect github.com/hashicorp/go-hclog v0.12.0 github.com/hashicorp/go-multierror v1.0.0 diff --git a/go.sum b/go.sum index 9984d27c..9a7ab3f0 100644 --- a/go.sum +++ b/go.sum @@ -80,7 +80,6 @@ github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -89,14 +88,11 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348 h1:MtvEpTB6LX3vkb4ax0b5D2DHbNAUsen0Gx5wZoq3lV4= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= -github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= -github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -github.com/mattn/go-isatty v0.0.10 h1:qxFzApOv4WsAL965uUPIsXzAKCZxN2p9UqdhFS4ZW10= github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= @@ -108,11 +104,9 @@ github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMK github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77 h1:7GoSOOW2jpsfkntVKaS2rAr1TJqfcxotyaUcuxoZSzg= github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.0.0 h1:fzU/JVNcaqHQEcVFAKeR41fkiLdIPrefOvVG1VZ96U0= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= -github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 h1:DpOJ2HYzCv8LZP15IdmG+YdwD2luVPHITV96TkirNBM= github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/mitchellh/go-wordwrap v1.0.0 h1:6GlHJ/LTGMrIJbwgdqdl2eEH8o+Exx/0m8ir9Gns0u4= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= @@ -149,59 +143,46 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= -github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/vmihailenco/msgpack v3.3.3+incompatible h1:wapg9xDUZDzGCNFlwc5SqI1rvcciqcxEHac4CYj89xI= github.com/vmihailenco/msgpack v3.3.3+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= -github.com/zclconf/go-cty v1.2.0 h1:sPHsy7ADcIZQP3vILvTjrh74ZA175TFP5vqiNK1UmlI= github.com/zclconf/go-cty v1.2.0/go.mod h1:hOPWgoHbaTUnI5k4D2ld+GRpFJSCe6bCM7m1q/N4PQ8= github.com/zclconf/go-cty v1.3.1 h1:QIOZl+CKKdkv4l2w3lG23nNzXgLoxsWLSEdg1MlX4p0= github.com/zclconf/go-cty v1.3.1/go.mod h1:YO23e2L18AG+ZYQfSobnY4G65nvwvprPCxBHkufUH1k= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734 h1:p/H982KKEjUnLJkM3tt/LemDnOc1GiZL5FCVlORJ5zo= golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20180811021610-c39426892332/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc h1:MeuS1UDyZyFH++6vVy44PuufTeFF0d0nfI6XB87YGSk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc h1:WiYx1rIFmx8c0mXAFtv5D/mHyKe1+jmuP7PViuwqwuQ= golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190502175342-a43fa875dd82 h1:vsphBvatvfbhlb4PO1BYSr9dzugGxJ/SQHoNufZJq1w= golang.org/x/sys v0.0.0-20190502175342-a43fa875dd82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200122134326-e047566fdf82 h1:ywK/j/KkyTHcdyYSZNXGjMwgmDSfjglYZ3vStQ/gSCU= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9 h1:1/DFK4b7JH8DmkqhUk48onnSfrPzImPoVxuomtbT2nk= golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -221,7 +202,6 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.5 h1:ymVxjfMaHvXD8RqPRmzHHsB3VvucivSkIAvJFDI5O3c= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/helper/blocking/blocking.go b/helper/blocking/blocking.go new file mode 100644 index 00000000..f16f5230 --- /dev/null +++ b/helper/blocking/blocking.go @@ -0,0 +1,5 @@ +package blocking + +// IndexHasChanged is used to check whether a returned blocking query has an +// updated index, compared to a tracked value. +func IndexHasChanged(new, old uint64) bool { return new > old } diff --git a/helper/blocking/blocking_test.go b/helper/blocking/blocking_test.go new file mode 100644 index 00000000..7abb90b0 --- /dev/null +++ b/helper/blocking/blocking_test.go @@ -0,0 +1,36 @@ +package blocking + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_indexHasChange(t *testing.T) { + testCases := []struct { + newValue uint64 + oldValue uint64 + expectedReturn bool + }{ + { + newValue: 13, + oldValue: 7, + expectedReturn: true, + }, + { + newValue: 13696, + oldValue: 13696, + expectedReturn: false, + }, + { + newValue: 7, + oldValue: 13, + expectedReturn: false, + }, + } + + for _, tc := range testCases { + res := IndexHasChanged(tc.newValue, tc.oldValue) + assert.Equal(t, tc.expectedReturn, res) + } +} diff --git a/helper/ptr/ptr.go b/helper/ptr/ptr.go new file mode 100644 index 00000000..85d3fc91 --- /dev/null +++ b/helper/ptr/ptr.go @@ -0,0 +1,9 @@ +package ptr + +func BoolToPtr(b bool) *bool { + return &b +} + +func Int64ToPtr(i int64) *int64 { + return &i +} diff --git a/plugins/builtin/target/nomad/main.go b/plugins/builtin/target/nomad/main.go index d2314ea8..f17807c9 100644 --- a/plugins/builtin/target/nomad/main.go +++ b/plugins/builtin/target/nomad/main.go @@ -3,7 +3,7 @@ package main import ( hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad-autoscaler/plugins" - nomadtarget "github.com/hashicorp/nomad-autoscaler/plugins/builtin/target/nomad/plugin" + nomadTarget "github.com/hashicorp/nomad-autoscaler/plugins/builtin/target/nomad/plugin" ) func main() { @@ -12,5 +12,5 @@ func main() { // factory returns a new instance of the Nomad Target plugin. func factory(log hclog.Logger) interface{} { - return nomadtarget.NewNomadPlugin(log) + return nomadTarget.NewNomadPlugin(log) } diff --git a/plugins/builtin/target/nomad/plugin/plugin.go b/plugins/builtin/target/nomad/plugin/plugin.go index 3095c2bf..97172842 100644 --- a/plugins/builtin/target/nomad/plugin/plugin.go +++ b/plugins/builtin/target/nomad/plugin/plugin.go @@ -2,18 +2,35 @@ package nomad import ( "fmt" + "sync" + "time" - "github.com/hashicorp/go-hclog" + hclog "github.com/hashicorp/go-hclog" nomadHelper "github.com/hashicorp/nomad-autoscaler/helper/nomad" "github.com/hashicorp/nomad-autoscaler/plugins" "github.com/hashicorp/nomad-autoscaler/plugins/base" "github.com/hashicorp/nomad-autoscaler/plugins/strategy" + "github.com/hashicorp/nomad-autoscaler/plugins/target" "github.com/hashicorp/nomad/api" ) const ( - // pluginName is the name of the plugin + // pluginName is the unique name of the this plugin amongst target + // plugins. pluginName = "nomad-target" + + // configKeys are the accepted configuration map keys which can be + // processed when performing SetConfig(). + configKeyJobID = "job_id" + configKeyGroup = "group" + + // garbageCollectionNanoSecondThreshold is the nanosecond threshold used + // when performing garbage collection of job status handlers. + garbageCollectionNanoSecondThreshold = 14400000000000 + + // garbageCollectionSecondInterval is the interval in seconds at which the + // garbage collector will run. + garbageCollectionSecondInterval = 60 ) var ( @@ -32,19 +49,40 @@ var ( } ) +// Assert that TargetPlugin meets the target.Target interface. +var _ target.Target = (*TargetPlugin)(nil) + +// TargetPlugin is the Nomad implementation of the target.Target interface. type TargetPlugin struct { client *api.Client logger hclog.Logger + + // statusHandlers is a mapping of jobScaleStatusHandlers keyed by the jobID + // that the handler represents. The lock should be used when accessing the + // map. + statusHandlers map[string]*jobScaleStatusHandler + statusHandlersLock sync.RWMutex + + // gcRunning indicates whether the GC loop is running or not. + gcRunning bool } +// NewNomadPlugin returns the Nomad implementation of the target.Target +// interface. func NewNomadPlugin(log hclog.Logger) *TargetPlugin { return &TargetPlugin{ - logger: log, + logger: log, + statusHandlers: make(map[string]*jobScaleStatusHandler), } } +// SetConfig satisfies the SetConfig function on the base.Plugin interface. func (t *TargetPlugin) SetConfig(config map[string]string) error { + if !t.gcRunning { + go t.garbageCollectionLoop() + } + cfg := nomadHelper.ConfigFromMap(config) client, err := api.NewClient(cfg) @@ -56,27 +94,12 @@ func (t *TargetPlugin) SetConfig(config map[string]string) error { return nil } +// PluginInfo satisfies the PluginInfo function on the base.Plugin interface. func (t *TargetPlugin) PluginInfo() (*base.PluginInfo, error) { return pluginInfo, nil } -func (t *TargetPlugin) Count(config map[string]string) (int64, error) { - // TODO: validate if group is valid - allocs, _, err := t.client.Jobs().Allocations(config["job_id"], false, nil) - if err != nil { - return 0, fmt.Errorf("failed to retrieve Nomad job: %v", err) - } - - var count int64 - for _, alloc := range allocs { - if alloc.TaskGroup == config["group"] && alloc.ClientStatus == "running" { - count++ - } - } - - return count, nil -} - +// Scale satisfies the Scale function on the target.Target interface. func (t *TargetPlugin) Scale(action strategy.Action, config map[string]string) error { var countIntPtr *int if action.Count != nil { @@ -97,3 +120,84 @@ func (t *TargetPlugin) Scale(action strategy.Action, config map[string]string) e } return nil } + +// Status satisfies the Status function on the target.Target interface. +func (t *TargetPlugin) Status(config map[string]string) (*target.Status, error) { + + // Get the JobID from the config map. This is a required param and results + // in an error if not found or is an empty string. + jobID, ok := config[configKeyJobID] + if !ok || jobID == "" { + return nil, fmt.Errorf("required config key %q not found", configKeyJobID) + } + + // Get the GroupName from the config map. This is a required param and + // results in an error if not found or is an empty string. + group, ok := config[configKeyGroup] + if !ok || group == "" { + return nil, fmt.Errorf("required config key %q not found", configKeyGroup) + } + + // Create a read/write lock on the handlers so we can safely interact. + t.statusHandlersLock.Lock() + defer t.statusHandlersLock.Unlock() + + // Create a handler for the job if one does not currently exist. + if _, ok := t.statusHandlers[jobID]; !ok { + t.statusHandlers[jobID] = newJobScaleStatusHandler(t.client, jobID, t.logger) + } + + // If the handler is not in a running state, start it and wait for the + // first run to finish. + if !t.statusHandlers[jobID].isRunning { + go t.statusHandlers[jobID].start() + <-t.statusHandlers[jobID].initialDone + } + + // Return the status data from the handler to the caller. + return t.statusHandlers[jobID].status(group) +} + +// garbageCollectionLoop runs a long lived loop, triggering the garbage +// collector at a specified interval. +func (t *TargetPlugin) garbageCollectionLoop() { + + // Setup the ticker and set that the loop is now running. + ticker := time.NewTicker(garbageCollectionSecondInterval * time.Second) + t.gcRunning = true + + for { + select { + case <-ticker.C: + t.logger.Debug("triggering run of handler garbage collection") + t.garbageCollect() + } + } +} + +// garbageCollect runs a single round of status handler garbage collection. +func (t *TargetPlugin) garbageCollect() { + + // Generate the GC threshold based on the current time. + threshold := time.Now().UTC().UnixNano() - garbageCollectionNanoSecondThreshold + + // Iterate all the handlers, ensuring we lock for safety. + t.statusHandlersLock.Lock() + + for jobID, handle := range t.statusHandlers { + + // If the handler is running, there is no need to GC. + if handle.isRunning { + continue + } + + // If the last updated time is before our threshold, the handler should + // be removed. Goodbye old friend. + if handle.lastUpdated < threshold { + delete(t.statusHandlers, jobID) + t.logger.Debug("removed inactive job status handler", "job_id", jobID) + } + } + + t.statusHandlersLock.Unlock() +} diff --git a/plugins/builtin/target/nomad/plugin/plugin_test.go b/plugins/builtin/target/nomad/plugin/plugin_test.go new file mode 100644 index 00000000..2bc34e06 --- /dev/null +++ b/plugins/builtin/target/nomad/plugin/plugin_test.go @@ -0,0 +1,34 @@ +package nomad + +import ( + "testing" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/assert" +) + +func TestTargetPlugin_garbageCollect(t *testing.T) { + + curTime := time.Now().UTC().UnixNano() + + // Build the plugin with some populated handlers and data to test. + targetPlugin := TargetPlugin{ + logger: hclog.NewNullLogger(), + statusHandlers: map[string]*jobScaleStatusHandler{ + "running": {isRunning: true, lastUpdated: curTime}, + "recently-stopped": {isRunning: false, lastUpdated: curTime - 1800000000000}, + "stopped-long-time-ago": {isRunning: false, lastUpdated: curTime - 18000000000000}, + }, + } + + // Trigger the GC. + targetPlugin.garbageCollect() + + // Perform our assertions to confirm the statusHandlers mapping has the + // entries expected after running the GC. + assert.Nil(t, targetPlugin.statusHandlers["stopped-long-time-ago"]) + assert.NotNil(t, targetPlugin.statusHandlers["running"]) + assert.NotNil(t, targetPlugin.statusHandlers["recently-stopped"]) + assert.Len(t, targetPlugin.statusHandlers, 2) +} diff --git a/plugins/builtin/target/nomad/plugin/state.go b/plugins/builtin/target/nomad/plugin/state.go new file mode 100644 index 00000000..4c646492 --- /dev/null +++ b/plugins/builtin/target/nomad/plugin/state.go @@ -0,0 +1,176 @@ +package nomad + +import ( + "fmt" + "strconv" + "strings" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad-autoscaler/helper/blocking" + "github.com/hashicorp/nomad-autoscaler/plugins/target" + "github.com/hashicorp/nomad/api" +) + +const ( + // metaKeyPrefix is the key prefix to be used when adding items to the + // status response meta object. + metaKeyPrefix = "nomad_autoscaler.target.nomad." + + // metaKeyJobStoppedSuffix is the key suffix used when adding a meta item + // to the status response detailing the jobs current stopped status. + metaKeyJobStoppedSuffix = ".stopped" +) + +// jobScaleStatusHandler is an individual handler on the /v1/job//scale +// GET endpoint. It provides methods for obtaining the current scaling state of +// a job and task group. +type jobScaleStatusHandler struct { + client *api.Client + jobID string + logger hclog.Logger + + // scaleStatus is the internal reflection of the response objects from the + // job scale status API. + scaleStatus *api.JobScaleStatusResponse + scaleStatusError error + + // initialDone helps synchronise the caller waiting for the state to be + // populated after starting the API query loop. + initialDone chan bool + initialized bool + + // isRunning details whether the loop within start() is currently running + // or not. + isRunning bool + + // lastUpdated is the UnixNano UTC timestamp of the last update to the + // state. This helps with garbage collection. + lastUpdated int64 +} + +func newJobScaleStatusHandler(client *api.Client, jobID string, logger hclog.Logger) *jobScaleStatusHandler { + return &jobScaleStatusHandler{ + client: client, + initialDone: make(chan bool), + jobID: jobID, + logger: logger.With(configKeyJobID, jobID), + } +} + +// status returns the cached scaling status of the passed group. +func (jsh *jobScaleStatusHandler) status(group string) (*target.Status, error) { + + // If the last status response included an error, just return this to the + // caller. + if jsh.scaleStatusError != nil { + return nil, jsh.scaleStatusError + } + + // If the scale status is nil, it means the main loop is stopped and + // therefore the job is not found on the cluster. + if jsh.scaleStatus == nil { + return nil, nil + } + + var ( + count int + found bool + ) + + for name, tg := range jsh.scaleStatus.TaskGroups { + if name == group { + // Currently "Running" is not populated: + // https://github.com/hashicorp/nomad/issues/7789 + count = tg.Healthy + found = true + break + } + } + + // If we did not find the task group in the status list, we can't reliably + // inform the caller of any details. Therefore return an error. + if !found { + return nil, fmt.Errorf("task group %q not found", group) + } + + return &target.Status{ + Ready: !jsh.scaleStatus.JobStopped, + Count: int64(count), + Meta: map[string]string{ + metaKeyPrefix + jsh.jobID + metaKeyJobStoppedSuffix: strconv.FormatBool(jsh.scaleStatus.JobStopped), + }, + }, nil +} + +// start runs the blocking query loop that processes changes from the API and +// reflects the status internally. +func (jsh *jobScaleStatusHandler) start() { + + // Log that we are starting, useful for debugging. + jsh.logger.Debug("starting job status handler") + jsh.isRunning = true + + q := &api.QueryOptions{WaitTime: 5 * time.Minute, WaitIndex: 1} + + for { + status, meta, err := jsh.client.Jobs().ScaleStatus(jsh.jobID, q) + if err != nil { + + // If the job is not found on the cluster, stop the handlers loop + // process and set terminal state. It is still possible to read the + // state from the handler until it is deleted by the GC. + if strings.Contains(err.Error(), "404") { + jsh.setStopState() + return + } + jsh.updateStatusState(status, err) + + // If the error was anything other than the job not being found, + // try again. + time.Sleep(10 * time.Second) + continue + } + + // If the index has not changed, the query returned because the timeout + // was reached, therefore start the next query loop. + if !blocking.IndexHasChanged(meta.LastIndex, q.WaitIndex) { + continue + } + + // Update the handlers state. + jsh.updateStatusState(status, nil) + + // Mark the handler as initialized and notify initialDone channel. + if !jsh.initialized { + jsh.handleFirstRun() + jsh.initialized = true + } + + // Modify the wait index on the QueryOptions so the blocking query + // is using the latest index value. + q.WaitIndex = meta.LastIndex + } +} + +// handleFirstRun is a helper function which responds to channel listeners that +// the first run of the blocking query has completed and therefore data is +// available for querying. +func (jsh *jobScaleStatusHandler) handleFirstRun() { jsh.initialDone <- true } + +// updateStatusState takes the API responses and updates the internal state +// along with a timestamp. +func (jsh *jobScaleStatusHandler) updateStatusState(status *api.JobScaleStatusResponse, err error) { + jsh.scaleStatus = status + jsh.scaleStatusError = err + jsh.lastUpdated = time.Now().UTC().UnixNano() +} + +// setStopState handles updating state when the job status handler is going to +// stop. +func (jsh *jobScaleStatusHandler) setStopState() { + jsh.isRunning = false + jsh.scaleStatus = nil + jsh.scaleStatusError = nil + jsh.lastUpdated = time.Now().UTC().UnixNano() +} diff --git a/plugins/builtin/target/nomad/plugin/state_test.go b/plugins/builtin/target/nomad/plugin/state_test.go new file mode 100644 index 00000000..e8706ee0 --- /dev/null +++ b/plugins/builtin/target/nomad/plugin/state_test.go @@ -0,0 +1,149 @@ +package nomad + +import ( + "fmt" + "testing" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad-autoscaler/plugins/target" + "github.com/hashicorp/nomad/api" + "github.com/stretchr/testify/assert" +) + +func Test_newJobStateHandler(t *testing.T) { + + // Create an actual client so we can test it gets set properly. + c, err := api.NewClient(api.DefaultConfig()) + assert.Nil(t, err) + + // Create the new handler and perform assertions. + jsh := newJobScaleStatusHandler(c, "test", hclog.NewNullLogger()) + assert.NotNil(t, jsh.client) + assert.Equal(t, "test", jsh.jobID) + assert.NotNil(t, jsh.initialDone) + assert.NotNil(t, jsh.client) +} + +func Test_jobStateHandler_status(t *testing.T) { + testCases := []struct { + inputJSH *jobScaleStatusHandler + inputGroup string + expectedReturn *target.Status + expectedError error + name string + }{ + { + inputJSH: &jobScaleStatusHandler{scaleStatusError: fmt.Errorf("this is an error message")}, + inputGroup: "test", + expectedReturn: nil, + expectedError: fmt.Errorf("this is an error message"), + name: "job status response currently in error", + }, + { + inputJSH: &jobScaleStatusHandler{}, + inputGroup: "test", + expectedReturn: nil, + expectedError: nil, + name: "job no longer running on cluster", + }, + { + inputJSH: &jobScaleStatusHandler{ + scaleStatus: &api.JobScaleStatusResponse{ + TaskGroups: map[string]api.TaskGroupScaleStatus{}, + }, + }, + inputGroup: "this-doesnt-exist", + expectedReturn: nil, + expectedError: fmt.Errorf("task group \"this-doesnt-exist\" not found"), + name: "job group not found within scale status task groups", + }, + { + inputJSH: &jobScaleStatusHandler{ + jobID: "cant-think-of-a-funny-name", + scaleStatus: &api.JobScaleStatusResponse{ + JobStopped: false, + TaskGroups: map[string]api.TaskGroupScaleStatus{ + "this-does-exist": {Healthy: 7}, + }, + }, + }, + inputGroup: "this-does-exist", + expectedReturn: &target.Status{ + Ready: true, + Count: 7, + Meta: map[string]string{ + "nomad_autoscaler.target.nomad.cant-think-of-a-funny-name.stopped": "false", + }, + }, + expectedError: nil, + name: "job group found within scale status task groups and job is running", + }, + { + inputJSH: &jobScaleStatusHandler{ + jobID: "cant-think-of-a-funny-name", + scaleStatus: &api.JobScaleStatusResponse{ + JobStopped: true, + TaskGroups: map[string]api.TaskGroupScaleStatus{ + "this-does-exist": {Healthy: 7}, + }, + }, + }, + inputGroup: "this-does-exist", + expectedReturn: &target.Status{ + Ready: false, + Count: 7, + Meta: map[string]string{ + "nomad_autoscaler.target.nomad.cant-think-of-a-funny-name.stopped": "true", + }, + }, + expectedError: nil, + name: "job group found within scale status task groups and job is not running", + }, + } + + for _, tc := range testCases { + actualReturn, actualErr := tc.inputJSH.status(tc.inputGroup) + assert.Equal(t, tc.expectedReturn, actualReturn, tc.name) + assert.Equal(t, tc.expectedError, actualErr, tc.name) + } +} + +func Test_jobStateHandler_updateStatusState(t *testing.T) { + jsh := &jobScaleStatusHandler{} + + // Assert that the lastUpdated timestamp is default. This helps confirm it + // gets updated later in the test. + assert.Equal(t, int64(0), jsh.lastUpdated) + + // Write our first update. + jsh.updateStatusState(&api.JobScaleStatusResponse{JobID: "test"}, nil) + newTimestamp := jsh.lastUpdated + assert.Equal(t, &api.JobScaleStatusResponse{JobID: "test"}, jsh.scaleStatus) + assert.Nil(t, jsh.scaleStatusError) + assert.Greater(t, newTimestamp, int64(0)) + + // Write a second update and ensure it is persisted. + jsh.updateStatusState(nil, fmt.Errorf("oh no, something went wrong")) + assert.Greater(t, jsh.lastUpdated, newTimestamp) + assert.Equal(t, fmt.Errorf("oh no, something went wrong"), jsh.scaleStatusError) + assert.Nil(t, jsh.scaleStatus) +} + +func Test_jobStateHandler_stop(t *testing.T) { + jsh := &jobScaleStatusHandler{} + + // Assert that the lastUpdated timestamp is default. This helps confirm it + // gets updated later in the test. + assert.Equal(t, int64(0), jsh.lastUpdated) + + // Set some data that will be overwritten by stop(). + jsh.isRunning = true + jsh.scaleStatus = &api.JobScaleStatusResponse{JobID: "test"} + + // Call stop and make assertions. + jsh.setStopState() + assert.False(t, jsh.isRunning) + assert.Nil(t, jsh.scaleStatus) + assert.Nil(t, jsh.scaleStatusError) + assert.Greater(t, jsh.lastUpdated, int64(0)) +} diff --git a/plugins/target/target.go b/plugins/target/target.go index 2f82729a..1f2aa34a 100644 --- a/plugins/target/target.go +++ b/plugins/target/target.go @@ -9,12 +9,18 @@ import ( ) type Target interface { - Count(config map[string]string) (int64, error) Scale(action strategy.Action, config map[string]string) error + Status(config map[string]string) (*Status, error) PluginInfo() (*base.PluginInfo, error) SetConfig(config map[string]string) error } +type Status struct { + Ready bool + Count int64 + Meta map[string]string +} + // RPC is a plugin implementation that talks over net/rpc type RPC struct { client *rpc.Client @@ -37,19 +43,13 @@ func (r *RPC) SetConfig(config map[string]string) error { func (r *RPC) PluginInfo() (*base.PluginInfo, error) { var resp base.PluginInfo err := r.client.Call("Plugin.PluginInfo", new(interface{}), &resp) - if err != nil { - return &resp, err - } - return &resp, nil + return &resp, err } -func (r *RPC) Count(config map[string]string) (int64, error) { - var resp int64 - err := r.client.Call("Plugin.Count", config, &resp) - if err != nil { - return 0, err - } - return resp, nil +func (r *RPC) Status(config map[string]string) (*Status, error) { + var resp Status + err := r.client.Call("Plugin.Status", config, &resp) + return &resp, err } func (r *RPC) Scale(action strategy.Action, config map[string]string) error { @@ -84,13 +84,12 @@ func (s *RPCServer) PluginInfo(_ interface{}, r *base.PluginInfo) error { return err } -func (s *RPCServer) Count(config map[string]string, resp *int64) error { - count, err := s.Impl.Count(config) - if err != nil { - return err +func (s *RPCServer) Status(config map[string]string, resp *Status) error { + status, err := s.Impl.Status(config) + if status != nil { + *resp = *status } - *resp = count - return nil + return err } func (s *RPCServer) Scale(req RPCScaleRequest, resp *error) error { diff --git a/policy/handler.go b/policy/handler.go new file mode 100644 index 00000000..4aff569b --- /dev/null +++ b/policy/handler.go @@ -0,0 +1,207 @@ +package policy + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/google/go-cmp/cmp" + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad-autoscaler/plugins" + "github.com/hashicorp/nomad-autoscaler/plugins/manager" + targetpkg "github.com/hashicorp/nomad-autoscaler/plugins/target" +) + +// Handler monitors a policy for changes and controls when them are sent for +// evaluation. +type Handler struct { + log hclog.Logger + + // policyID is the ID of the policy the handler is responsible for. + policyID PolicyID + + // pluginManager is used to retrieve an instance of the target plugin used + // by the policy. + pluginManager *manager.PluginManager + + // policySource is used to monitor for changes to the policy the handler + // is responsible for. + policySource Source + + // ticker controls the frequency the policy is sent for evaluation. + ticker *time.Ticker + + // running is used to help keep track if the handler is active or not. + running bool + runningLock sync.RWMutex + + // ch is used to listen for policy updates. + ch chan Policy + + // errCh is used to listen for errors from the policy source. + errCh chan error + + // doneCh is used to signal the handler to stop. + doneCh chan struct{} +} + +// NewHandler returns a new handler for a policy. +func NewHandler(ID PolicyID, log hclog.Logger, pm *manager.PluginManager, ps Source) *Handler { + return &Handler{ + policyID: ID, + log: log.Named("policy_handler").With("policy_id", ID), + pluginManager: pm, + policySource: ps, + ch: make(chan Policy), + errCh: make(chan error), + doneCh: make(chan struct{}), + } +} + +// Run starts the handler and periodically sends the policy for evaluation. +// +// This function blocks until the context provided is canceled or the handler +// is stopped with the Stop() method. +func (h *Handler) Run(ctx context.Context, evalCh chan<- *Evaluation) { + h.log.Trace("starting policy handler") + + defer h.Stop() + + // Mark the handler as running. + h.runningLock.Lock() + h.running = true + h.runningLock.Unlock() + + // Store a local copy of the policy so we can compare it for changes. + var currentPolicy *Policy + + // Start with a long ticker until we receive the right interval. + // TODO(luiz): make this a config param + policyReadTimeout := 3 * time.Minute + h.ticker = time.NewTicker(policyReadTimeout) + + // Create separate context so we can stop the monitoring Go routine if + // doneCh is closed, but ctx is still valid. + monitorCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // Start monitoring the policy for changes. + go h.policySource.MonitorPolicy(monitorCtx, h.policyID, h.ch, h.errCh) + + for { + select { + case <-ctx.Done(): + return + case <-h.doneCh: + return + case err := <-h.errCh: + h.log.Error(err.Error()) + return + case p := <-h.ch: + h.updateHandler(currentPolicy, &p) + currentPolicy = &p + case <-h.ticker.C: + eval, err := h.generateEvaluation(currentPolicy) + if err != nil { + h.log.Error(err.Error()) + return + } + if eval != nil { + evalCh <- eval + } + } + } +} + +// Stop stops the handler and the monitoring Go routine. +func (h *Handler) Stop() { + h.runningLock.Lock() + defer h.runningLock.Unlock() + + if h.running { + h.log.Trace("stopping handler") + h.ticker.Stop() + close(h.doneCh) + } + + h.running = false +} + +// generateEvaluation returns an evaluation if the policy needs to be evaluated. +// Returning an error will stop the handler. +func (h *Handler) generateEvaluation(policy *Policy) (*Evaluation, error) { + h.log.Trace("tick") + + if policy == nil { + // Initial ticker ticked without a policy being set, assume we are not able + // to retrieve the policy and exit. + return nil, fmt.Errorf("timeout: failed to read policy in time") + } + + // Exit early if the policy is not enabled. + if !policy.Enabled { + h.log.Debug("policy is not enabled") + return nil, nil + } + + // Dispense an instance of target plugin used by the policy. + targetPlugin, err := h.pluginManager.Dispense(policy.Target.Name, plugins.PluginTypeTarget) + if err != nil { + return nil, err + } + + targetInst, ok := targetPlugin.Plugin().(targetpkg.Target) + if !ok { + err := fmt.Errorf("plugin %s (%T) is not a target plugin", policy.Target.Name, targetPlugin.Plugin()) + return nil, err + } + + // Get target status. + h.log.Trace("getting target status") + + status, err := targetInst.Status(policy.Target.Config) + if err != nil { + h.log.Warn("failed to get target status", "error", err) + return nil, nil + } + + // A nil status indicates the target doesn't exist, so we don't need to + // monitor the policy anymore. + if status == nil { + h.log.Trace("target doesn't exist anymore", "target", policy.Target.Config) + h.Stop() + return nil, nil + } + + // Exit early if the target is not ready yet. + if !status.Ready { + h.log.Trace("target is not ready") + return nil, nil + } + + // Send policy for evaluation. + h.log.Trace("sending policy for evaluation") + return &Evaluation{ + Policy: policy, + TargetStatus: status, + }, nil +} + +// updateHandler updates the handler's internal state based on the changes in +// the policy being monitored. +func (h *Handler) updateHandler(current, next *Policy) { + if current == nil { + h.log.Trace("received policy") + } else { + h.log.Trace("received policy change") + h.log.Trace(cmp.Diff(current, next)) + } + + // Update ticker if it's the first time we receive the policy or if the + // policy's evaluation interval has changed. + if current == nil || current.EvaluationInterval != next.EvaluationInterval { + h.ticker.Stop() + h.ticker = time.NewTicker(next.EvaluationInterval) + } +} diff --git a/policy/manager.go b/policy/manager.go new file mode 100644 index 00000000..038ff789 --- /dev/null +++ b/policy/manager.go @@ -0,0 +1,129 @@ +package policy + +import ( + "context" + "fmt" + "sync" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad-autoscaler/plugins/manager" +) + +// Manager tracks policies and controls the lifecycle of each policy handler. +type Manager struct { + log hclog.Logger + policySource Source + pluginManager *manager.PluginManager + + // lock is used to synchronize parallel access to the maps below. + lock sync.RWMutex + + // handlers are used to track the Go routines monitoring policies. + handlers map[PolicyID]*Handler + + // keep is used to mark active policies during reconciliation. + keep map[PolicyID]bool +} + +// NewManager returns a new Manager. +func NewManager(log hclog.Logger, ps Source, pm *manager.PluginManager) *Manager { + return &Manager{ + log: log.Named("policy_manager"), + policySource: ps, + pluginManager: pm, + handlers: make(map[PolicyID]*Handler), + keep: make(map[PolicyID]bool), + } +} + +// Run starts the manager and blocks until the context is canceled. +// Policies that need to be evaluated are sent in the evalCh. +func (m *Manager) Run(ctx context.Context, evalCh chan<- *Evaluation) { + defer m.stopHandlers() + + policyIDsCh := make(chan []PolicyID) + policyIDsErrCh := make(chan error) + + // Start the policy source and listen for changes in the list of policy IDs + go m.policySource.MonitorIDs(ctx, policyIDsCh, policyIDsErrCh) + + for { + select { + case <-ctx.Done(): + m.log.Trace("stopping policy manager") + return + case policyIDs := <-policyIDsCh: + m.log.Trace(fmt.Sprintf("detected %d policies", len(policyIDs))) + + m.lock.Lock() + + // Reset set of policies to keep. We will remove the policies that + // are not in policyIDs to reconcile our state. + m.keep = make(map[PolicyID]bool) + + // Iterate over policy IDs and create new handlers if necessary + for _, policyID := range policyIDs { + + // Mark policy as must-keep so it doesn't get removed. + m.keep[policyID] = true + + // Check if we already have a handler for this policy. + if _, ok := m.handlers[policyID]; ok { + m.log.Trace("handler already exists") + continue + } + + // Create and store a new handler and use its channels to monitor + // the policy for changes. + m.log.Trace("creating new handler", "policy_id", policyID) + + h := NewHandler( + policyID, + m.log.ResetNamed("policy_handler"), + m.pluginManager, + m.policySource) + m.handlers[policyID] = h + + go func(ID PolicyID) { + h.Run(ctx, evalCh) + + m.lock.Lock() + delete(m.handlers, ID) + m.lock.Unlock() + }(policyID) + } + + // Remove and stop handlers for policies that don't exist anymore. + for k, h := range m.handlers { + if !m.keep[k] { + m.stopHandler(h) + } + } + + m.lock.Unlock() + } + } +} + +func (m *Manager) stopHandlers() { + m.lock.Lock() + defer m.lock.Unlock() + + for _, h := range m.handlers { + m.stopHandler(h) + } +} + +// stopHandler stops a handler and removes it from the manager's internal +// state storage. +// +// This method is not thread-safe so a RW lock should be acquired before +// calling it. +func (m *Manager) stopHandler(h *Handler) { + if h == nil { + return + } + + h.Stop() + delete(m.handlers, h.policyID) +} diff --git a/policy/nomad/parser.go b/policy/nomad/parser.go new file mode 100644 index 00000000..d6cb8690 --- /dev/null +++ b/policy/nomad/parser.go @@ -0,0 +1,121 @@ +package nomad + +import ( + "fmt" + "strings" + "time" + + "github.com/hashicorp/nomad-autoscaler/plugins" + "github.com/hashicorp/nomad-autoscaler/policy" + "github.com/hashicorp/nomad/api" +) + +func parsePolicy(p *api.ScalingPolicy) (policy.Policy, error) { + var to policy.Policy + + if err := validateScalingPolicy(p); err != nil { + return to, err + } + + source := p.Policy[keySource] + if source == nil { + source = "" + } + + to = policy.Policy{ + ID: p.ID, + Min: *p.Min, + Max: p.Max, + Enabled: *p.Enabled, + Source: source.(string), + Query: p.Policy[keyQuery].(string), + EvaluationInterval: defaultEvaluationInterval, //TODO(luiz): use agent scan interval as default + Target: parseTarget(p.Policy[keyTarget]), + Strategy: parseStrategy(p.Policy[keyStrategy]), + } + + canonicalizePolicy(p, &to) + + return to, nil +} + +func parseStrategy(s interface{}) *policy.Strategy { + strategyMap := s.([]interface{})[0].(map[string]interface{}) + configMap := strategyMap["config"].([]interface{})[0].(map[string]interface{}) + configMapString := make(map[string]string) + for k, v := range configMap { + configMapString[k] = fmt.Sprintf("%v", v) + } + + return &policy.Strategy{ + Name: strategyMap["name"].(string), + Config: configMapString, + } +} + +func parseTarget(t interface{}) *policy.Target { + if t == nil { + return &policy.Target{} + } + + targetMap := t.([]interface{})[0].(map[string]interface{}) + if targetMap == nil { + return &policy.Target{} + } + + var configMapString map[string]string + if targetMap["config"] != nil { + configMap := targetMap["config"].([]interface{})[0].(map[string]interface{}) + configMapString = make(map[string]string) + for k, v := range configMap { + configMapString[k] = fmt.Sprintf("%v", v) + } + } + return &policy.Target{ + Name: targetMap["name"].(string), + Config: configMapString, + } +} + +// canonicalizePolicy sets standarized values for missing fields. +// It must be called after Validate. +func canonicalizePolicy(from *api.ScalingPolicy, to *policy.Policy) { + + if from.Enabled == nil { + to.Enabled = true + } + + if evalInterval, ok := from.Policy[keyEvaluationInterval].(string); ok { + // Ignore parse error since we assume Canonicalize is called after Validate + to.EvaluationInterval, _ = time.ParseDuration(evalInterval) + } + + if to.Target.Name == "" { + to.Target.Name = plugins.InternalTargetNomad + } + + if to.Target.Config == nil { + to.Target.Config = make(map[string]string) + } + + to.Target.Config["job_id"] = from.Target["Job"] + to.Target.Config["group"] = from.Target["Group"] + + if to.Source == "" { + to.Source = plugins.InternalAPMNomad + + // TODO(luiz) move default query logic handling to the Nomad APM plugin + parts := strings.Split(to.Query, "_") + op := parts[0] + metric := parts[1] + + switch metric { + case "cpu": + metric = "nomad.client.allocs.cpu.total_percent" + case "memory": + metric = "nomad.client.allocs.memory.usage" + } + + to.Query = fmt.Sprintf("%s/%s/%s/%s", metric, from.Target["Job"], from.Target["Group"], op) + } +} diff --git a/policy/nomad/parser_test.go b/policy/nomad/parser_test.go new file mode 100644 index 00000000..c3028e4f --- /dev/null +++ b/policy/nomad/parser_test.go @@ -0,0 +1,54 @@ +package nomad + +import ( + "testing" + + "github.com/hashicorp/nomad-autoscaler/policy" + "github.com/stretchr/testify/assert" +) + +func Test_parseStrategy(t *testing.T) { + testCases := []struct { + inputStrategy interface{} + expectedOutput *policy.Strategy + }{ + { + inputStrategy: []interface{}{ + map[string]interface{}{ + "name": "target-value", + "config": []interface{}{ + map[string]interface{}{"target": float64(20)}, + }, + }, + }, + expectedOutput: &policy.Strategy{ + Name: "target-value", + Config: map[string]string{"target": "20"}, + }, + }, + } + + for _, tc := range testCases { + actualOutput := parseStrategy(tc.inputStrategy) + assert.Equal(t, tc.expectedOutput, actualOutput) + } +} + +func Test_parseTarget(t *testing.T) { + testCases := []struct { + inputTarget interface{} + expectedOutput *policy.Target + name string + }{ + { + inputTarget: nil, + expectedOutput: &policy.Target{}, + name: "nil passed target interface", + }, + } + + for _, tc := range testCases { + actualOutput := parseTarget(tc.inputTarget) + assert.Equal(t, tc.expectedOutput, actualOutput, tc.name) + } +} diff --git a/policy/nomad/source.go b/policy/nomad/source.go new file mode 100644 index 00000000..43f48163 --- /dev/null +++ b/policy/nomad/source.go @@ -0,0 +1,162 @@ +package nomad + +import ( + "context" + "fmt" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad-autoscaler/helper/blocking" + "github.com/hashicorp/nomad-autoscaler/policy" + "github.com/hashicorp/nomad/api" +) + +// Keys represent the scaling policy document keys and help translate +// the opaque object into a usable autoscaling policy. +const ( + keySource = "source" + keyQuery = "query" + keyEvaluationInterval = "evaluation_interval" + keyTarget = "target" + keyStrategy = "strategy" +) + +const ( + defaultEvaluationInterval = 10 * time.Second +) + +// Ensure NomadSource satisfies the Source interface. +var _ policy.Source = (*Source)(nil) + +// Source is an implementation of the Source interface that retrieves +// policies from a Nomad cluster. +type Source struct { + log hclog.Logger + nomad *api.Client +} + +// NewNomadSource returns a new Nomad policy source. +func NewNomadSource(log hclog.Logger, nomad *api.Client) *Source { + return &Source{ + log: log.Named("nomad_policy_source"), + nomad: nomad, + } +} + +// MonitorIDs retrieves a list of policy IDs from a Nomad cluster and sends it +// in the resultCh channel when change is detected. Errors are sent through the +// errCh channel. +// +// This function blocks until the context is closed. +func (s *Source) MonitorIDs(ctx context.Context, resultCh chan<- []policy.PolicyID, errCh chan<- error) { + s.log.Debug("starting policy blocking query watcher") + + q := &api.QueryOptions{WaitTime: 5 * time.Minute, WaitIndex: 1} + + for { + select { + case <-ctx.Done(): + s.log.Trace("stopping ID subscription") + return + default: + // Perform a blocking query on the Nomad API that returns a stub list + // of scaling policies. If we get an errors at this point, we should + // sleep and try again. + // + // TODO(jrasell) in the future maybe use a better method than sleep. + policies, meta, err := s.nomad.Scaling().ListPolicies(q) + + // Return immediately if context is closed. + if ctx.Err() != nil { + s.log.Trace("stopping ID subscription") + return + } + + if err != nil { + errCh <- fmt.Errorf("failed to call the Nomad list policies API: %v", err) + time.Sleep(10 * time.Second) + continue + } + + // If the index has not changed, the query returned because the timeout + // was reached, therefore start the next query loop. + if !blocking.IndexHasChanged(meta.LastIndex, q.WaitIndex) { + continue + } + + var policyIDs []policy.PolicyID + + // Iterate all policies in the list. + for _, p := range policies { + policyIDs = append(policyIDs, policy.PolicyID(p.ID)) + } + + // Update the Nomad API wait index to start long polling from the + // correct point and update our recorded lastChangeIndex so we have the + // correct point to use during the next API return. + q.WaitIndex = meta.LastIndex + + // Send new policy IDs in the channel. + resultCh <- policyIDs + } + } +} + +// MonitorPolicy monitors a policy and sends it through the resultCh channel +// when a change is detect. Errors are sent through the errCh channel. +// +// This function blocks until the context is closed. +func (s *Source) MonitorPolicy(ctx context.Context, ID policy.PolicyID, resultCh chan<- policy.Policy, errCh chan<- error) { + log := s.log.With("policy_id", ID) + + // Close channels when done with the monitoring loop. + defer close(resultCh) + defer close(errCh) + + log.Trace("starting policy blocking query watcher") + + q := &api.QueryOptions{WaitTime: 5 * time.Minute, WaitIndex: 1} + for { + select { + case <-ctx.Done(): + log.Trace("done with policy monitoring") + return + default: + // Perform a blocking query on the Nomad API that returns a stub list + // of scaling policies. If we get an errors at this point, we should + // sleep and try again. + // + // TODO(jrasell) in the future maybe use a better method than sleep. + p, meta, err := s.nomad.Scaling().GetPolicy(string(ID), q) + + // Return immediately if context is closed. + if ctx.Err() != nil { + log.Trace("done with policy monitoring") + return + } + + if err != nil { + errCh <- fmt.Errorf("failed to get policy: %v", err) + time.Sleep(10 * time.Second) + continue + } + + // If the index has not changed, the query returned because the timeout + // was reached, therefore start the next query loop. + if !blocking.IndexHasChanged(meta.LastIndex, q.WaitIndex) { + continue + } + + var autoPolicy policy.Policy + // TODO(jrasell) once we have a better method for surfacing errors to the + // user, this error should be presented. + if autoPolicy, err = parsePolicy(p); err != nil { + errCh <- fmt.Errorf("failed to parse policy: %v", err) + return + } + + resultCh <- autoPolicy + q.WaitIndex = meta.LastIndex + } + } +} diff --git a/policy/nomad/validate.go b/policy/nomad/validate.go new file mode 100644 index 00000000..5673d4a5 --- /dev/null +++ b/policy/nomad/validate.go @@ -0,0 +1,188 @@ +package nomad + +import ( + "fmt" + "time" + + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/api" +) + +func validateScalingPolicy(policy *api.ScalingPolicy) error { + var result *multierror.Error + + if policy == nil { + result = multierror.Append(result, fmt.Errorf("ScalingPolicy is nil")) + return result + } + + // Validate Min and Max values. + // 1. Min must not be nil. + // 2. Min must be positive. + // 3. Max must be positive. + // 4. Min must be smaller than Max. + if policy.Min == nil { + result = multierror.Append(result, fmt.Errorf("ScalingPolicy.Min is nil")) + } else { + min := *policy.Min + if min < 0 { + result = multierror.Append(result, fmt.Errorf("ScalingPolicy.Min can't be negative")) + } + + if min > policy.Max { + result = multierror.Append(result, fmt.Errorf("ScalingPolicy.Min must be smaller than ScalingPolicy.Max")) + } + } + + if policy.Max < 0 { + result = multierror.Append(result, fmt.Errorf("ScalingPolicy.Max can't be negative")) + } + + // Validate Target + if targetErr := validateTarget(policy.Target); targetErr != nil { + result = multierror.Append(result, targetErr) + } + + // Validate Policy + if policyErr := validatePolicy(policy.Policy); policyErr != nil { + result = multierror.Append(result, policyErr) + } + + return result.ErrorOrNil() +} + +func validateTarget(t map[string]string) error { + const path = "ScalingPolicy.Target" + + var result *multierror.Error + + if t == nil { + return multierror.Append(result, fmt.Errorf("%s is nil", path)) + } + + // Validate required keys are defined. + requiredKeys := []string{"Job", "Group"} + for _, k := range requiredKeys { + if v := t[k]; v == "" { + result = multierror.Append(result, fmt.Errorf(`%s is missing key "%s"`, path, k)) + } + } + + return result.ErrorOrNil() +} + +func validatePolicy(p map[string]interface{}) error { + const path = "ScalingPolicy.Policy" + + var result *multierror.Error + + if p == nil { + return multierror.Append(result, fmt.Errorf("%s is nil", path)) + } + + // Validate Source. + // 1. Source value must be a string if defined. + source, ok := p[keySource] + if ok { + _, ok := source.(string) + if !ok { + result = multierror.Append(result, fmt.Errorf("%s[%s] must be string, found %T", path, keySource, source)) + } + } + + // Validate Query. + // 1. Query key must exist. + // 2. Query must have string value. + // 3. Query must not be empty. + query, ok := p[keyQuery] + if !ok { + result = multierror.Append(result, fmt.Errorf(`%s is missing key "%s"`, path, keyQuery)) + } else { + queryStr, ok := query.(string) + if !ok { + result = multierror.Append(result, fmt.Errorf("%s[%s] must be string, found %T", path, keyQuery, query)) + } else { + if queryStr == "" { + result = multierror.Append(result, fmt.Errorf("%s[%s] can't be empty", path, keyQuery)) + } + } + } + + // Validate EvaluationInterval. + // 1. EvaluationInterval must have string value if defined. + // 2. EvaluationInterval must have time.Duration format if defined. + evalInterval, ok := p[keyEvaluationInterval] + if ok { + evalIntervalString, ok := evalInterval.(string) + if !ok { + result = multierror.Append(result, fmt.Errorf("%s[%s] must be string, found %T", path, keyEvaluationInterval, evalInterval)) + } else { + if _, err := time.ParseDuration(evalIntervalString); err != nil { + result = multierror.Append(result, fmt.Errorf("%s[%s] must have time.Duration format", path, keyEvaluationInterval)) + } + } + } + + // Validate Strategy. + // 1. Strategy key must exist. + // 2. Strategy must have []interface{} value. + // This is due the way HCL parses blocks, it creates a list to avoid + // overwriting blocks of the same type. + // 3. Strategy must have just one element. + // 4. The element in Strategy must be of type map[string]interface{} + strategyInterface, ok := p[keyStrategy] + if !ok { + result = multierror.Append(result, fmt.Errorf(`%s missing key "%s"`, path, keyStrategy)) + } else { + strategyList, ok := strategyInterface.([]interface{}) + if !ok { + result = multierror.Append(result, fmt.Errorf("%s[%s] must be []interface{}, found %T", path, keyStrategy, strategyInterface)) + } else { + if len(strategyList) != 1 { + result = multierror.Append(result, fmt.Errorf("%s[%s] must have length 1, found %d", path, keyStrategy, len(strategyList))) + } else { + strategyMap, ok := strategyList[0].(map[string]interface{}) + if !ok { + result = multierror.Append(result, fmt.Errorf("%s[%s][0] must be map[string]interface{}, found %T", path, keyStrategy, strategyList[0])) + } else { + if strategyErrs := validateStrategy(strategyMap); strategyErrs != nil { + result = multierror.Append(result, strategyErrs) + } + } + } + } + } + + return result.ErrorOrNil() +} + +func validateStrategy(s map[string]interface{}) error { + var path = fmt.Sprintf("ScalingPolicy.Policy[%s]", keyStrategy) + + var result *multierror.Error + + if s == nil { + return multierror.Append(result, fmt.Errorf("%s is nil", path)) + } + + // Validate name. + // 1. Name key must exist. + // 2. Name must have string value. + // 3. Name must not be empty. + nameKey := "name" + nameInterface, ok := s[nameKey] + if !ok { + result = multierror.Append(result, fmt.Errorf(`%s is missing key "%s"`, path, nameKey)) + } else { + nameString, ok := nameInterface.(string) + if !ok { + result = multierror.Append(result, fmt.Errorf("%s[%s] must be string, found %T", path, nameKey, nameInterface)) + } else { + if nameString == "" { + result = multierror.Append(result, fmt.Errorf("%s[%s] can't be empty", path, nameKey)) + } + } + } + + return result.ErrorOrNil() +} diff --git a/policy/nomad/validate_test.go b/policy/nomad/validate_test.go new file mode 100644 index 00000000..d42005ea --- /dev/null +++ b/policy/nomad/validate_test.go @@ -0,0 +1,433 @@ +package nomad + +import ( + "testing" + + "github.com/hashicorp/nomad-autoscaler/helper/ptr" + "github.com/hashicorp/nomad/api" + "github.com/stretchr/testify/assert" +) + +func Test_validateScalingPolicy(t *testing.T) { + testCases := []struct { + name string + input *api.ScalingPolicy + expectError bool + }{ + { + name: "valid policy", + input: validPolicy(), + expectError: false, + }, + { + name: "valid policy without optional values", + input: func() *api.ScalingPolicy { + p := validPolicy() + p.Policy[keySource] = "" + p.Enabled = nil + return p + }(), + expectError: false, + }, + { + name: "nil policy", + input: nil, + expectError: true, + }, + { + name: "empty policy", + input: &api.ScalingPolicy{}, + expectError: true, + }, + { + name: "invalid target", + input: func() *api.ScalingPolicy { + p := validPolicy() + p.Target = nil + return p + }(), + expectError: true, + }, + { + name: "invalid policy", + input: func() *api.ScalingPolicy { + p := validPolicy() + p.Policy = nil + return p + }(), + expectError: true, + }, + { + name: "min is nil", + input: func() *api.ScalingPolicy { + p := validPolicy() + p.Min = nil + return p + }(), + expectError: true, + }, + { + name: "min is negative", + input: func() *api.ScalingPolicy { + p := validPolicy() + p.Min = ptr.Int64ToPtr(-1) + return p + }(), + expectError: true, + }, + { + name: "max is negative", + input: func() *api.ScalingPolicy { + p := validPolicy() + p.Max = -1 + return p + }(), + expectError: true, + }, + { + name: "max less than min", + input: func() *api.ScalingPolicy { + p := validPolicy() + p.Max = 1 + p.Min = ptr.Int64ToPtr(2) + return p + }(), + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := validateScalingPolicy(tc.input) + + assertFunc := assert.NoError + if tc.expectError { + assertFunc = assert.Error + } + + assertFunc(t, err) + }) + } +} + +func Test_validateTarget(t *testing.T) { + testCases := []struct { + name string + input map[string]string + expectError bool + }{ + { + name: "target is valid", + input: map[string]string{ + "Job": "example", + "Group": "cache", + }, + expectError: false, + }, + { + name: "target is nil", + input: nil, + expectError: true, + }, + { + name: "target.job is missing", + input: map[string]string{ + "Group": "cache", + }, + expectError: true, + }, + { + name: "target.group is missing", + input: map[string]string{ + "Job": "example", + }, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := validateTarget(tc.input) + + assertFunc := assert.NoError + if tc.expectError { + assertFunc = assert.Error + } + + assertFunc(t, err) + }) + } +} + +func Test_validatePolicy(t *testing.T) { + testCases := []struct { + name string + input map[string]interface{} + expectError bool + }{ + { + name: "policy is valid", + input: map[string]interface{}{ + keySource: "source", + keyQuery: "query", + keyEvaluationInterval: "5s", + keyStrategy: []interface{}{map[string]interface{}{ + "name": "strategy", + "config": map[string]string{"key": "value"}, + }}, + }, + expectError: false, + }, + { + name: "policy without optional values is valid", + input: map[string]interface{}{ + keyQuery: "query", + keyStrategy: []interface{}{map[string]interface{}{ + "name": "strategy", + }}, + }, + expectError: false, + }, + { + name: "policy is nil", + input: nil, + expectError: true, + }, + { + name: "policy.source is not a string", + input: map[string]interface{}{ + keySource: 2, + keyQuery: "query", + keyEvaluationInterval: "5s", + keyStrategy: []interface{}{map[string]interface{}{ + "name": "strategy", + "config": map[string]string{"key": "value"}, + }}, + }, + expectError: true, + }, + { + name: "policy.query is missing", + input: map[string]interface{}{ + keySource: "source", + keyEvaluationInterval: "5s", + keyStrategy: []interface{}{map[string]interface{}{ + "name": "strategy", + "config": map[string]string{"key": "value"}, + }}, + }, + expectError: true, + }, + { + name: "policy.query is not a string", + input: map[string]interface{}{ + keySource: "source", + keyQuery: 2, + keyEvaluationInterval: "5s", + keyStrategy: []interface{}{map[string]interface{}{ + "name": "strategy", + "config": map[string]string{"key": "value"}, + }}, + }, + expectError: true, + }, + { + name: "policy.query is empty", + input: map[string]interface{}{ + keySource: "source", + keyQuery: "", + keyEvaluationInterval: "5s", + keyStrategy: []interface{}{map[string]interface{}{ + "name": "strategy", + "config": map[string]string{"key": "value"}, + }}, + }, + expectError: true, + }, + { + name: "policy.evaluation_interval has wrong type", + input: map[string]interface{}{ + keySource: "source", + keyQuery: "query", + keyEvaluationInterval: 5, + keyStrategy: []interface{}{map[string]interface{}{ + "name": "strategy", + "config": map[string]string{"key": "value"}, + }}, + }, + expectError: true, + }, + { + name: "policy.evaluation_interval has wrong format", + input: map[string]interface{}{ + keySource: "source", + keyQuery: "query", + keyEvaluationInterval: "5 seconds", + keyStrategy: []interface{}{map[string]interface{}{ + "name": "strategy", + "config": map[string]string{"key": "value"}, + }}, + }, + expectError: true, + }, + { + name: "policy.strategy is missing", + input: map[string]interface{}{ + keySource: "source", + keyQuery: "query", + keyEvaluationInterval: "5s", + }, + expectError: true, + }, + { + name: "policy.strategy has wrong type", + input: map[string]interface{}{ + keySource: "source", + keyQuery: "query", + keyEvaluationInterval: "5s", + keyStrategy: true, + }, + expectError: true, + }, + { + name: "policy.strategy is empty", + input: map[string]interface{}{ + keySource: "source", + keyQuery: "query", + keyEvaluationInterval: "5s", + keyStrategy: []interface{}{}, + }, + expectError: true, + }, + { + name: "policy.strategy has more than 1 element", + input: map[string]interface{}{ + keySource: "source", + keyQuery: "query", + keyEvaluationInterval: "5s", + keyStrategy: []interface{}{1, 2}, + }, + expectError: true, + }, + { + name: "policy.strategy[0] has wrong type", + input: map[string]interface{}{ + keySource: "source", + keyQuery: "query", + keyEvaluationInterval: "5s", + keyStrategy: []interface{}{1}, + }, + expectError: true, + }, + { + name: "policy.strategy[0] is invalid", + input: map[string]interface{}{ + keySource: "source", + keyQuery: "query", + keyEvaluationInterval: "5s", + keyStrategy: []interface{}{map[string]interface{}{}}, + }, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := validatePolicy(tc.input) + + assertFunc := assert.NoError + if tc.expectError { + assertFunc = assert.Error + } + + assertFunc(t, err) + }) + } +} + +func Test_validateStrategy(t *testing.T) { + testCases := []struct { + name string + input map[string]interface{} + expectError bool + }{ + { + name: "strategy is valid", + input: map[string]interface{}{ + "name": "strategy", + "config": map[string]string{"key": "value"}, + }, + expectError: false, + }, + { + name: "strategy without optional values is valid", + input: map[string]interface{}{ + "name": "strategy", + }, + expectError: false, + }, + { + name: "strategy is nil", + input: nil, + expectError: true, + }, + { + name: "strategy.name is missing", + input: map[string]interface{}{ + "config": map[string]string{"key": "value"}, + }, + expectError: true, + }, + { + name: "strategy.name is not a string", + input: map[string]interface{}{ + "name": 2, + "config": map[string]string{"key": "value"}, + }, + expectError: true, + }, + { + name: "strategy.name is empty", + input: map[string]interface{}{ + "name": "", + "config": map[string]string{"key": "value"}, + }, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := validateStrategy(tc.input) + + assertFunc := assert.NoError + if tc.expectError { + assertFunc = assert.Error + } + + assertFunc(t, err) + }) + } +} + +func validPolicy() *api.ScalingPolicy { + return &api.ScalingPolicy{ + ID: "id", + Namespace: "default", + Target: map[string]string{ + "Job": "example", + "Group": "cache", + }, + Min: ptr.Int64ToPtr(1), + Max: 5, + Policy: map[string]interface{}{ + keySource: "source", + keyQuery: "query", + keyStrategy: []interface{}{map[string]interface{}{ + "name": "strategy", + "config": map[string]string{"key": "value"}, + }}, + }, + Enabled: ptr.BoolToPtr(true), + } +} diff --git a/policy/policy.go b/policy/policy.go new file mode 100644 index 00000000..473cead0 --- /dev/null +++ b/policy/policy.go @@ -0,0 +1,34 @@ +package policy + +import ( + "time" + + "github.com/hashicorp/nomad-autoscaler/plugins/target" +) + +type Policy struct { + ID string + Min int64 + Max int64 + Source string + Query string + Enabled bool + EvaluationInterval time.Duration + Target *Target + Strategy *Strategy +} + +type Strategy struct { + Name string + Config map[string]string +} + +type Target struct { + Name string + Config map[string]string +} + +type Evaluation struct { + Policy *Policy + TargetStatus *target.Status +} diff --git a/policy/source.go b/policy/source.go new file mode 100644 index 00000000..023b279d --- /dev/null +++ b/policy/source.go @@ -0,0 +1,12 @@ +package policy + +import "context" + +// Source is the interface that must be implemented by backends which +// provide the canonical source for scaling policies. +type Source interface { + MonitorIDs(ctx context.Context, resultCh chan<- []PolicyID, errCh chan<- error) + MonitorPolicy(ctx context.Context, ID PolicyID, resultCh chan<- Policy, errCh chan<- error) +} + +type PolicyID string diff --git a/policystorage/nomad.go b/policystorage/nomad.go deleted file mode 100644 index 584dc065..00000000 --- a/policystorage/nomad.go +++ /dev/null @@ -1,148 +0,0 @@ -package policystorage - -import ( - "fmt" - "strings" - - "github.com/hashicorp/nomad-autoscaler/plugins" - "github.com/hashicorp/nomad/api" -) - -type Nomad struct { - Client *api.Client -} - -func (n *Nomad) List() ([]*PolicyListStub, error) { - fromPolicies, _, err := n.Client.Scaling().ListPolicies(nil) - if err != nil { - return nil, err - } - - var toPolicies []*PolicyListStub - for _, policy := range fromPolicies { - toPolicy := &PolicyListStub{ - ID: policy.ID, - } - toPolicies = append(toPolicies, toPolicy) - } - - return toPolicies, nil -} - -func (n *Nomad) Get(ID string) (*Policy, error) { - fromPolicy, _, err := n.Client.Scaling().GetPolicy(ID, nil) - if err != nil { - return nil, err - } - - errs := validate(fromPolicy) - if len(errs) > 0 { - return nil, fmt.Errorf("failed to parse Policy: %v", errs) - } - - if fromPolicy.Policy["source"] == nil { - fromPolicy.Policy["source"] = "" - } - toPolicy := &Policy{ - ID: fromPolicy.ID, - Min: *fromPolicy.Min, - Max: fromPolicy.Max, - Source: fromPolicy.Policy["source"].(string), - Query: fromPolicy.Policy["query"].(string), - Enabled: *fromPolicy.Enabled, - Strategy: parseStrategy(fromPolicy.Policy["strategy"]), - Target: parseTarget(fromPolicy.Policy["target"]), - } - canonicalize(fromPolicy, toPolicy) - return toPolicy, nil -} - -func canonicalize(from *api.ScalingPolicy, to *Policy) { - - if from.Enabled == nil { - to.Enabled = true - } - - if to.Target.Name == "" { - to.Target.Name = plugins.InternalTargetNomad - } - - if to.Target.Config == nil { - to.Target.Config = make(map[string]string) - } - - to.Target.Config["job_id"] = from.Target["Job"] - to.Target.Config["group"] = from.Target["Group"] - - if to.Source == "" { - to.Source = plugins.InternalAPMNomad - - parts := strings.Split(to.Query, "_") - op := parts[0] - metric := parts[1] - - switch metric { - case "cpu": - metric = "nomad.client.allocs.cpu.total_percent" - case "memory": - metric = "nomad.client.allocs.memory.usage" - } - - to.Query = fmt.Sprintf("%s/%s/%s/%s", metric, to.Target.Config["job_id"], to.Target.Config["group"], op) - } -} - -func validate(policy *api.ScalingPolicy) []error { - var errs []error - - strategyList, ok := policy.Policy["strategy"].([]interface{}) - if !ok { - errs = append(errs, fmt.Errorf("Policy.strategy (%T) is not a []interface{}", policy.Policy["strategy"])) - return errs - } - - _, ok = strategyList[0].(map[string]interface{}) - if !ok { - errs = append(errs, fmt.Errorf("Policy.strategy[0] (%T) is not a map[string]string", strategyList[0])) - } - - return errs -} - -func parseStrategy(s interface{}) *Strategy { - strategyMap := s.([]interface{})[0].(map[string]interface{}) - configMap := strategyMap["config"].([]interface{})[0].(map[string]interface{}) - configMapString := make(map[string]string) - for k, v := range configMap { - configMapString[k] = fmt.Sprintf("%v", v) - } - - return &Strategy{ - Name: strategyMap["name"].(string), - Config: configMapString, - } -} - -func parseTarget(t interface{}) *Target { - if t == nil { - return &Target{} - } - - targetMap := t.([]interface{})[0].(map[string]interface{}) - if targetMap == nil { - return &Target{} - } - - var configMapString map[string]string - if targetMap["config"] != nil { - configMap := targetMap["config"].([]interface{})[0].(map[string]interface{}) - configMapString = make(map[string]string) - for k, v := range configMap { - configMapString[k] = fmt.Sprintf("%v", v) - } - } - return &Target{ - Name: targetMap["name"].(string), - Config: configMapString, - } -} diff --git a/policystorage/nomad_test.go b/policystorage/nomad_test.go deleted file mode 100644 index c0591ffa..00000000 --- a/policystorage/nomad_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package policystorage - -import ( - "fmt" - "reflect" - "testing" - - "github.com/hashicorp/nomad/api" -) - -func Test_storagNomad_Validate(t *testing.T) { - tests := []struct { - name string - policy *api.ScalingPolicy - want []error - }{ - { - name: "validate missing", - policy: &api.ScalingPolicy{ - Policy: map[string]interface{}{}, - }, - want: []error{fmt.Errorf("Policy.strategy () is not a []interface{}")}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := validate(tt.policy); !reflect.DeepEqual(got, tt.want) { - t.Errorf("validate() = %v, want %v", got, tt.want) - } - }) - } -} \ No newline at end of file diff --git a/policystorage/policy.go b/policystorage/policy.go deleted file mode 100644 index f3f59d0b..00000000 --- a/policystorage/policy.go +++ /dev/null @@ -1,35 +0,0 @@ -package policystorage - -type PolicyStorage interface { - List() ([]*PolicyListStub, error) - Get(string) (Policy, error) -} - -type Policy struct { - ID string - Min int64 - Max int64 - Source string - Query string - Enabled bool - Target *Target - Strategy *Strategy -} - -type PolicyListStub struct { - ID string - Source string - Query string - Target - Strategy -} - -type Strategy struct { - Name string - Config map[string]string -} - -type Target struct { - Name string - Config map[string]string -}