From a5331ec5cb392a5361207de52ac86e03a8502d24 Mon Sep 17 00:00:00 2001 From: chris-vest Date: Wed, 20 May 2020 12:19:44 +0200 Subject: [PATCH] Add RestartTask function --- connectors.go | 10 +++++- connectors_test.go | 84 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) diff --git a/connectors.go b/connectors.go index 12b99af..b6e9a07 100644 --- a/connectors.go +++ b/connectors.go @@ -175,10 +175,18 @@ func (c *Client) ResumeConnector(name string) (*http.Response, error) { return c.doRequest("PUT", path, nil, nil) } -// RestartConnector restarts a connector and its tasks. +// RestartConnector restarts a connector // // See http://docs.confluent.io/current/connect/userguide.html#post--connectors-(string-name)-restart func (c *Client) RestartConnector(name string) (*http.Response, error) { path := fmt.Sprintf("connectors/%v/restart", name) return c.doRequest("POST", path, nil, nil) } + +// RestartTask restarts a connector's task by task id +// +// See https://docs.confluent.io/current/connect/references/restapi.html#post--connectors-(string-name)-tasks-(int-taskid)-restart +func (c *Client) RestartTask(name string, id int) (*http.Response, error) { + path := fmt.Sprintf("connectors/%v/tasks/%v/restart", name, id) + return c.doRequest("POST", path, nil, nil) +} diff --git a/connectors_test.go b/connectors_test.go index 8c9a484..fd044f5 100644 --- a/connectors_test.go +++ b/connectors_test.go @@ -560,4 +560,88 @@ var _ = Describe("Connector Lifecycle", func() { }) }) }) + + Describe("RestartTask", func() { + var statusCode int + taskID := 0 + + BeforeEach(func() { + server.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyHeader(jsonAcceptHeader), + ghttp.RespondWithPtr(&statusCode, nil), + ), + ) + }) + + Context("when existing connector name is given", func() { + BeforeEach(func() { + statusCode = http.StatusOK + server.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("POST", "/connectors/local-file-source/tasks/0/restart"), + ), + ) + }) + + It("restarts connector", func() { + resp, err := client.RestartTask("local-file-source", taskID) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + }) + + Context("when rebalance is in process", func() { + BeforeEach(func() { + statusCode = http.StatusConflict + server.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("POST", "/connectors/local-file-source/tasks/0/restart"), + ), + ) + }) + + It("returns error with a conflict response", func() { + resp, err := client.RestartTask("local-file-source", taskID) + Expect(err).To(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusConflict)) + }) + }) + }) + + Context("when nonexisting connector name is given", func() { + BeforeEach(func() { + // The API actually throws a 500 on POST to nonexistent + statusCode = http.StatusInternalServerError + server.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("POST", "/connectors/local-file-source/tasks/0/restart"), + ), + ) + }) + + It("returns error with a server error response", func() { + resp, err := client.RestartTask("local-file-source", taskID) + Expect(err).To(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError)) + }) + }) + + Context("when nonexisting task id is given", func() { + BeforeEach(func() { + // The API actually throws a 500 on POST to nonexistent + statusCode = http.StatusInternalServerError + server.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("POST", "/connectors/local-file-source/tasks/5/restart"), + ), + ) + }) + + It("returns error with a server error response", func() { + resp, err := client.RestartTask("local-file-source", 5) + Expect(err).Should(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError)) + }) + }) + }) })