Skip to content

Commit

Permalink
Refactor validation to allow job create/update handler to return 4xx …
Browse files Browse the repository at this point in the history
…errors if validation failed.

Prevent job with non-existing parent from being stored.
  • Loading branch information
yvanoers committed Oct 5, 2019
1 parent e26811a commit 3e7018a
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 105 deletions.
28 changes: 12 additions & 16 deletions dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package dkron
import (
"fmt"
"net/http"
"regexp"

"github.com/dgraph-io/badger"
"github.com/gin-contrib/expvar"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
status "google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -147,21 +147,27 @@ func (h *HTTPTransport) jobCreateOrUpdateHandler(c *gin.Context) {

// Parse values from JSON
if err := c.BindJSON(&job); err != nil {
c.Writer.WriteString("Incorrect or unexpected parameters")
c.Writer.WriteString(fmt.Sprintf("Unable to parse payload: %s.", err))
log.Error(err)
return
}

// Validate job name
if b, chr := isSlug(job.Name); !b {
// Validate job
if err := job.Validate(); err != nil {
c.AbortWithStatus(http.StatusBadRequest)
c.Writer.WriteString(fmt.Sprintf("Name contains illegal character '%s'.", chr))
c.Writer.WriteString(fmt.Sprintf("Job contains invalid value: %s.", err))
return
}

// Call gRPC SetJob
if err := h.agent.GRPCClient.SetJob(&job); err != nil {
c.AbortWithError(422, err)
s := status.Convert(err)
if s.Message() == ErrParentJobNotFound.Error() {
c.AbortWithStatus(http.StatusNotFound)
} else {
c.AbortWithStatus(http.StatusInternalServerError)
}
c.Writer.WriteString(s.Message())
return
}

Expand Down Expand Up @@ -262,13 +268,3 @@ func (h *HTTPTransport) jobToggleHandler(c *gin.Context) {
c.Header("Location", c.Request.RequestURI)
renderJSON(c, http.StatusOK, job)
}

// isSlug determines whether the given string is a proper value to be used as
// key in the backend store (a "slug"). If false, the 2nd return value
// will contain the first illegal character found.
func isSlug(candidate string) (bool, string) {
// Allow only lower case letters (unicode), digits, underscore and dash.
illegalCharPattern, _ := regexp.Compile(`[^\p{Ll}0-9_-]`)
whyNot := illegalCharPattern.FindString(candidate)
return whyNot == "", whyNot
}
150 changes: 93 additions & 57 deletions dkron/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,111 +99,104 @@ func TestAPIJobCreateUpdate(t *testing.T) {
}

func TestAPIJobCreateUpdateParentJob_SameParent(t *testing.T) {
port := "8092"
baseURL := fmt.Sprintf("http://localhost:%s/v1", port)
dir, _ := setupAPITest(t, port)
defer os.RemoveAll(dir)

jsonStr := []byte(`{
resp := postJob(t, "8092", []byte(`{
"name": "test_job",
"schedule": "@every 1m",
"command": "date",
"owner": "mec",
"owner_email": "foo@bar.com",
"disabled": true,
"parent_job": "test_job"
}`)
}`))

resp, err := http.Post(baseURL+"/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
if err != nil {
t.Fatal(err)
}
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()

assert.Equal(t, 422, resp.StatusCode)
errJSON, err := json.Marshal(ErrSameParent.Error())
assert.Contains(t, string(errJSON)+"\n", string(body))

// Send a shutdown request
//a.Stop()
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
assert.Contains(t, string(body), ErrSameParent.Error())
}

func TestAPIJobCreateUpdateParentJob_NoParent(t *testing.T) {
port := "8093"
baseURL := fmt.Sprintf("http://localhost:%s/v1", port)
dir, a := setupAPITest(t, port)
defer os.RemoveAll(dir)
defer a.Stop()

jsonStr := []byte(`{
resp := postJob(t, "8093", []byte(`{
"name": "test_job",
"schedule": "@every 1m",
"command": "date",
"owner": "mec",
"owner_email": "foo@bar.com",
"disabled": true,
"parent_job": "parent_test_job"
}`)
}`))

resp, err := http.Post(baseURL+"/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
if err != nil {
t.Fatal(err)
}
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()

assert.Equal(t, 422, resp.StatusCode)
errJSON, err := json.Marshal(ErrParentJobNotFound.Error())
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
errJSON, _ := json.Marshal(ErrParentJobNotFound.Error())
assert.Contains(t, string(errJSON)+"\n", string(body))
}

func TestAPIJobCreateUpdateValidationBadName(t *testing.T) {
port := "8094"
baseURL := fmt.Sprintf("http://localhost:%s/v1", port)
dir, a := setupAPITest(t, port)
defer os.RemoveAll(dir)
defer a.Stop()

jsonStr := []byte(`{
resp := postJob(t, "8094", []byte(`{
"name": "BAD JOB NAME!",
"schedule": "@every 1m",
"executor": "shell",
"executor_config": {"command": "date"},
"disabled": true
}`)

resp, err := http.Post(baseURL+"/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
if err != nil {
t.Fatal(err)
}
}`))

assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
}

func TestAPIJobCreateUpdateValidationValidName(t *testing.T) {
port := "8095"
baseURL := fmt.Sprintf("http://localhost:%s/v1", port)
dir, a := setupAPITest(t, port)
defer os.RemoveAll(dir)
defer a.Stop()

jsonStr := []byte(`{
resp := postJob(t, "8095", []byte(`{
"name": "abcdefghijklmnopqrstuvwxyz0123456789-_ßñëäïüøüáéíóýćàèìòùâêîôûæšłç",
"schedule": "@every 1m",
"executor": "shell",
"executor_config": {"command": "date"},
"disabled": true
}`)

resp, err := http.Post(baseURL+"/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
if err != nil {
t.Fatal(err)
}
}`))

assert.Equal(t, http.StatusCreated, resp.StatusCode)
}

