From 896118c575a2f2f9bdaf6f0dc08eeaed1da3846f Mon Sep 17 00:00:00 2001 From: tengattack Date: Thu, 7 Mar 2019 22:01:15 +0800 Subject: [PATCH] Update executor support error log instead of fail only --- builtin/bins/dkron-executor-http/http.go | 12 +- builtin/bins/dkron-executor-http/http_test.go | 4 +- builtin/bins/dkron-executor-shell/shell.go | 18 ++- dkron/executor.go | 2 +- dkron/executor.pb.go | 145 ++++++++++++------ dkron/grpc.go | 1 + dkron/invoke.go | 10 +- plugin/executor.go | 11 +- proto/executor.proto | 1 + 9 files changed, 134 insertions(+), 70 deletions(-) diff --git a/builtin/bins/dkron-executor-http/http.go b/builtin/bins/dkron-executor-http/http.go index 9a97ab6fa..40ca81335 100644 --- a/builtin/bins/dkron-executor-http/http.go +++ b/builtin/bins/dkron-executor-http/http.go @@ -42,7 +42,17 @@ type HTTP struct { // "expectBody": "", // Expect response body, support regexp, such as /success/ // "debug": "true" // Debug option, will log everything when this option is not empty // } -func (s *HTTP) Execute(args *dkron.ExecuteRequest) ([]byte, error) { +func (s *HTTP) Execute(args *dkron.ExecuteRequest) (*dkron.ExecuteResponse, error) { + out, err := s.ExecuteImpl(args) + resp := &dkron.ExecuteResponse{Output: out} + if err != nil { + resp.Error = err.Error() + } + return resp, nil +} + +// ExecuteImpl do http request +func (s *HTTP) ExecuteImpl(args *dkron.ExecuteRequest) ([]byte, error) { output, _ := circbuf.NewBuffer(maxBufSize) var debug bool if args.Config["debug"] != "" { diff --git a/builtin/bins/dkron-executor-http/http_test.go b/builtin/bins/dkron-executor-http/http_test.go index 743c79c0f..1b850d18e 100644 --- a/builtin/bins/dkron-executor-http/http_test.go +++ b/builtin/bins/dkron-executor-http/http_test.go @@ -19,7 +19,7 @@ func TestExecute(t *testing.T) { } http := &HTTP{} output, err := http.Execute(pa) - fmt.Println(string(output)) + fmt.Println(string(output.Output)) fmt.Println(err) if err != nil { t.Fatal(err) @@ -41,7 +41,7 @@ func TestExecutePost(t *testing.T) { } http := &HTTP{} output, err := http.Execute(pa) - fmt.Println(string(output)) + fmt.Println(string(output.Output)) fmt.Println(err) if err != nil { t.Fatal(err) diff --git a/builtin/bins/dkron-executor-shell/shell.go b/builtin/bins/dkron-executor-shell/shell.go index 93af2a5b6..52ee4bb35 100644 --- a/builtin/bins/dkron-executor-shell/shell.go +++ b/builtin/bins/dkron-executor-shell/shell.go @@ -28,7 +28,17 @@ const ( type Shell struct{} // Execute method of the plugin -func (s *Shell) Execute(args *dkron.ExecuteRequest) ([]byte, error) { +func (s *Shell) Execute(args *dkron.ExecuteRequest) (*dkron.ExecuteResponse, error) { + out, err := s.ExecuteImpl(args) + resp := &dkron.ExecuteResponse{Output: out} + if err != nil { + resp.Error = err.Error() + } + return resp, nil +} + +// ExecuteImpl do execute command +func (s *Shell) ExecuteImpl(args *dkron.ExecuteRequest) ([]byte, error) { output, _ := circbuf.NewBuffer(maxBufSize) shell, err := strconv.ParseBool(args.Config["shell"]) @@ -87,11 +97,7 @@ func (s *Shell) Execute(args *dkron.ExecuteRequest) ([]byte, error) { // Always log output log.Printf("shell: Command output %s", output) - if err != nil { - return nil, err - } - - return output.Bytes(), nil + return output.Bytes(), err } // Determine the shell invocation based on OS diff --git a/dkron/executor.go b/dkron/executor.go index 7879299ab..bdfc3ed90 100644 --- a/dkron/executor.go +++ b/dkron/executor.go @@ -2,7 +2,7 @@ package dkron // Executor is the interface that we're exposing as a plugin. type Executor interface { - Execute(args *ExecuteRequest) ([]byte, error) + Execute(args *ExecuteRequest) (*ExecuteResponse, error) } // ExecutorPluginConfig is the plugin config diff --git a/dkron/executor.pb.go b/dkron/executor.pb.go index 1aeadac9d..23eb31dd9 100644 --- a/dkron/executor.pb.go +++ b/dkron/executor.pb.go @@ -1,25 +1,14 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // source: executor.proto -/* -Package dkron is a generated protocol buffer package. - -It is generated from these files: - executor.proto - -It has these top-level messages: - ExecuteRequest - ExecuteResponse -*/ package dkron -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" - import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" context "golang.org/x/net/context" grpc "google.golang.org/grpc" + math "math" ) // Reference imports to suppress errors if they are not otherwise used. @@ -34,14 +23,37 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type ExecuteRequest struct { - JobName string `protobuf:"bytes,1,opt,name=job_name,json=jobName" json:"job_name,omitempty"` - Config map[string]string `protobuf:"bytes,2,rep,name=config" json:"config,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + JobName string `protobuf:"bytes,1,opt,name=job_name,json=jobName,proto3" json:"job_name,omitempty"` + Config map[string]string `protobuf:"bytes,2,rep,name=config,proto3" json:"config,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } -func (m *ExecuteRequest) Reset() { *m = ExecuteRequest{} } -func (m *ExecuteRequest) String() string { return proto.CompactTextString(m) } -func (*ExecuteRequest) ProtoMessage() {} -func (*ExecuteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (m *ExecuteRequest) Reset() { *m = ExecuteRequest{} } +func (m *ExecuteRequest) String() string { return proto.CompactTextString(m) } +func (*ExecuteRequest) ProtoMessage() {} +func (*ExecuteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_12d1cdcda51e000f, []int{0} +} + +func (m *ExecuteRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ExecuteRequest.Unmarshal(m, b) +} +func (m *ExecuteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ExecuteRequest.Marshal(b, m, deterministic) +} +func (m *ExecuteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ExecuteRequest.Merge(m, src) +} +func (m *ExecuteRequest) XXX_Size() int { + return xxx_messageInfo_ExecuteRequest.Size(m) +} +func (m *ExecuteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ExecuteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ExecuteRequest proto.InternalMessageInfo func (m *ExecuteRequest) GetJobName() string { if m != nil { @@ -58,13 +70,37 @@ func (m *ExecuteRequest) GetConfig() map[string]string { } type ExecuteResponse struct { - Output []byte `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` + Output []byte `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ExecuteResponse) Reset() { *m = ExecuteResponse{} } +func (m *ExecuteResponse) String() string { return proto.CompactTextString(m) } +func (*ExecuteResponse) ProtoMessage() {} +func (*ExecuteResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_12d1cdcda51e000f, []int{1} } -func (m *ExecuteResponse) Reset() { *m = ExecuteResponse{} } -func (m *ExecuteResponse) String() string { return proto.CompactTextString(m) } -func (*ExecuteResponse) ProtoMessage() {} -func (*ExecuteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (m *ExecuteResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ExecuteResponse.Unmarshal(m, b) +} +func (m *ExecuteResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ExecuteResponse.Marshal(b, m, deterministic) +} +func (m *ExecuteResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ExecuteResponse.Merge(m, src) +} +func (m *ExecuteResponse) XXX_Size() int { + return xxx_messageInfo_ExecuteResponse.Size(m) +} +func (m *ExecuteResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ExecuteResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ExecuteResponse proto.InternalMessageInfo func (m *ExecuteResponse) GetOutput() []byte { if m != nil { @@ -73,11 +109,40 @@ func (m *ExecuteResponse) GetOutput() []byte { return nil } +func (m *ExecuteResponse) GetError() string { + if m != nil { + return m.Error + } + return "" +} + func init() { proto.RegisterType((*ExecuteRequest)(nil), "dkron.ExecuteRequest") + proto.RegisterMapType((map[string]string)(nil), "dkron.ExecuteRequest.ConfigEntry") proto.RegisterType((*ExecuteResponse)(nil), "dkron.ExecuteResponse") } +func init() { proto.RegisterFile("executor.proto", fileDescriptor_12d1cdcda51e000f) } + +var fileDescriptor_12d1cdcda51e000f = []byte{ + // 230 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4b, 0xad, 0x48, 0x4d, + 0x2e, 0x2d, 0xc9, 0x2f, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x4d, 0xc9, 0x2e, 0xca, + 0xcf, 0x53, 0x5a, 0xc8, 0xc8, 0xc5, 0xe7, 0x0a, 0x96, 0x49, 0x0d, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, + 0x2e, 0x11, 0x92, 0xe4, 0xe2, 0xc8, 0xca, 0x4f, 0x8a, 0xcf, 0x4b, 0xcc, 0x4d, 0x95, 0x60, 0x54, + 0x60, 0xd4, 0xe0, 0x0c, 0x62, 0xcf, 0xca, 0x4f, 0xf2, 0x4b, 0xcc, 0x4d, 0x15, 0xb2, 0xe4, 0x62, + 0x4b, 0xce, 0xcf, 0x4b, 0xcb, 0x4c, 0x97, 0x60, 0x52, 0x60, 0xd6, 0xe0, 0x36, 0x52, 0xd4, 0x03, + 0x9b, 0xa2, 0x87, 0x6a, 0x82, 0x9e, 0x33, 0x58, 0x8d, 0x6b, 0x5e, 0x49, 0x51, 0x65, 0x10, 0x54, + 0x83, 0x94, 0x25, 0x17, 0x37, 0x92, 0xb0, 0x90, 0x00, 0x17, 0x73, 0x76, 0x6a, 0x25, 0xd4, 0x7c, + 0x10, 0x53, 0x48, 0x84, 0x8b, 0xb5, 0x2c, 0x31, 0xa7, 0x34, 0x55, 0x82, 0x09, 0x2c, 0x06, 0xe1, + 0x58, 0x31, 0x59, 0x30, 0x2a, 0xd9, 0x73, 0xf1, 0xc3, 0x2d, 0x28, 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, + 0x15, 0x12, 0xe3, 0x62, 0xcb, 0x2f, 0x2d, 0x29, 0x28, 0x2d, 0x01, 0x9b, 0xc0, 0x13, 0x04, 0xe5, + 0x81, 0x0c, 0x49, 0x2d, 0x2a, 0xca, 0x2f, 0x82, 0x19, 0x02, 0xe6, 0x18, 0xb9, 0x70, 0x71, 0xb8, + 0x42, 0x7d, 0x2f, 0x64, 0xc1, 0xc5, 0x0e, 0x35, 0x4c, 0x48, 0x14, 0xab, 0xeb, 0xa5, 0xc4, 0xd0, + 0x85, 0x21, 0x76, 0x26, 0xb1, 0x81, 0x03, 0xce, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x67, 0xda, + 0xb8, 0x46, 0x4a, 0x01, 0x00, 0x00, +} + // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConn @@ -86,8 +151,9 @@ var _ grpc.ClientConn // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion4 -// Client API for Executor service - +// ExecutorClient is the client API for Executor service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type ExecutorClient interface { Execute(ctx context.Context, in *ExecuteRequest, opts ...grpc.CallOption) (*ExecuteResponse, error) } @@ -102,15 +168,14 @@ func NewExecutorClient(cc *grpc.ClientConn) ExecutorClient { func (c *executorClient) Execute(ctx context.Context, in *ExecuteRequest, opts ...grpc.CallOption) (*ExecuteResponse, error) { out := new(ExecuteResponse) - err := grpc.Invoke(ctx, "/dkron.Executor/Execute", in, out, c.cc, opts...) + err := c.cc.Invoke(ctx, "/dkron.Executor/Execute", in, out, opts...) if err != nil { return nil, err } return out, nil } -// Server API for Executor service - +// ExecutorServer is the server API for Executor service. type ExecutorServer interface { Execute(context.Context, *ExecuteRequest) (*ExecuteResponse, error) } @@ -149,23 +214,3 @@ var _Executor_serviceDesc = grpc.ServiceDesc{ Streams: []grpc.StreamDesc{}, Metadata: "executor.proto", } - -func init() { proto.RegisterFile("executor.proto", fileDescriptor0) } - -var fileDescriptor0 = []byte{ - // 219 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4b, 0xad, 0x48, 0x4d, - 0x2e, 0x2d, 0xc9, 0x2f, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x4d, 0xc9, 0x2e, 0xca, - 0xcf, 0x53, 0x5a, 0xc8, 0xc8, 0xc5, 0xe7, 0x0a, 0x96, 0x49, 0x0d, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, - 0x2e, 0x11, 0x92, 0xe4, 0xe2, 0xc8, 0xca, 0x4f, 0x8a, 0xcf, 0x4b, 0xcc, 0x4d, 0x95, 0x60, 0x54, - 0x60, 0xd4, 0xe0, 0x0c, 0x62, 0xcf, 0xca, 0x4f, 0xf2, 0x4b, 0xcc, 0x4d, 0x15, 0xb2, 0xe4, 0x62, - 0x4b, 0xce, 0xcf, 0x4b, 0xcb, 0x4c, 0x97, 0x60, 0x52, 0x60, 0xd6, 0xe0, 0x36, 0x52, 0xd4, 0x03, - 0x9b, 0xa2, 0x87, 0x6a, 0x82, 0x9e, 0x33, 0x58, 0x8d, 0x6b, 0x5e, 0x49, 0x51, 0x65, 0x10, 0x54, - 0x83, 0x94, 0x25, 0x17, 0x37, 0x92, 0xb0, 0x90, 0x00, 0x17, 0x73, 0x76, 0x6a, 0x25, 0xd4, 0x7c, - 0x10, 0x53, 0x48, 0x84, 0x8b, 0xb5, 0x2c, 0x31, 0xa7, 0x34, 0x55, 0x82, 0x09, 0x2c, 0x06, 0xe1, - 0x58, 0x31, 0x59, 0x30, 0x2a, 0x69, 0x72, 0xf1, 0xc3, 0x2d, 0x28, 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, - 0x15, 0x12, 0xe3, 0x62, 0xcb, 0x2f, 0x2d, 0x29, 0x28, 0x2d, 0x01, 0x9b, 0xc0, 0x13, 0x04, 0xe5, - 0x19, 0xb9, 0x70, 0x71, 0xb8, 0x42, 0xfd, 0x29, 0x64, 0xc1, 0xc5, 0x0e, 0xd5, 0x26, 0x24, 0x8a, - 0xd5, 0x9d, 0x52, 0x62, 0xe8, 0xc2, 0x10, 0xd3, 0x93, 0xd8, 0xc0, 0x41, 0x64, 0x0c, 0x08, 0x00, - 0x00, 0xff, 0xff, 0x77, 0x47, 0xe0, 0x21, 0x34, 0x01, 0x00, 0x00, -} diff --git a/dkron/grpc.go b/dkron/grpc.go index ff9f3f6da..d9971775b 100644 --- a/dkron/grpc.go +++ b/dkron/grpc.go @@ -84,6 +84,7 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E log.WithFields(logrus.Fields{ "group": execDoneReq.Group, "job": execDoneReq.JobName, + "from": execDoneReq.NodeName, }).Debug("grpc: Received execution done") var execution Execution diff --git a/dkron/invoke.go b/dkron/invoke.go index f3a5433c6..bf4b2786b 100644 --- a/dkron/invoke.go +++ b/dkron/invoke.go @@ -38,15 +38,21 @@ func (a *Agent) invokeJob(job *Job, execution *Execution) error { JobName: job.Name, Config: exc, }) + + if err == nil && out.Error != "" { + err = errors.New(out.Error) + } if err != nil { log.WithError(err).WithField("job", job.Name).WithField("plugin", executor).Error("invoke: command error output") success = false - output.Write([]byte(err.Error())) + output.Write([]byte(err.Error() + "\n")) } else { success = true } - output.Write(out) + if out != nil { + output.Write(out.Output) + } } else { log.WithField("executor", jex).Error("invoke: Specified executor is not present") } diff --git a/plugin/executor.go b/plugin/executor.go index d5e402235..5b70d2420 100644 --- a/plugin/executor.go +++ b/plugin/executor.go @@ -32,13 +32,9 @@ type ExecutorClient struct { client dkron.ExecutorClient } -func (m *ExecutorClient) Execute(args *dkron.ExecuteRequest) ([]byte, error) { +func (m *ExecutorClient) Execute(args *dkron.ExecuteRequest) (*dkron.ExecuteResponse, error) { // This is where the magic conversion to Proto happens - out, err := m.client.Execute(context.Background(), args) - if err != nil { - return nil, err - } - return out.Output, nil + return m.client.Execute(context.Background(), args) } // Here is the gRPC server that GRPCClient talks to. @@ -49,6 +45,5 @@ type ExecutorServer struct { // Execute is where the magic happens func (m ExecutorServer) Execute(ctx context.Context, req *dkron.ExecuteRequest) (*dkron.ExecuteResponse, error) { - out, err := m.Impl.Execute(req) - return &dkron.ExecuteResponse{Output: out}, err + return m.Impl.Execute(req) } diff --git a/proto/executor.proto b/proto/executor.proto index 0486cb25e..e2fdd4aed 100644 --- a/proto/executor.proto +++ b/proto/executor.proto @@ -10,6 +10,7 @@ message ExecuteRequest { message ExecuteResponse { bytes output = 1; + string error = 2; } service Executor {