Skip to content

Commit

Permalink
drop namespace requirement for sink
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Oct 23, 2020
1 parent 1f3ccc9 commit 942fc69
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 113 deletions.
4 changes: 0 additions & 4 deletions command/agent/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ func (s *HTTPServer) eventSinkUpdate(resp http.ResponseWriter, req *http.Request
}
s.parseWriteRequest(req, &args.WriteRequest)

if args.Sink.Namespace == "" {
args.Sink.Namespace = args.WriteRequest.Namespace
}

var out structs.GenericResponse
if err := s.agent.RPC("Event.UpsertSink", &args, &out); err != nil {
return nil, err
Expand Down
12 changes: 6 additions & 6 deletions command/agent/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func TestHTTP_EventSinkList(t *testing.T) {
s1 := mock.EventSink()
s2 := mock.EventSink()

require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, structs.DefaultNamespace, s1))
require.NoError(t, s.Agent.server.State().UpsertEventSink(1001, structs.DefaultNamespace, s2))
require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1))
require.NoError(t, s.Agent.server.State().UpsertEventSink(1001, s2))

req, err := http.NewRequest("GET", "/v1/event/sinks", nil)
require.NoError(t, err)
Expand All @@ -51,7 +51,7 @@ func TestHTTP_EventSinkGet(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()

require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, structs.DefaultNamespace, s1))
require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1))

req, err := http.NewRequest("GET", "/v1/event/sink/"+s1.ID, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestHTTP_EventSinkUpsert(t *testing.T) {
require.NotEqual(t, "", respW.HeaderMap.Get("X-Nomad-Index"))

state := s.Agent.server.State()
out, err := state.EventSinkByID(nil, structs.DefaultNamespace, s1.ID)
out, err := state.EventSinkByID(nil, s1.ID)
require.NoError(t, err)
require.Equal(t, s1.Address, out.Address)
require.Equal(t, s1.ID, out.ID)
Expand All @@ -94,7 +94,7 @@ func TestHTTP_EventSinkDelete(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()

require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, structs.DefaultNamespace, s1))
require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1))

req, err := http.NewRequest("DELETE", "/v1/event/sink/"+s1.ID, nil)
require.NoError(t, err)
Expand All @@ -106,7 +106,7 @@ func TestHTTP_EventSinkDelete(t *testing.T) {
require.NotEqual(t, "", respW.HeaderMap.Get("X-Nomad-Index"))

state := s.Agent.server.State()
out, err := state.EventSinkByID(nil, structs.DefaultNamespace, s1.ID)
out, err := state.EventSinkByID(nil, s1.ID)
require.NoError(t, err)
require.Nil(t, out)
})
Expand Down
6 changes: 3 additions & 3 deletions nomad/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (e *Event) register() {
e.srv.streamingRpcs.Register("Event.Stream", e.stream)
}

