Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fixes #1324: Properly closes connections in tribe
Browse files Browse the repository at this point in the history
Properly closes some http.Respons's that were not previously closed.
  • Loading branch information
croseborough committed Nov 15, 2016
1 parent d9b23fa commit 7b23de9
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 41 deletions.
4 changes: 4 additions & 0 deletions mgmt/rest/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (c *Client) do(method, path string, ct contentType, body ...[]byte) (*rbody
}
return nil, fmt.Errorf("URL target is not available. %v", err)
}
defer rsp.Body.Close()
case "PUT":
var b *bytes.Reader
if len(body) == 0 {
Expand All @@ -193,6 +194,7 @@ func (c *Client) do(method, path string, ct contentType, body ...[]byte) (*rbody
}
return nil, fmt.Errorf("URL target is not available. %v", err)
}
defer rsp.Body.Close()
case "DELETE":
var b *bytes.Reader
if len(body) == 0 {
Expand All @@ -214,6 +216,7 @@ func (c *Client) do(method, path string, ct contentType, body ...[]byte) (*rbody
}
return nil, fmt.Errorf("URL target is not available. %v", err)
}
defer rsp.Body.Close()
case "POST":
var b *bytes.Reader
if len(body) == 0 {
Expand All @@ -234,6 +237,7 @@ func (c *Client) do(method, path string, ct contentType, body ...[]byte) (*rbody
}
return nil, fmt.Errorf("URL target is not available. %v", err)
}
defer rsp.Body.Close()
}

return httpRespToAPIResp(rsp)
Expand Down
94 changes: 53 additions & 41 deletions mgmt/tribe/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,55 +324,67 @@ func (w worker) loadPlugin(plugin core.Plugin) error {
}).Info("unable to create client")
continue
}
resp, err := c.TribeRequest()
f, err := w.downloadPlugin(c, plugin)
// If we can't download from this member, try the next
if err != nil {
logger.WithFields(log.Fields{
"err": err,
"url": url,
}).Info("plugin not found")
logger.Error(err)
continue
}
if resp.StatusCode == 200 {
if resp.Header.Get("Content-Type") != "application/x-gzip" {
logger.WithField("content-type", resp.Header.Get("Content-Type")).Error("Expected application/x-gzip")
}
dir, err := ioutil.TempDir("", "")
if err != nil {
logger.Error(err)
return err
}
f, err := os.Create(path.Join(dir, fmt.Sprintf("%s-%s-%d", plugin.TypeName(), plugin.Name(), plugin.Version())))
if err != nil {
logger.Error(err)
f.Close()
return err
}
io.Copy(f, resp.Body)
f.Close()
err = os.Chmod(f.Name(), 0700)
if err != nil {
logger.Error(err)
return err
}
rp, err := core.NewRequestedPlugin(f.Name())
if err != nil {
logger.Error(err)
return err
}
_, err = w.pluginManager.Load(rp)
if err != nil {
logger.Error(err)
return err
}
if w.isPluginLoaded(plugin.Name(), plugin.TypeName(), plugin.Version()) {
return nil
}
return errors.New("failed to load plugin")
rp, err := core.NewRequestedPlugin(f.Name())
if err != nil {
logger.Error(err)
return err
}
_, err = w.pluginManager.Load(rp)
if err != nil {
logger.Error(err)
return err
}
if w.isPluginLoaded(plugin.Name(), plugin.TypeName(), plugin.Version()) {
return nil
}
return errors.New("failed to load plugin")
}
return errors.New("failed to find a member with the plugin")
}

func (w worker) downloadPlugin(c *client.Client, plugin core.Plugin) (*os.File, error) {
logger := w.logger.WithFields(log.Fields{
"plugin-name": plugin.Name(),
"plugin-version": plugin.Version(),
"plugin-type": plugin.TypeName(),
"url": c.URL,
"_block": "download-plugin",
})
resp, err := c.TribeRequest()
if err != nil {
logger.WithFields(log.Fields{
"err": err,
}).Info("plugin not found")
return nil, fmt.Errorf("Plugin not found at %s: %s", c.URL, err.Error())
}
defer resp.Body.Close()
if resp.StatusCode == 200 {
if resp.Header.Get("Content-Type") != "application/x-gzip" {
logger.WithField("content-type", resp.Header.Get("Content-Type")).Error("Expected application/x-gzip")
}
dir, err := ioutil.TempDir("", "")
if err != nil {
logger.Error(err)
return nil, err
}
fpath := path.Join(dir, fmt.Sprintf("%s-%s-%d", plugin.TypeName(), plugin.Name(), plugin.Version()))
f, err := os.OpenFile(fpath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0700)
if err != nil {
logger.Error(err)
return nil, err
}
io.Copy(f, resp.Body)
f.Close()
return f, nil
}
return nil, fmt.Errorf("Status code not 200 was %v: %s", resp.StatusCode, c.URL)
}
func (w worker) createTask(taskID string, startOnCreate bool) {
logger := w.logger.WithFields(log.Fields{
"task-id": taskID,
Expand Down

0 comments on commit 7b23de9

Please sign in to comment.