Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

event sink crud operation api #9155

Merged
merged 2 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions command/agent/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,103 @@ import (
"golang.org/x/sync/errgroup"
)

func (s *HTTPServer) EventSinksRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodGet {
return nil, CodedError(405, ErrInvalidMethod)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there's https://golang.org/pkg/net/http/#pkg-constants we can use instead of raw numbers, e.g. http.StatusMethodNotAllowed

}

args := structs.EventSinkListRequest{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hyper nit: using := here, but var below for instantiating default value structs

if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

var out structs.EventSinkListResponse
if err := s.agent.RPC("Event.ListSinks", &args, &out); err != nil {
return nil, err
}

if out.Sinks == nil {
out.Sinks = make([]*structs.EventSink, 0)
}
setMeta(resp, &out.QueryMeta)
return out.Sinks, nil
}

func (s *HTTPServer) EventSinkSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
name := strings.TrimPrefix(req.URL.Path, "/v1/event/sink/")
if len(name) == 0 {
return nil, CodedError(400, "Missing Policy Name")
}
switch req.Method {
case http.MethodGet:
return s.eventSinkGet(resp, req, name)
case http.MethodPost, http.MethodPut:
return s.eventSinkUpdate(resp, req, name)
case http.MethodDelete:
return s.eventSinkDelete(resp, req, name)
default:
return nil, CodedError(405, ErrInvalidMethod)
}
}

func (s *HTTPServer) eventSinkGet(resp http.ResponseWriter, req *http.Request, sink string) (interface{}, error) {
args := structs.EventSinkSpecificRequest{
ID: sink,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

var out structs.EventSinkResponse
if err := s.agent.RPC("Event.GetSink", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
if out.Sink == nil {
return nil, CodedError(404, "event sink not found")
}
return out.Sink, nil
}

func (s *HTTPServer) eventSinkUpdate(resp http.ResponseWriter, req *http.Request, sinkName string) (interface{}, error) {
var sink structs.EventSink
if err := decodeBody(req, &sink); err != nil {
return nil, CodedError(500, err.Error())
}

if sink.ID != sinkName {
return nil, CodedError(400, "Event sink name does not match request path")
}

args := structs.EventSinkUpsertRequest{
Sink: &sink,
}
s.parseWriteRequest(req, &args.WriteRequest)

var out structs.GenericResponse
if err := s.agent.RPC("Event.UpsertSink", &args, &out); err != nil {
return nil, err
}

setIndex(resp, out.Index)
return nil, nil
}

func (s *HTTPServer) eventSinkDelete(resp http.ResponseWriter, req *http.Request, sink string) (interface{}, error) {

args := structs.EventSinkDeleteRequest{
IDs: []string{sink},
}
s.parseWriteRequest(req, &args.WriteRequest)

var out structs.GenericResponse
if err := s.agent.RPC("Event.DeleteSink", &args, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
return nil, nil
}

func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
query := req.URL.Query()

Expand Down
90 changes: 90 additions & 0 deletions command/agent/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"

"github.com/hashicorp/nomad/testutil"
Expand All @@ -22,6 +23,95 @@ type testEvent struct {
ID string
}

func TestHTTP_EventSinkList(t *testing.T) {
t.Parallel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is parallel but the others are not, can they all be?


httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()
s2 := mock.EventSink()

require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1))
require.NoError(t, s.Agent.server.State().UpsertEventSink(1001, s2))

req, err := http.NewRequest("GET", "/v1/event/sinks", nil)
require.NoError(t, err)

respW := httptest.NewRecorder()
obj, err := s.Server.EventSinksRequest(respW, req)
require.NoError(t, err)

require.Equal(t, "1001", respW.HeaderMap.Get("X-Nomad-Index"))

n := obj.([]*structs.EventSink)
require.Len(t, n, 2)
})
}

func TestHTTP_EventSinkGet(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()

require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1))

req, err := http.NewRequest("GET", "/v1/event/sink/"+s1.ID, nil)
require.NoError(t, err)

respW := httptest.NewRecorder()
obj, err := s.Server.EventSinkSpecificRequest(respW, req)
require.NoError(t, err)

require.Equal(t, "1000", respW.HeaderMap.Get("X-Nomad-Index"))

n := obj.(*structs.EventSink)
require.Equal(t, s1, n)
})
}

