-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Evaluation broker #282
Evaluation broker #282
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no significant problems. one potential (but unlikely) race condition and some comments about logging.
👍👍👍
@@ -15,6 +15,12 @@ func decodeFile(file string, p *sdk.ScalingPolicy) error { | |||
return err | |||
} | |||
|
|||
// Assume file policies are cluster policies unless specificied. | |||
// TODO: revisit this assumption. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is as good as anything... until we get our hands dirty with this, i'm not sure which policy type we'd be more likely to put in a file source (and, therefore, which one should be favored with "default" type).
@@ -199,6 +201,12 @@ func (s *Source) canonicalizePolicy(p *sdk.ScalingPolicy) { | |||
return | |||
} | |||
|
|||
// Assume a policy coming from Nomad without a type is a horizontal policy. | |||
// TODO: review this assumption. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one makes sense... moving forward with the next nomad release, all policies should have a type. so, a nomad policy without a type is from a previous version of nomad, and the only policy types in previous versions of nomad were "horizontal"
default: | ||
} | ||
|
||
eval, token, err := w.broker.Dequeue(ctx, w.queue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a blocking call, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, it will block until there's work in the queue or return nil
if ctx
is closed.
@@ -70,6 +73,10 @@ func (a *Agent) Run() error { | |||
policyEvalCh := a.setupPolicyManager() | |||
go a.policyManager.Run(ctx, policyEvalCh) | |||
|
|||
// Launch eval broker and workers. | |||
a.evalBroker = policyeval.NewBroker(a.logger.ResetNamed("policy_eval"), 5*time.Minute, 3) | |||
a.initWorkers(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may want to make this dynamic later, for autoscaling the number of brokers or simply SIGHUP'ing config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was trying to decide between a global config or a per-policy value.
I think per-policy would be better since it can be configured as needed, but global is appealing because you can just set it to your worst-case value.
So probably both? 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comments alongside those of @cgbaker but its look very nice! 🥳
changelog: add entry for #282.
This PR introduces the evaluation broker, which is responsible for storing, deduping and controlling the distribution and flow of policy evaluation requests to workers.
The eval requests are stored in a set of heaps, sorted by priority and create time (FIFO).
Evals are picked from the broker by workers and they must ACK the evaluation if it completes successful or NACK it otherwise. If an ACK doesn't arrive within the deadline (5 min for now, but should probably be configurable per policy) the eval is considered NACK'd.
A new agent configuration block was also added to control the number of workers for each policy type: