Skip to content

Commit

Permalink
Write workflow and node execution events asynchronously (flyteorg#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Apr 9, 2021
1 parent 8825c6d commit f4f7acf
Show file tree
Hide file tree
Showing 34 changed files with 746 additions and 347 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package implementations

import (
"context"

"github.com/flyteorg/flyteadmin/pkg/async/events/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories"
"github.com/flyteorg/flyteadmin/pkg/repositories/transformers"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
)

// This event writer acts to asynchronously persist node execution events. As flytepropeller sends node
// events, node execution processing doesn't have to wait on these to be committed.
type nodeExecutionEventWriter struct {
db repositories.RepositoryInterface
events chan admin.NodeExecutionEventRequest
}

func (w *nodeExecutionEventWriter) Write(event admin.NodeExecutionEventRequest) {
w.events <- event
}

func (w *nodeExecutionEventWriter) Run() {
for event := range w.events {
eventModel, err := transformers.CreateNodeExecutionEventModel(event)
if err != nil {
logger.Warnf(context.TODO(), "Failed to transform event [%+v] to database model with err [%+v]", event, err)
continue
}
err = w.db.NodeExecutionEventRepo().Create(context.TODO(), *eventModel)
if err != nil {
// It's okay to be lossy here. These events aren't used to fetch execution state but rather as a convenience
// to replay and understand the event execution timeline.
logger.Warnf(context.TODO(), "Failed to write event [%+v] to database with err [%+v]", event, err)
}
}
}

func NewNodeExecutionEventWriter(db repositories.RepositoryInterface, bufferSize int) interfaces.NodeExecutionEventWriter {
return &nodeExecutionEventWriter{
db: db,
events: make(chan admin.NodeExecutionEventRequest, bufferSize),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package implementations

import (
"testing"

"github.com/flyteorg/flyteadmin/pkg/repositories/mocks"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
event2 "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
)

func TestNodeExecutionEventWriter(t *testing.T) {
db := mocks.NewMockRepository()

event := admin.NodeExecutionEventRequest{
RequestId: "request_id",
Event: &event2.NodeExecutionEvent{
Id: &core.NodeExecutionIdentifier{
NodeId: "node_id",
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "exec_name",
},
},
},
}

nodeExecEventRepo := mocks.NodeExecutionEventRepoInterface{}
nodeExecEventRepo.On("Create", event).Return(nil)
db.(*mocks.MockRepository).NodeExecutionEventRepoIface = &nodeExecEventRepo
writer := NewNodeExecutionEventWriter(db, 100)
// Assert we can write an event using the buffered channel without holding up this process.
writer.Write(event)
go func() { writer.Run() }()
close(writer.(*nodeExecutionEventWriter).events)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package implementations

import (
"context"

"github.com/flyteorg/flyteadmin/pkg/async/events/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories"
"github.com/flyteorg/flyteadmin/pkg/repositories/transformers"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
)

// This event writer acts to asynchronously persist workflow execution events. As flytepropeller sends workflow
// events, workflow execution processing doesn't have to wait on these to be committed.
type workflowExecutionEventWriter struct {
db repositories.RepositoryInterface
events chan admin.WorkflowExecutionEventRequest
}

func (w *workflowExecutionEventWriter) Write(event admin.WorkflowExecutionEventRequest) {
w.events <- event
}

func (w *workflowExecutionEventWriter) Run() {
for event := range w.events {
eventModel, err := transformers.CreateExecutionEventModel(event)
if err != nil {
logger.Warnf(context.TODO(), "Failed to transform event [%+v] to database model with err [%+v]", event, err)
continue
}
err = w.db.ExecutionEventRepo().Create(context.TODO(), *eventModel)
if err != nil {
// It's okay to be lossy here. These events aren't used to fetch execution state but rather as a convenience
// to replay and understand the event execution timeline.
logger.Warnf(context.TODO(), "Failed to write event [%+v] to database with err [%+v]", event, err)
}
}
}

func NewWorkflowExecutionEventWriter(db repositories.RepositoryInterface, bufferSize int) interfaces.WorkflowExecutionEventWriter {
return &workflowExecutionEventWriter{
db: db,
events: make(chan admin.WorkflowExecutionEventRequest, bufferSize),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package implementations

import (
"testing"

"github.com/flyteorg/flyteadmin/pkg/repositories/mocks"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
event2 "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
)

func TestWorkflowExecutionEventWriter(t *testing.T) {
db := mocks.NewMockRepository()

event := admin.WorkflowExecutionEventRequest{
RequestId: "request_id",
Event: &event2.WorkflowExecutionEvent{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "exec_name",
},
},
}

workflowExecEventRepo := mocks.ExecutionEventRepoInterface{}
workflowExecEventRepo.On("Create", event).Return(nil)
db.(*mocks.MockRepository).ExecutionEventRepoIface = &workflowExecEventRepo
writer := NewWorkflowExecutionEventWriter(db, 100)
// Assert we can write an event using the buffered channel without holding up this process.
writer.Write(event)
go func() { writer.Run() }()
close(writer.(*workflowExecutionEventWriter).events)
}
12 changes: 12 additions & 0 deletions flyteadmin/pkg/async/events/interfaces/node_execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package interfaces

import (
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
)

//go:generate mockery -name=NodeExecutionEventWriter -output=../mocks -case=underscore

type NodeExecutionEventWriter interface {
Run()
Write(nodeExecutionEvent admin.NodeExecutionEventRequest)
}
12 changes: 12 additions & 0 deletions flyteadmin/pkg/async/events/interfaces/workflow_execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package interfaces

import (
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
)

//go:generate mockery -name=WorkflowExecutionEventWriter -output=../mocks -case=underscore

type WorkflowExecutionEventWriter interface {
Run()
Write(workflowExecutionEvent admin.WorkflowExecutionEventRequest)
}
24 changes: 24 additions & 0 deletions flyteadmin/pkg/async/events/mocks/node_execution_event_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 12 additions & 10 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/storage"

eventWriter "github.com/flyteorg/flyteadmin/pkg/async/events/interfaces"
"github.com/flyteorg/flyteadmin/pkg/async/notifications"
notificationInterfaces "github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyteadmin/pkg/errors"
Expand Down Expand Up @@ -90,6 +91,7 @@ type ExecutionManager struct {
resourceManager interfaces.ResourceInterface
qualityOfServiceAllocator executions.QualityOfServiceAllocator
eventPublisher notificationInterfaces.Publisher
dbEventWriter eventWriter.WorkflowExecutionEventWriter
}

func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context {
Expand Down Expand Up @@ -1001,18 +1003,13 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
request.Event.ExecutionId, err)
return nil, err
}
executionEventModel, err := transformers.CreateExecutionEventModel(request)
if err != nil {
logger.Debugf(ctx, "failed to transform workflow execution event %s for [%+v] after receiving event with err: %v",
request.RequestId, request.Event.ExecutionId, err)
return nil, err
}
err = m.db.ExecutionRepo().Update(ctx, *executionEventModel, *executionModel)
err = m.db.ExecutionRepo().Update(ctx, *executionModel)
if err != nil {
logger.Debugf(ctx, "Failed to update execution with CreateWorkflowEvent [%+v] with err %v",
request, err)
return nil, err
}
m.dbEventWriter.Write(request)

if request.Event.Phase == core.WorkflowExecution_RUNNING {
// Workflow executions are created in state "UNDEFINED". All the time up until a RUNNING event is received is
Expand Down Expand Up @@ -1098,7 +1095,7 @@ func (m *ExecutionManager) GetExecutionData(
}
// Update model so as not to offload again.
executionModel.InputsURI = newInputsURI
if err := m.db.ExecutionRepo().UpdateExecution(ctx, *executionModel); err != nil {
if err := m.db.ExecutionRepo().Update(ctx, *executionModel); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -1296,7 +1293,7 @@ func (m *ExecutionManager) TerminateExecution(
logger.Debugf(ctx, "failed to add abort metadata for execution [%+v] with err: %v", request.Id, err)
return nil, err
}
err = m.db.ExecutionRepo().UpdateExecution(ctx, executionModel)
err = m.db.ExecutionRepo().Update(ctx, executionModel)
if err != nil {
logger.Debugf(ctx, "failed to save abort cause for terminated execution: %+v with err: %v", request.Id, err)
return nil, err
Expand Down Expand Up @@ -1332,7 +1329,11 @@ func newExecutionSystemMetrics(scope promutils.Scope) executionSystemMetrics {
}
}

func NewExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, workflowExecutor workflowengineInterfaces.Executor, systemScope promutils.Scope, userScope promutils.Scope, publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface, workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface, eventPublisher notificationInterfaces.Publisher) interfaces.ExecutionInterface {
func NewExecutionManager(db repositories.RepositoryInterface, config runtimeInterfaces.Configuration,
storageClient *storage.DataStore, workflowExecutor workflowengineInterfaces.Executor, systemScope promutils.Scope,
userScope promutils.Scope, publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface,
workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface,
eventPublisher notificationInterfaces.Publisher, eventWriter eventWriter.WorkflowExecutionEventWriter) interfaces.ExecutionInterface {
queueAllocator := executions.NewQueueAllocator(config, db)
systemMetrics := newExecutionSystemMetrics(systemScope)

Expand Down Expand Up @@ -1363,6 +1364,7 @@ func NewExecutionManager(db repositories.RepositoryInterface, config runtimeInte
resourceManager: resourceManager,
qualityOfServiceAllocator: executions.NewQualityOfServiceAllocator(config, resourceManager),
eventPublisher: eventPublisher,
dbEventWriter: eventWriter,
}
}

Expand Down
Loading

0 comments on commit f4f7acf

Please sign in to comment.