func TestAPIJobCreateUpdateValidationBadSchedule(t *testing.T) {
resp := postJob(t, "8097", []byte(`{
"name": "testjob",
"schedule": "@at badtime",
"executor": "shell",
"executor_config": {"command": "date"},
"disabled": true
}`))

assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
}

func TestAPIJobCreateUpdateValidationBadConcurrency(t *testing.T) {
resp := postJob(t, "8098", []byte(`{
"name": "testjob",
"schedule": "@every 1m",
"executor": "shell",
"executor_config": {"command": "date"},
"concurrency": "badvalue"
"disabled": true
}`))

assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
}

func TestAPIJobCreateUpdateValidationBadTimezone(t *testing.T) {
resp := postJob(t, "8099", []byte(`{
"name": "testjob",
"schedule": "@every 1m",
"executor": "shell",
"executor_config": {"command": "date"},
"disabled": true,
"timezone": "notatimezone"
}`))

assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
}

func TestAPIGetNonExistentJobReturnsNotFound(t *testing.T) {
port := "8096"
baseURL := fmt.Sprintf("http://localhost:%s/v1", port)
Expand All @@ -215,3 +208,46 @@ func TestAPIGetNonExistentJobReturnsNotFound(t *testing.T) {

assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}

func TestAPIJobCreateUpdateJobWithInvalidParentIsNotCreated(t *testing.T) {
port := "8100"
baseURL := fmt.Sprintf("http://localhost:%s/v1", port)
dir, a := setupAPITest(t, port)
defer os.RemoveAll(dir)
defer a.Stop()

jsonStr := []byte(`{
"name": "test_job",
"schedule": "@every 1m",
"command": "date",
"owner": "mec",
"owner_email": "foo@bar.com",
"disabled": true,
"parent_job": "parent_test_job"
}`)

resp, err := http.Post(baseURL+"/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
require.NoError(t, err, err)

assert.Equal(t, http.StatusNotFound, resp.StatusCode)
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
assert.Equal(t, ErrParentJobNotFound.Error(), string(body))

resp, err = http.Get(baseURL + "/jobs/test_job")
require.NoError(t, err, err)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}

// postJob POSTs the given json to the jobs endpoint and returns the response
func postJob(t *testing.T, port string, jsonStr []byte) *http.Response {
baseURL := fmt.Sprintf("http://localhost:%s/v1", port)
dir, a := setupAPITest(t, port)
defer os.RemoveAll(dir)
defer a.Stop()

resp, err := http.Post(baseURL+"/jobs", "encoding/json", bytes.NewBuffer(jsonStr))
require.NoError(t, err, err)

return resp
}
40 changes: 40 additions & 0 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dkron
import (
"errors"
"fmt"
"regexp"
"time"

"github.com/dgraph-io/badger"
Expand Down Expand Up @@ -353,3 +354,42 @@ func (j *Job) isRunnable() bool {

return true
}

// Validate validates whether all values in the job are acceptable.
func (j *Job) Validate() error {
if valid, chr := isSlug(j.Name); !valid {
return fmt.Errorf("name contains illegal character '%s'", chr)
}

if j.ParentJob == j.Name {
return ErrSameParent
}

// Validate schedule, allow empty schedule if parent job set.
if j.Schedule != "" || j.ParentJob == "" {
if _, err := cron.Parse(j.Schedule); err != nil {
return fmt.Errorf("%s: %s", ErrScheduleParse.Error(), err)
}
}

if j.Concurrency != ConcurrencyAllow && j.Concurrency != ConcurrencyForbid && j.Concurrency != "" {
return ErrWrongConcurrency
}

// An empty string is a valid timezone for LoadLocation
if _, err := time.LoadLocation(j.Timezone); err != nil {
return err
}

return nil
}

// isSlug determines whether the given string is a proper value to be used as
// key in the backend store (a "slug"). If false, the 2nd return value
// will contain the first illegal character found.
func isSlug(candidate string) (bool, string) {
// Allow only lower case letters (unicode), digits, underscore and dash.
illegalCharPattern, _ := regexp.Compile(`[^\p{Ll}0-9_-]`)
whyNot := illegalCharPattern.FindString(candidate)
return whyNot == "", whyNot
}
40 changes: 8 additions & 32 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/dgraph-io/badger"
"github.com/distribworks/dkron/v2/cron"
dkronpb "github.com/distribworks/dkron/v2/proto"
"github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -118,10 +117,17 @@ func (s *Store) SetJob(job *Job, copyDependentJobs bool) error {
// Init the job agent
job.Agent = s.agent

if err := s.validateJob(job); err != nil {
if err := job.Validate(); err != nil {
return err
}

// Abort if parent not found before committing job to the store
if job.ParentJob != "" {
if j, _ := s.GetJob(job.ParentJob, nil); j == nil {
return ErrParentJobNotFound
}
}

err := s.db.Update(func(txn *badger.Txn) error {
// Get if the requested job already exist
err := s.getJobTxnFunc(job.Name, &pbej)(txn)
Expand Down Expand Up @@ -229,14 +235,6 @@ func (s *Store) addToParent(child *Job) error {
return nil
}

func (s *Store) validateTimeZone(timezone string) error {
if timezone == "" {
return nil
}
_, err := time.LoadLocation(timezone)
return err
}

// SetExecutionDone saves the execution and updates the job with the corresponding
// results
func (s *Store) SetExecutionDone(execution *Execution) (bool, error) {
Expand Down Expand Up @@ -284,28 +282,6 @@ func (s *Store) SetExecutionDone(execution *Execution) (bool, error) {
return true, nil
}

func (s *Store) validateJob(job *Job) error {
if job.ParentJob == job.Name {
return ErrSameParent
}

// Only validate the schedule if it doesn't have a parent
if job.ParentJob == "" {
if _, err := cron.Parse(job.Schedule); err != nil {
return fmt.Errorf("%s: %s", ErrScheduleParse.Error(), err)
}
}

if job.Concurrency != ConcurrencyAllow && job.Concurrency != ConcurrencyForbid && job.Concurrency != "" {
return ErrWrongConcurrency
}
if err := s.validateTimeZone(job.Timezone); err != nil {
return err
}

return nil
}

func (s *Store) jobHasMetadata(job *Job, metadata map[string]string) bool {
if job == nil || job.Metadata == nil || len(job.Metadata) == 0 {
return false
Expand Down

0 comments on commit 3e7018a

Please sign in to comment.