Skip to content

Commit

Permalink
Merge pull request #82 from 90poe/issue-12/connectors-status-cmd
Browse files Browse the repository at this point in the history
feat: Connectors status cmd
  • Loading branch information
andrzejWilde authored Apr 20, 2020
2 parents 293da1d + c1379a2 commit d75ad86
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/cli/connectctl_connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ None, all options are at the subcommand level
### SEE ALSO

* [connectctl](connectctl.md) - connectctl: work with Kafka Connect easily
* [connectctl connectors status](connectctl_connectors_status.md) - Connectors status
* [connectctl connectors add](connectctl_connectors_add.md) - Add connectors
* [connectctl connectors remove](connectctl_connectors_remove.md) - Remove connectors
* [connectctl connectors list](connectctl_connectors_list.md) - List connectors
Expand Down
37 changes: 37 additions & 0 deletions docs/cli/connectctl_connectors_status.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
## connectctl connectors status

Status of connectors

### Synopsis


Display status of selected connectors.
If some tasks or connectors are failing, command will exit with code 1.


```
connectctl connectors status [flags]
```

### Options

```
-h, --help help for add
-c, --clusterURL the url of the kafka connect cluster
-n, --connectors the names of the connectors. Multiple connector names
can be specified either by comma separating conn1,conn2
or by repeating the flag --n conn1 --n conn2. If no name is
supplied status of ALL connectors will be displayed.
-o, --output specify the output format (valid options: json, table) (default "json")
-q, --quiet disable output logging
```
### Options inherited from parent commands

```
-l, --loglevel loglevel Specify the loglevel for the program (default info)
--logfile Specify a file to output logs to
```

### SEE ALSO

