From 4944581a9c47f99f318f0dd687d368dbf91334e4 Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Fri, 17 Feb 2023 11:38:03 -0800 Subject: [PATCH] events: WS protobuf messages should be binary (#19232) The [WebSockets spec](https://www.rfc-editor.org/rfc/rfc6455) states that text messages must be valid UTF-8 encoded strings, which protobuf messages virtually never are. This now correctly sends the protobuf events as binary messages. We change the format to correspond to CloudEvents, as originally intended, and remove a redundant timestamp and newline. We also bump the eventlogger to fix a race condition that this code triggers. --- go.mod | 2 +- go.sum | 5 +- http/events.go | 18 ++++-- http/events_test.go | 53 ++++++++++------- sdk/logical/event.pb.go | 113 ++++++++++++++++--------------------- sdk/logical/event.proto | 2 - sdk/logical/events.go | 4 +- vault/eventbus/bus.go | 17 +++--- vault/eventbus/bus_test.go | 23 ++++---- vault/events_test.go | 3 +- 10 files changed, 119 insertions(+), 121 deletions(-) diff --git a/go.mod b/go.mod index c4d44bfb2f4d..2285d45e1c89 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,7 @@ require ( github.com/hashicorp/consul-template v0.29.5 github.com/hashicorp/consul/api v1.17.0 github.com/hashicorp/errwrap v1.1.0 - github.com/hashicorp/eventlogger v0.1.0 + github.com/hashicorp/eventlogger v0.1.1 github.com/hashicorp/go-cleanhttp v0.5.2 github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192 github.com/hashicorp/go-gcp-common v0.8.0 diff --git a/go.sum b/go.sum index 66f4a030e59c..7e0585f4e792 100644 --- a/go.sum +++ b/go.sum @@ -969,8 +969,8 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FK github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/eventlogger v0.1.0 h1:S6xc4gZVzewuDUP4R4Ngko419h/CGDuV/b4ADL3XLik= -github.com/hashicorp/eventlogger v0.1.0/go.mod h1:a3IXf1aEJfpCPzseTOrwKj4fVW/Qn3oEmpQeaIznzH0= +github.com/hashicorp/eventlogger v0.1.1 h1:zyCjxsy7KunFsMPZKU5PnwWEakSrp1zjj2vPFmrDaeo= +github.com/hashicorp/eventlogger v0.1.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= @@ -995,7 +995,6 @@ github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJ github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-kms-wrapping/entropy/v2 v2.0.0 h1:pSjQfW3vPtrOTcasTUKgCTQT7OGPPTTMVRrOfU6FJD8= github.com/hashicorp/go-kms-wrapping/entropy/v2 v2.0.0/go.mod h1:xvb32K2keAc+R8DSFG2IwDcydK9DBQE+fGA5fsw6hSk= -github.com/hashicorp/go-kms-wrapping/v2 v2.0.7/go.mod h1:sDQAfwJGv25uGPZA04x87ERglCG6avnRcBT9wYoMII8= github.com/hashicorp/go-kms-wrapping/v2 v2.0.8 h1:9Q2lu1YbbmiAgvYZ7Pr31RdlVonUpX+mmDL7Z7qTA2U= github.com/hashicorp/go-kms-wrapping/v2 v2.0.8/go.mod h1:qTCjxGig/kjuj3hk1z8pOUrzbse/GxB1tGfbrq8tGJg= github.com/hashicorp/go-kms-wrapping/wrappers/aead/v2 v2.0.7-1 h1:ZV26VJYcITBom0QqYSUOIj4HOHCVPEFjLqjxyXV/AbA= diff --git a/http/events.go b/http/events.go index e957cd7a3798..f516b3ccbc33 100644 --- a/http/events.go +++ b/http/events.go @@ -15,7 +15,6 @@ import ( "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/vault" "github.com/hashicorp/vault/vault/eventbus" - "google.golang.org/protobuf/encoding/protojson" "nhooyr.io/websocket" ) @@ -47,19 +46,26 @@ func handleEventsSubscribeWebsocket(args eventSubscribeArgs) (websocket.StatusCo logger.Info("Websocket context is done, closing the connection") return websocket.StatusNormalClosure, "", nil case message := <-ch: - logger.Debug("Sending message to websocket", "message", message) + logger.Debug("Sending message to websocket", "message", message.Payload) var messageBytes []byte + var messageType websocket.MessageType if args.json { - messageBytes, err = protojson.Marshal(message) + var ok bool + messageBytes, ok = message.Format("cloudevents-json") + if !ok { + logger.Warn("Could not get cloudevents JSON format") + return 0, "", errors.New("could not get cloudevents JSON format") + } + messageType = websocket.MessageText } else { - messageBytes, err = proto.Marshal(message) + messageBytes, err = proto.Marshal(message.Payload.(*logical.EventReceived)) + messageType = websocket.MessageBinary } if err != nil { logger.Warn("Could not serialize websocket event", "error", err) return 0, "", err } - messageString := string(messageBytes) + "\n" - err = args.conn.Write(ctx, websocket.MessageText, []byte(messageString)) + err = args.conn.Write(ctx, messageType, messageBytes) if err != nil { return 0, "", err } diff --git a/http/events_test.go b/http/events_test.go index 645ecd403a4a..3fe5d68e96c1 100644 --- a/http/events_test.go +++ b/http/events_test.go @@ -2,6 +2,7 @@ package http import ( "context" + "fmt" "net/http" "strings" "sync/atomic" @@ -64,27 +65,39 @@ func TestEventsSubscribe(t *testing.T) { wsAddr := strings.Replace(addr, "http", "ws", 1) - // check that the connection fails if we don't have a token - _, _, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/"+eventType+"?json=true", nil) - if err == nil { - t.Error("Expected websocket error but got none") - } else if !strings.HasSuffix(err.Error(), "401") { - t.Errorf("Expected 401 websocket but got %v", err) - } + testCases := []struct { + json bool + }{{true}, {false}} - conn, _, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/"+eventType+"?json=true", &websocket.DialOptions{ - HTTPHeader: http.Header{"x-vault-token": []string{token}}, - }) - if err != nil { - t.Fatal(err) - } + for _, testCase := range testCases { + url := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?json=%v", wsAddr, eventType, testCase.json) + // check that the connection fails if we don't have a token + _, _, err := websocket.Dial(ctx, url, nil) + if err == nil { + t.Error("Expected websocket error but got none") + } else if !strings.HasSuffix(err.Error(), "401") { + t.Errorf("Expected 401 websocket but got %v", err) + } - _, msg, err := conn.Read(ctx) - if err != nil { - t.Fatal(err) - } - msgJson := strings.TrimSpace(string(msg)) - if !strings.HasPrefix(msgJson, "{") || !strings.HasSuffix(msgJson, "}") { - t.Errorf("Expected to get JSON event but got: %v", msgJson) + conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{ + HTTPHeader: http.Header{"x-vault-token": []string{token}}, + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + conn.Close(websocket.StatusNormalClosure, "") + }) + + _, msg, err := conn.Read(ctx) + if err != nil { + t.Fatal(err) + } + if testCase.json { + msgJson := strings.TrimSpace(string(msg)) + if !strings.HasPrefix(msgJson, "{") || !strings.HasSuffix(msgJson, "}") { + t.Errorf("Expected to get JSON event but got: %v", msgJson) + } + } } } diff --git a/sdk/logical/event.pb.go b/sdk/logical/event.pb.go index d09020236414..1925c6ae9870 100644 --- a/sdk/logical/event.pb.go +++ b/sdk/logical/event.pb.go @@ -10,7 +10,6 @@ import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" structpb "google.golang.org/protobuf/types/known/structpb" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" ) @@ -207,10 +206,9 @@ type EventReceived struct { Event *EventData `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"` // namespace path - Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` - EventType string `protobuf:"bytes,3,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` - PluginInfo *EventPluginInfo `protobuf:"bytes,4,opt,name=plugin_info,json=pluginInfo,proto3" json:"plugin_info,omitempty"` - Timestamp *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` + EventType string `protobuf:"bytes,3,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` + PluginInfo *EventPluginInfo `protobuf:"bytes,4,opt,name=plugin_info,json=pluginInfo,proto3" json:"plugin_info,omitempty"` } func (x *EventReceived) Reset() { @@ -273,13 +271,6 @@ func (x *EventReceived) GetPluginInfo() *EventPluginInfo { return nil } -func (x *EventReceived) GetTimestamp() *timestamppb.Timestamp { - if x != nil { - return x.Timestamp - } - return nil -} - var File_sdk_logical_event_proto protoreflect.FileDescriptor var file_sdk_logical_event_proto_rawDesc = []byte{ @@ -287,48 +278,42 @@ var file_sdk_logical_event_proto_rawDesc = []byte{ 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x22, 0xd1, 0x01, 0x0a, 0x0f, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67, 0x69, - 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x63, - 0x6c, 0x61, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, 0x6f, 0x75, 0x6e, - 0x74, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, - 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, - 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x41, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x12, 0x1d, 0x0a, - 0x0a, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x09, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a, 0x06, - 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, 0x6c, - 0x75, 0x67, 0x69, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x76, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70, 0x6c, - 0x75, 0x67, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x76, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, - 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x83, 0x01, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44, - 0x61, 0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x02, 0x69, 0x64, 0x12, 0x33, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x08, - 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6e, 0x74, 0x69, - 0x74, 0x79, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x65, 0x6e, - 0x74, 0x69, 0x74, 0x79, 0x49, 0x64, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x22, 0xeb, 0x01, 0x0a, 0x0d, - 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x12, 0x28, 0x0a, - 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, - 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44, 0x61, 0x74, 0x61, - 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, - 0x70, 0x61, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, - 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, - 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x0b, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x69, - 0x6e, 0x66, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6c, 0x6f, 0x67, 0x69, - 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, - 0x6e, 0x66, 0x6f, 0x52, 0x0a, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, - 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x05, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, - 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x28, 0x5a, 0x26, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, - 0x70, 0x2f, 0x76, 0x61, 0x75, 0x6c, 0x74, 0x2f, 0x73, 0x64, 0x6b, 0x2f, 0x6c, 0x6f, 0x67, 0x69, - 0x63, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x22, 0xd1, 0x01, 0x0a, 0x0f, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, + 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x63, 0x6c, + 0x61, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, 0x6f, 0x75, 0x6e, 0x74, + 0x43, 0x6c, 0x61, 0x73, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x61, + 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x6d, + 0x6f, 0x75, 0x6e, 0x74, 0x41, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x12, 0x1d, 0x0a, 0x0a, + 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x09, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a, 0x06, 0x70, + 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, 0x6c, 0x75, + 0x67, 0x69, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70, 0x6c, 0x75, + 0x67, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x83, 0x01, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44, 0x61, + 0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x69, 0x64, 0x12, 0x33, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x08, 0x6d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6e, 0x74, 0x69, 0x74, + 0x79, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x65, 0x6e, 0x74, + 0x69, 0x74, 0x79, 0x49, 0x64, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x22, 0xb1, 0x01, 0x0a, 0x0d, 0x45, + 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x12, 0x28, 0x0a, 0x05, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, 0x6f, + 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52, + 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, + 0x61, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, + 0x70, 0x61, 0x63, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x79, + 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, + 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x0b, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x69, 0x6e, + 0x66, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6c, 0x6f, 0x67, 0x69, 0x63, + 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, + 0x66, 0x6f, 0x52, 0x0a, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x42, 0x28, + 0x5a, 0x26, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, + 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x76, 0x61, 0x75, 0x6c, 0x74, 0x2f, 0x73, 0x64, 0x6b, + 0x2f, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -345,22 +330,20 @@ func file_sdk_logical_event_proto_rawDescGZIP() []byte { var file_sdk_logical_event_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_sdk_logical_event_proto_goTypes = []interface{}{ - (*EventPluginInfo)(nil), // 0: logical.EventPluginInfo - (*EventData)(nil), // 1: logical.EventData - (*EventReceived)(nil), // 2: logical.EventReceived - (*structpb.Struct)(nil), // 3: google.protobuf.Struct - (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp + (*EventPluginInfo)(nil), // 0: logical.EventPluginInfo + (*EventData)(nil), // 1: logical.EventData + (*EventReceived)(nil), // 2: logical.EventReceived + (*structpb.Struct)(nil), // 3: google.protobuf.Struct } var file_sdk_logical_event_proto_depIdxs = []int32{ 3, // 0: logical.EventData.metadata:type_name -> google.protobuf.Struct 1, // 1: logical.EventReceived.event:type_name -> logical.EventData 0, // 2: logical.EventReceived.plugin_info:type_name -> logical.EventPluginInfo - 4, // 3: logical.EventReceived.timestamp:type_name -> google.protobuf.Timestamp - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_sdk_logical_event_proto_init() } diff --git a/sdk/logical/event.proto b/sdk/logical/event.proto index 789f1b8d4db1..594bcf1dde09 100644 --- a/sdk/logical/event.proto +++ b/sdk/logical/event.proto @@ -5,7 +5,6 @@ option go_package = "github.com/hashicorp/vault/sdk/logical"; package logical; import "google/protobuf/struct.proto"; -import "google/protobuf/timestamp.proto"; // EventPluginInfo contains data related to the plugin that generated an event. message EventPluginInfo { @@ -49,5 +48,4 @@ message EventReceived { string namespace = 2; string event_type = 3; EventPluginInfo plugin_info = 4; - google.protobuf.Timestamp timestamp = 5; } diff --git a/sdk/logical/events.go b/sdk/logical/events.go index 9840a9ca47c2..e96e6d709005 100644 --- a/sdk/logical/events.go +++ b/sdk/logical/events.go @@ -7,8 +7,8 @@ import ( ) // ID is an alias to GetId() for CloudEvents compatibility. -func (x *EventData) ID() string { - return x.GetId() +func (x *EventReceived) ID() string { + return x.Event.GetId() } // NewEvent returns an event with a new, random EID. diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go index cc954435fc93..917638018ece 100644 --- a/vault/eventbus/bus.go +++ b/vault/eventbus/bus.go @@ -17,7 +17,6 @@ import ( "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/sdk/logical" "github.com/ryanuber/go-glob" - "google.golang.org/protobuf/types/known/timestamppb" ) const ( @@ -53,7 +52,7 @@ type pluginEventBus struct { type asyncChanNode struct { // TODO: add bounded deque buffer of *EventReceived ctx context.Context - ch chan *logical.EventReceived + ch chan *eventlogger.Event logger hclog.Logger // used to close the connection @@ -95,7 +94,6 @@ func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace, Namespace: ns.Path, EventType: string(eventType), PluginInfo: pluginInfo, - Timestamp: timestamppb.New(time.Now()), } bus.logger.Info("Sending event", "event", eventReceived) @@ -169,7 +167,7 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) { }, nil } -func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *logical.EventReceived, context.CancelFunc, error) { +func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) { // subscriptions are still stored even if the bus has not been started pipelineID, err := uuid.GenerateUUID() if err != nil { @@ -193,7 +191,7 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat } ctx, cancel := context.WithCancel(ctx) - asyncNode := newAsyncNode(ctx, ns, bus.logger) + asyncNode := newAsyncNode(ctx, bus.logger) err = bus.broker.RegisterNode(eventlogger.NodeID(sinkNodeID), asyncNode) if err != nil { defer cancel() @@ -247,10 +245,10 @@ func newFilterNode(ns *namespace.Namespace, pattern string) *eventlogger.Filter } } -func newAsyncNode(ctx context.Context, namespace *namespace.Namespace, logger hclog.Logger) *asyncChanNode { +func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode { return &asyncChanNode{ ctx: ctx, - ch: make(chan *logical.EventReceived), + ch: make(chan *eventlogger.Event), logger: logger, } } @@ -272,17 +270,16 @@ func (node *asyncChanNode) Close() { func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) { // sends to the channel async in another goroutine go func() { - eventRecv := e.Payload.(*logical.EventReceived) var timeout bool select { - case node.ch <- eventRecv: + case node.ch <- e: case <-ctx.Done(): timeout = errors.Is(ctx.Err(), context.DeadlineExceeded) case <-node.ctx.Done(): timeout = errors.Is(node.ctx.Err(), context.DeadlineExceeded) } if timeout { - node.logger.Info("Subscriber took too long to process event, closing", "ID", eventRecv.Event.ID()) + node.logger.Info("Subscriber took too long to process event, closing", "ID", e.Payload.(*logical.EventReceived).Event.Id) node.Close() } }() diff --git a/vault/eventbus/bus_test.go b/vault/eventbus/bus_test.go index cf7d26e318eb..e0ac46c83fc2 100644 --- a/vault/eventbus/bus_test.go +++ b/vault/eventbus/bus_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/hashicorp/eventlogger" "github.com/hashicorp/go-secure-stdlib/strutil" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/sdk/logical" @@ -58,7 +59,7 @@ func TestBusBasics(t *testing.T) { timeout := time.After(1 * time.Second) select { case message := <-ch: - if message.Event.ID() != event.ID() { + if message.Payload.(*logical.EventReceived).Event.Id != event.Id { t.Errorf("Got unexpected message: %+v", message) } case <-timeout: @@ -117,7 +118,7 @@ func TestNamespaceFiltering(t *testing.T) { timeout = time.After(1 * time.Second) select { case message := <-ch: - if message.Event.ID() != event.ID() { + if message.Payload.(*logical.EventReceived).Event.Id != event.Id { t.Errorf("Got unexpected message %+v but was waiting for %+v", message, event) } @@ -171,7 +172,7 @@ func TestBus2Subscriptions(t *testing.T) { timeout := time.After(1 * time.Second) select { case message := <-ch1: - if message.Event.ID() != event1.ID() { + if message.Payload.(*logical.EventReceived).Event.Id != event1.Id { t.Errorf("Got unexpected message: %v", message) } case <-timeout: @@ -179,7 +180,7 @@ func TestBus2Subscriptions(t *testing.T) { } select { case message := <-ch2: - if message.Event.ID() != event2.ID() { + if message.Payload.(*logical.EventReceived).Event.Id != event2.Id { t.Errorf("Got unexpected message: %v", message) } case <-timeout: @@ -216,7 +217,7 @@ func TestBusSubscriptionsCancel(t *testing.T) { eventType := logical.EventType("someType") - var channels []<-chan *logical.EventReceived + var channels []<-chan *eventlogger.Event var cancels []context.CancelFunc stopped := atomic.Int32{} @@ -348,7 +349,7 @@ func TestBusWildcardSubscriptions(t *testing.T) { for i := 0; i < 2; i++ { select { case message := <-ch1: - ch1Seen = append(ch1Seen, message.Event.ID()) + ch1Seen = append(ch1Seen, message.Payload.(*logical.EventReceived).Event.Id) case <-timeout: t.Error("Timeout waiting for event1") } @@ -356,17 +357,17 @@ func TestBusWildcardSubscriptions(t *testing.T) { if len(ch1Seen) != 2 { t.Errorf("Expected 2 events but got: %v", ch1Seen) } else { - if !strutil.StrListContains(ch1Seen, event1.ID()) { - t.Errorf("Did not find %s event1 ID in ch1seen", event1.ID()) + if !strutil.StrListContains(ch1Seen, event1.Id) { + t.Errorf("Did not find %s event1 ID in ch1seen", event1.Id) } - if !strutil.StrListContains(ch1Seen, event2.ID()) { - t.Errorf("Did not find %s event2 ID in ch1seen", event2.ID()) + if !strutil.StrListContains(ch1Seen, event2.Id) { + t.Errorf("Did not find %s event2 ID in ch1seen", event2.Id) } } // Expect to receive just kv/bar on ch2, which subscribed to */bar select { case message := <-ch2: - if message.Event.ID() != event2.ID() { + if message.Payload.(*logical.EventReceived).Event.Id != event2.Id { t.Errorf("Got unexpected message: %v", message) } case <-timeout: diff --git a/vault/events_test.go b/vault/events_test.go index 107be1913efb..97d1968e0015 100644 --- a/vault/events_test.go +++ b/vault/events_test.go @@ -37,7 +37,8 @@ func TestCanSendEventsFromBuiltinPlugin(t *testing.T) { // check that the event is routed to the subscription select { - case received := <-ch: + case receivedEvent := <-ch: + received := receivedEvent.Payload.(*logical.EventReceived) if event.Id != received.Event.Id { t.Errorf("Got wrong event: %+v, expected %+v", received, event) }