diff --git a/cmd/connectctl/main.go b/cmd/connectctl/main.go index c1114d9..2ab2025 100644 --- a/cmd/connectctl/main.go +++ b/cmd/connectctl/main.go @@ -3,6 +3,8 @@ package main import ( "fmt" + "github.com/90poe/connectctl/internal/ctl/tasks" + "github.com/90poe/connectctl/pkg/client/connect" "os" "strings" @@ -60,6 +62,12 @@ func main() { rootCmd.AddCommand(plugins.Command()) rootCmd.AddCommand(version.Command()) + rootCmd.AddCommand(tasks.Command(&tasks.GenericOptions{ + CreateClient: func(clusterURL string) (client tasks.Client, err error) { + return connect.NewClient(clusterURL) + }, + })) + cobra.OnInitialize(initConfig) if err := rootCmd.Execute(); err != nil { diff --git a/internal/ctl/tasks/get.go b/internal/ctl/tasks/get.go new file mode 100644 index 0000000..d80746c --- /dev/null +++ b/internal/ctl/tasks/get.go @@ -0,0 +1,78 @@ +package tasks + +import ( + "fmt" + "github.com/90poe/connectctl/pkg/client/connect" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "io" + "strconv" +) + +type GetOptions struct { + *GenericOptions + + TaskID int + Output string +} + +func NewGetCommand(options *GenericOptions) *cobra.Command { + var opts = GetOptions{ + GenericOptions: options, + } + + var cmd = cobra.Command{ + Use: "get", + Short: "Displays a single task currently running for the connector.", + RunE: func(cmd *cobra.Command, args []string) error { + if err := opts.Validate(); err != nil { + return err + } + return opts.Run(cmd.OutOrStdout()) + }, + } + + cmd.PersistentFlags().StringVarP(&opts.Output, "output", "o", "json", "The output format. Possible values are 'json' and 'table'") + + cmd.PersistentFlags().IntVarP(&opts.TaskID, "id", "", -1, "The ID of the task to get") + _ = cmd.MarkPersistentFlagRequired("id") + + return &cmd +} + +func (o *GetOptions) Run(out io.Writer) error { + client, err := o.CreateClient(o.ClusterURL) + if err != nil { + return errors.Wrap(err, "failed to create http client") + } + tasks, _, err := client.GetConnectorTasks(o.ConnectorName) + if err != nil { + return errors.Wrap(err, "failed to retrieve tasks") + } + task, ok := findTaskByID(tasks, o.TaskID) + if !ok { + return errors.New("no task found by id=" + strconv.Itoa(o.TaskID)) + } + return o.writeOutput(task, out) +} + +func (o *GetOptions) writeOutput(task connect.Task, out io.Writer) error { + var outputFn TaskListOutputFn + var outputType = OutputType(o.Output) + switch outputType { + case OutputTypeJSON: + outputFn = OutputTaskListAsJSON + case OutputTypeTable: + outputFn = OutputTaskListAsTable + default: + return fmt.Errorf("output type '%s' is not supported", o.Output) + } + output, err := outputFn([]connect.Task{task}) + if err != nil { + return fmt.Errorf("failed to form output for '%s' type", outputType) + } + if _, err := out.Write(output); err != nil { + return errors.Wrap(err, "failed to write output") + } + return nil +} diff --git a/internal/ctl/tasks/list.go b/internal/ctl/tasks/list.go new file mode 100644 index 0000000..4e3ab63 --- /dev/null +++ b/internal/ctl/tasks/list.go @@ -0,0 +1,74 @@ +package tasks + +import ( + "fmt" + "github.com/90poe/connectctl/pkg/client/connect" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "io" +) + +type ListOptions struct { + // Use pointer here to make reference to the 'tasks' options. + // Otherwise, the local copy of the config will be created an no + // value from the flags will be set. It is caused by the global + // scope nature of viper and multiple overrides of cluster flag + // from the other commands. + *GenericOptions + + Output string +} + +func NewListCommand(options *GenericOptions) *cobra.Command { + var opts = ListOptions{ + GenericOptions: options, + } + + var cmd = cobra.Command{ + Use: "list", + Short: "Displays a list of tasks currently running for the connector.", + RunE: func(cmd *cobra.Command, args []string) error { + if err := opts.Validate(); err != nil { + return err + } + return opts.Run(cmd.OutOrStdout()) + }, + } + + cmd.PersistentFlags().StringVarP(&opts.Output, "output", "o", "json", "The output format. Possible values are 'json' and 'table'") + + return &cmd +} + +func (o *ListOptions) Run(out io.Writer) error { + client, err := o.CreateClient(o.ClusterURL) + if err != nil { + return errors.Wrap(err, "failed to create http client") + } + tasks, _, err := client.GetConnectorTasks(o.ConnectorName) + if err != nil { + return errors.Wrap(err, "failed to retrieve tasks") + } + return o.writeOutput(tasks, out) +} + +func (o *ListOptions) writeOutput(tasks []connect.Task, out io.Writer) error { + var outputFn TaskListOutputFn + var outputType = OutputType(o.Output) + switch outputType { + case OutputTypeJSON: + outputFn = OutputTaskListAsJSON + case OutputTypeTable: + outputFn = OutputTaskListAsTable + default: + return fmt.Errorf("output type '%s' is not supported", o.Output) + } + output, err := outputFn(tasks) + if err != nil { + return fmt.Errorf("failed to form output for '%s' type", outputType) + } + if _, err := out.Write(output); err != nil { + return errors.Wrap(err, "failed to write output") + } + return nil +} diff --git a/internal/ctl/tasks/output.go b/internal/ctl/tasks/output.go new file mode 100644 index 0000000..964c312 --- /dev/null +++ b/internal/ctl/tasks/output.go @@ -0,0 +1,67 @@ +package tasks + +import ( + "bytes" + "encoding/json" + "github.com/90poe/connectctl/pkg/client/connect" + "github.com/jedib0t/go-pretty/table" + "github.com/pkg/errors" +) + +type OutputType string + +const ( + OutputTypeJSON OutputType = "json" + OutputTypeTable OutputType = "table" +) + +type TaskListOutputFn func([]connect.Task) ([]byte, error) + +func OutputTaskListAsJSON(tasks []connect.Task) ([]byte, error) { + b, err := DefaultMarshalIndent(tasks) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal tasks") + } + return b, nil +} + +func OutputTaskListAsTable(tasks []connect.Task) ([]byte, error) { + var buff bytes.Buffer + t := table.NewWriter() + t.Style().Options.SeparateRows = true + t.SetOutputMirror(&buff) + t.AppendHeader(table.Row{"Connector Name", "ID", "Config"}) + for _, task := range tasks { + configBytes, err := DefaultMarshalIndent(task.Config) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal config %v", task.Config) + } + t.AppendRow(table.Row{task.ID.ConnectorName, task.ID.ID, string(configBytes)}) + } + t.Render() + return buff.Bytes(), nil +} + +func DefaultMarshalIndent(value interface{}) ([]byte, error) { + return json.MarshalIndent(value, "", " ") +} + +type TaskStateOutputFn func(*connect.TaskState) ([]byte, error) + +func OutputTaskStateAsJSON(taskState *connect.TaskState) ([]byte, error) { + b, err := DefaultMarshalIndent(taskState) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal task state") + } + return b, nil +} + +func OutputTaskStateAsTable(taskState *connect.TaskState) ([]byte, error) { + var buff bytes.Buffer + t := table.NewWriter() + t.SetOutputMirror(&buff) + t.AppendHeader(table.Row{"ID", "WorkerID", "State", "Trace"}) + t.AppendRow(table.Row{taskState.ID, taskState.WorkerID, taskState.State, taskState.Trace}) + t.Render() + return buff.Bytes(), nil +} diff --git a/internal/ctl/tasks/restart.go b/internal/ctl/tasks/restart.go new file mode 100644 index 0000000..1570ab8 --- /dev/null +++ b/internal/ctl/tasks/restart.go @@ -0,0 +1,46 @@ +package tasks + +import ( + "github.com/pkg/errors" + "github.com/spf13/cobra" +) + +type RestartOptions struct { + *GenericOptions + + TaskID int +} + +func NewRestartCommand(options *GenericOptions) *cobra.Command { + var opts = RestartOptions{ + GenericOptions: options, + } + + var cmd = cobra.Command{ + Use: "restart", + Short: "Restart an individual task for the specified connector.", + RunE: func(cmd *cobra.Command, args []string) error { + if err := opts.Validate(); err != nil { + return err + } + return opts.Run() + }, + } + + cmd.PersistentFlags().IntVarP(&opts.TaskID, "id", "", -1, "The ID of the task to restart") + _ = cmd.MarkPersistentFlagRequired("id") + + return &cmd +} + +func (o *RestartOptions) Run() error { + client, err := o.CreateClient(o.ClusterURL) + if err != nil { + return errors.Wrap(err, "failed to create http client") + } + _, err = client.RestartConnectorTask(o.ConnectorName, o.TaskID) + if err != nil { + return errors.Wrapf(err, "failed to restart task '%d' for connector '%s'", o.TaskID, o.ConnectorName) + } + return nil +} diff --git a/internal/ctl/tasks/status.go b/internal/ctl/tasks/status.go new file mode 100644 index 0000000..c63b20e --- /dev/null +++ b/internal/ctl/tasks/status.go @@ -0,0 +1,76 @@ +package tasks + +import ( + "fmt" + "github.com/90poe/connectctl/pkg/client/connect" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "io" +) + +type StatusOptions struct { + *GenericOptions + + TaskID int + Output string +} + +func NewStatusCommand(options *GenericOptions) *cobra.Command { + var opts = StatusOptions{ + GenericOptions: options, + } + + var cmd = cobra.Command{ + Use: "status", + Short: "Displays a status by individual task currently running for the connector.", + RunE: func(cmd *cobra.Command, args []string) error { + if err := opts.Validate(); err != nil { + return err + } + return opts.Run(cmd.OutOrStdout()) + }, + } + + cmd.PersistentFlags().StringVarP(&opts.Output, "output", "o", "json", "The output format. Possible values are 'json' and 'table'") + + cmd.PersistentFlags().IntVarP(&opts.TaskID, "id", "", -1, "The ID of the task to get status for") + _ = cmd.MarkPersistentFlagRequired("id") + + return &cmd +} + +func (o *StatusOptions) Run(out io.Writer) error { + client, err := o.CreateClient(o.ClusterURL) + if err != nil { + return errors.Wrap(err, "failed to create http client") + } + taskState, _, err := client.GetConnectorTaskStatus(o.ConnectorName, o.TaskID) + if err != nil { + return errors.Wrap(err, "failed to get task status") + } + if taskState == nil { + return errors.New("task state response is nil") + } + return o.writeOutput(taskState, out) +} + +func (o *StatusOptions) writeOutput(taskState *connect.TaskState, out io.Writer) error { + var outputFn TaskStateOutputFn + var outputType = OutputType(o.Output) + switch outputType { + case OutputTypeJSON: + outputFn = OutputTaskStateAsJSON + case OutputTypeTable: + outputFn = OutputTaskStateAsTable + default: + return fmt.Errorf("output type '%s' is not supported", o.Output) + } + output, err := outputFn(taskState) + if err != nil { + return fmt.Errorf("failed to form output for '%s' type", outputType) + } + if _, err := out.Write(output); err != nil { + return errors.Wrap(err, "failed to write output") + } + return nil +} diff --git a/internal/ctl/tasks/tasks.go b/internal/ctl/tasks/tasks.go new file mode 100644 index 0000000..f433e6e --- /dev/null +++ b/internal/ctl/tasks/tasks.go @@ -0,0 +1,73 @@ +package tasks + +import ( + "github.com/90poe/connectctl/pkg/client/connect" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "net/http" + "net/url" + "strings" +) + +type ClientFn func(string) (Client, error) + +type Client interface { + GetConnectorTasks(name string) ([]connect.Task, *http.Response, error) + GetConnectorTaskStatus(name string, taskID int) (*connect.TaskState, *http.Response, error) + RestartConnectorTask(name string, taskID int) (*http.Response, error) +} + +type GenericOptions struct { + // Function for creating API client + CreateClient ClientFn + + ClusterURL string + ConnectorName string +} + +func Command(opts *GenericOptions) *cobra.Command { + var cmd = cobra.Command{ + Use: "tasks", + Short: "Commands related to kafka connector tasks", + RunE: func(cmd *cobra.Command, args []string) error { + if err := opts.Validate(); err != nil { + return err + } + // return 'help' by default + return cmd.Help() + }, + } + + // ClusterURL will be used in sub-commands + cmd.PersistentFlags().StringVarP(&opts.ClusterURL, "cluster", "c", "", "the URL of the connect cluster to manage (required)") + _ = cmd.MarkPersistentFlagRequired("cluster") + + // ConnectorName will be used in sub-commands + cmd.PersistentFlags().StringVar(&opts.ConnectorName, "connector", "", "Connector name to get tasks for") + _ = cmd.MarkPersistentFlagRequired("connector") + + // connectctl tasks list --cluster=... --connector=... + cmd.AddCommand(NewListCommand(opts)) + + // connectctl task get --cluster=... --connector=... --id=... + cmd.AddCommand(NewGetCommand(opts)) + + // connectctl task restart --cluster=... --connector=... --id=... + cmd.AddCommand(NewRestartCommand(opts)) + + // connectctl task status --cluster=... --connector=... --id=... + cmd.AddCommand(NewStatusCommand(opts)) + + return &cmd +} + +func (o *GenericOptions) Validate() error { + _, err := url.ParseRequestURI(o.ClusterURL) + if err != nil { + return errors.Wrap(err, "--cluster is not a valid URI") + } + if len(strings.TrimSpace(o.ConnectorName)) == 0 { + return errors.New("--connector name is empty") + } + return nil +} diff --git a/internal/ctl/tasks/tasks_test.go b/internal/ctl/tasks/tasks_test.go new file mode 100644 index 0000000..1cc4a11 --- /dev/null +++ b/internal/ctl/tasks/tasks_test.go @@ -0,0 +1,71 @@ +package tasks + +import ( + "github.com/pkg/errors" + "testing" +) + +func TestGenericOptions_Validate(t *testing.T) { + f := func(opts GenericOptions, expectedErr error) { + err := opts.Validate() + if err != nil && errors.Cause(err) == expectedErr { + t.Fatalf("expected %#v, got %#v", expectedErr, err) + } + } + + const ( + clusterURL = "http://localhost:8083" + connectorName = "connector" + ) + + var ( + clusterErr = errors.New("--cluster is not a valid URI") + connectorErr = errors.New("--connector name is empty") + ) + + f( + GenericOptions{ + ClusterURL: "", + }, + clusterErr, + ) + f( + GenericOptions{ + ConnectorName: "", + }, + clusterErr, + ) + f( + GenericOptions{ + ClusterURL: "simple:string", + }, + clusterErr, + ) + f( + GenericOptions{ + ClusterURL: clusterURL, + }, + connectorErr, + ) + f( + GenericOptions{ + ClusterURL: clusterURL, + ConnectorName: connectorName, + }, + nil, + ) + f( + GenericOptions{ + ClusterURL: "www.google.com", + ConnectorName: connectorName, + }, + clusterErr, + ) + f( + GenericOptions{ + ClusterURL: "https://www.google.com", + ConnectorName: connectorName, + }, + nil, + ) +} diff --git a/internal/ctl/tasks/utils.go b/internal/ctl/tasks/utils.go new file mode 100644 index 0000000..3b6b90d --- /dev/null +++ b/internal/ctl/tasks/utils.go @@ -0,0 +1,14 @@ +package tasks + +import ( + "github.com/90poe/connectctl/pkg/client/connect" +) + +func findTaskByID(tasks []connect.Task, id int) (connect.Task, bool) { + for _, t := range tasks { + if t.ID.ID == id { + return t, true + } + } + return connect.Task{}, false +} diff --git a/internal/ctl/tasks/utils_test.go b/internal/ctl/tasks/utils_test.go new file mode 100644 index 0000000..0613963 --- /dev/null +++ b/internal/ctl/tasks/utils_test.go @@ -0,0 +1,87 @@ +package tasks + +import ( + "github.com/90poe/connectctl/pkg/client/connect" + "reflect" + "testing" +) + +func TestFindTaskByID(t *testing.T) { + f := func(tasks []connect.Task, id int, expected connect.Task, found bool) { + t.Helper() + task, ok := findTaskByID(tasks, id) + if ok != found { + t.Fatalf("expected '%t', got '%t'", found, ok) + } + if !reflect.DeepEqual(task, expected) { + t.Fatalf("expected %#v, got %#v", expected, task) + } + } + + empty := connect.Task{} + + tasks := []connect.Task{ + { + ID: connect.TaskID{ + ConnectorName: "a", + ID: 1, + }, + Config: nil, + }, + { + ID: connect.TaskID{ + ConnectorName: "b", + ID: 3, + }, + Config: nil, + }, + { + ID: connect.TaskID{ + ConnectorName: "c", + ID: 2, + }, + Config: nil, + }, + } + + f(nil, 1, empty, false) + f([]connect.Task{}, 1, empty, false) + f( + tasks, + 1, + connect.Task{ + ID: connect.TaskID{ + ConnectorName: "a", + ID: 1, + }, + Config: nil, + }, + true, + ) + f( + tasks, + 3, + connect.Task{ + ID: connect.TaskID{ + ConnectorName: "b", + ID: 3, + }, + Config: nil, + }, + true, + ) + f( + tasks, + 2, + connect.Task{ + ID: connect.TaskID{ + ConnectorName: "c", + ID: 2, + }, + Config: nil, + }, + true, + ) + f(tasks, -1, empty, false) + f(tasks, 4, empty, false) +}