Skip to content

Commit

Permalink
#3 display connector configuration before deleting
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Apr 30, 2017
1 parent 1019193 commit 04bacb4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
17 changes: 13 additions & 4 deletions cmd/kafkaconnectcli/kafkaconnectcli.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func main() {
case "status":
utils.PrintJson(client.Status(conn), *args.pretty)
case "delete":
client.Delete(conn)
deleteConnector(client, conn)
case "resume":
client.Resume(conn)
case "pause":
Expand Down Expand Up @@ -293,9 +293,9 @@ func main() {
}
name := config["name"]
delete(config, "name")
jsonConfig, _ = json.Marshal(connect.Config{Name: name, Config: config})
jsonConfig, _ = json.Marshal(connect.ConnectorConfig{Name: name, Config: config})
}
var config connect.Config
var config connect.ConnectorConfig
fmt.Println(string(jsonConfig))
err := json.Unmarshal(jsonConfig, &config)
if err != nil {
Expand All @@ -321,13 +321,22 @@ func main() {
case "delete-all":
matchConnectors := findMatchingConnectors(client, func(_ string) bool { return true })
for _, conn := range matchConnectors {
client.Delete(conn)
deleteConnector(client, conn)
}
}
}
os.Exit(0)
}

func deleteConnector(client connect.ConnectRestClient, connector string) {
fmt.Fprintf(os.Stdin, "\nCurrent configuration for connector %s\n\n", connector)
connectorTasks := client.GetConfig(connector)
config, _ := json.Marshal(connect.ConnectorConfig{Name: connectorTasks.Name, Config: connectorTasks.Config})
utils.PrintJson(string(config), true)
fmt.Fprint(os.Stdin, "\nSave this to use as the `-config.json` option during rollback connector\n\n")
client.Delete(connector)
}

func findMatchingConnectors(client connect.ConnectRestClient, fn func(string) bool) []string {
var matchConnectors []string
for _, conn := range client.List() {
Expand Down
21 changes: 13 additions & 8 deletions connect/kafkaconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// ConnectorStatus describes the configuration of a connector and its tasks.
type ConnectorConfig struct {
type ConnectorTasksConfig struct {
Name string `json:"name"`
Config map[string]string `json:"config"`
Tasks []struct {
Expand All @@ -48,7 +48,7 @@ type ConnectorStatus struct {
} `json:"tasks"`
}

type Config struct {
type ConnectorConfig struct {
Name string `json:"name"`
Config map[string]string `json:"config"`
}
Expand Down Expand Up @@ -126,9 +126,9 @@ func (client *ConnectRestClient) Tasks(connector string) string {

// GetConfig retrieves the configuration for the specified connector.
// Return a new ConnectorConfig struct.
func (client *ConnectRestClient) GetConfig(connector string) ConnectorConfig {
func (client *ConnectRestClient) GetConfig(connector string) ConnectorTasksConfig {
response := sendGetResponse("GET", client.connectEndPoint()+connector, "")
var config ConnectorConfig
var config ConnectorTasksConfig
err := json.Unmarshal([]byte(response), &config)
if err != nil {
panic(err)
Expand All @@ -144,8 +144,10 @@ func (client *ConnectRestClient) Pause(connector string) {

// Delete deletes all tasks for the specified connector name.
func (client *ConnectRestClient) Delete(connector string) {
fmt.Fprintf(os.Stdin, "Deleting connector %s \n", connector)
send("DELETE", client.connectEndPoint()+connector)
statusCode := send("DELETE", client.connectEndPoint()+connector)
if statusCode == 204 {
fmt.Fprintf(os.Stdin, "Successfully deleted connector %s \n", connector)
}
}

// Resume resumes all tasks for the specified connector name.
Expand All @@ -162,7 +164,7 @@ func (client *ConnectRestClient) Restart(connector string, id int) {

// Create submit a new connector configuration.
// Return a JSON string describing the new connector configuration.
func (client *ConnectRestClient) Create(config Config) string {
func (client *ConnectRestClient) Create(config ConnectorConfig) string {
body, _ := json.Marshal(config)
return sendGetResponse("POST", client.connectEndPoint(), string(body))
}
Expand All @@ -186,7 +188,7 @@ func sendGetResponse(method string, url string, content string) string {
return string(body)
}

func send(method string, url string) {
func send(method string, url string) (statusCode int) {
req, err := http.NewRequest(method, url, bytes.NewBuffer(nil))

client := &http.Client{}
Expand All @@ -195,4 +197,7 @@ func send(method string, url string) {
panic(err)
}
defer resp.Body.Close()

statusCode = resp.StatusCode
return
}

0 comments on commit 04bacb4

Please sign in to comment.