From e741f412d56e8c9c219bc62e2d5b0dead1deb351 Mon Sep 17 00:00:00 2001 From: kindermoumoute Date: Tue, 24 Jan 2017 11:57:55 -0800 Subject: [PATCH] Add version 2 rest api Adds version 2 of the rest api in order to have more consistent responses, errors, and conform to REST better. - Add package v2/mock - Add medium tests for v2 - Implement API interface for v2 - Implement v2 routes --- mgmt/rest/rest_v2_test.go | 463 +++++++++++++++++++++++ mgmt/rest/server.go | 2 + mgmt/rest/v2/api.go | 114 ++++++ mgmt/rest/v2/config.go | 161 ++++++++ mgmt/rest/v2/error.go | 76 ++++ mgmt/rest/v2/metric.go | 156 ++++++++ mgmt/rest/v2/metric_test.go | 69 ++++ mgmt/rest/v2/mock/mock_config_manager.go | 88 +++++ mgmt/rest/v2/mock/mock_metric_manager.go | 258 +++++++++++++ mgmt/rest/v2/mock/mock_task_manager.go | 264 +++++++++++++ mgmt/rest/v2/plugin.go | 448 ++++++++++++++++++++++ mgmt/rest/v2/task.go | 222 +++++++++++ mgmt/rest/v2/watch.go | 206 ++++++++++ 13 files changed, 2527 insertions(+) create mode 100644 mgmt/rest/rest_v2_test.go create mode 100644 mgmt/rest/v2/api.go create mode 100644 mgmt/rest/v2/config.go create mode 100644 mgmt/rest/v2/error.go create mode 100644 mgmt/rest/v2/metric.go create mode 100644 mgmt/rest/v2/metric_test.go create mode 100644 mgmt/rest/v2/mock/mock_config_manager.go create mode 100644 mgmt/rest/v2/mock/mock_metric_manager.go create mode 100644 mgmt/rest/v2/mock/mock_task_manager.go create mode 100644 mgmt/rest/v2/plugin.go create mode 100644 mgmt/rest/v2/task.go create mode 100644 mgmt/rest/v2/watch.go diff --git a/mgmt/rest/rest_v2_test.go b/mgmt/rest/rest_v2_test.go new file mode 100644 index 000000000..b3b3d935e --- /dev/null +++ b/mgmt/rest/rest_v2_test.go @@ -0,0 +1,463 @@ +// +build medium + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "mime/multipart" + "net/http" + "net/url" + "os" + "strings" + "testing" + + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/snap/core/cdata" + "github.com/intelsdi-x/snap/core/ctypes" + "github.com/intelsdi-x/snap/mgmt/rest/v2/mock" + . "github.com/smartystreets/goconvey/convey" +) + +func startV2API(cfg *mockConfig, testType string) *restAPIInstance { + log.SetLevel(LOG_LEVEL) + r, _ := New(cfg.RestAPI) + switch testType { + case "plugin": + mockMetricManager := &mock.MockManagesMetrics{} + mockConfigManager := &mock.MockConfigManager{} + r.BindMetricManager(mockMetricManager) + r.BindConfigManager(mockConfigManager) + case "metric": + mockMetricManager := &mock.MockManagesMetrics{} + r.BindMetricManager(mockMetricManager) + case "task": + mockTaskManager := &mock.MockTaskManager{} + r.BindTaskManager(mockTaskManager) + } + go func(ch <-chan error) { + // Block on the error channel. Will return exit status 1 for an error or + // just return if the channel closes. + err, ok := <-ch + if !ok { + return + } + log.Fatal(err) + }(r.Err()) + r.SetAddress("127.0.0.1:0") + r.Start() + return &restAPIInstance{ + port: r.Port(), + server: r, + } +} + +func TestV2Plugin(t *testing.T) { + r := startV2API(getDefaultMockConfig(), "plugin") + Convey("Test Plugin REST API V2", t, func() { + + Convey("Post plugins - v2/plugins/:type:name", func(c C) { + f, err := os.Open(MOCK_PLUGIN_PATH1) + defer f.Close() + So(err, ShouldBeNil) + + // We create a pipe so that we can write the file in multipart + // format and read it in to the body of the http request + reader, writer := io.Pipe() + mwriter := multipart.NewWriter(writer) + bufin := bufio.NewReader(f) + + // A go routine is needed since we must write the multipart file + // to the pipe so we can read from it in the http call + go func() { + part, err := mwriter.CreateFormFile("snap-plugins", "mock") + c.So(err, ShouldBeNil) + bufin.WriteTo(part) + mwriter.Close() + writer.Close() + }() + + resp1, err1 := http.Post( + fmt.Sprintf("http://localhost:%d/v2/plugins", r.port), + mwriter.FormDataContentType(), reader) + So(err1, ShouldBeNil) + So(resp1.StatusCode, ShouldEqual, 201) + }) + + Convey("Get plugins - v2/plugins", func() { + resp, err := http.Get( + fmt.Sprintf("http://localhost:%d/v2/plugins", r.port)) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, 200) + body, err := ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + string(body), + ShouldResemble, + fmt.Sprintf(mock.GET_PLUGINS_RESPONSE, r.port, r.port, + r.port, r.port, r.port, r.port)) + }) + Convey("Get plugins - v2/plugins/:type", func() { + c := &http.Client{} + req, err := http.NewRequest("GET", + fmt.Sprintf("http://localhost:%d/v2/plugins", r.port), + bytes.NewReader([]byte{})) + So(err, ShouldBeNil) + q := req.URL.Query() + q.Add("type", "collector") + req.URL.RawQuery = q.Encode() + resp, err := c.Do(req) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, 200) + body, err := ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + string(body), + ShouldResemble, + fmt.Sprintf(mock.GET_PLUGINS_RESPONSE_TYPE, r.port, r.port)) + }) + Convey("Get plugins - v2/plugins/:type:name", func() { + c := &http.Client{} + req, err := http.NewRequest("GET", + fmt.Sprintf("http://localhost:%d/v2/plugins", r.port), + bytes.NewReader([]byte{})) + So(err, ShouldBeNil) + q := req.URL.Query() + q.Add("type", "publisher") + q.Add("name", "bar") + req.URL.RawQuery = q.Encode() + resp, err := c.Do(req) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, 200) + body, err := ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + string(body), + ShouldResemble, + fmt.Sprintf(mock.GET_PLUGINS_RESPONSE_TYPE_NAME, r.port)) + }) + Convey("Get plugin - v2/plugins/:type:name:version", func() { + resp, err := http.Get( + fmt.Sprintf("http://localhost:%d/v2/plugins/publisher/bar/3", r.port)) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, 200) + body, err := ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + string(body), + ShouldResemble, + fmt.Sprintf(mock.GET_PLUGINS_RESPONSE_TYPE_NAME_VERSION, r.port)) + }) + + Convey("Delete plugins - v2/plugins/:type:name:version", func() { + c := &http.Client{} + pluginName := "foo" + pluginType := "collector" + pluginVersion := 2 + req, err := http.NewRequest( + "DELETE", + fmt.Sprintf("http://localhost:%d/v2/plugins/%s/%s/%d", + r.port, + pluginType, + pluginName, + pluginVersion), + bytes.NewReader([]byte{})) + So(err, ShouldBeNil) + resp, err := c.Do(req) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusNoContent) + body, err := ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + string(body), + ShouldResemble, + fmt.Sprintf(mock.UNLOAD_PLUGIN_RESPONSE)) + }) + + Convey("Get plugin config items - v2/plugins/:type/:name/:version/config", func() { + resp, err := http.Get( + fmt.Sprintf("http://localhost:%d/v2/plugins/publisher/bar/3/config", r.port)) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, 200) + body, err := ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + string(body), + ShouldResemble, + fmt.Sprintf(mock.GET_PLUGIN_CONFIG_ITEM)) + }) + + Convey("Set plugin config item- v2/plugins/:type/:name/:version/config", func() { + c := &http.Client{} + pluginName := "foo" + pluginType := "collector" + pluginVersion := 2 + cd := cdata.NewNode() + cd.AddItem("user", ctypes.ConfigValueStr{Value: "Jane"}) + body, err := cd.MarshalJSON() + So(err, ShouldBeNil) + + req, err := http.NewRequest( + "PUT", + fmt.Sprintf("http://localhost:%d/v2/plugins/%s/%s/%d/config", + r.port, + pluginType, + pluginName, + pluginVersion), + bytes.NewReader(body)) + So(err, ShouldBeNil) + resp, err := c.Do(req) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusOK) + body, err = ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + string(body), + ShouldResemble, + fmt.Sprintf(mock.SET_PLUGIN_CONFIG_ITEM)) + + }) + + Convey("Delete plugin config item - /v2/plugins/:type/:name/:version/config", func() { + c := &http.Client{} + pluginName := "foo" + pluginType := "collector" + pluginVersion := 2 + cd := []string{"foo"} + body, err := json.Marshal(cd) + So(err, ShouldBeNil) + req, err := http.NewRequest( + "DELETE", + fmt.Sprintf("http://localhost:%d/v2/plugins/%s/%s/%d/config", + r.port, + pluginType, + pluginName, + pluginVersion), + bytes.NewReader(body)) + + So(err, ShouldBeNil) + resp, err := c.Do(req) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusOK) + body, err = ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + string(body), + ShouldResemble, + fmt.Sprintf(mock.DELETE_PLUGIN_CONFIG_ITEM)) + }) + }) +} + +func TestV2Task(t *testing.T) { + r := startV2API(getDefaultMockConfig(), "task") + Convey("Test Task REST API V2", t, func() { + + Convey("Add tasks - v2/tasks", func() { + reader := strings.NewReader(mock.TASK) + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v2/tasks", r.port), + http.DetectContentType([]byte(mock.TASK)), + reader) + So(err, ShouldBeNil) + So(resp, ShouldNotBeEmpty) + So(resp.StatusCode, ShouldEqual, 201) + body, err := ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + fmt.Sprintf(mock.ADD_TASK_RESPONSE, r.port), + ShouldResemble, + string(body)) + }) + + Convey("Get tasks - v2/tasks", func() { + resp, err := http.Get( + fmt.Sprintf("http://localhost:%d/v2/tasks", r.port)) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, 200) + body, err := ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + responses := []string{ + fmt.Sprintf(mock.GET_TASKS_RESPONSE, r.port, r.port), + fmt.Sprintf(mock.GET_TASKS_RESPONSE2, r.port, r.port), + } + // GetTasks returns an unordered map, + // thus there is more than one possible response + So( + responses, + ShouldContain, + string(body)) + }) + + Convey("Get task - v2/tasks/:id", func() { + taskID := "1234" + resp, err := http.Get( + fmt.Sprintf("http://localhost:%d/v2/tasks/:%s", r.port, taskID)) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, 200) + body, err := ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + fmt.Sprintf(mock.GET_TASK_RESPONSE, r.port), + ShouldResemble, + string(body)) + }) + + Convey("Watch tasks - v2/tasks/:id/watch", func() { + taskID := "1234" + resp, err := http.Get( + fmt.Sprintf("http://localhost:%d/v2/tasks/:%s/watch", r.port, taskID)) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusOK) + }) + + Convey("Start tasks - v2/tasks/:id", func() { + c := &http.Client{} + taskID := "MockTask1234" + cd := cdata.NewNode() + cd.AddItem("user", ctypes.ConfigValueStr{Value: "Kelly"}) + body, err := cd.MarshalJSON() + So(err, ShouldBeNil) + + req, err := http.NewRequest( + "PUT", + fmt.Sprintf("http://localhost:%d/v2/tasks/%s", r.port, taskID), + bytes.NewReader(body)) + So(err, ShouldBeNil) + q := req.URL.Query() + q.Add("action", "start") + req.URL.RawQuery = q.Encode() + resp, err := c.Do(req) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusNoContent) + body, err = ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + string(body), + ShouldResemble, + fmt.Sprintf(mock.START_TASK_RESPONSE_ID_START)) + }) + + Convey("Stop tasks - v2/tasks/:id", func() { + c := &http.Client{} + taskID := "MockTask1234" + cd := cdata.NewNode() + cd.AddItem("user", ctypes.ConfigValueStr{Value: "Kelly"}) + body, err := cd.MarshalJSON() + So(err, ShouldBeNil) + + req, err := http.NewRequest( + "PUT", + fmt.Sprintf("http://localhost:%d/v2/tasks/%s", r.port, taskID), + bytes.NewReader(body)) + So(err, ShouldBeNil) + q := req.URL.Query() + q.Add("action", "stop") + req.URL.RawQuery = q.Encode() + resp, err := c.Do(req) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusNoContent) + body, err = ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + string(body), + ShouldResemble, + fmt.Sprintf(mock.STOP_TASK_RESPONSE_ID_STOP)) + }) + + Convey("Enable tasks - v2/tasks/:id", func() { + c := &http.Client{} + taskID := "MockTask1234" + cd := cdata.NewNode() + cd.AddItem("user", ctypes.ConfigValueStr{Value: "Kelly"}) + body, err := cd.MarshalJSON() + So(err, ShouldBeNil) + + req, err := http.NewRequest( + "PUT", + fmt.Sprintf("http://localhost:%d/v2/tasks/%s", r.port, taskID), + bytes.NewReader(body)) + So(err, ShouldBeNil) + q := req.URL.Query() + q.Add("action", "enable") + req.URL.RawQuery = q.Encode() + resp, err := c.Do(req) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusNoContent) + body, err = ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + string(body), + ShouldResemble, + fmt.Sprintf(mock.ENABLE_TASK_RESPONSE_ID_ENABLE)) + }) + + Convey("Remove tasks - v2/tasks/:id", func() { + c := &http.Client{} + taskID := "MockTask1234" + cd := []string{"foo"} + body, err := json.Marshal(cd) + So(err, ShouldBeNil) + req, err := http.NewRequest( + "DELETE", + fmt.Sprintf("http://localhost:%d/v2/tasks/%s", + r.port, + taskID), + bytes.NewReader([]byte{})) + So(err, ShouldBeNil) + resp, err := c.Do(req) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusNoContent) + body, err = ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + So( + string(body), + ShouldResemble, + fmt.Sprintf(mock.REMOVE_TASK_RESPONSE_ID)) + }) + }) +} + +func TestV2Metric(t *testing.T) { + r := startV2API(getDefaultMockConfig(), "metric") + Convey("Test Metric REST API V2", t, func() { + + Convey("Get metrics - v2/metrics", func() { + resp, err := http.Get( + fmt.Sprintf("http://localhost:%d/v2/metrics", r.port)) + So(err, ShouldBeNil) + So(resp.StatusCode, ShouldEqual, 200) + body, err := ioutil.ReadAll(resp.Body) + So(err, ShouldBeNil) + resp1, err := url.QueryUnescape(string(body)) + So(err, ShouldBeNil) + So( + resp1, + ShouldResemble, + fmt.Sprintf(mock.GET_METRICS_RESPONSE, r.port)) + }) + }) +} diff --git a/mgmt/rest/server.go b/mgmt/rest/server.go index 66f581e59..326238a78 100644 --- a/mgmt/rest/server.go +++ b/mgmt/rest/server.go @@ -34,6 +34,7 @@ import ( "github.com/intelsdi-x/snap/mgmt/rest/api" "github.com/intelsdi-x/snap/mgmt/rest/v1" + "github.com/intelsdi-x/snap/mgmt/rest/v2" ) var ( @@ -82,6 +83,7 @@ func New(cfg *Config) (*Server, error) { s.apis = []api.API{ v1.New(&s.wg, s.killChan, protocolPrefix), + v2.New(&s.wg, s.killChan, protocolPrefix), } s.n = negroni.New( diff --git a/mgmt/rest/v2/api.go b/mgmt/rest/v2/api.go new file mode 100644 index 000000000..3fde05da8 --- /dev/null +++ b/mgmt/rest/v2/api.go @@ -0,0 +1,114 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2017 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v2 + +import ( + "encoding/json" + "sync" + + "net/http" + + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/snap/mgmt/rest/api" + "github.com/urfave/negroni" +) + +const ( + version = "v2" + prefix = "/" + version +) + +var ( + restLogger = log.WithField("_module", "_mgmt-rest-v2") + protocolPrefix = "http" +) + +type apiV2 struct { + metricManager api.Metrics + taskManager api.Tasks + configManager api.Config + + wg *sync.WaitGroup + killChan chan struct{} +} + +func New(wg *sync.WaitGroup, killChan chan struct{}, protocol string) *apiV2 { + protocolPrefix = protocol + return &apiV2{wg: wg, killChan: killChan} +} + +func (s *apiV2) GetRoutes() []api.Route { + routes := []api.Route{ + // plugin routes + api.Route{Method: "GET", Path: prefix + "/plugins", Handle: s.getPlugins}, + api.Route{Method: "GET", Path: prefix + "/plugins/:type/:name/:version", Handle: s.getPlugin}, + api.Route{Method: "POST", Path: prefix + "/plugins", Handle: s.loadPlugin}, + api.Route{Method: "DELETE", Path: prefix + "/plugins/:type/:name/:version", Handle: s.unloadPlugin}, + + api.Route{Method: "GET", Path: prefix + "/plugins/:type/:name/:version/config", Handle: s.getPluginConfigItem}, + api.Route{Method: "PUT", Path: prefix + "/plugins/:type/:name/:version/config", Handle: s.setPluginConfigItem}, + api.Route{Method: "DELETE", Path: prefix + "/plugins/:type/:name/:version/config", Handle: s.deletePluginConfigItem}, + + // metric routes + api.Route{Method: "GET", Path: prefix + "/metrics", Handle: s.getMetrics}, + + // task routes + api.Route{Method: "GET", Path: prefix + "/tasks", Handle: s.getTasks}, + api.Route{Method: "GET", Path: prefix + "/tasks/:id", Handle: s.getTask}, + api.Route{Method: "GET", Path: prefix + "/tasks/:id/watch", Handle: s.watchTask}, + api.Route{Method: "POST", Path: prefix + "/tasks", Handle: s.addTask}, + api.Route{Method: "PUT", Path: prefix + "/tasks/:id", Handle: s.updateTaskState}, + api.Route{Method: "DELETE", Path: prefix + "/tasks/:id", Handle: s.removeTask}, + } + return routes +} + +func (s *apiV2) BindMetricManager(metricManager api.Metrics) { + s.metricManager = metricManager +} + +func (s *apiV2) BindTaskManager(taskManager api.Tasks) { + s.taskManager = taskManager +} + +func (s *apiV2) BindTribeManager(tribeManager api.Tribe) {} + +func (s *apiV2) BindConfigManager(configManager api.Config) { + s.configManager = configManager +} + +func Write(code int, body interface{}, w http.ResponseWriter) { + w.Header().Set("Content-Type", "application/json; version=2; charset=utf-8") + w.Header().Set("Version", "beta") + + if !w.(negroni.ResponseWriter).Written() { + w.WriteHeader(code) + } + + if body != nil { + e := json.NewEncoder(w) + e.SetIndent("", " ") + e.SetEscapeHTML(false) + err := e.Encode(body) + if err != nil { + restLogger.Fatalln(err) + } + } +} diff --git a/mgmt/rest/v2/config.go b/mgmt/rest/v2/config.go new file mode 100644 index 000000000..134d6f5b1 --- /dev/null +++ b/mgmt/rest/v2/config.go @@ -0,0 +1,161 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2017 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v2 + +import ( + "net/http" + "strconv" + + "github.com/intelsdi-x/snap/control/plugin/cpolicy" + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/cdata" + "github.com/julienschmidt/httprouter" +) + +type PolicyTable cpolicy.RuleTable + +type PolicyTableSlice []cpolicy.RuleTable + +// cdata.ConfigDataNode implements it's own UnmarshalJSON +type PluginConfigItem struct { + cdata.ConfigDataNode +} + +func (s *apiV2) getPluginConfigItem(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + var err error + styp := p.ByName("type") + if styp == "" { + cdn := s.configManager.GetPluginConfigDataNodeAll() + item := &PluginConfigItem{ConfigDataNode: cdn} + Write(200, item, w) + return + } + + typ, err := getPluginType(styp) + if err != nil { + Write(400, FromError(err), w) + return + } + + name := p.ByName("name") + sver := p.ByName("version") + iver := -2 + if sver != "" { + if iver, err = strconv.Atoi(sver); err != nil { + Write(400, FromError(err), w) + return + } + } + + cdn := s.configManager.GetPluginConfigDataNode(typ, name, iver) + item := &PluginConfigItem{ConfigDataNode: cdn} + Write(200, item, w) +} + +func (s *apiV2) deletePluginConfigItem(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + var err error + var typ core.PluginType + styp := p.ByName("type") + if styp != "" { + typ, err = getPluginType(styp) + if err != nil { + Write(400, FromError(err), w) + return + } + } + + name := p.ByName("name") + sver := p.ByName("version") + iver := -2 + if sver != "" { + if iver, err = strconv.Atoi(sver); err != nil { + Write(400, FromError(err), w) + return + } + } + + src := []string{} + errCode, err := core.UnmarshalBody(&src, r.Body) + if errCode != 0 && err != nil { + Write(400, FromError(err), w) + return + } + + var res cdata.ConfigDataNode + if styp == "" { + res = s.configManager.DeletePluginConfigDataNodeFieldAll(src...) + } else { + res = s.configManager.DeletePluginConfigDataNodeField(typ, name, iver, src...) + } + + item := &PluginConfigItem{ConfigDataNode: res} + Write(200, item, w) +} + +func (s *apiV2) setPluginConfigItem(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + var err error + var typ core.PluginType + styp := p.ByName("type") + if styp != "" { + typ, err = getPluginType(styp) + if err != nil { + Write(400, FromError(err), w) + return + } + } + + name := p.ByName("name") + sver := p.ByName("version") + iver := -2 + if sver != "" { + if iver, err = strconv.Atoi(sver); err != nil { + Write(400, FromError(err), w) + return + } + } + + src := cdata.NewNode() + errCode, err := core.UnmarshalBody(src, r.Body) + if errCode != 0 && err != nil { + Write(400, FromError(err), w) + return + } + + var res cdata.ConfigDataNode + if styp == "" { + res = s.configManager.MergePluginConfigDataNodeAll(src) + } else { + res = s.configManager.MergePluginConfigDataNode(typ, name, iver, src) + } + + item := &PluginConfigItem{ConfigDataNode: res} + Write(200, item, w) +} + +func getPluginType(t string) (core.PluginType, error) { + if ityp, err := strconv.Atoi(t); err == nil { + return core.PluginType(ityp), nil + } + ityp, err := core.ToPluginType(t) + if err != nil { + return core.PluginType(-1), err + } + return ityp, nil +} diff --git a/mgmt/rest/v2/error.go b/mgmt/rest/v2/error.go new file mode 100644 index 000000000..61bd83eee --- /dev/null +++ b/mgmt/rest/v2/error.go @@ -0,0 +1,76 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v2 + +import ( + "fmt" + + "errors" + + "github.com/intelsdi-x/snap/core/serror" +) + +const ( + ErrPluginAlreadyLoaded = "plugin is already loaded" + ErrTaskNotFound = "task not found" + ErrTaskDisabledNotRunnable = "task is disabled" +) + +var ( + ErrPluginNotFound = errors.New("plugin not found") + ErrStreamingUnsupported = errors.New("streaming unsupported") + ErrNoActionSpecified = errors.New("no action was specified in the request") + ErrWrongAction = errors.New("wrong action requested") +) + +// Unsuccessful generic response to a failed API call +type Error struct { + ErrorMessage string `json:"message"` + Fields map[string]string `json:"fields"` +} + +func FromSnapError(pe serror.SnapError) *Error { + e := &Error{ErrorMessage: pe.Error(), Fields: make(map[string]string)} + // Convert into string format + for k, v := range pe.Fields() { + e.Fields[k] = fmt.Sprint(v) + } + return e +} + +func FromSnapErrors(errs []serror.SnapError) *Error { + fields := make(map[string]string) + var msg string + for i, err := range errs { + for k, v := range err.Fields() { + fields[fmt.Sprintf("%s_err_%d", k, i)] = fmt.Sprint(v) + } + msg = msg + fmt.Sprintf("error %d: %s ", i, err.Error()) + } + return &Error{ + ErrorMessage: msg, + Fields: fields, + } +} + +func FromError(err error) *Error { + e := &Error{ErrorMessage: err.Error(), Fields: make(map[string]string)} + return e +} diff --git a/mgmt/rest/v2/metric.go b/mgmt/rest/v2/metric.go new file mode 100644 index 000000000..6f334cd6a --- /dev/null +++ b/mgmt/rest/v2/metric.go @@ -0,0 +1,156 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2017 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v2 + +import ( + "fmt" + "net/http" + "sort" + "strconv" + "strings" + + "net/url" + + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/pkg/stringutils" + "github.com/julienschmidt/httprouter" +) + +type MetricsResonse struct { + Metrics Metrics `json:"metrics,omitempty"` +} + +type Metrics []Metric + +type Metric struct { + LastAdvertisedTimestamp int64 `json:"last_advertised_timestamp,omitempty"` + Namespace string `json:"namespace,omitempty"` + Version int `json:"version,omitempty"` + Dynamic bool `json:"dynamic"` + DynamicElements []DynamicElement `json:"dynamic_elements,omitempty"` + Description string `json:"description,omitempty"` + Unit string `json:"unit,omitempty"` + Policy PolicyTableSlice `json:"policy,omitempty"` + Href string `json:"href"` +} + +type DynamicElement struct { + Index int `json:"index,omitempty"` + Name string `json:"name,omitempty"` + Description string `json:"description,omitempty"` +} + +// Used to sort the metrics before marshalling the response +func (m Metrics) Len() int { + return len(m) +} + +func (m Metrics) Less(i, j int) bool { + return (fmt.Sprintf("%s:%d", m[i].Namespace, m[i].Version)) < (fmt.Sprintf("%s:%d", m[j].Namespace, m[j].Version)) +} + +func (m Metrics) Swap(i, j int) { + m[i], m[j] = m[j], m[i] +} + +func (s *apiV2) getMetrics(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + + // If we are provided a parameter with the name 'ns' we need to + // perform a query + q := r.URL.Query() + v := q.Get("ver") + ns_query := q.Get("ns") + if ns_query != "" { + ver := 0 // 0: get all versions + if v != "" { + var err error + ver, err = strconv.Atoi(v) + if err != nil { + Write(400, FromError(err), w) + return + } + } + // strip the leading char and split on the remaining. + fc := stringutils.GetFirstChar(ns_query) + ns := strings.Split(strings.TrimLeft(ns_query, fc), fc) + if ns[len(ns)-1] == "*" { + ns = ns[:len(ns)-1] + } + + mts, err := s.metricManager.FetchMetrics(core.NewNamespace(ns...), ver) + if err != nil { + Write(404, FromError(err), w) + return + } + respondWithMetrics(r.Host, mts, w) + return + } + + mts, err := s.metricManager.MetricCatalog() + if err != nil { + Write(500, FromError(err), w) + return + } + respondWithMetrics(r.Host, mts, w) +} + +func respondWithMetrics(host string, mts []core.CatalogedMetric, w http.ResponseWriter) { + b := MetricsResonse{Metrics: make(Metrics, 0)} + for _, m := range mts { + policies := PolicyTableSlice(m.Policy().RulesAsTable()) + dyn, indexes := m.Namespace().IsDynamic() + b.Metrics = append(b.Metrics, Metric{ + Namespace: m.Namespace().String(), + Version: m.Version(), + LastAdvertisedTimestamp: m.LastAdvertisedTime().Unix(), + Description: m.Description(), + Dynamic: dyn, + DynamicElements: getDynamicElements(m.Namespace(), indexes), + Unit: m.Unit(), + Policy: policies, + Href: catalogedMetricURI(host, m), + }) + } + sort.Sort(b.Metrics) + Write(200, b, w) +} + +func catalogedMetricURI(host string, mt core.CatalogedMetric) string { + return fmt.Sprintf("%s://%s/%s/metrics?ns=%s&ver=%d", protocolPrefix, host, version, url.QueryEscape(mt.Namespace().String()), mt.Version()) +} + +func getDynamicElements(ns core.Namespace, indexes []int) []DynamicElement { + elements := make([]DynamicElement, 0, len(indexes)) + for _, v := range indexes { + e := ns.Element(v) + elements = append(elements, DynamicElement{ + Index: v, + Name: e.Name, + Description: e.Description, + }) + } + return elements +} + +func parseNamespace(ns string) []string { + fc := stringutils.GetFirstChar(ns) + ns = strings.Trim(ns, fc) + return strings.Split(ns, fc) +} diff --git a/mgmt/rest/v2/metric_test.go b/mgmt/rest/v2/metric_test.go new file mode 100644 index 000000000..0474519ed --- /dev/null +++ b/mgmt/rest/v2/metric_test.go @@ -0,0 +1,69 @@ +// +build small + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v2 + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestParseNamespace(t *testing.T) { + tcs := getNsTestCases() + + Convey("Test parseNamespace", t, func() { + for _, c := range tcs { + Convey("Test parseNamespace "+c.input, func() { + So(c.output, ShouldResemble, parseNamespace(c.input)) + }) + } + }) +} + +type nsTestCase struct { + input string + output []string +} + +func getNsTestCases() []nsTestCase { + tcs := []nsTestCase{ + { + input: "小a小b小c", + output: []string{"a", "b", "c"}}, + { + input: "%a%b%c", + output: []string{"a", "b", "c"}}, + { + input: "-aヒ-b/-c|", + output: []string{"aヒ", "b/", "c|"}}, + { + input: ">a>b=>c=", + output: []string{"a", "b=", "c="}}, + { + input: ">a>b<>c<", + output: []string{"a", "b<", "c<"}}, + { + input: "㊽a㊽b%㊽c/|", + output: []string{"a", "b%", "c/|"}}, + } + return tcs +} diff --git a/mgmt/rest/v2/mock/mock_config_manager.go b/mgmt/rest/v2/mock/mock_config_manager.go new file mode 100644 index 000000000..138ad2658 --- /dev/null +++ b/mgmt/rest/v2/mock/mock_config_manager.go @@ -0,0 +1,88 @@ +// +build legacy small medium large + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + +Copyright 2016 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mock + +import ( + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/cdata" + "github.com/intelsdi-x/snap/core/ctypes" +) + +var mockConfig *cdata.ConfigDataNode + +func init() { + mockConfig = cdata.NewNode() + mockConfig.AddItem("User", ctypes.ConfigValueStr{Value: "KELLY"}) + mockConfig.AddItem("Port", ctypes.ConfigValueInt{Value: 2}) +} + +type MockConfigManager struct{} + +func (MockConfigManager) GetPluginConfigDataNode(core.PluginType, string, int) cdata.ConfigDataNode { + return *mockConfig +} +func (MockConfigManager) GetPluginConfigDataNodeAll() cdata.ConfigDataNode { + return *mockConfig +} +func (MockConfigManager) MergePluginConfigDataNode( + pluginType core.PluginType, name string, ver int, cdn *cdata.ConfigDataNode) cdata.ConfigDataNode { + return *cdn +} +func (MockConfigManager) MergePluginConfigDataNodeAll(cdn *cdata.ConfigDataNode) cdata.ConfigDataNode { + return cdata.ConfigDataNode{} +} +func (MockConfigManager) DeletePluginConfigDataNodeField( + pluginType core.PluginType, name string, ver int, fields ...string) cdata.ConfigDataNode { + for _, field := range fields { + mockConfig.DeleteItem(field) + + } + return *mockConfig +} + +func (MockConfigManager) DeletePluginConfigDataNodeFieldAll(fields ...string) cdata.ConfigDataNode { + for _, field := range fields { + mockConfig.DeleteItem(field) + + } + return *mockConfig +} + +// These constants are the expected plugin config responses from running +// rest_v2_test.go on the plugin config routes found in mgmt/rest/server.go +const ( + SET_PLUGIN_CONFIG_ITEM = `{ + "user": "Jane" +} +` + + GET_PLUGIN_CONFIG_ITEM = `{ + "Port": 2, + "User": "KELLY" +} +` + + DELETE_PLUGIN_CONFIG_ITEM = `{ + "Port": 2, + "User": "KELLY" +} +` +) diff --git a/mgmt/rest/v2/mock/mock_metric_manager.go b/mgmt/rest/v2/mock/mock_metric_manager.go new file mode 100644 index 000000000..701b46585 --- /dev/null +++ b/mgmt/rest/v2/mock/mock_metric_manager.go @@ -0,0 +1,258 @@ +// +build legacy small medium large + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + +Copyright 2016 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mock + +import ( + "errors" + "time" + + "github.com/intelsdi-x/snap/control/plugin/cpolicy" + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/serror" +) + +var pluginCatalog []core.CatalogedPlugin = []core.CatalogedPlugin{ + MockLoadedPlugin{MyName: "foo", MyType: "collector", MyVersion: 2}, + MockLoadedPlugin{MyName: "bar", MyType: "publisher", MyVersion: 3}, + MockLoadedPlugin{MyName: "foo", MyType: "collector", MyVersion: 4}, + MockLoadedPlugin{MyName: "baz", MyType: "publisher", MyVersion: 5}, + MockLoadedPlugin{MyName: "foo", MyType: "processor", MyVersion: 6}, + MockLoadedPlugin{MyName: "foobar", MyType: "processor", MyVersion: 1}, +} + +var metricCatalog []core.CatalogedMetric = []core.CatalogedMetric{ + MockCatalogedMetric{}, +} + +//////MockLoadedPlugin///// + +type MockLoadedPlugin struct { + MyName string + MyType string + MyVersion int +} + +func (m MockLoadedPlugin) Name() string { return m.MyName } +func (m MockLoadedPlugin) Port() string { return "" } +func (m MockLoadedPlugin) TypeName() string { return m.MyType } +func (m MockLoadedPlugin) Version() int { return m.MyVersion } +func (m MockLoadedPlugin) Plugin() string { return "" } +func (m MockLoadedPlugin) IsSigned() bool { return false } +func (m MockLoadedPlugin) Status() string { return "" } +func (m MockLoadedPlugin) PluginPath() string { return "" } +func (m MockLoadedPlugin) LoadedTimestamp() *time.Time { + t := time.Date(2016, time.September, 6, 0, 0, 0, 0, time.UTC) + return &t +} +func (m MockLoadedPlugin) Policy() *cpolicy.ConfigPolicy { return cpolicy.New() } +func (m MockLoadedPlugin) HitCount() int { return 0 } +func (m MockLoadedPlugin) LastHit() time.Time { return time.Now() } +func (m MockLoadedPlugin) ID() uint32 { return 0 } + +//////MockCatalogedMetric///// + +type MockCatalogedMetric struct{} + +func (m MockCatalogedMetric) Namespace() core.Namespace { + return core.NewNamespace("one", "two", "three") +} +func (m MockCatalogedMetric) Version() int { return 5 } +func (m MockCatalogedMetric) LastAdvertisedTime() time.Time { return time.Time{} } +func (m MockCatalogedMetric) Policy() *cpolicy.ConfigPolicyNode { return cpolicy.NewPolicyNode() } +func (m MockCatalogedMetric) Description() string { return "This Is A Description" } +func (m MockCatalogedMetric) Unit() string { return "" } + +//////MockManagesMetrics///// + +type MockManagesMetrics struct{} + +func (m MockManagesMetrics) MetricCatalog() ([]core.CatalogedMetric, error) { + return metricCatalog, nil +} +func (m MockManagesMetrics) FetchMetrics(core.Namespace, int) ([]core.CatalogedMetric, error) { + return metricCatalog, nil +} +func (m MockManagesMetrics) GetMetricVersions(core.Namespace) ([]core.CatalogedMetric, error) { + return metricCatalog, nil +} +func (m MockManagesMetrics) GetMetric(core.Namespace, int) (core.CatalogedMetric, error) { + return MockCatalogedMetric{}, nil +} +func (m MockManagesMetrics) Load(*core.RequestedPlugin) (core.CatalogedPlugin, serror.SnapError) { + return MockLoadedPlugin{"foo", "collector", 1}, nil +} +func (m MockManagesMetrics) Unload(plugin core.Plugin) (core.CatalogedPlugin, serror.SnapError) { + for _, pl := range pluginCatalog { + if plugin.Name() == pl.Name() && + plugin.Version() == pl.Version() && + plugin.TypeName() == pl.TypeName() { + return pl, nil + } + } + return nil, serror.New(errors.New("plugin not found")) +} + +func (m MockManagesMetrics) PluginCatalog() core.PluginCatalog { + return pluginCatalog +} +func (m MockManagesMetrics) AvailablePlugins() []core.AvailablePlugin { + return []core.AvailablePlugin{ + MockLoadedPlugin{MyName: "foo", MyType: "collector", MyVersion: 2}, + MockLoadedPlugin{MyName: "bar", MyType: "publisher", MyVersion: 3}, + MockLoadedPlugin{MyName: "foo", MyType: "collector", MyVersion: 4}, + MockLoadedPlugin{MyName: "baz", MyType: "publisher", MyVersion: 5}, + MockLoadedPlugin{MyName: "foo", MyType: "processor", MyVersion: 6}, + MockLoadedPlugin{MyName: "foobar", MyType: "processor", MyVersion: 1}, + } +} +func (m MockManagesMetrics) GetAutodiscoverPaths() []string { + return nil +} + +// These constants are the expected plugin responses from running +// rest_v2_test.go on the plugin routes found in mgmt/rest/server.go +const ( + GET_PLUGINS_RESPONSE = `{ + "plugins": [ + { + "name": "foo", + "version": 2, + "type": "collector", + "signed": false, + "status": "", + "loaded_timestamp": 1473120000, + "href": "http://localhost:%d/v2/plugins/collector/foo/2" + }, + { + "name": "bar", + "version": 3, + "type": "publisher", + "signed": false, + "status": "", + "loaded_timestamp": 1473120000, + "href": "http://localhost:%d/v2/plugins/publisher/bar/3" + }, + { + "name": "foo", + "version": 4, + "type": "collector", + "signed": false, + "status": "", + "loaded_timestamp": 1473120000, + "href": "http://localhost:%d/v2/plugins/collector/foo/4" + }, + { + "name": "baz", + "version": 5, + "type": "publisher", + "signed": false, + "status": "", + "loaded_timestamp": 1473120000, + "href": "http://localhost:%d/v2/plugins/publisher/baz/5" + }, + { + "name": "foo", + "version": 6, + "type": "processor", + "signed": false, + "status": "", + "loaded_timestamp": 1473120000, + "href": "http://localhost:%d/v2/plugins/processor/foo/6" + }, + { + "name": "foobar", + "version": 1, + "type": "processor", + "signed": false, + "status": "", + "loaded_timestamp": 1473120000, + "href": "http://localhost:%d/v2/plugins/processor/foobar/1" + } + ] +} +` + + GET_PLUGINS_RESPONSE_TYPE = `{ + "plugins": [ + { + "name": "foo", + "version": 2, + "type": "collector", + "signed": false, + "status": "", + "loaded_timestamp": 1473120000, + "href": "http://localhost:%d/v2/plugins/collector/foo/2" + }, + { + "name": "foo", + "version": 4, + "type": "collector", + "signed": false, + "status": "", + "loaded_timestamp": 1473120000, + "href": "http://localhost:%d/v2/plugins/collector/foo/4" + } + ] +} +` + + GET_PLUGINS_RESPONSE_TYPE_NAME = `{ + "plugins": [ + { + "name": "bar", + "version": 3, + "type": "publisher", + "signed": false, + "status": "", + "loaded_timestamp": 1473120000, + "href": "http://localhost:%d/v2/plugins/publisher/bar/3" + } + ] +} +` + + GET_PLUGINS_RESPONSE_TYPE_NAME_VERSION = `{ + "name": "bar", + "version": 3, + "type": "publisher", + "signed": false, + "status": "", + "loaded_timestamp": 1473120000, + "href": "http://localhost:%d/v2/plugins/publisher/bar/3" +} +` + + GET_METRICS_RESPONSE = `{ + "metrics": [ + { + "last_advertised_timestamp": -62135596800, + "namespace": "/one/two/three", + "version": 5, + "dynamic": false, + "description": "This Is A Description", + "href": "http://localhost:%d/v2/metrics?ns=/one/two/three&ver=5" + } + ] +} +` + + UNLOAD_PLUGIN_RESPONSE = `` +) diff --git a/mgmt/rest/v2/mock/mock_task_manager.go b/mgmt/rest/v2/mock/mock_task_manager.go new file mode 100644 index 000000000..4e75ff90f --- /dev/null +++ b/mgmt/rest/v2/mock/mock_task_manager.go @@ -0,0 +1,264 @@ +// +build legacy small medium large + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + +Copyright 2016 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mock + +import ( + "time" + + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/serror" + "github.com/intelsdi-x/snap/pkg/schedule" + "github.com/intelsdi-x/snap/scheduler/wmap" +) + +var taskCatalog map[string]core.Task = map[string]core.Task{ + "Task1": &mockTask{ + MyID: "qwertyuiop", + MyName: "TASK1.0", + MyDeadline: "4", + MyCreationTimestamp: time.Now().Unix(), + MyLastRunTimestamp: time.Now().Unix(), + MyHitCount: 44, + MyMissCount: 8, + MyState: "failed", + MyHref: "http://localhost:8181/v2/tasks/qwertyuiop"}, + "Task2": &mockTask{ + MyID: "asdfghjkl", + MyName: "TASK2.0", + MyDeadline: "4", + MyCreationTimestamp: time.Now().Unix(), + MyLastRunTimestamp: time.Now().Unix(), + MyHitCount: 33, + MyMissCount: 7, + MyState: "passed", + MyHref: "http://localhost:8181/v2/tasks/asdfghjkl"}} + +type mockTask struct { + MyID string `json:"id"` + MyName string `json:"name"` + MyDeadline string `json:"deadline"` + MyWorkflow *wmap.WorkflowMap `json:"workflow,omitempty"` + MySchedule *core.Schedule `json:"schedule,omitempty"` + MyCreationTimestamp int64 `json:"creation_timestamp,omitempty"` + MyLastRunTimestamp int64 `json:"last_run_timestamp,omitempty"` + MyHitCount int `json:"hit_count,omitempty"` + MyMissCount int `json:"miss_count,omitempty"` + MyFailedCount int `json:"failed_count,omitempty"` + MyLastFailureMessage string `json:"last_failure_message,omitempty"` + MyState string `json:"task_state"` + MyHref string `json:"href"` +} + +func (t *mockTask) ID() string { return t.MyID } +func (t *mockTask) State() core.TaskState { return core.TaskSpinning } +func (t *mockTask) HitCount() uint { return 0 } +func (t *mockTask) GetName() string { return t.MyName } +func (t *mockTask) SetName(string) { return } +func (t *mockTask) SetID(string) { return } +func (t *mockTask) MissedCount() uint { return 0 } +func (t *mockTask) FailedCount() uint { return 0 } +func (t *mockTask) LastFailureMessage() string { return "" } +func (t *mockTask) LastRunTime() *time.Time { return &time.Time{} } +func (t *mockTask) CreationTime() *time.Time { return &time.Time{} } +func (t *mockTask) DeadlineDuration() time.Duration { return 4 } +func (t *mockTask) SetDeadlineDuration(time.Duration) { return } +func (t *mockTask) SetTaskID(id string) { return } +func (t *mockTask) SetStopOnFailure(int) { return } +func (t *mockTask) GetStopOnFailure() int { return 0 } +func (t *mockTask) Option(...core.TaskOption) core.TaskOption { + return core.TaskDeadlineDuration(0) +} +func (t *mockTask) WMap() *wmap.WorkflowMap { + return wmap.NewWorkflowMap() +} +func (t *mockTask) Schedule() schedule.Schedule { + return schedule.NewSimpleSchedule(time.Second * 1) +} +func (t *mockTask) MaxFailures() int { return 10 } + +type MockTaskManager struct{} + +func (m *MockTaskManager) GetTask(id string) (core.Task, error) { + href := "http://localhost:8181/v2/tasks/" + id + return &mockTask{ + MyID: id, + MyName: "NewTaskCreated", + MyCreationTimestamp: time.Now().Unix(), + MyLastRunTimestamp: time.Now().Unix(), + MyHitCount: 22, + MyMissCount: 4, + MyState: "failed", + MyHref: href}, nil +} +func (m *MockTaskManager) CreateTask( + sch schedule.Schedule, + wmap *wmap.WorkflowMap, + start bool, + opts ...core.TaskOption) (core.Task, core.TaskErrors) { + return &mockTask{ + MyID: "MyTaskID", + MyName: "NewTaskCreated", + MySchedule: &core.Schedule{}, + MyCreationTimestamp: time.Now().Unix(), + MyLastRunTimestamp: time.Now().Unix(), + MyHitCount: 99, + MyMissCount: 5, + MyState: "failed", + MyHref: "http://localhost:8181/v2/tasks/MyTaskID"}, nil +} +func (m *MockTaskManager) GetTasks() map[string]core.Task { + return taskCatalog +} +func (m *MockTaskManager) StartTask(id string) []serror.SnapError { return nil } +func (m *MockTaskManager) StopTask(id string) []serror.SnapError { return nil } +func (m *MockTaskManager) RemoveTask(id string) error { return nil } +func (m *MockTaskManager) WatchTask(id string, handler core.TaskWatcherHandler) (core.TaskWatcherCloser, error) { + return nil, nil +} +func (m *MockTaskManager) EnableTask(id string) (core.Task, error) { + return &mockTask{ + MyID: "alskdjf", + MyName: "Task2", + MyCreationTimestamp: time.Now().Unix(), + MyLastRunTimestamp: time.Now().Unix(), + MyHitCount: 44, + MyMissCount: 8, + MyState: "failed", + MyHref: "http://localhost:8181/v2/tasks/alskdjf"}, nil +} + +// Mock task used in the 'Add tasks' test in rest_v2_test.go +const TASK = `{ + "version": 1, + "schedule": { + "type": "simple", + "interval": "1s" + }, + "max-failures": 10, + "workflow": { + "collect": { + "metrics": { + "/one/two/three": {} + } + } + } +} +` + +// These constants are the expected responses from running the task tests in +// rest_v2_test.go on the task routes found in mgmt/rest/server.go +const ( + GET_TASKS_RESPONSE = `{ + "tasks": [ + { + "id": "qwertyuiop", + "name": "TASK1.0", + "deadline": "4ns", + "creation_timestamp": -62135596800, + "last_run_timestamp": -1, + "task_state": "Running", + "href": "http://localhost:%d/v2/tasks/qwertyuiop" + }, + { + "id": "asdfghjkl", + "name": "TASK2.0", + "deadline": "4ns", + "creation_timestamp": -62135596800, + "last_run_timestamp": -1, + "task_state": "Running", + "href": "http://localhost:%d/v2/tasks/asdfghjkl" + } + ] +} +` + + GET_TASKS_RESPONSE2 = `{ + "tasks": [ + { + "id": "asdfghjkl", + "name": "TASK2.0", + "deadline": "4ns", + "creation_timestamp": -62135596800, + "last_run_timestamp": -1, + "task_state": "Running", + "href": "http://localhost:%d/v2/tasks/asdfghjkl" + }, + { + "id": "qwertyuiop", + "name": "TASK1.0", + "deadline": "4ns", + "creation_timestamp": -62135596800, + "last_run_timestamp": -1, + "task_state": "Running", + "href": "http://localhost:%d/v2/tasks/qwertyuiop" + } + ] +} +` + + GET_TASK_RESPONSE = `{ + "id": ":1234", + "name": "NewTaskCreated", + "deadline": "4ns", + "workflow": { + "collect": { + "metrics": {} + } + }, + "schedule": { + "type": "simple", + "interval": "1s" + }, + "creation_timestamp": -62135596800, + "last_run_timestamp": -1, + "task_state": "Running", + "href": "http://localhost:%d/v2/tasks/:1234" +} +` + + ADD_TASK_RESPONSE = `{ + "id": "MyTaskID", + "name": "NewTaskCreated", + "deadline": "4ns", + "workflow": { + "collect": { + "metrics": {} + } + }, + "schedule": { + "type": "simple", + "interval": "1s" + }, + "creation_timestamp": -62135596800, + "last_run_timestamp": -1, + "task_state": "Running", + "href": "http://localhost:%d/v2/tasks/MyTaskID" +} +` + + START_TASK_RESPONSE_ID_START = `` + + STOP_TASK_RESPONSE_ID_STOP = `` + + ENABLE_TASK_RESPONSE_ID_ENABLE = `` + + REMOVE_TASK_RESPONSE_ID = `` +) diff --git a/mgmt/rest/v2/plugin.go b/mgmt/rest/v2/plugin.go new file mode 100644 index 000000000..78680ef3b --- /dev/null +++ b/mgmt/rest/v2/plugin.go @@ -0,0 +1,448 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2017 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v2 + +import ( + "compress/gzip" + "crypto/sha256" + "errors" + "fmt" + "io" + "io/ioutil" + "mime" + "mime/multipart" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + + "path" + "runtime" + + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/snap/control" + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/serror" + "github.com/julienschmidt/httprouter" +) + +type PluginsResponse struct { + RunningPlugins []RunningPlugin `json:"running_plugins,omitempty"` + Plugins []Plugin `json:"plugins,omitempty"` +} + +type Plugin struct { + Name string `json:"name"` + Version int `json:"version"` + Type string `json:"type"` + Signed bool `json:"signed"` + Status string `json:"status"` + LoadedTimestamp int64 `json:"loaded_timestamp"` + Href string `json:"href"` + ConfigPolicy []PolicyTable `json:"policy,omitempty"` +} + +type RunningPlugin struct { + Name string `json:"name"` + Version int `json:"version"` + Type string `json:"type"` + HitCount int `json:"hitcount"` + LastHitTimestamp int64 `json:"last_hit_timestamp"` + ID uint32 `json:"id"` + Href string `json:"href"` + PprofPort string `json:"pprof_port"` +} + +type plugin struct { + name string + version int + pluginType string +} + +func (p *plugin) Name() string { + return p.name +} + +func (p *plugin) Version() int { + return p.version +} + +func (p *plugin) TypeName() string { + return p.pluginType +} + +func (s *apiV2) loadPlugin(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) + if err != nil { + Write(415, FromError(err), w) + return + } + if strings.HasPrefix(mediaType, "multipart/") { + var pluginPath string + var signature []byte + var checkSum [sha256.Size]byte + mr := multipart.NewReader(r.Body, params["boundary"]) + var i int + for { + var b []byte + p, err := mr.NextPart() + if err == io.EOF { + break + } + if err != nil { + Write(500, FromError(err), w) + return + } + if r.Header.Get("Plugin-Compression") == "gzip" { + g, err := gzip.NewReader(p) + defer g.Close() + if err != nil { + Write(500, FromError(err), w) + return + } + b, err = ioutil.ReadAll(g) + if err != nil { + Write(500, FromError(err), w) + return + } + } else { + b, err = ioutil.ReadAll(p) + if err != nil { + Write(500, FromError(err), w) + return + } + } + + // A little sanity checking for files being passed into the API server. + // First file passed in should be the plugin. If the first file is a signature + // file, an error is returned. The signature file should be the second + // file passed to the API server. If the second file does not have the ".asc" + // extension, an error is returned. + // If we loop around more than twice before receiving io.EOF, then + // an error is returned. + + switch { + case i == 0: + if filepath.Ext(p.FileName()) == ".asc" { + e := errors.New("Error: first file passed to load plugin api can not be signature file") + Write(400, FromError(e), w) + return + } + if pluginPath, err = writeFile(p.FileName(), b); err != nil { + Write(500, FromError(err), w) + return + } + checkSum = sha256.Sum256(b) + case i == 1: + if filepath.Ext(p.FileName()) == ".asc" { + signature = b + } else { + e := errors.New("Error: second file passed was not a signature file") + Write(400, FromError(e), w) + return + } + case i == 2: + e := errors.New("Error: More than two files passed to the load plugin api") + Write(400, FromError(e), w) + return + } + i++ + } + rp, err := core.NewRequestedPlugin(pluginPath) + if err != nil { + Write(500, FromError(err), w) + return + } + rp.SetAutoLoaded(false) + // Sanity check, verify the checkSum on the file sent is the same + // as after it is written to disk. + if rp.CheckSum() != checkSum { + e := errors.New("Error: CheckSum mismatch on requested plugin to load") + Write(400, FromError(e), w) + return + } + rp.SetSignature(signature) + restLogger.Info("Loading plugin: ", rp.Path()) + pl, err := s.metricManager.Load(rp) + if err != nil { + var ec int + restLogger.Error(err) + restLogger.Debugf("Removing file (%s)", rp.Path()) + err2 := os.RemoveAll(filepath.Dir(rp.Path())) + if err2 != nil { + restLogger.Error(err2) + } + rb := FromError(err) + switch rb.ErrorMessage { + case ErrPluginAlreadyLoaded: + ec = 409 + default: + ec = 500 + } + Write(ec, rb, w) + return + } + Write(201, catalogedPluginBody(r.Host, pl), w) + } +} + +func writeFile(filename string, b []byte) (string, error) { + // Create temporary directory + dir, err := ioutil.TempDir("", "") + if err != nil { + return "", err + } + f, err := os.Create(path.Join(dir, filename)) + if err != nil { + return "", err + } + // Close before load + defer f.Close() + + n, err := f.Write(b) + log.Debugf("wrote %v to %v", n, f.Name()) + if err != nil { + return "", err + } + if runtime.GOOS != "windows" { + err = f.Chmod(0700) + if err != nil { + return "", err + } + } + return f.Name(), nil +} + +func pluginParameters(p httprouter.Params) (string, string, int, map[string]interface{}, serror.SnapError) { + plName := p.ByName("name") + plType := p.ByName("type") + plVersion, err := strconv.ParseInt(p.ByName("version"), 10, 0) + f := map[string]interface{}{ + "plugin-name": plName, + "plugin-version": plVersion, + "plugin-type": plType, + } + + if err != nil || plName == "" || plType == "" { + se := serror.New(errors.New("missing or invalid parameter(s)")) + se.SetFields(f) + return "", "", 0, nil, se + } + return plType, plName, int(plVersion), f, nil +} + +func (s *apiV2) unloadPlugin(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + plType, plName, plVersion, f, se := pluginParameters(p) + if se != nil { + Write(400, FromSnapError(se), w) + return + } + + _, se = s.metricManager.Unload(&plugin{ + name: plName, + version: plVersion, + pluginType: plType, + }) + + // 404 - plugin not found + // 409 - plugin state is not plugin loaded + // 500 - removing plugin from /tmp failed + if se != nil { + se.SetFields(f) + statusCode := 500 + switch se.Error() { + case control.ErrPluginNotFound.Error(): + statusCode = 404 + case control.ErrPluginNotInLoadedState.Error(): + statusCode = 409 + } + Write(statusCode, FromSnapError(se), w) + return + } + Write(204, nil, w) +} + +func (s *apiV2) getPlugins(w http.ResponseWriter, r *http.Request, params httprouter.Params) { + + // filter by plugin name or plugin type + q := r.URL.Query() + plName := q.Get("name") + plType := q.Get("type") + nbFilter := Btoi(plName != "") + Btoi(plType != "") + + if _, detail := r.URL.Query()["running"]; detail { + // get running plugins + plugins := runningPluginsBody(r.Host, s.metricManager.AvailablePlugins()) + filteredPlugins := []RunningPlugin{} + if nbFilter > 0 { + for _, p := range plugins { + if nbFilter == 1 && (p.Name == plName || p.Type == plType) || nbFilter == 2 && (p.Name == plName && p.Type == plType) { + filteredPlugins = append(filteredPlugins, p) + } + } + } else { + filteredPlugins = plugins + } + Write(200, PluginsResponse{RunningPlugins: filteredPlugins}, w) + } else { + // get plugins from the plugin catalog + plugins := pluginCatalogBody(r.Host, s.metricManager.PluginCatalog()) + filteredPlugins := []Plugin{} + + if nbFilter > 0 { + for _, p := range plugins { + if nbFilter == 1 && (p.Name == plName || p.Type == plType) || nbFilter == 2 && (p.Name == plName && p.Type == plType) { + filteredPlugins = append(filteredPlugins, p) + } + } + } else { + filteredPlugins = plugins + } + Write(200, PluginsResponse{Plugins: filteredPlugins}, w) + } +} + +func Btoi(b bool) int { + if b { + return 1 + } + return 0 +} + +func pluginCatalogBody(host string, c []core.CatalogedPlugin) []Plugin { + plugins := make([]Plugin, len(c)) + for i, p := range c { + plugins[i] = catalogedPluginBody(host, p) + } + return plugins +} + +func catalogedPluginBody(host string, c core.CatalogedPlugin) Plugin { + return Plugin{ + Name: c.Name(), + Version: c.Version(), + Type: c.TypeName(), + Signed: c.IsSigned(), + Status: c.Status(), + LoadedTimestamp: c.LoadedTimestamp().Unix(), + Href: pluginURI(host, c), + } +} + +func runningPluginsBody(host string, c []core.AvailablePlugin) []RunningPlugin { + plugins := make([]RunningPlugin, len(c)) + for i, p := range c { + plugins[i] = RunningPlugin{ + Name: p.Name(), + Version: p.Version(), + Type: p.TypeName(), + HitCount: p.HitCount(), + LastHitTimestamp: p.LastHit().Unix(), + ID: p.ID(), + Href: pluginURI(host, p), + PprofPort: p.Port(), + } + } + return plugins +} + +func pluginURI(host string, c core.Plugin) string { + return fmt.Sprintf("%s://%s/%s/plugins/%s/%s/%d", protocolPrefix, host, version, c.TypeName(), c.Name(), c.Version()) +} + +func (s *apiV2) getPlugin(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + plType, plName, plVersion, f, se := pluginParameters(p) + if se != nil { + Write(400, FromSnapError(se), w) + return + } + + pluginCatalog := s.metricManager.PluginCatalog() + var plugin core.CatalogedPlugin + for _, item := range pluginCatalog { + if item.Name() == plName && + item.Version() == int(plVersion) && + item.TypeName() == plType { + plugin = item + break + } + } + if plugin == nil { + se := serror.New(ErrPluginNotFound, f) + Write(404, FromSnapError(se), w) + return + } + + rd := r.FormValue("download") + d, _ := strconv.ParseBool(rd) + var configPolicy []PolicyTable + if plugin.TypeName() == "processor" || plugin.TypeName() == "publisher" { + rules := plugin.Policy().Get([]string{""}).RulesAsTable() + configPolicy = make([]PolicyTable, 0, len(rules)) + for _, r := range rules { + configPolicy = append(configPolicy, PolicyTable{ + Name: r.Name, + Type: r.Type, + Default: r.Default, + Required: r.Required, + Minimum: r.Minimum, + Maximum: r.Maximum, + }) + } + + } + + if d { + b, err := ioutil.ReadFile(plugin.PluginPath()) + if err != nil { + f["plugin-path"] = plugin.PluginPath() + se := serror.New(err, f) + Write(500, FromSnapError(se), w) + return + } + + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Encoding", "gzip") + gz := gzip.NewWriter(w) + defer gz.Close() + _, err = gz.Write(b) + if err != nil { + f["plugin-path"] = plugin.PluginPath() + se := serror.New(err, f) + Write(500, FromSnapError(se), w) + return + } + w.WriteHeader(200) + return + } else { + pluginRet := Plugin{ + Name: plugin.Name(), + Version: plugin.Version(), + Type: plugin.TypeName(), + Signed: plugin.IsSigned(), + Status: plugin.Status(), + LoadedTimestamp: plugin.LoadedTimestamp().Unix(), + Href: pluginURI(r.Host, plugin), + ConfigPolicy: configPolicy, + } + Write(200, pluginRet, w) + } +} diff --git a/mgmt/rest/v2/task.go b/mgmt/rest/v2/task.go new file mode 100644 index 000000000..77e13c0b2 --- /dev/null +++ b/mgmt/rest/v2/task.go @@ -0,0 +1,222 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2017 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v2 + +import ( + "fmt" + "net/http" + "sort" + "strings" + "time" + + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/serror" + "github.com/intelsdi-x/snap/pkg/schedule" + "github.com/intelsdi-x/snap/scheduler/wmap" + "github.com/julienschmidt/httprouter" +) + +type TasksResponse struct { + Tasks Tasks `json:"tasks"` +} + +type Task struct { + ID string `json:"id"` + Name string `json:"name"` + Deadline string `json:"deadline"` + Workflow *wmap.WorkflowMap `json:"workflow,omitempty"` + Schedule *core.Schedule `json:"schedule,omitempty"` + CreationTimestamp int64 `json:"creation_timestamp,omitempty"` + LastRunTimestamp int64 `json:"last_run_timestamp,omitempty"` + HitCount int `json:"hit_count,omitempty"` + MissCount int `json:"miss_count,omitempty"` + FailedCount int `json:"failed_count,omitempty"` + LastFailureMessage string `json:"last_failure_message,omitempty"` + State string `json:"task_state"` + Href string `json:"href"` +} + +type Tasks []Task + +func (s Tasks) Len() int { + return len(s) +} + +func (s Tasks) Less(i, j int) bool { + return s[j].CreationTime().After(s[i].CreationTime()) +} + +func (s Tasks) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s *Task) CreationTime() time.Time { + return time.Unix(s.CreationTimestamp, 0) +} + +func (s *apiV2) addTask(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + task, err := core.CreateTaskFromContent(r.Body, nil, s.taskManager.CreateTask) + if err != nil { + Write(500, FromError(err), w) + return + } + taskB := AddSchedulerTaskFromTask(task) + taskB.Href = taskURI(r.Host, task) + Write(201, taskB, w) +} + +func (s *apiV2) getTasks(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + // get tasks from the task manager + sts := s.taskManager.GetTasks() + + // create the task list response + tasks := make(Tasks, len(sts)) + i := 0 + for _, t := range sts { + tasks[i] = SchedulerTaskFromTask(t) + tasks[i].Href = taskURI(r.Host, t) + i++ + } + sort.Sort(tasks) + + Write(200, TasksResponse{Tasks: tasks}, w) +} + +func (s *apiV2) getTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + id := p.ByName("id") + t, err := s.taskManager.GetTask(id) + if err != nil { + Write(404, FromError(err), w) + return + } + task := AddSchedulerTaskFromTask(t) + task.Href = taskURI(r.Host, t) + Write(200, task, w) +} + +func (s *apiV2) updateTaskState(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + errs := make([]serror.SnapError, 0, 1) + id := p.ByName("id") + action, exist := r.URL.Query()["action"] + if !exist && len(action) > 0 { + errs = append(errs, serror.New(ErrNoActionSpecified)) + } else { + switch action[0] { + case "enable": + _, err := s.taskManager.EnableTask(id) + if err != nil { + errs = append(errs, serror.New(err)) + } + case "start": + errs = s.taskManager.StartTask(id) + case "stop": + errs = s.taskManager.StopTask(id) + default: + errs = append(errs, serror.New(ErrWrongAction)) + } + } + + if len(errs) > 0 { + statusCode := 500 + switch errs[0].Error() { + case ErrNoActionSpecified.Error(): + statusCode = 400 + case ErrWrongAction.Error(): + statusCode = 400 + case ErrTaskNotFound: + statusCode = 404 + case ErrTaskDisabledNotRunnable: + statusCode = 409 + } + Write(statusCode, FromSnapErrors(errs), w) + return + } + Write(204, nil, w) +} + +func (s *apiV2) removeTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + id := p.ByName("id") + err := s.taskManager.RemoveTask(id) + if err != nil { + if strings.Contains(err.Error(), ErrTaskNotFound) { + Write(404, FromError(err), w) + return + } + Write(500, FromError(err), w) + return + } + Write(204, nil, w) +} + +func taskURI(host string, t core.Task) string { + return fmt.Sprintf("%s://%s/%s/tasks/%s", protocolPrefix, host, version, t.ID()) +} + +// functions to convert a core.Task to a Task +func AddSchedulerTaskFromTask(t core.Task) Task { + st := SchedulerTaskFromTask(t) + (&st).assertSchedule(t.Schedule()) + st.Workflow = t.WMap() + return st +} + +func SchedulerTaskFromTask(t core.Task) Task { + st := Task{ + ID: t.ID(), + Name: t.GetName(), + Deadline: t.DeadlineDuration().String(), + CreationTimestamp: t.CreationTime().Unix(), + LastRunTimestamp: t.LastRunTime().Unix(), + HitCount: int(t.HitCount()), + MissCount: int(t.MissedCount()), + FailedCount: int(t.FailedCount()), + LastFailureMessage: t.LastFailureMessage(), + State: t.State().String(), + } + if st.LastRunTimestamp < 0 { + st.LastRunTimestamp = -1 + } + return st +} + +func (t *Task) assertSchedule(s schedule.Schedule) { + switch v := s.(type) { + case *schedule.SimpleSchedule: + t.Schedule = &core.Schedule{ + Type: "simple", + Interval: v.Interval.String(), + } + return + case *schedule.WindowedSchedule: + t.Schedule = &core.Schedule{ + Type: "windowed", + Interval: v.Interval.String(), + StartTimestamp: v.StartTime, + StopTimestamp: v.StopTime, + } + return + case *schedule.CronSchedule: + t.Schedule = &core.Schedule{ + Type: "cron", + Interval: v.Entry(), + } + return + } +} diff --git a/mgmt/rest/v2/watch.go b/mgmt/rest/v2/watch.go new file mode 100644 index 000000000..671e0f6a2 --- /dev/null +++ b/mgmt/rest/v2/watch.go @@ -0,0 +1,206 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2017 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v2 + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "github.com/intelsdi-x/snap/core" + "github.com/julienschmidt/httprouter" +) + +const ( + // Event types for task watcher streaming + TaskWatchStreamOpen = "stream-open" + TaskWatchMetricEvent = "metric-event" + TaskWatchTaskDisabled = "task-disabled" + TaskWatchTaskStarted = "task-started" + TaskWatchTaskStopped = "task-stopped" +) + +// The amount of time to buffer streaming events before flushing in seconds +var StreamingBufferWindow = 0.1 + +func (s *apiV2) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + s.wg.Add(1) + defer s.wg.Done() + + id := p.ByName("id") + + tw := &TaskWatchHandler{ + alive: true, + mChan: make(chan StreamedTaskEvent), + } + tc, err1 := s.taskManager.WatchTask(id, tw) + if err1 != nil { + if strings.Contains(err1.Error(), ErrTaskNotFound) { + Write(404, FromError(err1), w) + return + } + Write(500, FromError(err1), w) + return + } + + // Make this Server Sent Events compatible + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + // get a flusher type + flusher, ok := w.(http.Flusher) + if !ok { + // This only works on ResponseWriters that support streaming + Write(500, FromError(ErrStreamingUnsupported), w) + return + } + // send initial stream open event + so := StreamedTaskEvent{ + EventType: TaskWatchStreamOpen, + Message: "Stream opened", + } + fmt.Fprintf(w, "data: %s\n\n", so.ToJSON()) + flusher.Flush() + + // Get a channel for if the client notifies us it is closing the connection + n := w.(http.CloseNotifier).CloseNotify() + t := time.Now() + for { + // Write to the ResponseWriter + select { + case e := <-tw.mChan: + switch e.EventType { + case TaskWatchMetricEvent, TaskWatchTaskStarted: + // The client can decide to stop receiving on the stream on Task Stopped. + // We write the event to the buffer + fmt.Fprintf(w, "data: %s\n\n", e.ToJSON()) + case TaskWatchTaskDisabled, TaskWatchTaskStopped: + // A disabled task should end the streaming and close the connection + fmt.Fprintf(w, "data: %s\n\n", e.ToJSON()) + // Flush since we are sending nothing new + flusher.Flush() + // Close out watcher removing it from the scheduler + tc.Close() + // exit since this client is no longer listening + Write(204, nil, w) + } + // If we are at least above our minimum buffer time we flush to send + if time.Now().Sub(t).Seconds() > StreamingBufferWindow { + flusher.Flush() + t = time.Now() + } + case <-n: + // Flush since we are sending nothing new + flusher.Flush() + // Close out watcher removing it from the scheduler + tc.Close() + // exit since this client is no longer listening + Write(204, nil, w) + return + case <-s.killChan: + // Flush since we are sending nothing new + flusher.Flush() + // Close out watcher removing it from the scheduler + tc.Close() + // exit since this client is no longer listening + Write(204, nil, w) + return + } + } +} + +type TaskWatchHandler struct { + streamCount int + alive bool + mChan chan StreamedTaskEvent +} + +func (t *TaskWatchHandler) CatchCollection(m []core.Metric) { + sm := make([]StreamedMetric, len(m)) + for i := range m { + sm[i] = StreamedMetric{ + Namespace: m[i].Namespace().String(), + Data: m[i].Data(), + Timestamp: m[i].Timestamp(), + Tags: m[i].Tags(), + } + } + t.mChan <- StreamedTaskEvent{ + EventType: TaskWatchMetricEvent, + Message: "", + Event: sm, + } +} + +func (t *TaskWatchHandler) CatchTaskStarted() { + t.mChan <- StreamedTaskEvent{ + EventType: TaskWatchTaskStarted, + } +} + +func (t *TaskWatchHandler) CatchTaskStopped() { + t.mChan <- StreamedTaskEvent{ + EventType: TaskWatchTaskStopped, + } +} + +func (t *TaskWatchHandler) CatchTaskDisabled(why string) { + t.mChan <- StreamedTaskEvent{ + EventType: TaskWatchTaskDisabled, + Message: why, + } +} + +type StreamedTaskEvent struct { + // Used to describe the event + EventType string `json:"type"` + Message string `json:"message"` + Event StreamedMetrics `json:"event,omitempty"` +} + +func (s *StreamedTaskEvent) ToJSON() string { + j, _ := json.Marshal(s) + return string(j) +} + +type StreamedMetric struct { + Namespace string `json:"namespace"` + Data interface{} `json:"data"` + Timestamp time.Time `json:"timestamp"` + Tags map[string]string `json:"tags"` +} + +type StreamedMetrics []StreamedMetric + +func (s StreamedMetrics) Len() int { + return len(s) +} + +func (s StreamedMetrics) Less(i, j int) bool { + return fmt.Sprintf("%s", s[i].Namespace) < fmt.Sprintf("%s", s[j].Namespace) +} + +func (s StreamedMetrics) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +}