Skip to content

Commit

Permalink
Fix cancellation locking that I managed to screw up
Browse files Browse the repository at this point in the history
  • Loading branch information
evankanderson committed Jul 15, 2024
1 parent 81ef93c commit b4a4fe0
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 25 deletions.
24 changes: 13 additions & 11 deletions internal/engine/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,10 @@ func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {
// TODO: Make this timeout configurable
msgCtx := context.WithoutCancel(msg.Context())
msgCtx, shutdownCancel := context.WithCancel(msgCtx)
{
e.lock.Lock()
defer e.lock.Unlock()
e.cancels = append(e.cancels, &shutdownCancel)
}

e.lock.Lock()
e.cancels = append(e.cancels, &shutdownCancel)
e.lock.Unlock()

// Let's not share memory with the caller. Note that this does not copy Context
msg = msg.Copy()
Expand All @@ -127,9 +126,17 @@ func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {

ctx, cancel := context.WithTimeout(msgCtx, DefaultExecutionTimeout)
defer cancel()
defer func() {
e.lock.Lock()
e.cancels = slices.DeleteFunc(e.cancels, func(cf *context.CancelFunc) bool {
return cf == &shutdownCancel
})
e.lock.Unlock()
}()

ctx = engcontext.WithEntityContext(ctx, &engcontext.EntityContext{
Project: engcontext.Project{ID: inf.ProjectID},
// TODO: extract Provider name from ProviderID
// TODO: extract Provider name from ProviderID?
})

ts := minderlogger.BusinessRecord(ctx)
Expand Down Expand Up @@ -175,11 +182,6 @@ func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {
if err := e.evt.Publish(events.TopicQueueEntityFlush, msg); err != nil {
logger.Err(err).Msg("error publishing flush event")
}
e.lock.Lock()
defer e.lock.Unlock()
e.cancels = slices.DeleteFunc(e.cancels, func(cf *context.CancelFunc) bool {
return cf == &shutdownCancel
})
}()

return nil
Expand Down
36 changes: 22 additions & 14 deletions internal/engine/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func TestExecutorEventHandler_handleEntityEvent(t *testing.T) {
repositoryID := uuid.New()
executionID := uuid.New()

parallelOps := 2

// -- end expectations

evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{
Expand Down Expand Up @@ -80,9 +82,11 @@ func TestExecutorEventHandler_handleEntityEvent(t *testing.T) {
WithExecutionID(executionID)

executor := mockengine.NewMockExecutor(ctrl)
executor.EXPECT().
EvalEntityEvent(gomock.Any(), gomock.Eq(eiw)).
Return(nil)
for i := 0; i < parallelOps; i++ {
executor.EXPECT().
EvalEntityEvent(gomock.Any(), gomock.Eq(eiw)).
Return(nil)
}

handler := engine.NewExecutorEventHandler(
ctx,
Expand All @@ -97,19 +101,23 @@ func TestExecutorEventHandler_handleEntityEvent(t *testing.T) {
msg, err := eiw.BuildMessage()
require.NoError(t, err, "expected no error")

// Run in the background
go func() {
t.Log("Running entity event handler")
require.NoError(t, handler.HandleEntityEvent(msg), "expected no error")
}()
// Run in the background, twice
for i := 0; i < parallelOps; i++ {
go func() {
t.Log("Running entity event handler")
require.NoError(t, handler.HandleEntityEvent(msg), "expected no error")
}()
}

// expect flush
t.Log("waiting for flush")
result := <-queued
require.NotNil(t, result)
require.Equal(t, providerID.String(), msg.Metadata.Get(entities.ProviderIDEventKey))
require.Equal(t, "repository", msg.Metadata.Get(entities.EntityTypeEventKey))
require.Equal(t, projectID.String(), msg.Metadata.Get(entities.ProjectIDEventKey))
for i := 0; i < parallelOps; i++ {
t.Log("waiting for flush")
result := <-queued
require.NotNil(t, result)
require.Equal(t, providerID.String(), msg.Metadata.Get(entities.ProviderIDEventKey))
require.Equal(t, "repository", msg.Metadata.Get(entities.EntityTypeEventKey))
require.Equal(t, projectID.String(), msg.Metadata.Get(entities.ProjectIDEventKey))
}

require.NoError(t, evt.Close(), "expected no error")

Expand Down

0 comments on commit b4a4fe0

Please sign in to comment.