Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Run task count times (available for simple and window schedule)
Browse files Browse the repository at this point in the history
Removed the code of simple schedule
  • Loading branch information
IzabellaRaulin committed Mar 20, 2017
1 parent a292a0a commit b3e990f
Show file tree
Hide file tree
Showing 35 changed files with 1,514 additions and 842 deletions.
1 change: 1 addition & 0 deletions cmd/snaptel/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
flTaskManifest,
flWorkfowManifest,
flTaskSchedInterval,
flTaskSchedCount,
flTaskSchedStartDate,
flTaskSchedStartTime,
flTaskSchedStopDate,
Expand Down
6 changes: 4 additions & 2 deletions cmd/snaptel/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ var (
Name: "interval, i",
Usage: "Interval for the task schedule [ex (simple schedule): 250ms, 1s, 30m (cron schedule): \"0 * * * * *\"]",
}

flTaskSchedStartTime = cli.StringFlag{
Name: "start-time",
Usage: "Start time for the task schedule [defaults to now]",
Expand All @@ -113,7 +112,6 @@ var (
Name: "stop-time",
Usage: "Start time for the task schedule [defaults to now]",
}

flTaskSchedStartDate = cli.StringFlag{
Name: "start-date",
Usage: "Start date for the task schedule [defaults to today]",
Expand All @@ -122,6 +120,10 @@ var (
Name: "stop-date",
Usage: "Stop date for the task schedule [defaults to today]",
}
flTaskSchedCount = cli.StringFlag{
Name: "count",
Usage: "The count of runs for the task schedule [defaults to 0 what means no limit, e.g. set to 1 determines a single run task]",
}
flTaskSchedDuration = cli.StringFlag{
Name: "duration, d",
Usage: "The amount of time to run the task [appends to start or creates a start time before a stop]",
Expand Down
28 changes: 26 additions & 2 deletions cmd/snaptel/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ func createTask(ctx *cli.Context) error {
return err
}

// stringValToInt parses the input (string) as an integer value (and returns that integer value
// to the caller or an error if the input value cannot be parsed as an integer)
func stringValToInt(val string) (int, error) {
// parse the input (string) as an integer value (and return that integer value
// to the caller or an error if the input value cannot be parsed as an integer)
parsedField, err := strconv.Atoi(val)
if err != nil {
splitErr := strings.Split(err.Error(), ": ")
Expand All @@ -115,6 +115,20 @@ func stringValToInt(val string) (int, error) {
return parsedField, nil
}

// stringValToUint parses the input (string) as an unsigned integer value (and returns that uint value
// to the caller or an error if the input value cannot be parsed as an unsigned integer)
func stringValToUint(val string) (uint, error) {
parsedField, err := strconv.ParseUint(val, 10, 64)
if err != nil {
splitErr := strings.Split(err.Error(), ": ")
errStr := splitErr[len(splitErr)-1]
// return a value of zero and the error encountered during string parsing
return 0, fmt.Errorf("Value '%v' cannot be parsed as an unsigned integer (%v)", val, errStr)
}
// return the unsigned integer equivalent of the input string and a nil error (indicating success)
return uint(parsedField), nil
}

// Parses the command-line parameters (if any) and uses them to override the underlying
// schedule for this task or to set a schedule for that task (if one is not already defined,
// as is the case when we're building a new task from a workflow manifest).
Expand Down Expand Up @@ -234,6 +248,16 @@ func (t *task) setScheduleFromCliOptions(ctx *cli.Context) error {
if !ctx.IsSet("interval") && interval == "" && t.Schedule.Interval == "" {
return fmt.Errorf("Usage error (missing interval value); when constructing a new task schedule an interval must be provided")
}

countValStr := ctx.String("count")
if ctx.IsSet("count") || countValStr != "" {
count, err := stringValToUint(countValStr)
if err != nil {
return fmt.Errorf("Usage error (bad count format); %v", err)
}
t.Schedule.Count = count

}
// if a start, stop, or duration value was provided, or if the existing schedule for this task
// is 'windowed', then it's a 'windowed' schedule
isWindowed := (start != nil || stop != nil || duration != nil || t.Schedule.Type == "windowed")
Expand Down
2 changes: 1 addition & 1 deletion control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (a *availablePlugin) Kill(r string) error {
}).Debug("deleting available plugin package")
os.RemoveAll(filepath.Dir(a.execPath))
}
// If it's a stremaing plugin, we need to signal the scheduler that
// If it's a streaming plugin, we need to signal the scheduler that
// this plugin is being killed.
if c, ok := a.client.(client.PluginStreamCollectorClient); ok {
c.Killed()
Expand Down
33 changes: 10 additions & 23 deletions core/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,18 @@ type Schedule struct {
Interval string `json:"interval,omitempty"`
StartTimestamp *time.Time `json:"start_timestamp,omitempty"`
StopTimestamp *time.Time `json:"stop_timestamp,omitempty"`
Count uint `json:"count,omitempty"`
}

var (
ErrMissingScheduleInterval = errors.New("missing `interval` in configuration of schedule")
)

func makeSchedule(s Schedule) (schedule.Schedule, error) {
switch s.Type {
case "simple":
case "simple", "windowed":
if s.Interval == "" {
return nil, errors.New("missing `interval` in configuration of simple schedule")
}

d, err := time.ParseDuration(s.Interval)
if err != nil {
return nil, err
}
sch := schedule.NewSimpleSchedule(d)

err = sch.Validate()
if err != nil {
return nil, err
}
return sch, nil
case "windowed":
if s.StartTimestamp == nil || s.StopTimestamp == nil || s.Interval == "" {
errmsg := fmt.Sprintf("missing parameter/parameters in configuration of windowed schedule,"+
"start_timestamp: %s, stop_timestamp: %s, interval: %s",
s.StartTimestamp, s.StopTimestamp, s.Interval)
return nil, errors.New(errmsg)
return nil, ErrMissingScheduleInterval
}

d, err := time.ParseDuration(s.Interval)
Expand All @@ -69,6 +55,7 @@ func makeSchedule(s Schedule) (schedule.Schedule, error) {
d,
s.StartTimestamp,
s.StopTimestamp,
s.Count,
)

err = sch.Validate()
Expand All @@ -78,7 +65,7 @@ func makeSchedule(s Schedule) (schedule.Schedule, error) {
return sch, nil
case "cron":
if s.Interval == "" {
return nil, errors.New("missing `interval` in configuration of cron schedule")
return nil, ErrMissingScheduleInterval
}
sch := schedule.NewCronSchedule(s.Interval)

Expand All @@ -90,6 +77,6 @@ func makeSchedule(s Schedule) (schedule.Schedule, error) {
case "streaming":
return schedule.NewStreamingSchedule(), nil
default:
return nil, errors.New("unknown schedule type " + s.Type)
return nil, fmt.Errorf("unknown schedule type `%s`", s.Type)
}
}
64 changes: 48 additions & 16 deletions core/schedule_small_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ func TestMakeSchedule(t *testing.T) {
rsched, err := makeSchedule(*sched1)
So(rsched, ShouldBeNil)
So(err, ShouldNotBeNil)
So(err.Error(), ShouldEqual, fmt.Sprintf("unknown schedule type %s", DUMMY_TYPE))
So(err.Error(), ShouldEqual, fmt.Sprintf("unknown schedule type `%s`", DUMMY_TYPE))
})

Convey("Simple schedule with missing interval in configuration", t, func() {
sched1 := &Schedule{Type: "simple"}
rsched, err := makeSchedule(*sched1)
So(rsched, ShouldBeNil)
So(err, ShouldNotBeNil)
So(err.Error(), ShouldEqual, "missing `interval` in configuration of simple schedule")
So(err, ShouldEqual, ErrMissingScheduleInterval)
})

Convey("Simple schedule with bad duration", t, func() {
Expand All @@ -75,12 +75,20 @@ func TestMakeSchedule(t *testing.T) {
So(rsched.GetState(), ShouldEqual, 0)
})

Convey("Simple schedule with determined count", t, func() {
sched1 := &Schedule{Type: "simple", Interval: "1s", Count: 1}
rsched, err := makeSchedule(*sched1)
So(err, ShouldBeNil)
So(rsched, ShouldNotBeNil)
So(rsched.GetState(), ShouldEqual, 0)
})

Convey("Windowed schedule with missing interval", t, func() {
sched1 := &Schedule{Type: "windowed"}
rsched, err := makeSchedule(*sched1)
So(rsched, ShouldBeNil)
So(err, ShouldNotBeNil)
So(err.Error(), ShouldStartWith, "missing parameter/parameters in configuration of windowed schedule")
So(err, ShouldEqual, ErrMissingScheduleInterval)
})

Convey("Windowed schedule with bad duration", t, func() {
Expand All @@ -102,22 +110,46 @@ func TestMakeSchedule(t *testing.T) {
So(err.Error(), ShouldEqual, "Interval must be greater than 0")
})

Convey("Windowed schedule with missing start_timestamp", t, func() {
now := time.Now()
sched1 := &Schedule{Type: "windowed", Interval: "1s", StopTimestamp: &now}
Convey("Windowed schedule with determined start_timestamp and count", t, func() {
startTime := time.Now().Add(time.Minute)
sched1 := &Schedule{Type: "simple", Interval: "1s", StartTimestamp: &startTime, Count: 1}
rsched, err := makeSchedule(*sched1)
So(rsched, ShouldBeNil)
So(err, ShouldNotBeNil)
So(err.Error(), ShouldStartWith, "missing parameter/parameters in configuration of windowed schedule")
So(err, ShouldBeNil)
So(rsched, ShouldNotBeNil)
So(rsched.GetState(), ShouldEqual, 0)
})

Convey("Windowed schedule with missing stop_timestamp", t, func() {
now := time.Now()
sched1 := &Schedule{Type: "windowed", Interval: "1s", StartTimestamp: &now}
Convey("Windowed schedule without determined start_timestamp", t, func() {
stopTime := time.Now().Add(time.Second)
sched1 := &Schedule{Type: "windowed", Interval: "1s", StopTimestamp: &stopTime}
rsched, err := makeSchedule(*sched1)
So(rsched, ShouldBeNil)
So(err, ShouldNotBeNil)
So(err.Error(), ShouldStartWith, "missing parameter/parameters in configuration of windowed schedule")
So(err, ShouldBeNil)
So(rsched, ShouldNotBeNil)
})

Convey("Windowed schedule without determined stop_timestamp", t, func() {
startTime := time.Now().Add(time.Second)
sched1 := &Schedule{Type: "windowed", Interval: "1s", StartTimestamp: &startTime}
rsched, err := makeSchedule(*sched1)
So(err, ShouldBeNil)
So(rsched, ShouldNotBeNil)
})

Convey("Windowed schedule without determined start and stop", t, func() {
sched1 := &Schedule{Type: "windowed", Interval: "1s"}
rsched, err := makeSchedule(*sched1)
So(err, ShouldBeNil)
So(rsched, ShouldNotBeNil)
})

Convey("Windowed schedule with start in the past", t, func() {
startTime := time.Now().Add(-2 * time.Minute)
stopTime := time.Now().Add(1 * time.Minute)
sched1 := &Schedule{Type: "windowed", Interval: "1s",
StartTimestamp: &startTime, StopTimestamp: &stopTime}
rsched, err := makeSchedule(*sched1)
So(err, ShouldBeNil)
So(rsched, ShouldNotBeNil)
})

Convey("Windowed schedule with stop in the past", t, func() {
Expand Down Expand Up @@ -147,7 +179,7 @@ func TestMakeSchedule(t *testing.T) {
rsched, err := makeSchedule(*sched1)
So(rsched, ShouldBeNil)
So(err, ShouldNotBeNil)
So(err.Error(), ShouldEqual, "missing `interval` in configuration of cron schedule")
So(err, ShouldEqual, ErrMissingScheduleInterval)
})

Convey("Cron schedule with invalid duration", t, func() {
Expand Down
9 changes: 6 additions & 3 deletions docs/SNAPTEL.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ create There are two ways to create a task.
--task-manifest value, -t value File path for task manifest to use for task creation.
--workflow-manifest value, -w value File path for workflow manifest to use for task creation
--interval value, -i value Interval for the task schedule [ex (simple schedule): 250ms, 1s, 30m (cron schedule): "0 * * * * *"]
--count value The count of runs for the task schedule [defaults to 0 what means no limit, e.g. set to 1 determines a single run task]
--start-date value Start date for the task schedule [defaults to today]
--start-time value Start time for the task schedule [defaults to now]
--stop-date value Stop date for the task schedule [defaults to today]
Expand Down Expand Up @@ -186,16 +187,18 @@ and then:
3. load a publishing plugin
4. list the plugins
5. start a task with a task manifest
6. start a task with a workflow manifest
7. list the tasks
8. unload the plugins
6. start a single run task with a task manifest
7. start a task with a workflow manifest
8. list the tasks
9. unload the plugins

```
$ snaptel plugin load /opt/snap/plugins/snap-plugin-collector-mock1
$ snaptel plugin load /opt/snap/plugins/snap-plugin-processor-passthru
$ snaptel plugin load /opt/snap/plugins/snap-plugin-publisher-mock-file
$ snaptel plugin list
$ snaptel task create -t mock-file.json
$ snaptel task create -t mock-file.json --count 1
$ snaptel task create -w workflow.json -i 1s -d 10s
$ snaptel task list
$ snaptel plugin unload collector mock <version>
Expand Down
Loading

0 comments on commit b3e990f

Please sign in to comment.