Skip to content

Commit

Permalink
feat: implement tasks command (#78)
Browse files Browse the repository at this point in the history
- Supports getting a list of all tasks by the connector name;
- Supports getting task by id;
- Supports getting status of currently running task individually;
- Supports restarting tasks by id.

Signed-off-by: oleksmir <olexiy.miroshnik@gmail.com>
  • Loading branch information
oleksmir authored Apr 2, 2020
1 parent 2dfcc1b commit 29ef9cb
Show file tree
Hide file tree
Showing 10 changed files with 594 additions and 0 deletions.
8 changes: 8 additions & 0 deletions cmd/connectctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main

import (
"fmt"
"github.com/90poe/connectctl/internal/ctl/tasks"
"github.com/90poe/connectctl/pkg/client/connect"
"os"
"strings"

Expand Down Expand Up @@ -60,6 +62,12 @@ func main() {
rootCmd.AddCommand(plugins.Command())
rootCmd.AddCommand(version.Command())

rootCmd.AddCommand(tasks.Command(&tasks.GenericOptions{
CreateClient: func(clusterURL string) (client tasks.Client, err error) {
return connect.NewClient(clusterURL)
},
}))

cobra.OnInitialize(initConfig)

if err := rootCmd.Execute(); err != nil {
Expand Down
78 changes: 78 additions & 0 deletions internal/ctl/tasks/get.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package tasks

import (
"fmt"
"github.com/90poe/connectctl/pkg/client/connect"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"io"
"strconv"
)

type GetOptions struct {
*GenericOptions

TaskID int
Output string
}

func NewGetCommand(options *GenericOptions) *cobra.Command {
var opts = GetOptions{
GenericOptions: options,
}

var cmd = cobra.Command{
Use: "get",
Short: "Displays a single task currently running for the connector.",
RunE: func(cmd *cobra.Command, args []string) error {
if err := opts.Validate(); err != nil {
return err
}
return opts.Run(cmd.OutOrStdout())
},
}

cmd.PersistentFlags().StringVarP(&opts.Output, "output", "o", "json", "The output format. Possible values are 'json' and 'table'")

cmd.PersistentFlags().IntVarP(&opts.TaskID, "id", "", -1, "The ID of the task to get")
_ = cmd.MarkPersistentFlagRequired("id")

return &cmd
}

func (o *GetOptions) Run(out io.Writer) error {
client, err := o.CreateClient(o.ClusterURL)
if err != nil {
return errors.Wrap(err, "failed to create http client")
}
tasks, _, err := client.GetConnectorTasks(o.ConnectorName)
if err != nil {
return errors.Wrap(err, "failed to retrieve tasks")
}
task, ok := findTaskByID(tasks, o.TaskID)
if !ok {
return errors.New("no task found by id=" + strconv.Itoa(o.TaskID))
}
return o.writeOutput(task, out)
}

func (o *GetOptions) writeOutput(task connect.Task, out io.Writer) error {
var outputFn TaskListOutputFn
var outputType = OutputType(o.Output)
switch outputType {
case OutputTypeJSON:
outputFn = OutputTaskListAsJSON
case OutputTypeTable:
outputFn = OutputTaskListAsTable
default:
return fmt.Errorf("output type '%s' is not supported", o.Output)
}
output, err := outputFn([]connect.Task{task})
if err != nil {
return fmt.Errorf("failed to form output for '%s' type", outputType)
}
if _, err := out.Write(output); err != nil {
return errors.Wrap(err, "failed to write output")
}
return nil
}
74 changes: 74 additions & 0 deletions internal/ctl/tasks/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package tasks

import (
"fmt"
"github.com/90poe/connectctl/pkg/client/connect"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"io"
)

type ListOptions struct {
// Use pointer here to make reference to the 'tasks' options.
// Otherwise, the local copy of the config will be created an no
// value from the flags will be set. It is caused by the global
// scope nature of viper and multiple overrides of cluster flag
// from the other commands.
*GenericOptions

Output string
}

func NewListCommand(options *GenericOptions) *cobra.Command {
var opts = ListOptions{
GenericOptions: options,
}

var cmd = cobra.Command{
Use: "list",
Short: "Displays a list of tasks currently running for the connector.",
RunE: func(cmd *cobra.Command, args []string) error {
if err := opts.Validate(); err != nil {
return err
}
return opts.Run(cmd.OutOrStdout())
},
}

cmd.PersistentFlags().StringVarP(&opts.Output, "output", "o", "json", "The output format. Possible values are 'json' and 'table'")

return &cmd
}

func (o *ListOptions) Run(out io.Writer) error {
client, err := o.CreateClient(o.ClusterURL)
if err != nil {
return errors.Wrap(err, "failed to create http client")
}
tasks, _, err := client.GetConnectorTasks(o.ConnectorName)
if err != nil {
return errors.Wrap(err, "failed to retrieve tasks")
}
return o.writeOutput(tasks, out)
}

func (o *ListOptions) writeOutput(tasks []connect.Task, out io.Writer) error {
var outputFn TaskListOutputFn
var outputType = OutputType(o.Output)
switch outputType {
case OutputTypeJSON:
outputFn = OutputTaskListAsJSON
case OutputTypeTable:
outputFn = OutputTaskListAsTable
default:
return fmt.Errorf("output type '%s' is not supported", o.Output)
}
output, err := outputFn(tasks)
if err != nil {
return fmt.Errorf("failed to form output for '%s' type", outputType)
}
if _, err := out.Write(output); err != nil {
return errors.Wrap(err, "failed to write output")
}
return nil
}
67 changes: 67 additions & 0 deletions internal/ctl/tasks/output.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package tasks

import (
"bytes"
"encoding/json"
"github.com/90poe/connectctl/pkg/client/connect"
"github.com/jedib0t/go-pretty/table"
"github.com/pkg/errors"
)

type OutputType string

const (
OutputTypeJSON OutputType = "json"
OutputTypeTable OutputType = "table"
)

type TaskListOutputFn func([]connect.Task) ([]byte, error)

func OutputTaskListAsJSON(tasks []connect.Task) ([]byte, error) {
b, err := DefaultMarshalIndent(tasks)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal tasks")
}
return b, nil
}

func OutputTaskListAsTable(tasks []connect.Task) ([]byte, error) {
var buff bytes.Buffer
t := table.NewWriter()
t.Style().Options.SeparateRows = true
t.SetOutputMirror(&buff)
t.AppendHeader(table.Row{"Connector Name", "ID", "Config"})
for _, task := range tasks {
configBytes, err := DefaultMarshalIndent(task.Config)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal config %v", task.Config)
}
t.AppendRow(table.Row{task.ID.ConnectorName, task.ID.ID, string(configBytes)})
}
t.Render()
return buff.Bytes(), nil
}

func DefaultMarshalIndent(value interface{}) ([]byte, error) {
return json.MarshalIndent(value, "", " ")
}

type TaskStateOutputFn func(*connect.TaskState) ([]byte, error)

func OutputTaskStateAsJSON(taskState *connect.TaskState) ([]byte, error) {
b, err := DefaultMarshalIndent(taskState)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal task state")
}
return b, nil
}

func OutputTaskStateAsTable(taskState *connect.TaskState) ([]byte, error) {
var buff bytes.Buffer
t := table.NewWriter()
t.SetOutputMirror(&buff)
t.AppendHeader(table.Row{"ID", "WorkerID", "State", "Trace"})
t.AppendRow(table.Row{taskState.ID, taskState.WorkerID, taskState.State, taskState.Trace})
t.Render()
return buff.Bytes(), nil
}
46 changes: 46 additions & 0 deletions internal/ctl/tasks/restart.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package tasks

import (
"github.com/pkg/errors"
"github.com/spf13/cobra"
)

type RestartOptions struct {
*GenericOptions

TaskID int
}

func NewRestartCommand(options *GenericOptions) *cobra.Command {
var opts = RestartOptions{
GenericOptions: options,
}

var cmd = cobra.Command{
Use: "restart",
Short: "Restart an individual task for the specified connector.",
RunE: func(cmd *cobra.Command, args []string) error {
if err := opts.Validate(); err != nil {
return err
}
return opts.Run()
},
}

cmd.PersistentFlags().IntVarP(&opts.TaskID, "id", "", -1, "The ID of the task to restart")
_ = cmd.MarkPersistentFlagRequired("id")

return &cmd
}

func (o *RestartOptions) Run() error {
client, err := o.CreateClient(o.ClusterURL)
if err != nil {
return errors.Wrap(err, "failed to create http client")
}
_, err = client.RestartConnectorTask(o.ConnectorName, o.TaskID)
if err != nil {
return errors.Wrapf(err, "failed to restart task '%d' for connector '%s'", o.TaskID, o.ConnectorName)
}
return nil
}
76 changes: 76 additions & 0 deletions internal/ctl/tasks/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package tasks

import (
"fmt"
"github.com/90poe/connectctl/pkg/client/connect"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"io"
)

type StatusOptions struct {
*GenericOptions

TaskID int
Output string
}

func NewStatusCommand(options *GenericOptions) *cobra.Command {
var opts = StatusOptions{
GenericOptions: options,
}

var cmd = cobra.Command{
Use: "status",
Short: "Displays a status by individual task currently running for the connector.",
RunE: func(cmd *cobra.Command, args []string) error {
if err := opts.Validate(); err != nil {
return err
}
return opts.Run(cmd.OutOrStdout())
},
}

cmd.PersistentFlags().StringVarP(&opts.Output, "output", "o", "json", "The output format. Possible values are 'json' and 'table'")

cmd.PersistentFlags().IntVarP(&opts.TaskID, "id", "", -1, "The ID of the task to get status for")
_ = cmd.MarkPersistentFlagRequired("id")

return &cmd
}

func (o *StatusOptions) Run(out io.Writer) error {
client, err := o.CreateClient(o.ClusterURL)
if err != nil {
return errors.Wrap(err, "failed to create http client")
}
taskState, _, err := client.GetConnectorTaskStatus(o.ConnectorName, o.TaskID)
if err != nil {
return errors.Wrap(err, "failed to get task status")
}
if taskState == nil {
return errors.New("task state response is nil")
}
return o.writeOutput(taskState, out)
}

func (o *StatusOptions) writeOutput(taskState *connect.TaskState, out io.Writer) error {
var outputFn TaskStateOutputFn
var outputType = OutputType(o.Output)
switch outputType {
case OutputTypeJSON:
outputFn = OutputTaskStateAsJSON
case OutputTypeTable:
outputFn = OutputTaskStateAsTable
default:
return fmt.Errorf("output type '%s' is not supported", o.Output)
}
output, err := outputFn(taskState)
if err != nil {
return fmt.Errorf("failed to form output for '%s' type", outputType)
}
if _, err := out.Write(output); err != nil {
return errors.Wrap(err, "failed to write output")
}
return nil
}
Loading

0 comments on commit 29ef9cb

Please sign in to comment.