// ListSinks is used to list the event sinks registered in each namespace.
// ListSinks is used to list the event sinks registered in Nomad
func (e *Event) ListSinks(args *structs.EventSinkListRequest, reply *structs.EventSinkListResponse) error {
if done, err := e.srv.forward("Event.ListSinks", args, args, reply); done {
return err
Expand All @@ -42,7 +42,7 @@ func (e *Event) ListSinks(args *structs.EventSinkListRequest, reply *structs.Eve
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
iter, err := state.EventSinksByNamespace(ws, args.RequestNamespace())
iter, err := state.EventSinks(ws)
if err != nil {
return err
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func (e *Event) GetSink(args *structs.EventSinkSpecificRequest, reply *structs.E
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
s, err := state.EventSinkByID(ws, args.RequestNamespace(), args.ID)
s, err := state.EventSinkByID(ws, args.ID)
if err != nil {
return nil
}
Expand Down
14 changes: 6 additions & 8 deletions nomad/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func TestEvent_UpsertSink(t *testing.T) {
// Check for the sink in the FSM

state := s1.fsm.State()
out, err := state.EventSinkByID(nil, structs.DefaultNamespace, sink.ID)
out, err := state.EventSinkByID(nil, sink.ID)
require.NoError(t, err)

// set the index so we can compare values
Expand Down Expand Up @@ -566,9 +566,7 @@ func TestEvent_UpsertSink_Invalid(t *testing.T) {
require.Error(t, err)

require.Contains(t, err.Error(), "Missing sink ID")
require.Contains(t, err.Error(), "Sink must be in a namespace")
require.Contains(t, err.Error(), "Webhook sink requires a valid Address")

}

func TestEvent_GetSink(t *testing.T) {
Expand All @@ -582,7 +580,7 @@ func TestEvent_GetSink(t *testing.T) {

sink := mock.EventSink()

require.NoError(t, s1.fsm.State().UpsertEventSink(1000, structs.DefaultNamespace, sink))
require.NoError(t, s1.fsm.State().UpsertEventSink(1000, sink))

get := &structs.EventSinkSpecificRequest{
ID: sink.ID,
Expand Down Expand Up @@ -615,7 +613,7 @@ func TestEvent_DeleteSink(t *testing.T) {

sink := mock.EventSink()

require.NoError(t, s1.fsm.State().UpsertEventSink(1000, structs.DefaultNamespace, sink))
require.NoError(t, s1.fsm.State().UpsertEventSink(1000, sink))

get := &structs.EventSinkDeleteRequest{
IDs: []string{sink.ID},
Expand All @@ -629,7 +627,7 @@ func TestEvent_DeleteSink(t *testing.T) {
require.NotEqual(t, uint64(0), resp.Index)

state := s1.fsm.State()
out, err := state.EventSinkByID(nil, structs.DefaultNamespace, sink.ID)
out, err := state.EventSinkByID(nil, sink.ID)
require.NoError(t, err)
require.Nil(t, out)
}
Expand All @@ -646,8 +644,8 @@ func TestEvent_ListSinks(t *testing.T) {
sink := mock.EventSink()
sink2 := mock.EventSink()

require.NoError(t, s1.fsm.State().UpsertEventSink(1000, structs.DefaultNamespace, sink))
require.NoError(t, s1.fsm.State().UpsertEventSink(1001, structs.DefaultNamespace, sink2))
require.NoError(t, s1.fsm.State().UpsertEventSink(1000, sink))
require.NoError(t, s1.fsm.State().UpsertEventSink(1001, sink2))

get := &structs.EventSinkListRequest{
QueryOptions: structs.QueryOptions{
Expand Down
31 changes: 29 additions & 2 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ func (n *nomadFSM) applyUpsertEventSink(buf []byte, index uint64) interface{} {
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_upsert_event_sink"}, time.Now())

if err := n.state.UpsertEventSink(index, req.RequestNamespace(), req.Sink); err != nil {
if err := n.state.UpsertEventSink(index, req.Sink); err != nil {
n.logger.Error("UpsertEventSink failed", "error", err)
return err
}
Expand All @@ -1275,7 +1275,7 @@ func (n *nomadFSM) applyDeleteEventSink(buf []byte, index uint64) interface{} {
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_delete_event_sink"}, time.Now())

if err := n.state.DeleteEventSinks(index, req.RequestNamespace(), req.IDs); err != nil {
if err := n.state.DeleteEventSinks(index, req.IDs); err != nil {
n.logger.Error("DeleteEventSink failed", "error", err)
return err
}
Expand Down Expand Up @@ -1872,6 +1872,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistEventSinks(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}

Expand Down Expand Up @@ -2375,6 +2379,29 @@ func (s *nomadSnapshot) persistCSIVolumes(sink raft.SnapshotSink,
return nil
}

func (s *nomadSnapshot) persistEventSinks(sink raft.SnapshotSink,
encoder *codec.Encoder) error {

sinks, err := s.snap.EventSinks(nil)
if err != nil {
return err
}

for {
raw := sinks.Next()
if raw == nil {
break
}

es := raw.(*structs.EventSink)
sink.Write([]byte{byte(EventSinkSnapshot)})
if err := encoder.Encode(es); err != nil {
return err
}
}
return nil
}

// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.
Expand Down
7 changes: 3 additions & 4 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -1508,9 +1508,8 @@ func AllocNetworkStatus() *structs.AllocNetworkStatus {

func EventSink() *structs.EventSink {
return &structs.EventSink{
ID: fmt.Sprintf("webhook-sink-%s", uuid.Generate()[0:8]),
Namespace: structs.DefaultNamespace,
Type: structs.SinkWebhook,
Address: "http://127.0.0.1/",
ID: fmt.Sprintf("webhook-sink-%s", uuid.Generate()[0:8]),
Type: structs.SinkWebhook,
Address: "http://127.0.0.1/",
}
}
15 changes: 3 additions & 12 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,18 +914,9 @@ func eventSinkTableSchema() *memdb.TableSchema {
AllowMissing: false,
Unique: true,

// Use a compound index so the tuple of (Namespace, ID) is
// uniquely identifying
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},

&memdb.StringFieldIndex{
Field: "ID",
},
},
// Sink ID is uniquely identifying
Indexer: &memdb.StringFieldIndex{
Field: "ID",
},
},
},
Expand Down
44 changes: 22 additions & 22 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5638,43 +5638,43 @@ func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[
return nil, nil
}

func (s *StateStore) EventSinkByID(ws memdb.WatchSet, namespace, id string) (*structs.EventSink, error) {
func (s *StateStore) EventSinks(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
return s.eventSinkByIDTxn(ws, namespace, id, txn)
}

func (s *StateStore) eventSinkByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn) (*structs.EventSink, error) {
watchCh, existing, err := txn.FirstWatch("event_sink", "id", namespace, id)
// Walk the entire event sink table
iter, err := txn.Get("event_sink", "id")
if err != nil {
return nil, fmt.Errorf("event sink lookup failed: %w", err)
return nil, err
}
ws.Add(watchCh)

if existing != nil {
return existing.(*structs.EventSink), nil
}
return nil, nil
ws.Add(iter.WatchCh())

return iter, nil
}

func (s *StateStore) EventSinksByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
func (s *StateStore) EventSinkByID(ws memdb.WatchSet, id string) (*structs.EventSink, error) {
txn := s.db.ReadTxn()
return s.eventSinkByIDTxn(ws, id, txn)
}

// Walk the entire event sink table
iter, err := txn.Get("event_sink", "id_prefix", namespace, "")
func (s *StateStore) eventSinkByIDTxn(ws memdb.WatchSet, id string, txn Txn) (*structs.EventSink, error) {
watchCh, existing, err := txn.FirstWatch("event_sink", "id", id)
if err != nil {
return nil, err
return nil, fmt.Errorf("event sink lookup failed: %w", err)
}
ws.Add(watchCh)

ws.Add(iter.WatchCh())

return iter, nil
if existing != nil {
return existing.(*structs.EventSink), nil
}
return nil, nil
}

func (s *StateStore) UpsertEventSink(idx uint64, namespace string, sink *structs.EventSink) error {
func (s *StateStore) UpsertEventSink(idx uint64, sink *structs.EventSink) error {
txn := s.db.WriteTxn(idx)
defer txn.Abort()

existing, err := txn.First("event_sink", "id", namespace, sink.ID)
existing, err := txn.First("event_sink", "id", sink.ID)
if err != nil {
return fmt.Errorf("event sink lookup failed: %w", err)
}
Expand All @@ -5698,12 +5698,12 @@ func (s *StateStore) UpsertEventSink(idx uint64, namespace string, sink *structs
return txn.Commit()
}

func (s *StateStore) DeleteEventSinks(idx uint64, namespace string, sinks []string) error {
func (s *StateStore) DeleteEventSinks(idx uint64, sinks []string) error {
txn := s.db.WriteTxn(idx)
defer txn.Abort()

for _, id := range sinks {
if _, err := txn.DeleteAll("event_sink", "id", namespace, id); err != nil {
if _, err := txn.DeleteAll("event_sink", "id", id); err != nil {
return fmt.Errorf("deleting event sink failed: %v", err)
}
}
Expand Down
Loading

0 comments on commit 942fc69

Please sign in to comment.