Skip to content

Commit

Permalink
Merge branch 'v2.x' into v2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
yvanoers authored Jun 23, 2019
2 parents 6f5b013 + 6d84f9c commit 4b4e9b8
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 148 deletions.
30 changes: 19 additions & 11 deletions dkron/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dkron
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"testing"
Expand All @@ -12,15 +13,16 @@ import (
"github.com/stretchr/testify/assert"
)

func setupAPITest(t *testing.T) (a *Agent) {
func setupAPITest(port string) (a *Agent) {
c := DefaultConfig()
c.BindAddr = testutil.GetBindAddr().String()
c.HTTPAddr = "127.0.0.1:8090"
c.HTTPAddr = fmt.Sprintf("127.0.0.1:%s", port)
c.NodeName = "test"
c.Server = true
c.LogLevel = logLevel
c.BootstrapExpect = 1
c.DevMode = true
c.DataDir = "dkron-test-" + port + ".data"

a = NewAgent(c, nil)
a.Start()
Expand All @@ -37,7 +39,9 @@ func setupAPITest(t *testing.T) (a *Agent) {
}

func TestAPIJobCreateUpdate(t *testing.T) {
a := setupAPITest(t)
port := "8091"
baseURL := fmt.Sprintf("http://localhost:%s/v1", port)
setupAPITest(port)

jsonStr := []byte(`{
"name": "test_job",
Expand All @@ -49,7 +53,7 @@ func TestAPIJobCreateUpdate(t *testing.T) {
"disabled": true
}`)

resp, err := http.Post("http://localhost:8090/v1/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
resp, err := http.Post(baseURL+"/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
if err != nil {
t.Fatal(err)
}
Expand All @@ -69,7 +73,7 @@ func TestAPIJobCreateUpdate(t *testing.T) {
"executor_config": {"command": "test"},
"disabled": false
}`)
resp, err = http.Post("http://localhost:8090/v1/jobs", "encoding/json", bytes.NewBuffer(jsonStr1))
resp, err = http.Post(baseURL+"/jobs", "encoding/json", bytes.NewBuffer(jsonStr1))
if err != nil {
t.Fatal(err)
}
Expand All @@ -88,11 +92,13 @@ func TestAPIJobCreateUpdate(t *testing.T) {
assert.Equal(t, "test", overwriteJob.ExecutorConfig["command"])

// Send a shutdown request
a.Stop()
//a.Stop()
}

func TestAPIJobCreateUpdateParentJob_SameParent(t *testing.T) {
a := setupAPITest(t)
port := "8092"
baseURL := fmt.Sprintf("http://localhost:%s/v1", port)
setupAPITest(port)

jsonStr := []byte(`{
"name": "test_job",
Expand All @@ -104,7 +110,7 @@ func TestAPIJobCreateUpdateParentJob_SameParent(t *testing.T) {
"parent_job": "test_job"
}`)

resp, err := http.Post("http://localhost:8090/v1/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
resp, err := http.Post(baseURL+"/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
if err != nil {
t.Fatal(err)
}
Expand All @@ -116,11 +122,13 @@ func TestAPIJobCreateUpdateParentJob_SameParent(t *testing.T) {
assert.Contains(t, string(errJSON)+"\n", string(body))

// Send a shutdown request
a.Stop()
//a.Stop()
}

func TestAPIJobCreateUpdateParentJob_NoParent(t *testing.T) {
a := setupAPITest(t)
port := "8093"
baseURL := fmt.Sprintf("http://localhost:%s/v1", port)
a := setupAPITest(port)

jsonStr := []byte(`{
"name": "test_job",
Expand All @@ -132,7 +140,7 @@ func TestAPIJobCreateUpdateParentJob_NoParent(t *testing.T) {
"parent_job": "parent_test_job"
}`)

resp, err := http.Post("http://localhost:8090/v1/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
resp, err := http.Post(baseURL+"/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E
"execution": execution,
}).Debug("grpc: Retrying execution")

grpcs.agent.RunQuery(&execution)
grpcs.agent.RunQuery(job, &execution)
return nil, nil
}

Expand Down Expand Up @@ -243,7 +243,7 @@ func (grpcs *GRPCServer) RunJob(ctx context.Context, req *proto.RunJobRequest) (
}

ex := NewExecution(job.Name)
grpcs.agent.RunQuery(ex)
grpcs.agent.RunQuery(job, ex)

jpb := job.ToProto()

Expand Down
2 changes: 1 addition & 1 deletion dkron/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestGRPCExecutionDone(t *testing.T) {

c := DefaultConfig()
c.BindAddr = aAddr
c.NodeName = "test1"
c.NodeName = "test-grpc"
c.Server = true
c.LogLevel = logLevel
c.BootstrapExpect = 1
Expand Down
23 changes: 7 additions & 16 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/dgraph-io/badger"
"github.com/golang/protobuf/ptypes"
"github.com/sirupsen/logrus"
"github.com/victorcoder/dkron/cron"
"github.com/victorcoder/dkron/proto"
)

Expand Down Expand Up @@ -117,9 +116,11 @@ type Job struct {
Next time.Time `json:"next"`
}

// NewJobFromProto create a new Job from a PB Job struct
func NewJobFromProto(in *proto.Job) *Job {
lastSuccess, _ := ptypes.Timestamp(in.GetLastSuccess())
lastError, _ := ptypes.Timestamp(in.GetLastError())
next, _ := ptypes.Timestamp(in.GetNext())
return &Job{
Name: in.Name,
Timezone: in.Timezone,
Expand All @@ -140,13 +141,15 @@ func NewJobFromProto(in *proto.Job) *Job {
Metadata: in.Metadata,
LastSuccess: lastSuccess,
LastError: lastError,
Next: next,
}
}

// ToProto return the corresponding proto type
// ToProto return the corresponding representation of this Job in proto struct
func (j *Job) ToProto() *proto.Job {
lastSuccess, _ := ptypes.TimestampProto(j.LastSuccess)
lastError, _ := ptypes.TimestampProto(j.LastError)
next, _ := ptypes.TimestampProto(j.Next)
return &proto.Job{
Name: j.Name,
Timezone: j.Timezone,
Expand All @@ -167,6 +170,7 @@ func (j *Job) ToProto() *proto.Job {
Metadata: j.Metadata,
LastSuccess: lastSuccess,
LastError: lastError,
Next: next,
}
}

Expand All @@ -188,7 +192,7 @@ func (j *Job) Run() {

// Simple execution wrapper
ex := NewExecution(j.Name)
j.Agent.RunQuery(ex)
j.Agent.RunQuery(j, ex)
}
}
}
Expand Down Expand Up @@ -261,19 +265,6 @@ func (j *Job) GetParent() (*Job, error) {
return parentJob, nil
}

// GetNext returns the job's next schedule
func (j *Job) GetNext() (time.Time, error) {
if j.Schedule != "" {
s, err := cron.Parse(j.Schedule)
if err != nil {
return time.Time{}, err
}
return s.Next(j.getLast()), nil
}

return time.Time{}, nil
}

func (j *Job) getLast() time.Time {
if j.LastSuccess.After(j.LastError) {
return j.LastSuccess
Expand Down
14 changes: 0 additions & 14 deletions dkron/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dkron

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -61,16 +60,3 @@ func TestJobGetParent(t *testing.T) {
assert.NoError(t, err)
assert.Nil(t, ptj.DependentJobs)
}

func TestJobGetNext(t *testing.T) {
j := Job{
Schedule: "@daily",
}

td := time.Now()
tonight := time.Date(td.Year(), td.Month(), td.Day()+1, 0, 0, 0, 0, td.Location())
n, err := j.GetNext()

assert.NoError(t, err)
assert.Equal(t, tonight, n)
}
26 changes: 11 additions & 15 deletions dkron/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"time"

"github.com/dgraph-io/badger"
"github.com/hashicorp/serf/serf"
"github.com/sirupsen/logrus"
)
Expand All @@ -25,30 +24,27 @@ type RunQueryParam struct {

// Send a serf run query to the cluster, this is used to ask a node or nodes
// to run a Job.
func (a *Agent) RunQuery(ex *Execution) {
func (a *Agent) RunQuery(job *Job, ex *Execution) {
var params *serf.QueryParam

job, err := a.Store.GetJob(ex.JobName, nil)

if err != nil {
//Job can be removed and the QuerySchedulerRestart not yet received.
//In this case, the job will not be found in the store.
if err == badger.ErrKeyNotFound {
log.Warning("agent: Job not found, cancelling this execution")
return
}
log.WithError(err).Fatal("agent: Getting job error")
return
e := a.sched.GetEntry(job)
if e != nil {
job.Next = e.Next
}
if err := a.GRPCClient.CallSetJob(job); err != nil {
log.WithError(err).WithFields(logrus.Fields{
"job": job.Name,
"method": "RunQuery",
}).Fatal("agent: Error storing job before running")
}

// In the first execution attempt we build and filter the target nodes
// but we use the existing node target in case of retry.
if ex.Attempt <= 1 {
filterNodes, filterTags, err := a.processFilteredNodes(job)
if err != nil {
log.WithFields(logrus.Fields{
log.WithError(err).WithFields(logrus.Fields{
"job": job.Name,
"err": err.Error(),
}).Fatal("agent: Error processing filtered nodes")
}
log.Debug("agent: Filtered nodes to run: ", filterNodes)
Expand Down
11 changes: 0 additions & 11 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,6 @@ func (s *Store) GetJobs(options *JobOptions) ([]*Job, error) {
}
}

n, err := job.GetNext()
if err != nil {
return err
}
job.Next = n

jobs = append(jobs, job)
}
return nil
Expand Down Expand Up @@ -355,11 +349,6 @@ func (s *Store) GetJob(name string, options *JobOptions) (*Job, error) {
job.Status = job.GetStatus()
}

n, err := job.GetNext()
if err != nil {
return err
}
job.Next = n
return nil
})

Expand Down
6 changes: 4 additions & 2 deletions dkron/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
)

func TestStore(t *testing.T) {
s, err := NewStore(nil, "test.db")
s, err := NewStore(nil, "test1.data")
defer s.Shutdown()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -76,7 +77,8 @@ func TestStore(t *testing.T) {
}

func TestStore_GetLastExecutionGroup(t *testing.T) {
s, err := NewStore(nil, "test.db")
s, err := NewStore(nil, "test2.data")
defer s.Shutdown()
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 4b4e9b8

Please sign in to comment.