Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't send 429 when max concurrency is reached #19

Merged
merged 1 commit into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/executor/executortype/poolmgr/gp.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ func (gp *GenericPool) getFuncSvc(ctx context.Context, fn *fv1.Function) (*fscac

gp.fsCache.PodToFsvc.Store(pod.GetObjectMeta().GetName(), fsvc)
gp.podFSVCMap.Store(pod.ObjectMeta.Name, []interface{}{crd.CacheKey(fsvc.Function), fsvc.Address})
gp.fsCache.AddFunc(ctx, *fsvc, fn.GetRequestPerPod())
gp.fsCache.AddFunc(ctx, *fsvc, fn.GetRequestPerPod(), fn.GetConcurrency())

logger.Info("added function service",
zap.String("pod", pod.ObjectMeta.Name),
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/fscache/functionServiceCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func (fsc *FunctionServiceCache) GetByFunctionUID(uid types.UID) (*FuncSvc, erro
}

// AddFunc adds a function service to pool cache.
func (fsc *FunctionServiceCache) AddFunc(ctx context.Context, fsvc FuncSvc, requestsPerPod int) {
fsc.connFunctionCache.SetSvcValue(ctx, crd.CacheKey(fsvc.Function), fsvc.Address, &fsvc, fsvc.CPULimit, requestsPerPod)
func (fsc *FunctionServiceCache) AddFunc(ctx context.Context, fsvc FuncSvc, requestsPerPod, concurrency int) {
fsc.connFunctionCache.SetSvcValue(ctx, crd.CacheKey(fsvc.Function), fsvc.Address, &fsvc, fsvc.CPULimit, requestsPerPod, concurrency)
now := time.Now()
fsvc.Ctime = now
fsvc.Atime = now
Expand Down
125 changes: 108 additions & 17 deletions pkg/executor/fscache/poolcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"io"
"math/rand"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -45,15 +46,15 @@ const (
type (
funcSvcInfo struct {
val *FuncSvc
activeRequests int // number of requests served by function pod
activeRequests int // number of requests currently being sent to the pod (can be greater than requestsPerPod if max concurrency is reached)
currentCPUUsage resource.Quantity // current cpu usage of the specialized function pod
cpuLimit resource.Quantity // if currentCPUUsage is more than cpuLimit cache miss occurs in getValue request
}

funcSvcGroup struct {
svcWaiting int
svcs map[string]*funcSvcInfo
queue *Queue
svcWaiting int // number of requests waiting for a pod to finish specializing
svcs map[string]*funcSvcInfo // map of pod ip -> specialized pod
queue *Queue // requests waiting for a pod to finish specializing (does not include the first request that caused the pod to specialize)
}

// PoolCache implements a simple cache implementation having values mapped by two keys [function][address].
Expand All @@ -71,10 +72,10 @@ type (
address string
dumpWriter io.Writer
value *FuncSvc
requestsPerPod int
requestsPerPod int // number of requests to send to a single pod before specializing another one
cpuUsage resource.Quantity
responseChannel chan *response
concurrency int
concurrency int // maximum number of pods that can be specialized for this function
}
response struct {
error
Expand Down Expand Up @@ -115,13 +116,16 @@ func (c *PoolCache) service() {
case getValue:
funcSvcGroup, ok := c.cache[req.function]
if !ok {
// this function has never been specialized before, return not found so that the executor specializes a pod
c.cache[req.function] = NewFuncSvcGroup()
c.cache[req.function].svcWaiting++
resp.error = ferror.MakeError(ferror.ErrorNotFound,
fmt.Sprintf("function Name '%v' not found", req.function))
req.responseChannel <- resp
continue
}

// this function has been specialized before, check if there's a specialized pod with room for another request
found := false
totalActiveRequests := 0
for addr := range funcSvcGroup.svcs {
Expand All @@ -138,13 +142,21 @@ func (c *PoolCache) service() {
break
}
}

if found {
// we found a specialized pod with room for another request, use it
req.responseChannel <- resp
continue
}
specializationInProgress := funcSvcGroup.svcWaiting - funcSvcGroup.queue.Len()
capacity := ((specializationInProgress + len(funcSvcGroup.svcs)) * req.requestsPerPod) - (totalActiveRequests + funcSvcGroup.svcWaiting)
if capacity > 0 {

// none of our specialized pods have room for another request
// check if we're already specializing a pod, and if we are, check if it has room for another request
numPodsSpecializing := funcSvcGroup.svcWaiting - funcSvcGroup.queue.Len()
maxRequestsForSpecializingPods := numPodsSpecializing * req.requestsPerPod

if maxRequestsForSpecializingPods-funcSvcGroup.svcWaiting > 0 {
thegedge marked this conversation as resolved.
Show resolved Hide resolved
// we're already specializing a pod and it has room for another request
// increment the number of requests waiting for a pod to finish specializing and push this request onto the queue
funcSvcGroup.svcWaiting++
svcWait := &svcWait{
svcChannel: make(chan *FuncSvc),
Expand All @@ -156,14 +168,55 @@ func (c *PoolCache) service() {
continue
}

// concurrency should not be set to zero and
//sum of specialization in progress and specialized pods should be less then req.concurrency
if req.concurrency > 0 && (specializationInProgress+len(funcSvcGroup.svcs)) >= req.concurrency {
resp.error = ferror.MakeError(ferror.ErrorTooManyRequests, fmt.Sprintf("function '%s' concurrency '%d' limit reached.", req.function, req.concurrency))
} else {
funcSvcGroup.svcWaiting++
resp.error = ferror.MakeError(ferror.ErrorNotFound, fmt.Sprintf("function '%s' all functions are busy", req.function))
numPodsSpecialized := len(funcSvcGroup.svcs)

// we're either not specializing a pod, or we are and none of them have room for another request
// check if we can specialize another pod
if req.concurrency > 0 && numPodsSpecialized+numPodsSpecializing >= req.concurrency {
// we can't specialize another pod because our specialized pods + specializing pods is >= this function's concurrency limit
// check if we have any pods that are already specialized
if numPodsSpecialized > 0 {
// we have specialized pod(s), just pick one at random and use it for this request
svc := randomSvc(funcSvcGroup.svcs)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious what you all think about this scenario:

  1. One specialized pod
  2. A massive burst of requests, going well beyond the max requests
  3. All requests that spill over the max requests go to that one specialized pod because the others are specializing

Could we do more harm than good by having one pod take those request? Should we consider an upper bound where we start putting these requests on the queue for a specializing pod to become available?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, how about this instead:

if numPodsSpecializing > 0 {
	// we still have pods that are specializing
	// increment the number of requests waiting for a pod to finish specializing and push this request onto the queue
} else {
	// we have the maximum amount of specialized pods, just pick one at random and use it for this request
}

☝️ This queues the requests for the specializing pods instead of the specialized one(s). We already have logic below to randomly distribute the queue among all the pods once we've specialized the last one.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think in the absolute worst-case scenario outlined above, I'd rather have those requests distributed, or at least some of them, distributed across the specializing pods to not overwhelm the one specialized pod. Some questions that come to my mind:

  • Do we have any sense of what would be overwhelming? I know node could definitely take on a ton of requests, but eventually it would topple over.
  • Latency-wise, anything we can measure? Would it be better to wait for specialization, or have the specialized pod slowly work through the requests?
  • If specialization fails, we keep things in the queue. Does that mean, for now, it would be best to send it at the specialized pod instead and follow up where we pop things off the queue when an existing specialized pod can serve more requests, i.e. when markAvailable?

I'm not sure what's best, but I'll let you make that call!

otelUtils.LoggerWithTraceID(req.ctx, c.logger).
Info("max concurrency reached, sending request to random pod",
zap.String("function", req.function),
zap.String("address", svc.val.Address),
zap.Int("active_requests", svc.activeRequests),
zap.Int("svc_waiting", funcSvcGroup.svcWaiting),
zap.Int("queue_len", funcSvcGroup.queue.Len()),
)
Comment on lines +181 to +188

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log feels like it could be fairly noisy for our big apps. Is it intended to be temporary? Should we make it a debug log?


svc.activeRequests++
resp.value = svc.val
req.responseChannel <- resp
continue
} else {
// we don't have any specialized pods yet, they're all still specializing
// increment the number of requests waiting for a pod to finish specializing and push this request onto the queue
otelUtils.LoggerWithTraceID(req.ctx, c.logger).
Info("max concurrency reached, queuing request for specializing pod",
zap.String("function", req.function),
zap.Int("svc_waiting", funcSvcGroup.svcWaiting),
zap.Int("queue_len", funcSvcGroup.queue.Len()),
)

funcSvcGroup.svcWaiting++
svcWait := &svcWait{
svcChannel: make(chan *FuncSvc),
ctx: req.ctx,
}
resp.svcWaitValue = svcWait
funcSvcGroup.queue.Push(svcWait)
req.responseChannel <- resp
continue
}
}

// we can specialize another pod
// increment the number of requests waiting for a pod to finish specializing and return not found so that the executor specializes one

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3 this comment for clarifying this causes specialization to happen!

funcSvcGroup.svcWaiting++
resp.error = ferror.MakeError(ferror.ErrorNotFound, fmt.Sprintf("function '%s' all functions are busy", req.function))
req.responseChannel <- resp
case setValue:
if _, ok := c.cache[req.function]; !ok {
Expand All @@ -173,8 +226,10 @@ func (c *PoolCache) service() {
c.cache[req.function].svcs[req.address] = &funcSvcInfo{}
}
c.cache[req.function].svcs[req.address].val = req.value
// we increment the active requests here because the pod that was just set is being used to serve the request that caused it to specialize
c.cache[req.function].svcs[req.address].activeRequests++
if c.cache[req.function].svcWaiting > 0 {
// we decrement for same reason above, the request that was waiting for the pod to finish specializing is now being served
c.cache[req.function].svcWaiting--
svcCapacity := req.requestsPerPod - c.cache[req.function].svcs[req.address].activeRequests
queueLen := c.cache[req.function].queue.Len()
Expand All @@ -194,6 +249,29 @@ func (c *PoolCache) service() {
close(popped.svcChannel)
c.cache[req.function].svcWaiting--
}
if len(c.cache[req.function].svcs) == req.concurrency {
// this is the last pod this function will specialize, make sure all the queued requests are served
otelUtils.LoggerWithTraceID(req.ctx, c.logger).
Info("max concurrency reached, sending queued requests to random pods",
zap.String("function", req.function),
zap.Int("svc_waiting", c.cache[req.function].svcWaiting),
zap.Int("queue_len", c.cache[req.function].queue.Len()),
)

for {
popped := c.cache[req.function].queue.Pop()
if popped == nil {
break
}
if popped.ctx.Err() == nil {
svc := randomSvc(c.cache[req.function].svcs)
popped.svcChannel <- svc.val
svc.activeRequests++
}
close(popped.svcChannel)
c.cache[req.function].svcWaiting--
}
}
}
if c.logger.Core().Enabled(zap.DebugLevel) {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with setValue", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function].svcs[req.address].activeRequests))
Expand Down Expand Up @@ -334,7 +412,7 @@ func (c *PoolCache) ListAvailableValue() []*FuncSvc {
}

// SetValue marks the value at key [function][address] as active(begin used)
func (c *PoolCache) SetSvcValue(ctx context.Context, function, address string, value *FuncSvc, cpuLimit resource.Quantity, requestsPerPod int) {
func (c *PoolCache) SetSvcValue(ctx context.Context, function, address string, value *FuncSvc, cpuLimit resource.Quantity, requestsPerPod, concurrency int) {
respChannel := make(chan *response)
c.requestChannel <- &request{
ctx: ctx,
Expand All @@ -345,6 +423,7 @@ func (c *PoolCache) SetSvcValue(ctx context.Context, function, address string, v
cpuUsage: cpuLimit,
requestsPerPod: requestsPerPod,
responseChannel: respChannel,
concurrency: concurrency,
}
}

Expand Down Expand Up @@ -403,3 +482,15 @@ func (c *PoolCache) LogFnSvcGroup(ctx context.Context, file io.Writer) error {
resp := <-respChannel
return resp.error
}

func randomSvc(svcs map[string]*funcSvcInfo) *funcSvcInfo {
i := 0
iToChoose := rand.Intn(len(svcs))
for _, svc := range svcs {
if i == iToChoose {
return svc
}
i++
}
panic("unreachable")
}