diff --git a/dkron/grpc.go b/dkron/grpc.go index b2560320d..d14752bc0 100644 --- a/dkron/grpc.go +++ b/dkron/grpc.go @@ -6,9 +6,9 @@ import ( "net" "time" - "github.com/sirupsen/logrus" "github.com/abronan/valkeyrie/store" metrics "github.com/armon/go-metrics" + "github.com/sirupsen/logrus" "github.com/victorcoder/dkron/proto" "golang.org/x/net/context" "google.golang.org/grpc" @@ -88,6 +88,9 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E "job": execDoneReq.JobName, }).Debug("grpc: Received execution done") + var execution Execution + processed := false + retry: // Load the job from the store job, jkv, err := grpcs.agent.Store.GetJobWithKVPair(execDoneReq.JobName, &JobOptions{ @@ -102,21 +105,25 @@ retry: return nil, err } - // Get the defined output types for the job, and call them - origExec := *NewExecutionFromProto(execDoneReq) - execution := origExec - for k, v := range job.Processors { - log.WithField("plugin", k).Debug("grpc: Processing execution with plugin") - if processor, ok := grpcs.agent.ProcessorPlugins[k]; ok { - v["reporting_node"] = grpcs.agent.config.NodeName - e := processor.Process(&ExecutionProcessorArgs{Execution: origExec, Config: v}) - execution = e + if !processed { + // Get the defined output types for the job, and call them + origExec := *NewExecutionFromProto(execDoneReq) + execution = origExec + for k, v := range job.Processors { + log.WithField("plugin", k).Debug("grpc: Processing execution with plugin") + if processor, ok := grpcs.agent.ProcessorPlugins[k]; ok { + v["reporting_node"] = grpcs.agent.config.NodeName + e := processor.Process(&ExecutionProcessorArgs{Execution: origExec, Config: v}) + execution = e + } + } + + // Save the execution to store + if _, err := grpcs.agent.Store.SetExecution(&execution); err != nil { + return nil, err } - } - // Save the execution to store - if _, err := grpcs.agent.Store.SetExecution(&execution); err != nil { - return nil, err + processed = true } if execution.Success {