Skip to content

Commit

Permalink
Update process execution only once
Browse files Browse the repository at this point in the history
  • Loading branch information
tengattack committed Sep 9, 2018
1 parent c86dcf6 commit ad03e67
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"net"
"time"

"github.com/sirupsen/logrus"
"github.com/abronan/valkeyrie/store"
metrics "github.com/armon/go-metrics"
"github.com/sirupsen/logrus"
"github.com/victorcoder/dkron/proto"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand Down Expand Up @@ -88,6 +88,9 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E
"job": execDoneReq.JobName,
}).Debug("grpc: Received execution done")

var execution Execution
processed := false

retry:
// Load the job from the store
job, jkv, err := grpcs.agent.Store.GetJobWithKVPair(execDoneReq.JobName, &JobOptions{
Expand All @@ -102,21 +105,25 @@ retry:
return nil, err
}

// Get the defined output types for the job, and call them
origExec := *NewExecutionFromProto(execDoneReq)
execution := origExec
for k, v := range job.Processors {
log.WithField("plugin", k).Debug("grpc: Processing execution with plugin")
if processor, ok := grpcs.agent.ProcessorPlugins[k]; ok {
v["reporting_node"] = grpcs.agent.config.NodeName
e := processor.Process(&ExecutionProcessorArgs{Execution: origExec, Config: v})
execution = e
if !processed {
// Get the defined output types for the job, and call them
origExec := *NewExecutionFromProto(execDoneReq)
execution = origExec
for k, v := range job.Processors {
log.WithField("plugin", k).Debug("grpc: Processing execution with plugin")
if processor, ok := grpcs.agent.ProcessorPlugins[k]; ok {
v["reporting_node"] = grpcs.agent.config.NodeName
e := processor.Process(&ExecutionProcessorArgs{Execution: origExec, Config: v})
execution = e
}
}

// Save the execution to store
if _, err := grpcs.agent.Store.SetExecution(&execution); err != nil {
return nil, err
}
}

// Save the execution to store
if _, err := grpcs.agent.Store.SetExecution(&execution); err != nil {
return nil, err
processed = true
}

if execution.Success {
Expand Down

0 comments on commit ad03e67

Please sign in to comment.