* [connectctl connectors](connectctl_connectors.md) - Manage connectors
6 changes: 6 additions & 0 deletions docs/cli/connectctl_version.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ Prints version information of connectctl
connectctl version
```

### Options

```
-h, --help help for version
-c, --clusterURL the url of the kafka connect cluster
```
### SEE ALSO

* [connectctl](connectctl.md) - connectctl: work with Kafka Connect easily
1 change: 1 addition & 0 deletions internal/ctl/connectors/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func Command() *cobra.Command {
connectorsCmd.AddCommand(removeConnectorCmd())
connectorsCmd.AddCommand(pauseConnectorsCmd())
connectorsCmd.AddCommand(resumeConnectorsCmd())
connectorsCmd.AddCommand(connectorsStatusCmd())

return connectorsCmd
}
149 changes: 149 additions & 0 deletions internal/ctl/connectors/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package connectors

import (
"encoding/json"
"fmt"
"os"

"github.com/90poe/connectctl/internal/ctl"
"github.com/90poe/connectctl/internal/version"
"github.com/90poe/connectctl/pkg/client/connect"
"github.com/90poe/connectctl/pkg/manager"
"github.com/jedib0t/go-pretty/table"
"github.com/pkg/errors"

"github.com/spf13/cobra"
)

type connectorsStatusCmdParams struct {
ClusterURL string
Connectors []string
Output string
Quiet bool
}

func connectorsStatusCmd() *cobra.Command {
params := &connectorsStatusCmdParams{}

statusCmd := &cobra.Command{
Use: "status",
Short: "Get status for connectors in a cluster",
Long: "",
RunE: func(cmd *cobra.Command, _ []string) error {
return doConnectorsStatus(cmd, params)
},
}

ctl.AddCommonConnectorsFlags(statusCmd, &params.ClusterURL)
ctl.AddConnectorNamesFlags(statusCmd, &params.Connectors)
ctl.AddOutputFlags(statusCmd, &params.Output)
ctl.AddQuietFlag(statusCmd, &params.Quiet)

return statusCmd
}

func doConnectorsStatus(_ *cobra.Command, params *connectorsStatusCmdParams) error {
config := &manager.Config{
ClusterURL: params.ClusterURL,
Version: version.Version,
}

userAgent := fmt.Sprintf("90poe.io/connectctl/%s", version.Version)

client, err := connect.NewClient(params.ClusterURL, connect.WithUserAgent(userAgent))
if err != nil {
return errors.Wrap(err, "error creating connect client")
}

mngr, err := manager.NewConnectorsManager(client, config)
if err != nil {
return errors.Wrap(err, "error creating connectors manager")
}

statusList, err := mngr.Status(params.Connectors)
if err != nil {
return errors.Wrap(err, "error getting connectors status")
}

if !params.Quiet {
switch params.Output {
case "json":
if err = printAsJSON(statusList); err != nil {
return errors.Wrap(err, "error printing connectors status as JSON")
}

case "table":
printAsTable(statusList)

default:
return fmt.Errorf("invalid output format specified: %s", params.Output)
}
}

failingConnectors, failingTasks := countFailing(statusList)

if failingConnectors != 0 || failingTasks != 0 {
return fmt.Errorf("%d connectors are failng, %d tasks are failing", failingConnectors, failingTasks)
}

return nil
}

func countFailing(statusList []*connect.ConnectorStatus) (int, int) {
connectorCount := 0
taskCount := 0

for _, status := range statusList {
if status.Connector.State == "FAILED" {
connectorCount++
}

taskCount += countFailingTasks(&status.Tasks)
}

return connectorCount, taskCount
}

func countFailingTasks(tasks *[]connect.TaskState) int {
count := 0

for _, task := range *tasks {
if task.State == "FAILED" {
count++
}
}

return count
}

func printAsJSON(statusList []*connect.ConnectorStatus) error {
b, err := json.MarshalIndent(statusList, "", " ")
if err != nil {
return err
}

os.Stdout.Write(b)
return nil
}

func printAsTable(statusList []*connect.ConnectorStatus) {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"Name", "State", "WorkerId", "Tasks"})

for _, status := range statusList {
tasks := ""
for _, task := range status.Tasks {
tasks += fmt.Sprintf("%d(%s): %s\n", task.ID, task.WorkerID, task.State)
}

t.AppendRow(table.Row{
status.Name,
status.Connector.State,
status.Connector.WorkerID,
tasks,
})
}

t.Render()
}
45 changes: 45 additions & 0 deletions internal/ctl/connectors/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package connectors

import (
"testing"

"github.com/90poe/connectctl/pkg/client/connect"
"github.com/stretchr/testify/require"
)

func TestCountFailing(t *testing.T) {
statusList := []*connect.ConnectorStatus{
{
Connector: connect.ConnectorState{
State: "RUNNING",
},
Tasks: []connect.TaskState{
{
State: "RUNNING",
},
{
State: "FAILED",
},
},
},
{
Connector: connect.ConnectorState{
State: "FAILED",
},
Tasks: []connect.TaskState{
{
State: "FAILED",
},
{
State: "FAILED",
},
},
},
}

connectorsFailing, tasksFailing := countFailing(statusList)

require.Equal(t, 1, connectorsFailing)
require.Equal(t, 3, tasksFailing)

}
14 changes: 12 additions & 2 deletions internal/ctl/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func AddClusterFlag(cmd *cobra.Command, required bool, clusterURL *string) {
description := "the URL of the connect cluster to manage"
description := "the URL of the connect cluster"

if required {
description = requiredDescription(&description)
Expand All @@ -27,14 +27,18 @@ func AddOutputFlags(cmd *cobra.Command, output *string) {
BindStringVarP(cmd.Flags(), output, "json", "output", "o", "specify the output format (valid options: json, table)")
}

func AddQuietFlag(cmd *cobra.Command, quiet *bool) {
BindBoolVarP(cmd.Flags(), quiet, false, "quiet", "q", "disable output logging")
}

func AddDefinitionFilesFlags(cmd *cobra.Command, files *[]string, directory *string, env *string) {
BindStringArrayVarP(cmd.Flags(), files, []string{}, "files", "f", "the connector definitions files (Required if --directory or --env-var not specified)")
BindStringVarP(cmd.Flags(), directory, "", "directory", "d", "the directory containing the connector definitions files (Required if --file or --env-vars not specified)")
BindStringVarP(cmd.Flags(), env, "", "env-var", "e", "an environmental variable whose value is a singular or array of connectors serialised as JSON (Required if --files or --directory not specified)")
}

func AddConnectorNamesFlags(cmd *cobra.Command, names *[]string) {
BindStringArrayVarP(cmd.Flags(), names, []string{}, "connectors", "n", "The connect names to restart (if not specified all connectors will be restarted)")
BindStringArrayVarP(cmd.Flags(), names, []string{}, "connectors", "n", "The connect names to perform action on (if not specified action will be performed on all connectors)")
}

func BindDurationVarP(f *pflag.FlagSet, p *time.Duration, value time.Duration, long, short, description string) {
Expand All @@ -55,6 +59,12 @@ func BindBoolVar(f *pflag.FlagSet, p *bool, value bool, long, description string
viper.SetDefault(long, value)
}

func BindBoolVarP(f *pflag.FlagSet, p *bool, value bool, long, short, description string) {
f.BoolVarP(p, long, short, value, description)
_ = viper.BindPFlag(long, f.Lookup(long))
viper.SetDefault(long, value)
}

func BindStringVarP(f *pflag.FlagSet, p *string, value, long, short, description string) {
f.StringVarP(p, long, short, value, description)
_ = viper.BindPFlag(long, f.Lookup(long))
Expand Down
39 changes: 39 additions & 0 deletions pkg/manager/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package manager

import (
"github.com/90poe/connectctl/pkg/client/connect"
"github.com/pkg/errors"
)

// Status - gets status of specified (or all) connectors
func (c *ConnectorManager) Status(connectors []string) ([]*connect.ConnectorStatus, error) {
if len(connectors) == 0 {
return c.allConnectorsStatus()
}

return c.specifiedConnectorsStatus(connectors)
}

func (c *ConnectorManager) allConnectorsStatus() ([]*connect.ConnectorStatus, error) {
existing, _, err := c.client.ListConnectors()
if err != nil {
return nil, errors.Wrap(err, "error listing connectors")
}

return c.specifiedConnectorsStatus(existing)
}

func (c *ConnectorManager) specifiedConnectorsStatus(connectors []string) ([]*connect.ConnectorStatus, error) {
statusList := make([]*connect.ConnectorStatus, len(connectors))

for idx, connectorName := range connectors {
status, _, err := c.client.GetConnectorStatus(connectorName)
if err != nil {
return nil, errors.Wrapf(err, "error getting connector status for %s", connectorName)
}

statusList[idx] = status
}

return statusList, nil
}

0 comments on commit d75ad86

Please sign in to comment.