diff --git a/go.mod b/go.mod index c4d44bfb2f4d..2285d45e1c89 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,7 @@ require ( github.com/hashicorp/consul-template v0.29.5 github.com/hashicorp/consul/api v1.17.0 github.com/hashicorp/errwrap v1.1.0 - github.com/hashicorp/eventlogger v0.1.0 + github.com/hashicorp/eventlogger v0.1.1 github.com/hashicorp/go-cleanhttp v0.5.2 github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192 github.com/hashicorp/go-gcp-common v0.8.0 diff --git a/go.sum b/go.sum index 66f4a030e59c..7e0585f4e792 100644 --- a/go.sum +++ b/go.sum @@ -969,8 +969,8 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FK github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/eventlogger v0.1.0 h1:S6xc4gZVzewuDUP4R4Ngko419h/CGDuV/b4ADL3XLik= -github.com/hashicorp/eventlogger v0.1.0/go.mod h1:a3IXf1aEJfpCPzseTOrwKj4fVW/Qn3oEmpQeaIznzH0= +github.com/hashicorp/eventlogger v0.1.1 h1:zyCjxsy7KunFsMPZKU5PnwWEakSrp1zjj2vPFmrDaeo= +github.com/hashicorp/eventlogger v0.1.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= @@ -995,7 +995,6 @@ github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJ github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-kms-wrapping/entropy/v2 v2.0.0 h1:pSjQfW3vPtrOTcasTUKgCTQT7OGPPTTMVRrOfU6FJD8= github.com/hashicorp/go-kms-wrapping/entropy/v2 v2.0.0/go.mod h1:xvb32K2keAc+R8DSFG2IwDcydK9DBQE+fGA5fsw6hSk= -github.com/hashicorp/go-kms-wrapping/v2 v2.0.7/go.mod h1:sDQAfwJGv25uGPZA04x87ERglCG6avnRcBT9wYoMII8= github.com/hashicorp/go-kms-wrapping/v2 v2.0.8 h1:9Q2lu1YbbmiAgvYZ7Pr31RdlVonUpX+mmDL7Z7qTA2U= github.com/hashicorp/go-kms-wrapping/v2 v2.0.8/go.mod h1:qTCjxGig/kjuj3hk1z8pOUrzbse/GxB1tGfbrq8tGJg= github.com/hashicorp/go-kms-wrapping/wrappers/aead/v2 v2.0.7-1 h1:ZV26VJYcITBom0QqYSUOIj4HOHCVPEFjLqjxyXV/AbA= diff --git a/http/events.go b/http/events.go index e957cd7a3798..f516b3ccbc33 100644 --- a/http/events.go +++ b/http/events.go @@ -15,7 +15,6 @@ import ( "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/vault" "github.com/hashicorp/vault/vault/eventbus" - "google.golang.org/protobuf/encoding/protojson" "nhooyr.io/websocket" ) @@ -47,19 +46,26 @@ func handleEventsSubscribeWebsocket(args eventSubscribeArgs) (websocket.StatusCo logger.Info("Websocket context is done, closing the connection") return websocket.StatusNormalClosure, "", nil case message := <-ch: - logger.Debug("Sending message to websocket", "message", message) + logger.Debug("Sending message to websocket", "message", message.Payload) var messageBytes []byte + var messageType websocket.MessageType if args.json { - messageBytes, err = protojson.Marshal(message) + var ok bool + messageBytes, ok = message.Format("cloudevents-json") + if !ok { + logger.Warn("Could not get cloudevents JSON format") + return 0, "", errors.New("could not get cloudevents JSON format") + } + messageType = websocket.MessageText } else { - messageBytes, err = proto.Marshal(message) + messageBytes, err = proto.Marshal(message.Payload.(*logical.EventReceived)) + messageType = websocket.MessageBinary } if err != nil { logger.Warn("Could not serialize websocket event", "error", err) return 0, "", err } - messageString := string(messageBytes) + "\n" - err = args.conn.Write(ctx, websocket.MessageText, []byte(messageString)) + err = args.conn.Write(ctx, messageType, messageBytes) if err != nil { return 0, "", err } diff --git a/http/events_test.go b/http/events_test.go index 645ecd403a4a..3fe5d68e96c1 100644 --- a/http/events_test.go +++ b/http/events_test.go @@ -2,6 +2,7 @@ package http import ( "context" + "fmt" "net/http" "strings" "sync/atomic" @@ -64,27 +65,39 @@ func TestEventsSubscribe(t *testing.T) { wsAddr := strings.Replace(addr, "http", "ws", 1) - // check that the connection fails if we don't have a token - _, _, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/"+eventType+"?json=true", nil) - if err == nil { - t.Error("Expected websocket error but got none") - } else if !strings.HasSuffix(err.Error(), "401") { - t.Errorf("Expected 401 websocket but got %v", err) - } + testCases := []struct { + json bool + }{{true}, {false}} - conn, _, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/"+eventType+"?json=true", &websocket.DialOptions{ - HTTPHeader: http.Header{"x-vault-token": []string{token}}, - }) - if err != nil { - t.Fatal(err) - } + for _, testCase := range testCases { + url := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?json=%v", wsAddr, eventType, testCase.json) + // check that the connection fails if we don't have a token + _, _, err := websocket.Dial(ctx, url, nil) + if err == nil { + t.Error("Expected websocket error but got none") + } else if !strings.HasSuffix(err.Error(), "401") { + t.Errorf("Expected 401 websocket but got %v", err) + } - _, msg, err := conn.Read(ctx) - if err != nil { - t.Fatal(err) - } - msgJson := strings.TrimSpace(string(msg)) - if !strings.HasPrefix(msgJson, "{") || !strings.HasSuffix(msgJson, "}") { - t.Errorf("Expected to get JSON event but got: %v", msgJson) + conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{ + HTTPHeader: http.Header{"x-vault-token": []string{token}}, + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + conn.Close(websocket.StatusNormalClosure, "") + }) + + _, msg, err := conn.Read(ctx) + if err != nil { + t.Fatal(err) + } + if testCase.json { + msgJson := strings.TrimSpace(string(msg)) + if !strings.HasPrefix(msgJson, "{") || !strings.HasSuffix(msgJson, "}") { + t.Errorf("Expected to get JSON event but got: %v", msgJson) + } + } } } diff --git a/sdk/logical/event.pb.go b/sdk/logical/event.pb.go index d09020236414..1925c6ae9870 100644 --- a/sdk/logical/event.pb.go +++ b/sdk/logical/event.pb.go @@ -10,7 +10,6 @@ import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" structpb "google.golang.org/protobuf/types/known/structpb" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" ) @@ -207,10 +206,9 @@ type EventReceived struct { Event *EventData `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"` // namespace path - Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` - EventType string `protobuf:"bytes,3,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` - PluginInfo *EventPluginInfo `protobuf:"bytes,4,opt,name=plugin_info,json=pluginInfo,proto3" json:"plugin_info,omitempty"` - Timestamp *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` + EventType string `protobuf:"bytes,3,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` + PluginInfo *EventPluginInfo `protobuf:"bytes,4,opt,name=plugin_info,json=pluginInfo,proto3" json:"plugin_info,omitempty"` } func (x *EventReceived) Reset() { @@ -273,13 +271,6 @@ func (x *EventReceived) GetPluginInfo() *EventPluginInfo { return nil } -func (x *EventReceived) GetTimestamp() *timestamppb.Timestamp { - if x != nil { - return x.Timestamp - } - return nil -} - var File_sdk_logical_event_proto protoreflect.FileDescriptor var file_sdk_logical_event_proto_rawDesc = []byte{ @@ -287,48 +278,42 @@ var file_sdk_logical_event_proto_rawDesc = []byte{ 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x22, 0xd1, 0x01, 0x0a, 0x0f, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67, 0x69, - 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x63, - 0x6c, 0x61, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, 0x6f, 0x75, 0x6e, - 0x74, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, - 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, - 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x41, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x12, 0x1d, 0x0a, - 0x0a, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x09, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a, 0x06, - 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, 0x6c, - 0x75, 0x67, 0x69, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x76, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70, 0x6c, - 0x75, 0x67, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x76, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, - 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x83, 0x01, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44, - 0x61, 0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x02, 0x69, 0x64, 0x12, 0x33, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x08, - 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6e, 0x74, 0x69, - 0x74, 0x79, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x65, 0x6e, - 0x74, 0x69, 0x74, 0x79, 0x49, 0x64, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x22, 0xeb, 0x01, 0x0a, 0x0d, - 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x12, 0x28, 0x0a, - 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, - 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44, 0x61, 0x74, 0x61, - 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, - 0x70, 0x61, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, - 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, - 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x0b, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x69, - 0x6e, 0x66, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6c, 0x6f, 0x67, 0x69, - 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, - 0x6e, 0x66, 0x6f, 0x52, 0x0a, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, - 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x05, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, - 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x28, 0x5a, 0x26, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, - 0x70, 0x2f, 0x76, 0x61, 0x75, 0x6c, 0x74, 0x2f, 0x73, 0x64, 0x6b, 0x2f, 0x6c, 0x6f, 0x67, 0x69, - 0x63, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x22, 0xd1, 0x01, 0x0a, 0x0f, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, + 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x63, 0x6c, + 0x61, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, 0x6f, 0x75, 0x6e, 0x74, + 0x43, 0x6c, 0x61, 0x73, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x61, + 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x6d, + 0x6f, 0x75, 0x6e, 0x74, 0x41, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x12, 0x1d, 0x0a, 0x0a, + 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x09, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a, 0x06, 0x70, + 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, 0x6c, 0x75, + 0x67, 0x69, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70, 0x6c, 0x75, + 0x67, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x83, 0x01, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44, 0x61, + 0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x69, 0x64, 0x12, 0x33, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x08, 0x6d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6e, 0x74, 0x69, 0x74, + 0x79, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x65, 0x6e, 0x74, + 0x69, 0x74, 0x79, 0x49, 0x64, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x22, 0xb1, 0x01, 0x0a, 0x0d, 0x45, + 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x12, 0x28, 0x0a, 0x05, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, 0x6f, + 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52, + 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, + 0x61, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, + 0x70, 0x61, 0x63, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x79, + 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, + 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x0b, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x69, 0x6e, + 0x66, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6c, 0x6f, 0x67, 0x69, 0x63, + 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, + 0x66, 0x6f, 0x52, 0x0a, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x42, 0x28, + 0x5a, 0x26, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, + 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x76, 0x61, 0x75, 0x6c, 0x74, 0x2f, 0x73, 0x64, 0x6b, + 0x2f, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -345,22 +330,20 @@ func file_sdk_logical_event_proto_rawDescGZIP() []byte { var file_sdk_logical_event_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_sdk_logical_event_proto_goTypes = []interface{}{ - (*EventPluginInfo)(nil), // 0: logical.EventPluginInfo - (*EventData)(nil), // 1: logical.EventData - (*EventReceived)(nil), // 2: logical.EventReceived - (*structpb.Struct)(nil), // 3: google.protobuf.Struct - (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp + (*EventPluginInfo)(nil), // 0: logical.EventPluginInfo + (*EventData)(nil), // 1: logical.EventData + (*EventReceived)(nil), // 2: logical.EventReceived + (*structpb.Struct)(nil), // 3: google.protobuf.Struct } var file_sdk_logical_event_proto_depIdxs = []int32{ 3, // 0: logical.EventData.metadata:type_name -> google.protobuf.Struct 1, // 1: logical.EventReceived.event:type_name -> logical.EventData 0, // 2: logical.EventReceived.plugin_info:type_name -> logical.EventPluginInfo - 4, // 3: logical.EventReceived.timestamp:type_name -> google.protobuf.Timestamp - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_sdk_logical_event_proto_init() } diff --git a/sdk/logical/event.proto b/sdk/logical/event.proto index 789f1b8d4db1..594bcf1dde09 100644 --- a/sdk/logical/event.proto +++ b/sdk/logical/event.proto @@ -5,7 +5,6 @@ option go_package = "github.com/hashicorp/vault/sdk/logical"; package logical; import "google/protobuf/struct.proto"; -import "google/protobuf/timestamp.proto"; // EventPluginInfo contains data related to the plugin that generated an event. message EventPluginInfo { @@ -49,5 +48,4 @@ message EventReceived { string namespace = 2; string event_type = 3; EventPluginInfo plugin_info = 4; - google.protobuf.Timestamp timestamp = 5; } diff --git a/sdk/logical/events.go b/sdk/logical/events.go index 9840a9ca47c2..e96e6d709005 100644 --- a/sdk/logical/events.go +++ b/sdk/logical/events.go @@ -7,8 +7,8 @@ import ( ) // ID is an alias to GetId() for CloudEvents compatibility. -func (x *EventData) ID() string { - return x.GetId() +func (x *EventReceived) ID() string { + return x.Event.GetId() } // NewEvent returns an event with a new, random EID. diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go index cc954435fc93..917638018ece 100644 --- a/vault/eventbus/bus.go +++ b/vault/eventbus/bus.go @@ -17,7 +17,6 @@ import ( "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/sdk/logical" "github.com/ryanuber/go-glob" - "google.golang.org/protobuf/types/known/timestamppb" ) const ( @@ -53,7 +52,7 @@ type pluginEventBus struct { type asyncChanNode struct { // TODO: add bounded deque buffer of *EventReceived ctx context.Context - ch chan *logical.EventReceived + ch chan *eventlogger.Event logger hclog.Logger // used to close the connection @@ -95,7 +94,6 @@ func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace, Namespace: ns.Path, EventType: string(eventType), PluginInfo: pluginInfo, - Timestamp: timestamppb.New(time.Now()), } bus.logger.Info("Sending event", "event", eventReceived) @@ -169,7 +167,7 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) { }, nil } -func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *logical.EventReceived, context.CancelFunc, error) { +func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) { // subscriptions are still stored even if the bus has not been started pipelineID, err := uuid.GenerateUUID() if err != nil { @@ -193,7 +191,7 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat } ctx, cancel := context.WithCancel(ctx) - asyncNode := newAsyncNode(ctx, ns, bus.logger) + asyncNode := newAsyncNode(ctx, bus.logger) err = bus.broker.RegisterNode(eventlogger.NodeID(sinkNodeID), asyncNode) if err != nil { defer cancel() @@ -247,10 +245,10 @@ func newFilterNode(ns *namespace.Namespace, pattern string) *eventlogger.Filter } } -func newAsyncNode(ctx context.Context, namespace *namespace.Namespace, logger hclog.Logger) *asyncChanNode { +func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode { return &asyncChanNode{ ctx: ctx, - ch: make(chan *logical.EventReceived), + ch: make(chan *eventlogger.Event), logger: logger, } } @@ -272,17 +270,16 @@ func (node *asyncChanNode) Close() { func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) { // sends to the channel async in another goroutine go func() { - eventRecv := e.Payload.(*logical.EventReceived) var timeout bool select { - case node.ch <- eventRecv: + case node.ch <- e: case <-ctx.Done(): timeout = errors.Is(ctx.Err(), context.DeadlineExceeded) case <-node.ctx.Done(): timeout = errors.Is(node.ctx.Err(), context.DeadlineExceeded) } if timeout { - node.logger.Info("Subscriber took too long to process event, closing", "ID", eventRecv.Event.ID()) + node.logger.Info("Subscriber took too long to process event, closing", "ID", e.Payload.(*logical.EventReceived).Event.Id) node.Close() } }() diff --git a/vault/eventbus/bus_test.go b/vault/eventbus/bus_test.go index cf7d26e318eb..e0ac46c83fc2 100644 --- a/vault/eventbus/bus_test.go +++ b/vault/eventbus/bus_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/hashicorp/eventlogger" "github.com/hashicorp/go-secure-stdlib/strutil" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/sdk/logical" @@ -58,7 +59,7 @@ func TestBusBasics(t *testing.T) { timeout := time.After(1 * time.Second) select { case message := <-ch: - if message.Event.ID() != event.ID() { + if message.Payload.(*logical.EventReceived).Event.Id != event.Id { t.Errorf("Got unexpected message: %+v", message) } case <-timeout: @@ -117,7 +118,7 @@ func TestNamespaceFiltering(t *testing.T) { timeout = time.After(1 * time.Second) select { case message := <-ch: - if message.Event.ID() != event.ID() { + if message.Payload.(*logical.EventReceived).Event.Id != event.Id { t.Errorf("Got unexpected message %+v but was waiting for %+v", message, event) } @@ -171,7 +172,7 @@ func TestBus2Subscriptions(t *testing.T) { timeout := time.After(1 * time.Second) select { case message := <-ch1: - if message.Event.ID() != event1.ID() { + if message.Payload.(*logical.EventReceived).Event.Id != event1.Id { t.Errorf("Got unexpected message: %v", message) } case <-timeout: @@ -179,7 +180,7 @@ func TestBus2Subscriptions(t *testing.T) { } select { case message := <-ch2: - if message.Event.ID() != event2.ID() { + if message.Payload.(*logical.EventReceived).Event.Id != event2.Id { t.Errorf("Got unexpected message: %v", message) } case <-timeout: @@ -216,7 +217,7 @@ func TestBusSubscriptionsCancel(t *testing.T) { eventType := logical.EventType("someType") - var channels []<-chan *logical.EventReceived + var channels []<-chan *eventlogger.Event var cancels []context.CancelFunc stopped := atomic.Int32{} @@ -348,7 +349,7 @@ func TestBusWildcardSubscriptions(t *testing.T) { for i := 0; i < 2; i++ { select { case message := <-ch1: - ch1Seen = append(ch1Seen, message.Event.ID()) + ch1Seen = append(ch1Seen, message.Payload.(*logical.EventReceived).Event.Id) case <-timeout: t.Error("Timeout waiting for event1") } @@ -356,17 +357,17 @@ func TestBusWildcardSubscriptions(t *testing.T) { if len(ch1Seen) != 2 { t.Errorf("Expected 2 events but got: %v", ch1Seen) } else { - if !strutil.StrListContains(ch1Seen, event1.ID()) { - t.Errorf("Did not find %s event1 ID in ch1seen", event1.ID()) + if !strutil.StrListContains(ch1Seen, event1.Id) { + t.Errorf("Did not find %s event1 ID in ch1seen", event1.Id) } - if !strutil.StrListContains(ch1Seen, event2.ID()) { - t.Errorf("Did not find %s event2 ID in ch1seen", event2.ID()) + if !strutil.StrListContains(ch1Seen, event2.Id) { + t.Errorf("Did not find %s event2 ID in ch1seen", event2.Id) } } // Expect to receive just kv/bar on ch2, which subscribed to */bar select { case message := <-ch2: - if message.Event.ID() != event2.ID() { + if message.Payload.(*logical.EventReceived).Event.Id != event2.Id { t.Errorf("Got unexpected message: %v", message) } case <-timeout: diff --git a/vault/events_test.go b/vault/events_test.go index 107be1913efb..97d1968e0015 100644 --- a/vault/events_test.go +++ b/vault/events_test.go @@ -37,7 +37,8 @@ func TestCanSendEventsFromBuiltinPlugin(t *testing.T) { // check that the event is routed to the subscription select { - case received := <-ch: + case receivedEvent := <-ch: + received := receivedEvent.Payload.(*logical.EventReceived) if event.Id != received.Event.Id { t.Errorf("Got wrong event: %+v, expected %+v", received, event) }