Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement task command #78

Merged
merged 1 commit into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/connectctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main

import (
"fmt"
"github.com/90poe/connectctl/internal/ctl/tasks"
"github.com/90poe/connectctl/pkg/client/connect"
"os"
"strings"

Expand Down Expand Up @@ -60,6 +62,12 @@ func main() {
rootCmd.AddCommand(plugins.Command())
rootCmd.AddCommand(version.Command())

rootCmd.AddCommand(tasks.Command(&tasks.GenericOptions{
CreateClient: func(clusterURL string) (client tasks.Client, err error) {
return connect.NewClient(clusterURL)
},
}))

cobra.OnInitialize(initConfig)

if err := rootCmd.Execute(); err != nil {
Expand Down
78 changes: 78 additions & 0 deletions internal/ctl/tasks/get.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package tasks

import (
"fmt"
"github.com/90poe/connectctl/pkg/client/connect"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"io"
"strconv"
)

type GetOptions struct {
*GenericOptions

TaskID int
Output string
}

func NewGetCommand(options *GenericOptions) *cobra.Command {
var opts = GetOptions{
GenericOptions: options,
}

var cmd = cobra.Command{
Use: "get",
Short: "Displays a single task currently running for the connector.",
RunE: func(cmd *cobra.Command, args []string) error {
if err := opts.Validate(); err != nil {
return err
}
return opts.Run(cmd.OutOrStdout())
},
}

cmd.PersistentFlags().StringVarP(&opts.Output, "output", "o", "json", "The output format. Possible values are 'json' and 'table'")

cmd.PersistentFlags().IntVarP(&opts.TaskID, "id", "", -1, "The ID of the task to get")
_ = cmd.MarkPersistentFlagRequired("id")

return &cmd
}

func (o *GetOptions) Run(out io.Writer) error {
client, err := o.CreateClient(o.ClusterURL)
if err != nil {
return errors.Wrap(err, "failed to create http client")
}
tasks, _, err := client.GetConnectorTasks(o.ConnectorName)
if err != nil {
return errors.Wrap(err, "failed to retrieve tasks")
}
task, ok := findTaskByID(tasks, o.TaskID)
if !ok {
return errors.New("no task found by id=" + strconv.Itoa(o.TaskID))
}
return o.writeOutput(task, out)
}

func (o *GetOptions) writeOutput(task connect.Task, out io.Writer) error {
var outputFn TaskListOutputFn
var outputType = OutputType(o.Output)
switch outputType {
case OutputTypeJSON:
outputFn = OutputTaskListAsJSON
case OutputTypeTable:
outputFn = OutputTaskListAsTable
default:
return fmt.Errorf("output type '%s' is not supported", o.Output)
}
output, err := outputFn([]connect.Task{task})
if err != nil {
return fmt.Errorf("failed to form output for '%s' type", outputType)
}
if _, err := out.Write(output); err != nil {
return errors.Wrap(err, "failed to write output")
}
return nil
}
74 changes: 74 additions & 0 deletions internal/ctl/tasks/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package tasks

import (
"fmt"
"github.com/90poe/connectctl/pkg/client/connect"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"io"
)

type ListOptions struct {
// Use pointer here to make reference to the 'tasks' options.
// Otherwise, the local copy of the config will be created an no
// value from the flags will be set. It is caused by the global
// scope nature of viper and multiple overrides of cluster flag
// from the other commands.
*GenericOptions

Output string
}

func NewListCommand(options *GenericOptions) *cobra.Command {
var opts = ListOptions{
GenericOptions: options,
}

var cmd = cobra.Command{
Use: "list",
Short: "Displays a list of tasks currently running for the connector.",
RunE: func(cmd *cobra.Command, args []string) error {
if err := opts.Validate(); err != nil {
return err
}
return opts.Run(cmd.OutOrStdout())
},
}

cmd.PersistentFlags().StringVarP(&opts.Output, "output", "o", "json", "The output format. Possible values are 'json' and 'table'")

return &cmd
}

func (o *ListOptions) Run(out io.Writer) error {
client, err := o.CreateClient(o.ClusterURL)
if err != nil {
return errors.Wrap(err, "failed to create http client")
}
tasks, _, err := client.GetConnectorTasks(o.ConnectorName)
if err != nil {
return errors.Wrap(err, "failed to retrieve tasks")
}
return o.writeOutput(tasks, out)
}

func (o *ListOptions) writeOutput(tasks []connect.Task, out io.Writer) error {
var outputFn TaskListOutputFn
var outputType = OutputType(o.Output)
switch outputType {
case OutputTypeJSON:
outputFn = OutputTaskListAsJSON
case OutputTypeTable:
outputFn = OutputTaskListAsTable
default:
return fmt.Errorf("output type '%s' is not supported", o.Output)
}
output, err := outputFn(tasks)
if err != nil {
return fmt.Errorf("failed to form output for '%s' type", outputType)
}
if _, err := out.Write(output); err != nil {
return errors.Wrap(err, "failed to write output")
}
return nil
}
67 changes: 67 additions & 0 deletions internal/ctl/tasks/output.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package tasks

import (
"bytes"
"encoding/json"
"github.com/90poe/connectctl/pkg/client/connect"
"github.com/jedib0t/go-pretty/table"
"github.com/pkg/errors"
)

type OutputType string

const (
OutputTypeJSON OutputType = "json"
OutputTypeTable OutputType = "table"
)

type TaskListOutputFn func([]connect.Task) ([]byte, error)

func OutputTaskListAsJSON(tasks []connect.Task) ([]byte, error) {
b, err := DefaultMarshalIndent(tasks)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal tasks")
}
return b, nil
}

func OutputTaskListAsTable(tasks []connect.Task) ([]byte, error) {
var buff bytes.Buffer
t := table.NewWriter()
t.Style().Options.SeparateRows = true
t.SetOutputMirror(&buff)
t.AppendHeader(table.Row{"Connector Name", "ID", "Config"})
for _, task := range tasks {
configBytes, err := DefaultMarshalIndent(task.Config)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal config %v", task.Config)
}
t.AppendRow(table.Row{task.ID.ConnectorName, task.ID.ID, string(configBytes)})
}
t.Render()
return buff.Bytes(), nil
}

func DefaultMarshalIndent(value interface{}) ([]byte, error) {
return json.MarshalIndent(value, "", " ")
}

type TaskStateOutputFn func(*connect.TaskState) ([]byte, error)

func OutputTaskStateAsJSON(taskState *connect.TaskState) ([]byte, error) {
b, err := DefaultMarshalIndent(taskState)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal task state")
}
return b, nil
}