func TestHTTP_EventSinkUpsert(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()

buf := encodeReq(s1)

req, err := http.NewRequest("POST", "/v1/event/sink/"+s1.ID, buf)
require.NoError(t, err)

respW := httptest.NewRecorder()
_, err = s.Server.EventSinkSpecificRequest(respW, req)
require.NoError(t, err)

require.NotEqual(t, "", respW.HeaderMap.Get("X-Nomad-Index"))

state := s.Agent.server.State()
out, err := state.EventSinkByID(nil, s1.ID)
require.NoError(t, err)
require.Equal(t, s1.Address, out.Address)
require.Equal(t, s1.ID, out.ID)
})
}

func TestHTTP_EventSinkDelete(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()

require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1))

req, err := http.NewRequest("DELETE", "/v1/event/sink/"+s1.ID, nil)
require.NoError(t, err)

respW := httptest.NewRecorder()
_, err = s.Server.EventSinkSpecificRequest(respW, req)
require.NoError(t, err)

require.NotEqual(t, "", respW.HeaderMap.Get("X-Nomad-Index"))

state := s.Agent.server.State()
out, err := state.EventSinkByID(nil, s1.ID)
require.NoError(t, err)
require.Nil(t, out)
})
}

func TestEventStream(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 2 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/operator/scheduler/configuration", s.wrap(s.OperatorSchedulerConfiguration))

s.mux.HandleFunc("/v1/event/stream", s.wrap(s.EventStream))
s.mux.HandleFunc("/v1/event/sinks", s.wrap(s.EventSinksRequest))
s.mux.HandleFunc("/v1/event/sink/", s.wrap(s.EventSinkSpecificRequest))

s.mux.HandleFunc("/v1/namespaces", s.wrap(s.NamespacesRequest))
s.mux.HandleFunc("/v1/namespace", s.wrap(s.NamespaceCreateRequest))
Expand Down
2 changes: 2 additions & 0 deletions helper/raftutil/msgtypes.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

147 changes: 147 additions & 0 deletions nomad/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"io/ioutil"
"time"

metrics "github.com/armon/go-metrics"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand All @@ -22,6 +25,150 @@ func (e *Event) register() {
e.srv.streamingRpcs.Register("Event.Stream", e.stream)
}

// ListSinks is used to list the event sinks registered in Nomad
func (e *Event) ListSinks(args *structs.EventSinkListRequest, reply *structs.EventSinkListResponse) error {
if done, err := e.srv.forward("Event.ListSinks", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "event", "list_sinks"}, time.Now())

if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowOperatorRead() {
return structs.ErrPermissionDenied
}

opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
iter, err := state.EventSinks(ws)
if err != nil {
return err
}

var sinks []*structs.EventSink
for {
raw := iter.Next()
if raw == nil {
break
}

sink := raw.(*structs.EventSink)
sinks = append(sinks, sink)
}
reply.Sinks = sinks

index, err := state.Index("event_sink")
if err != nil {
return err
}

// Ensure we never set the index to zero, otherwise a blocking query cannot be used.
// We floor the index at one, since realistically the first write must have a higher index.
if index == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be even more sure with index <= 0

index = 1
}

reply.Index = index
return nil
},
}

return e.srv.blockingRPC(&opts)
}

// UpsertSink is used to create or update an event sink
func (e *Event) UpsertSink(args *structs.EventSinkUpsertRequest, reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Event.UpsertSink", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "event", "upsert_sink"}, time.Now())

if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.IsManagement() {
return structs.ErrPermissionDenied
}

if err := args.Sink.Validate(); err != nil {
return err
}

// Update via Raft
_, index, err := e.srv.raftApply(structs.EventSinkUpsertRequestType, args)
if err != nil {
return err
}

reply.Index = index
return nil
}

// GetSink returns the requested event sink
func (e *Event) GetSink(args *structs.EventSinkSpecificRequest, reply *structs.EventSinkResponse) error {
if done, err := e.srv.forward("Event.GetSink", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "event", "get_sink"}, time.Now())

if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowOperatorRead() {
return structs.ErrPermissionDenied
}

opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
s, err := state.EventSinkByID(ws, args.ID)
if err != nil {
return nil
}

reply.Sink = s

index, err := state.Index("event_sink")
if err != nil {
return err
}

if index == 0 {
index = 1
}

reply.Index = index
return nil
},
}

return e.srv.blockingRPC(&opts)
}

// DeleteSink deletes an event sink
func (e *Event) DeleteSink(args *structs.EventSinkDeleteRequest, reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Event.DeleteSink", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "event", "delete_sink"}, time.Now())

if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.IsManagement() {
return structs.ErrPermissionDenied
}

// Update via Raft
_, index, err := e.srv.raftApply(structs.EventSinkDeleteRequestType, args)
if err != nil {
return err
}

reply.Index = index
return nil
}

func (e *Event) stream(conn io.ReadWriteCloser) {
defer conn.Close()

Expand Down
Loading