Skip to content

Commit

Permalink
#7 kafka-connect-cli - add missing method update
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed May 17, 2017
1 parent 557e175 commit 6c2d49c
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 37 deletions.
97 changes: 62 additions & 35 deletions cmd/kafkaconnectcli/kafkaconnectcli.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ func main() {
CreateArgParser := NewArgParser("CreateArgParser")
CreateArgParser.withCommonArgs().withConfigArg()

UpdateArgParser := NewArgParser("CreateArgParser")
UpdateArgParser.withCommonArgs().withConnectorArg().withConfigArg()

ScaleArgParser := NewArgParser("ScaleArgParser")
ScaleArgParser.withCommonArgs().withConnectorArg().withTasksMaxArg()

Expand All @@ -196,6 +199,8 @@ func main() {
commandArgParser = CommonArgParser
case "create":
commandArgParser = CreateArgParser
case "update":
commandArgParser = UpdateArgParser
case "scale":
commandArgParser = ScaleArgParser
case "help":
Expand All @@ -211,6 +216,8 @@ func main() {
CreateArgParser.Flag.PrintDefaults()
case "scale":
ScaleArgParser.Flag.PrintDefaults()
case "update":
UpdateArgParser.Flag.PrintDefaults()
case "list":
ListArgParser.Flag.PrintDefaults()
case "delete-all", "plugins", "version":
Expand Down Expand Up @@ -243,6 +250,10 @@ func main() {
result, err = handleCreateCommand(client, args)
}

if UpdateArgParser.Flag.Parsed() {
result, err = handleUpdateCommand(client, args)
}

if ScaleArgParser.Flag.Parsed() {
result, err = handleScaleCommand(client, *args.connector, *args.tasks)
}
Expand Down Expand Up @@ -321,39 +332,14 @@ func handleListCommand(client connect.ConnectRestClient, state string) (result i

// handleCreateCommand executes "create" command.
func handleCreateCommand(client connect.ConnectRestClient, args CommandArgs) (result interface{}, e error) {
var jsonConfig []byte
result, e = client.Create(readConnectorConfig(args))
return
}

jsonString := *args.json
if jsonString != "" {
jsonConfig = []byte(jsonString)
}
jsonFile := *args.jsonFile
if jsonFile != "" {
file, err := ioutil.ReadFile(jsonFile)
if err != nil {
fmt.Fprintf(os.Stderr, "Error while reading config file '%s': %v\n", jsonFile, err)
os.Exit(1)
}
jsonConfig = file
}
propsFile := *args.propsFile
if propsFile != "" {
config, err := utils.ReadProps(propsFile)
if err != nil {
fmt.Fprintf(os.Stderr, "Error while reading config file '%s': %v\n", propsFile, err)
os.Exit(1)
}
name := config["name"]
delete(config, "name")
jsonConfig, _ = json.Marshal(connect.ConnectorConfig{Name: name, Config: config})
}
var config connect.ConnectorConfig
err := json.Unmarshal(jsonConfig, &config)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid configuration - error: %v\n", err)
os.Exit(1)
}
result, e = client.Create(config)
// handleUpdateCommand executes "update" command.
func handleUpdateCommand(client connect.ConnectRestClient, args CommandArgs) (result interface{}, e error) {
config := readConnectorConfig(args)
result, e = client.Update(connect.ConnectorConfig{Name: *args.connector, Config: config.Config})
return
}

Expand All @@ -364,9 +350,7 @@ func handleScaleCommand(client connect.ConnectRestClient, connector string, task
return nil, err
}
config.Config["tasks.max"] = strconv.Itoa(tasks)
jsonConfig, _ := json.Marshal(config.Config)

result, e = client.Update(connector, string(jsonConfig))
result, e = client.Update(connect.ConnectorConfig{Name: connector, Config: config.Config})
return
}

Expand All @@ -391,6 +375,49 @@ func handleCommonsCommand(command string, client connect.ConnectRestClient) (res
return
}

// readConnectorConfig reads a connector configuration from the specified arguments.
// Returns the configuration as map value-pairs.
func readConnectorConfig(args CommandArgs) (config connect.ConnectorConfig) {
jsonString := *args.json
if jsonString != "" {
e := json.Unmarshal([]byte(jsonString), &config)
if e != nil {
fmt.Fprintf(os.Stderr, "Invalid configuration - error: %v\n", e)
os.Exit(1)
}
}
jsonFile := *args.jsonFile
if jsonFile != "" {
file, err := ioutil.ReadFile(jsonFile)
if err != nil {
fmt.Fprintf(os.Stderr, "Error while reading config file '%s': %v\n", jsonFile, err)
os.Exit(1)
}
e := json.Unmarshal(file, &config)
if e != nil {
fmt.Fprintf(os.Stderr, "Invalid configuration - error: %v\n", e)
os.Exit(1)
}
}
propsFile := *args.propsFile
if propsFile != "" {
res, err := utils.ReadProps(propsFile)
if err != nil {
fmt.Fprintf(os.Stderr, "Error while reading config file '%s': %v\n", propsFile, err)
os.Exit(1)
}
name := res["name"]
if name != "" {
delete(res, "name")
} else {
fmt.Fprint(os.Stderr, "Missing required configuration field : 'name'")
os.Exit(1)
}
config = connect.ConnectorConfig{Name: name, Config: res}
}
return
}

func deleteConnector(client connect.ConnectRestClient, connector string) (e error) {
connectorTasks, e := client.GetConfig(connector)
if e == nil {
Expand Down
6 changes: 4 additions & 2 deletions connect/kafkaconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,10 @@ func (client *ConnectRestClient) Create(config ConnectorConfig) (r string, e err

// Update modifies the configuration for the specified connector name.
// Return a JSON string describing the new connector configuration.
func (client *ConnectRestClient) Update(connector string, config string) (r string, e error) {
response, e := requestAndGetResponse("PUT", client.connectEndPoint()+connector+"/config", &config)
func (client *ConnectRestClient) Update(config ConnectorConfig) (r string, e error) {
bytes, _ := json.Marshal(config.Config)
body := string(bytes)
response, e := requestAndGetResponse("PUT", client.connectEndPoint()+config.Name+"/config", &body)
if e == nil {
r = string(response)
}
Expand Down

0 comments on commit 6c2d49c

Please sign in to comment.