func OutputTaskStateAsTable(taskState *connect.TaskState) ([]byte, error) {
var buff bytes.Buffer
t := table.NewWriter()
t.SetOutputMirror(&buff)
t.AppendHeader(table.Row{"ID", "WorkerID", "State", "Trace"})
t.AppendRow(table.Row{taskState.ID, taskState.WorkerID, taskState.State, taskState.Trace})
t.Render()
return buff.Bytes(), nil
}
46 changes: 46 additions & 0 deletions internal/ctl/tasks/restart.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package tasks

import (
"github.com/pkg/errors"
"github.com/spf13/cobra"
)

type RestartOptions struct {
*GenericOptions

TaskID int
}

func NewRestartCommand(options *GenericOptions) *cobra.Command {
var opts = RestartOptions{
GenericOptions: options,
}

var cmd = cobra.Command{
Use: "restart",
Short: "Restart an individual task for the specified connector.",
RunE: func(cmd *cobra.Command, args []string) error {
if err := opts.Validate(); err != nil {
return err
}
return opts.Run()
},
}

cmd.PersistentFlags().IntVarP(&opts.TaskID, "id", "", -1, "The ID of the task to restart")
_ = cmd.MarkPersistentFlagRequired("id")

return &cmd
}

func (o *RestartOptions) Run() error {
client, err := o.CreateClient(o.ClusterURL)
if err != nil {
return errors.Wrap(err, "failed to create http client")
}
_, err = client.RestartConnectorTask(o.ConnectorName, o.TaskID)
if err != nil {
return errors.Wrapf(err, "failed to restart task '%d' for connector '%s'", o.TaskID, o.ConnectorName)
}
return nil
}
76 changes: 76 additions & 0 deletions internal/ctl/tasks/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package tasks

import (
"fmt"
"github.com/90poe/connectctl/pkg/client/connect"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"io"
)

type StatusOptions struct {
*GenericOptions

TaskID int
Output string
}

func NewStatusCommand(options *GenericOptions) *cobra.Command {
var opts = StatusOptions{
GenericOptions: options,
}

var cmd = cobra.Command{
Use: "status",
Short: "Displays a status by individual task currently running for the connector.",
RunE: func(cmd *cobra.Command, args []string) error {
if err := opts.Validate(); err != nil {
return err
}
return opts.Run(cmd.OutOrStdout())
},
}

cmd.PersistentFlags().StringVarP(&opts.Output, "output", "o", "json", "The output format. Possible values are 'json' and 'table'")

cmd.PersistentFlags().IntVarP(&opts.TaskID, "id", "", -1, "The ID of the task to get status for")
_ = cmd.MarkPersistentFlagRequired("id")

return &cmd
}

func (o *StatusOptions) Run(out io.Writer) error {
client, err := o.CreateClient(o.ClusterURL)
if err != nil {
return errors.Wrap(err, "failed to create http client")
}
taskState, _, err := client.GetConnectorTaskStatus(o.ConnectorName, o.TaskID)
if err != nil {
return errors.Wrap(err, "failed to get task status")
}
if taskState == nil {
return errors.New("task state response is nil")
}
return o.writeOutput(taskState, out)
}

func (o *StatusOptions) writeOutput(taskState *connect.TaskState, out io.Writer) error {
var outputFn TaskStateOutputFn
var outputType = OutputType(o.Output)
switch outputType {
case OutputTypeJSON:
outputFn = OutputTaskStateAsJSON
case OutputTypeTable:
outputFn = OutputTaskStateAsTable
default:
return fmt.Errorf("output type '%s' is not supported", o.Output)
}
output, err := outputFn(taskState)
if err != nil {
return fmt.Errorf("failed to form output for '%s' type", outputType)
}
if _, err := out.Write(output); err != nil {
return errors.Wrap(err, "failed to write output")
}
return nil
}
Loading