Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use project as targeting key, propagate message context through entity evaluation #3827

Merged
merged 3 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 56 additions & 16 deletions internal/engine/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package engine
import (
"context"
"fmt"
"slices"
"sync"
"time"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/rs/zerolog"

"github.com/stacklok/minder/internal/engine/engcontext"
"github.com/stacklok/minder/internal/engine/entities"
"github.com/stacklok/minder/internal/events"
minderlogger "github.com/stacklok/minder/internal/logger"
Expand All @@ -44,10 +46,12 @@ type ExecutorEventHandler struct {
evt events.Publisher
handlerMiddleware []message.HandlerMiddleware
wgEntityEventExecution *sync.WaitGroup
// terminationcontext is used to terminate the executor
// when the server is shutting down.
terminationcontext context.Context
executor Executor
executor Executor
// cancels are a set of cancel functions for current entity events in flight.
// This allows us to cancel rule evaluation directly when terminationContext
// is cancelled.
cancels []*context.CancelFunc
lock sync.Mutex
}

// NewExecutorEventHandler creates the event handler for the executor
Expand All @@ -57,13 +61,23 @@ func NewExecutorEventHandler(
handlerMiddleware []message.HandlerMiddleware,
executor Executor,
) *ExecutorEventHandler {
return &ExecutorEventHandler{
eh := &ExecutorEventHandler{
evt: evt,
wgEntityEventExecution: &sync.WaitGroup{},
terminationcontext: ctx,
handlerMiddleware: handlerMiddleware,
executor: executor,
}
go func() {
<-ctx.Done()
eh.lock.Lock()
defer eh.lock.Unlock()

for _, cancel := range eh.cancels {
(*cancel)()
}
}()
Comment on lines +70 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I got this right, you're still waiting for Done() on what was originally the terminationcontext, but keep track of all downstream contexts to cancel them as well, right?

If yes, was this source of some sort of bug?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I really want is Context.TerminateOnAdditionalContext(ctx, additional). But that doesn't exist.

The problem is that handleEntityEvent is (for performance) extracting the event processing out of the watermill message lifecycle, but we still want to be able to cancel message processing during process shutdown.

The previous way we were doing this was to basically take the background context of the server and sub it in for the message context, but that means that we're losing all the message context in the event handler. The new pattern is to use Context.WithoutCancel to escape from the cancellation of the Watermill event handler (because it only allows one message in flight), but then store the cancels for this loop.

If we didn't have the performance problem with Watermill, we still might want to be able to early-terminate the in-flight event handling when the server shuts down, but we'd do that by passing the termination context to Watermill and have it cancel all the contexts. But when we call WithoutCancel, we end up putting that work on ourselves.

(I'm also back-intuiting some of this from #1654)


return eh
}

// Register implements the Consumer interface.
Expand All @@ -79,9 +93,23 @@ func (e *ExecutorEventHandler) Wait() {
// HandleEntityEvent handles events coming from webhooks/signals
// as well as the init event.
func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {
// Grab the context before making a copy of the message
msgCtx := msg.Context()
// Let's not share memory with the caller

// NOTE: we're _deliberately_ "escaping" from the parent context's Cancel/Done
// completion, because the default watermill behavior for both Go channels and
// SQL is to process messages sequentially, but we need additional parallelism
// beyond that. When we switch to a different message processing system, we
// should aim to remove this goroutine altogether and have the messaging system
// provide the parallelism.
// We _do_ still want to cancel on shutdown, however.
// TODO: Make this timeout configurable
msgCtx := context.WithoutCancel(msg.Context())
msgCtx, shutdownCancel := context.WithCancel(msgCtx)

e.lock.Lock()
e.cancels = append(e.cancels, &shutdownCancel)
e.lock.Unlock()

// Let's not share memory with the caller. Note that this does not copy Context
msg = msg.Copy()

inf, err := entities.ParseEntityEvent(msg)
Expand All @@ -95,11 +123,23 @@ func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {
if inf.Type == pb.Entity_ENTITY_ARTIFACTS {
time.Sleep(ArtifactSignatureWaitPeriod)
}
// TODO: Make this timeout configurable
ctx, cancel := context.WithTimeout(e.terminationcontext, DefaultExecutionTimeout)
defer cancel()

ts := minderlogger.BusinessRecord(msgCtx)
ctx, cancel := context.WithTimeout(msgCtx, DefaultExecutionTimeout)
defer cancel()
defer func() {
e.lock.Lock()
e.cancels = slices.DeleteFunc(e.cancels, func(cf *context.CancelFunc) bool {
return cf == &shutdownCancel
})
e.lock.Unlock()
}()

ctx = engcontext.WithEntityContext(ctx, &engcontext.EntityContext{
Project: engcontext.Project{ID: inf.ProjectID},
// TODO: extract Provider name from ProviderID?
})

ts := minderlogger.BusinessRecord(ctx)
ctx = ts.WithTelemetry(ctx)

logger := zerolog.Ctx(ctx)
Expand All @@ -116,14 +156,14 @@ func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {
// here even though we also record it in the middleware because the evaluation
// is done in a separate goroutine which usually still runs after the middleware
// had already recorded the telemetry.
logMsg := zerolog.Ctx(ctx).Info()
logMsg := logger.Info()
if err != nil {
logMsg = zerolog.Ctx(ctx).Error()
logMsg = logger.Error()
}
ts.Record(logMsg).Send()

if err != nil {
zerolog.Ctx(ctx).Info().
logger.Info().
Str("project", inf.ProjectID.String()).
Str("provider_id", inf.ProviderID.String()).
Str("entity", inf.Type.String()).
Expand Down
36 changes: 22 additions & 14 deletions internal/engine/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func TestExecutorEventHandler_handleEntityEvent(t *testing.T) {
repositoryID := uuid.New()
executionID := uuid.New()

parallelOps := 2

// -- end expectations

evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{
Expand Down Expand Up @@ -80,9 +82,11 @@ func TestExecutorEventHandler_handleEntityEvent(t *testing.T) {
WithExecutionID(executionID)

executor := mockengine.NewMockExecutor(ctrl)
executor.EXPECT().
EvalEntityEvent(gomock.Any(), gomock.Eq(eiw)).
Return(nil)
for i := 0; i < parallelOps; i++ {
executor.EXPECT().
EvalEntityEvent(gomock.Any(), gomock.Eq(eiw)).
Return(nil)
}

handler := engine.NewExecutorEventHandler(
ctx,
Expand All @@ -97,19 +101,23 @@ func TestExecutorEventHandler_handleEntityEvent(t *testing.T) {
msg, err := eiw.BuildMessage()
require.NoError(t, err, "expected no error")

// Run in the background
go func() {
t.Log("Running entity event handler")
require.NoError(t, handler.HandleEntityEvent(msg), "expected no error")
}()
// Run in the background, twice
for i := 0; i < parallelOps; i++ {
go func() {
t.Log("Running entity event handler")
require.NoError(t, handler.HandleEntityEvent(msg), "expected no error")
}()
}

// expect flush
t.Log("waiting for flush")
result := <-queued
require.NotNil(t, result)
require.Equal(t, providerID.String(), msg.Metadata.Get(entities.ProviderIDEventKey))
require.Equal(t, "repository", msg.Metadata.Get(entities.EntityTypeEventKey))
require.Equal(t, projectID.String(), msg.Metadata.Get(entities.ProjectIDEventKey))
for i := 0; i < parallelOps; i++ {
t.Log("waiting for flush")
result := <-queued
require.NotNil(t, result)
require.Equal(t, providerID.String(), msg.Metadata.Get(entities.ProviderIDEventKey))
require.Equal(t, "repository", msg.Metadata.Get(entities.EntityTypeEventKey))
require.Equal(t, projectID.String(), msg.Metadata.Get(entities.ProjectIDEventKey))
}

require.NoError(t, evt.Close(), "expected no error")

Expand Down
6 changes: 4 additions & 2 deletions internal/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ func fromContext(ctx context.Context) openfeature.EvaluationContext {
// Note: engine.EntityFromContext is best-effort, so these values may be zero.
ec := engcontext.EntityFromContext(ctx)
return openfeature.NewEvaluationContext(
jwt.GetUserSubjectFromContext(ctx),
ec.Project.ID.String(),
map[string]interface{}{
"project": ec.Project.ID.String(),
"project": ec.Project.ID.String(),
// TODO: is this useful, given how provider names are used?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO we should consider removing it, but I don't have strong feelings about it right now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're probably right.

"provider": ec.Provider.Name,
"user": jwt.GetUserSubjectFromContext(ctx),
},
)
}
Expand Down