diff --git a/policyeval/base_worker.go b/policyeval/base_worker.go index 92302566..ce4cef21 100644 --- a/policyeval/base_worker.go +++ b/policyeval/base_worker.go @@ -103,59 +103,68 @@ func (w *BaseWorker) handlePolicy(ctx context.Context, eval *sdk.ScalingEvaluati } logger := w.logger.With("policy_id", eval.Policy.ID, "target", eval.Policy.Target.Name) - checks := make(map[string]*checkHandler) - logger.Debug("received policy for evaluation") - handlersCtx, cancel := context.WithCancel(ctx) - defer cancel() + // Dispense taget plugin. + targetPlugin, err := w.pluginManager.Dispense(eval.Policy.Target.Name, plugins.PluginTypeTarget) + if err != nil { + return fmt.Errorf(`target plugin "%s" not initialized: %v`, eval.Policy.Target.Name, err) + } + targetInst, ok := targetPlugin.Plugin().(target.Target) + if !ok { + return fmt.Errorf(`"%s" is not a target plugin`, eval.Policy.Target.Name) + } - // Start check handlers. - for _, checkEval := range eval.CheckEvaluations { - checkHandler := newCheckHandler(logger, eval.Policy, checkEval, w.pluginManager) - checks[checkEval.Check.Name] = checkHandler - go checkHandler.start(handlersCtx) + // Fetch target status. + logger.Debug("fetching current count") + + currentStatus, err := w.runTargetStatus(targetInst, eval.Policy) + if err != nil { + return fmt.Errorf("failed to fetch current count: %v", err) + } + if !currentStatus.Ready { + return errTargetNotReady } + // Prepare handlers. + handlersCtx, cancel := context.WithCancel(ctx) + defer cancel() + // winningAction is the action to be executed after all checks' results are // reconciled. var winningAction *sdk.ScalingAction var winningHandler *checkHandler - // Initial results should return fairly quickly. - // Timeout if it is taking too long. - resultsTimeout := time.NewTimer(5 * time.Minute) + // Start check handlers. + for _, checkEval := range eval.CheckEvaluations { + checkHandler := newCheckHandler(logger, eval.Policy, checkEval, w.pluginManager) + + // Wrap target status call in a goroutine so we can listen for ctx as well. + var action *sdk.ScalingAction + var err error + doneCh := make(chan interface{}) + + go func() { + defer close(doneCh) + action, err = checkHandler.start(handlersCtx, currentStatus) + }() - // Wait for check results and pick the winner. - for check, handler := range checks { select { case <-ctx.Done(): - logger.Info("policy evaluation canceled") + w.logger.Info("stopping worker") return nil - case <-resultsTimeout.C: - return fmt.Errorf("timeout while waiting for policy check results") - case r := <-handler.results(): - if r.err != nil { - if r.err == errTargetNotReady { - logger.Info("target not ready") - return nil - } - - // TODO(luiz): properly handle errors. - logger.Warn("failed to evaluate check", "error", r.err, "check", check) - continue - } + case <-doneCh: + } - winningAction = sdk.PreemptScalingAction(winningAction, r.action) - if winningAction == r.action { - winningHandler = handler - } + if err != nil { + logger.Warn("failed to run check", "err", err) + continue } - } - // Stop and drain results timeout timer. - if !resultsTimeout.Stop() { - <-resultsTimeout.C + winningAction = sdk.PreemptScalingAction(winningAction, action) + if winningAction == action { + winningHandler = checkHandler + } } // At this point the checks have finished. Therefore emit of metric data @@ -170,32 +179,48 @@ func (w *BaseWorker) handlePolicy(ctx context.Context, eval *sdk.ScalingEvaluati logger.Trace(fmt.Sprintf("check %s selected", winningHandler.checkEval.Check.Name), "direction", winningAction.Direction, "count", winningAction.Count) - // Unblock winning handler and cancel the others. The default guards - // against the possibility of there being no receiver on the proceedCh. - for _, handler := range checks { - select { - case handler.proceedCh <- handler == winningHandler: - default: - } - } - // Measure how long it takes to invoke the scaling actions. This helps // understand the time taken to interact with the remote target and action // the scaling action. defer metrics.MeasureSinceWithLabels([]string{"scale", "invoke_ms"}, time.Now(), labels) - // Block until winning handler returns. + // If the policy is configured with dry-run:true then we set the + // action count to nil so its no-nop. This allows us to still + // submit the job, but not alter its state. + if val, ok := eval.Policy.Target.Config["dry-run"]; ok && val == "true" { + logger.Info("scaling dry-run is enabled, using no-op task group count") + winningAction.SetDryRun() + } + + if winningAction.Count == sdk.StrategyActionMetaValueDryRunCount { + logger.Debug("registering scaling event", + "count", currentStatus.Count, "reason", winningAction.Reason, "meta", winningAction.Meta) + } else { + logger.Info("scaling target", + "from", currentStatus.Count, "to", winningAction.Count, + "reason", winningAction.Reason, "meta", winningAction.Meta) + } + + // Last check for early exit before scaling the target, which we consider + // a non-preemptable action since we cannot be sure that a scaling action can + // be cancelled halfway through or undone. select { case <-ctx.Done(): - logger.Info("policy evaluation canceled") + w.logger.Info("stopping worker") return nil - case r := <-winningHandler.results(): - if r.err != nil { - return r.err - } - if r.action == nil { - return nil - } + default: + } + + // Scale the target. If we receive an error add this onto the result so the + // handler understand what do to. + err = w.runTargetScale(targetInst, eval.Policy, *winningAction) + if err != nil { + metrics.IncrCounter([]string{"scale", "invoke", "error_count"}, 1) + return fmt.Errorf("failed to scale target: %v", err) + } else { + logger.Info("successfully submitted scaling action to target", + "desired_count", winningAction.Count) + metrics.IncrCounter([]string{"scale", "invoke", "success_count"}, 1) } // Enforce the cooldown after a successful scaling event. @@ -205,19 +230,32 @@ func (w *BaseWorker) handlePolicy(ctx context.Context, eval *sdk.ScalingEvaluati return nil } +// runTargetStatus wraps the target.Status call to provide operational +// functionality. +func (w *BaseWorker) runTargetStatus(targetImpl target.Target, policy *sdk.ScalingPolicy) (*sdk.TargetStatus, error) { + // Trigger a metric measure to track latency of the call. + labels := []metrics.Label{{Name: "plugin_name", Value: policy.Target.Name}, {Name: "policy_id", Value: policy.ID}} + defer metrics.MeasureSinceWithLabels([]string{"plugin", "target", "status", "invoke_ms"}, time.Now(), labels) + + return targetImpl.Status(policy.Target.Config) +} + +// runTargetScale wraps the target.Scale call to provide operational +// functionality. +func (w *BaseWorker) runTargetScale(targetImpl target.Target, policy *sdk.ScalingPolicy, action sdk.ScalingAction) error { + // Trigger a metric measure to track latency of the call. + labels := []metrics.Label{{Name: "plugin_name", Value: policy.Target.Name}, {Name: "policy_id", Value: policy.ID}} + defer metrics.MeasureSinceWithLabels([]string{"plugin", "target", "scale", "invoke_ms"}, time.Now(), labels) + + return targetImpl.Scale(action, policy.Target.Config) +} + // checkHandler evaluates one of the checks of a policy. type checkHandler struct { logger hclog.Logger policy *sdk.ScalingPolicy checkEval *sdk.ScalingCheckEvaluation pluginManager *manager.PluginManager - resultCh chan checkHandlerResult - proceedCh chan bool -} - -type checkHandlerResult struct { - action *sdk.ScalingAction - err error } // newCheckHandler returns a new checkHandler instance. @@ -231,88 +269,51 @@ func newCheckHandler(l hclog.Logger, p *sdk.ScalingPolicy, c *sdk.ScalingCheckEv policy: p, checkEval: c, pluginManager: pm, - resultCh: make(chan checkHandlerResult), - proceedCh: make(chan bool), } } -// results returns a read-only version of resultCh. -func (h *checkHandler) results() <-chan checkHandlerResult { - return h.resultCh -} - // start begins the execution of the check handler. -// -// The process is split in two phases: -// -// 1) check evaluation -// The check handler uses the plugins defined in the policy and the check -// to calculate what the next scaling action should be (if any). -// -// 2) action execution -// If a scaling action is necessary, the check handler will call the -// target plugin to trigger a scaling event. -// -// Since there are multiple checks in a policy, after step 1 the action is -// sent back to the worker and the handler halts until an action is selected -// for execution. The check handler that produced the selected action is -// allowed to continue to step 2 while the others are cancelled. - -func (h *checkHandler) start(ctx context.Context) { - defer close(h.resultCh) - +func (h *checkHandler) start(ctx context.Context, currentStatus *sdk.TargetStatus) (*sdk.ScalingAction, error) { h.logger.Debug("received policy check for evaluation") - result := checkHandlerResult{} - - var targetInst target.Target var apmInst apm.APM var strategyInst strategy.Strategy // Dispense plugins. - targetPlugin, err := h.pluginManager.Dispense(h.policy.Target.Name, plugins.PluginTypeTarget) - if err != nil { - result.err = fmt.Errorf(`target plugin "%s" not initialized: %v`, h.policy.Target.Name, err) - h.resultCh <- result - return - } - targetInst = targetPlugin.Plugin().(target.Target) - apmPlugin, err := h.pluginManager.Dispense(h.checkEval.Check.Source, plugins.PluginTypeAPM) if err != nil { - result.err = fmt.Errorf(`apm plugin "%s" not initialized: %v`, h.checkEval.Check.Source, err) - h.resultCh <- result - return + return nil, fmt.Errorf(`apm plugin "%s" not initialized: %v`, h.checkEval.Check.Source, err) } - apmInst = apmPlugin.Plugin().(apm.APM) - - strategyPlugin, err := h.pluginManager.Dispense(h.checkEval.Check.Strategy.Name, plugins.PluginTypeStrategy) - if err != nil { - result.err = fmt.Errorf(`strategy plugin "%s" not initialized: %v`, h.checkEval.Check.Strategy.Name, err) - h.resultCh <- result - return + apmInst, ok := apmPlugin.Plugin().(apm.APM) + if !ok { + return nil, fmt.Errorf(`"%s" is not an APM plugin`, h.checkEval.Check.Source) } - strategyInst = strategyPlugin.Plugin().(strategy.Strategy) - // Fetch target status. - currentStatus, err := h.runTargetStatus(targetInst) + strategyPlugin, err := h.pluginManager.Dispense(h.checkEval.Check.Strategy.Name, plugins.PluginTypeStrategy) if err != nil { - result.err = fmt.Errorf("failed to fetch current count: %v", err) - h.resultCh <- result - return + return nil, fmt.Errorf(`strategy plugin "%s" not initialized: %v`, h.checkEval.Check.Strategy.Name, err) } - if !currentStatus.Ready { - result.err = errTargetNotReady - h.resultCh <- result - return + strategyInst, ok = strategyPlugin.Plugin().(strategy.Strategy) + if !ok { + return nil, fmt.Errorf(`"%s" is not a strategy plugin`, h.checkEval.Check.Strategy.Name) } // Query check's APM. - h.checkEval.Metrics, err = h.runAPMQuery(apmInst) + // Wrap call in a goroutine so we can listen for ctx as well. + apmQueryDoneCh := make(chan interface{}) + go func() { + defer close(apmQueryDoneCh) + h.checkEval.Metrics, err = h.runAPMQuery(apmInst) + }() + + select { + case <-ctx.Done(): + return nil, nil + case <-apmQueryDoneCh: + } + if err != nil { - result.err = fmt.Errorf("failed to query source: %v", err) - h.resultCh <- result - return + return nil, fmt.Errorf("failed to query source: %v", err) } // Make sure metrics are sorted consistently. @@ -320,16 +321,14 @@ func (h *checkHandler) start(ctx context.Context) { if len(h.checkEval.Metrics) == 0 { h.logger.Warn("no metrics available") - return + return &sdk.ScalingAction{Direction: sdk.ScaleDirectionNone}, nil } // Calculate new count using check's Strategy. h.logger.Debug("calculating new count", "count", currentStatus.Count) runResp, err := h.runStrategyRun(strategyInst, currentStatus.Count) if err != nil { - result.err = fmt.Errorf("failed to execute strategy: %v", err) - h.resultCh <- result - return + return nil, fmt.Errorf("failed to execute strategy: %v", err) } h.checkEval = runResp @@ -356,9 +355,7 @@ func (h *checkHandler) start(ctx context.Context) { h.checkEval.Action = minMaxAction } else { h.logger.Debug("nothing to do") - result.action = &sdk.ScalingAction{Direction: sdk.ScaleDirectionNone} - h.resultCh <- result - return + return &sdk.ScalingAction{Direction: sdk.ScaleDirectionNone}, nil } } @@ -371,82 +368,10 @@ func (h *checkHandler) start(ctx context.Context) { // Skip action if count doesn't change. if currentStatus.Count == h.checkEval.Action.Count { h.logger.Debug("nothing to do", "from", currentStatus.Count, "to", h.checkEval.Action.Count) - - result.action = &sdk.ScalingAction{Direction: sdk.ScaleDirectionNone} - h.resultCh <- result - return - } - - result.action = h.checkEval.Action - - // Send result back and wait to see if we should proceed. - h.resultCh <- result - select { - case <-ctx.Done(): - return - case proceed := <-h.proceedCh: - if !proceed { - h.logger.Debug("check not selected") - return - } + return &sdk.ScalingAction{Direction: sdk.ScaleDirectionNone}, nil } - // If the policy is configured with dry-run:true then we set the - // action count to nil so its no-nop. This allows us to still - // submit the job, but not alter its state. - if val, ok := h.policy.Target.Config["dry-run"]; ok && val == "true" { - h.logger.Info("scaling dry-run is enabled, using no-op task group count") - h.checkEval.Action.SetDryRun() - } - - if h.checkEval.Action.Count == sdk.StrategyActionMetaValueDryRunCount { - h.logger.Debug("registering scaling event", - "count", currentStatus.Count, "reason", h.checkEval.Action.Reason, "meta", h.checkEval.Action.Meta) - } else { - h.logger.Info("scaling target", - "from", currentStatus.Count, "to", h.checkEval.Action.Count, - "reason", h.checkEval.Action.Reason, "meta", h.checkEval.Action.Meta) - } - - // Scale the target. If we receive an error add this onto the result so the - // handler understand what do to. - if err = h.runTargetScale(targetInst, *h.checkEval.Action); err != nil { - result.err = fmt.Errorf("failed to scale target: %v", err) - h.logger.Error("failed to submit scaling action to target", "error", err) - metrics.IncrCounter([]string{"scale", "invoke", "error_count"}, 1) - } else { - h.logger.Info("successfully submitted scaling action to target", - "desired_count", h.checkEval.Action.Count) - metrics.IncrCounter([]string{"scale", "invoke", "success_count"}, 1) - } - - // Ensure we send a result otherwise the Worker.HandlePolicy routine will - // leak waiting endlessly for the result it will never receive, poor thing. - h.resultCh <- result -} - -// runTargetStatus wraps the target.Status call to provide operational -// functionality. -func (h *checkHandler) runTargetStatus(targetImpl target.Target) (*sdk.TargetStatus, error) { - - h.logger.Debug("fetching current count") - - // Trigger a metric measure to track latency of the call. - labels := []metrics.Label{{Name: "plugin_name", Value: h.policy.Target.Name}, {Name: "policy_id", Value: h.policy.ID}} - defer metrics.MeasureSinceWithLabels([]string{"plugin", "target", "status", "invoke_ms"}, time.Now(), labels) - - return targetImpl.Status(h.policy.Target.Config) -} - -// runTargetScale wraps the target.Scale call to provide operational -// functionality. -func (h *checkHandler) runTargetScale(targetImpl target.Target, action sdk.ScalingAction) error { - - // Trigger a metric measure to track latency of the call. - labels := []metrics.Label{{Name: "plugin_name", Value: h.policy.Target.Name}, {Name: "policy_id", Value: h.policy.ID}} - defer metrics.MeasureSinceWithLabels([]string{"plugin", "target", "scale", "invoke_ms"}, time.Now(), labels) - - return targetImpl.Scale(action, h.policy.Target.Config) + return h.checkEval.Action, nil } // runAPMQuery wraps the apm.Query call to provide operational functionality.