Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

events: WS protobuf messages should be binary #19232

Merged
merged 6 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ require (
github.com/hashicorp/consul-template v0.29.5
github.com/hashicorp/consul/api v1.17.0
github.com/hashicorp/errwrap v1.1.0
github.com/hashicorp/eventlogger v0.1.0
github.com/hashicorp/eventlogger v0.1.1
github.com/hashicorp/go-cleanhttp v0.5.2
github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192
github.com/hashicorp/go-gcp-common v0.8.0
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -969,8 +969,8 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FK
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/eventlogger v0.1.0 h1:S6xc4gZVzewuDUP4R4Ngko419h/CGDuV/b4ADL3XLik=
github.com/hashicorp/eventlogger v0.1.0/go.mod h1:a3IXf1aEJfpCPzseTOrwKj4fVW/Qn3oEmpQeaIznzH0=
github.com/hashicorp/eventlogger v0.1.1 h1:zyCjxsy7KunFsMPZKU5PnwWEakSrp1zjj2vPFmrDaeo=
github.com/hashicorp/eventlogger v0.1.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand All @@ -995,7 +995,6 @@ github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJ
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-kms-wrapping/entropy/v2 v2.0.0 h1:pSjQfW3vPtrOTcasTUKgCTQT7OGPPTTMVRrOfU6FJD8=
github.com/hashicorp/go-kms-wrapping/entropy/v2 v2.0.0/go.mod h1:xvb32K2keAc+R8DSFG2IwDcydK9DBQE+fGA5fsw6hSk=
github.com/hashicorp/go-kms-wrapping/v2 v2.0.7/go.mod h1:sDQAfwJGv25uGPZA04x87ERglCG6avnRcBT9wYoMII8=
github.com/hashicorp/go-kms-wrapping/v2 v2.0.8 h1:9Q2lu1YbbmiAgvYZ7Pr31RdlVonUpX+mmDL7Z7qTA2U=
github.com/hashicorp/go-kms-wrapping/v2 v2.0.8/go.mod h1:qTCjxGig/kjuj3hk1z8pOUrzbse/GxB1tGfbrq8tGJg=
github.com/hashicorp/go-kms-wrapping/wrappers/aead/v2 v2.0.7-1 h1:ZV26VJYcITBom0QqYSUOIj4HOHCVPEFjLqjxyXV/AbA=
Expand Down
12 changes: 7 additions & 5 deletions http/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault"
"github.com/hashicorp/vault/vault/eventbus"
"google.golang.org/protobuf/encoding/protojson"
"nhooyr.io/websocket"
)

Expand Down Expand Up @@ -49,17 +48,20 @@ func handleEventsSubscribeWebsocket(args eventSubscribeArgs) (websocket.StatusCo
case message := <-ch:
logger.Debug("Sending message to websocket", "message", message)
var messageBytes []byte
var messageType websocket.MessageType
if args.json {
messageBytes, err = protojson.Marshal(message)
messageBytes = message.Formatted["cloudevents-json"]
messageType = websocket.MessageText
messageBytes = []byte(string(messageBytes) + "\n")
tomhjp marked this conversation as resolved.
Show resolved Hide resolved
} else {
messageBytes, err = proto.Marshal(message)
messageBytes, err = proto.Marshal(message.Payload.(*logical.EventReceived))
messageType = websocket.MessageBinary
}
if err != nil {
logger.Warn("Could not serialize websocket event", "error", err)
return 0, "", err
}
messageString := string(messageBytes) + "\n"
err = args.conn.Write(ctx, websocket.MessageText, []byte(messageString))
err = args.conn.Write(ctx, messageType, messageBytes)
if err != nil {
return 0, "", err
}
Expand Down
53 changes: 33 additions & 20 deletions http/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"context"
"fmt"
"net/http"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -64,27 +65,39 @@ func TestEventsSubscribe(t *testing.T) {

wsAddr := strings.Replace(addr, "http", "ws", 1)

// check that the connection fails if we don't have a token
_, _, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/"+eventType+"?json=true", nil)
if err == nil {
t.Error("Expected websocket error but got none")
} else if !strings.HasSuffix(err.Error(), "401") {
t.Errorf("Expected 401 websocket but got %v", err)
}
testCases := []struct {
json bool
}{{true}, {false}}

conn, _, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/"+eventType+"?json=true", &websocket.DialOptions{
HTTPHeader: http.Header{"x-vault-token": []string{token}},
})
if err != nil {
t.Fatal(err)
}
for _, testCase := range testCases {
url := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?json=%v", wsAddr, eventType, testCase.json)
// check that the connection fails if we don't have a token
_, _, err := websocket.Dial(ctx, url, nil)
if err == nil {
t.Error("Expected websocket error but got none")
} else if !strings.HasSuffix(err.Error(), "401") {
t.Errorf("Expected 401 websocket but got %v", err)
}

_, msg, err := conn.Read(ctx)
if err != nil {
t.Fatal(err)
}
msgJson := strings.TrimSpace(string(msg))
if !strings.HasPrefix(msgJson, "{") || !strings.HasSuffix(msgJson, "}") {
t.Errorf("Expected to get JSON event but got: %v", msgJson)
conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{
HTTPHeader: http.Header{"x-vault-token": []string{token}},
})
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
conn.Close(websocket.StatusNormalClosure, "")
})

_, msg, err := conn.Read(ctx)
if err != nil {
t.Fatal(err)
}
if testCase.json {
msgJson := strings.TrimSpace(string(msg))
if !strings.HasPrefix(msgJson, "{") || !strings.HasSuffix(msgJson, "}") {
t.Errorf("Expected to get JSON event but got: %v", msgJson)
}
}
}
}
4 changes: 2 additions & 2 deletions sdk/logical/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
)

// ID is an alias to GetId() for CloudEvents compatibility.
func (x *EventData) ID() string {
return x.GetId()
func (x *EventReceived) ID() string {
return x.Event.GetId()
}

// NewEvent returns an event with a new, random EID.
Expand Down
15 changes: 7 additions & 8 deletions vault/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type pluginEventBus struct {
type asyncChanNode struct {
// TODO: add bounded deque buffer of *EventReceived
ctx context.Context
ch chan *logical.EventReceived
ch chan *eventlogger.Event
logger hclog.Logger

// used to close the connection
Expand Down Expand Up @@ -169,7 +169,7 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) {
}, nil
}

func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *logical.EventReceived, context.CancelFunc, error) {
func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
// subscriptions are still stored even if the bus has not been started
pipelineID, err := uuid.GenerateUUID()
if err != nil {
Expand All @@ -193,7 +193,7 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat
}

ctx, cancel := context.WithCancel(ctx)
asyncNode := newAsyncNode(ctx, ns, bus.logger)
asyncNode := newAsyncNode(ctx, bus.logger)
err = bus.broker.RegisterNode(eventlogger.NodeID(sinkNodeID), asyncNode)
if err != nil {
defer cancel()
Expand Down Expand Up @@ -247,10 +247,10 @@ func newFilterNode(ns *namespace.Namespace, pattern string) *eventlogger.Filter
}
}

func newAsyncNode(ctx context.Context, namespace *namespace.Namespace, logger hclog.Logger) *asyncChanNode {
func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode {
return &asyncChanNode{
ctx: ctx,
ch: make(chan *logical.EventReceived),
ch: make(chan *eventlogger.Event),
logger: logger,
}
}
Expand All @@ -272,17 +272,16 @@ func (node *asyncChanNode) Close() {
func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
// sends to the channel async in another goroutine
go func() {
eventRecv := e.Payload.(*logical.EventReceived)
var timeout bool
select {
case node.ch <- eventRecv:
case node.ch <- e:
case <-ctx.Done():
timeout = errors.Is(ctx.Err(), context.DeadlineExceeded)
case <-node.ctx.Done():
timeout = errors.Is(node.ctx.Err(), context.DeadlineExceeded)
}
if timeout {
node.logger.Info("Subscriber took too long to process event, closing", "ID", eventRecv.Event.ID())
node.logger.Info("Subscriber took too long to process event, closing", "ID", e.Payload.(*logical.EventReceived).Event.Id)
node.Close()
}
}()
Expand Down
23 changes: 12 additions & 11 deletions vault/eventbus/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/hashicorp/eventlogger"
"github.com/hashicorp/go-secure-stdlib/strutil"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/logical"
Expand Down Expand Up @@ -58,7 +59,7 @@ func TestBusBasics(t *testing.T) {
timeout := time.After(1 * time.Second)
select {
case message := <-ch:
if message.Event.ID() != event.ID() {
if message.Payload.(*logical.EventReceived).Event.Id != event.Id {
t.Errorf("Got unexpected message: %+v", message)
}
case <-timeout:
Expand Down Expand Up @@ -117,7 +118,7 @@ func TestNamespaceFiltering(t *testing.T) {
timeout = time.After(1 * time.Second)
select {
case message := <-ch:
if message.Event.ID() != event.ID() {
if message.Payload.(*logical.EventReceived).Event.Id != event.Id {
t.Errorf("Got unexpected message %+v but was waiting for %+v", message, event)
}

Expand Down Expand Up @@ -171,15 +172,15 @@ func TestBus2Subscriptions(t *testing.T) {
timeout := time.After(1 * time.Second)
select {
case message := <-ch1:
if message.Event.ID() != event1.ID() {
if message.Payload.(*logical.EventReceived).Event.Id != event1.Id {
t.Errorf("Got unexpected message: %v", message)
}
case <-timeout:
t.Error("Timeout waiting for event1")
}
select {
case message := <-ch2:
if message.Event.ID() != event2.ID() {
if message.Payload.(*logical.EventReceived).Event.Id != event2.Id {
t.Errorf("Got unexpected message: %v", message)
}
case <-timeout:
Expand Down Expand Up @@ -216,7 +217,7 @@ func TestBusSubscriptionsCancel(t *testing.T) {

eventType := logical.EventType("someType")

var channels []<-chan *logical.EventReceived
var channels []<-chan *eventlogger.Event
var cancels []context.CancelFunc
stopped := atomic.Int32{}

Expand Down Expand Up @@ -348,25 +349,25 @@ func TestBusWildcardSubscriptions(t *testing.T) {
for i := 0; i < 2; i++ {
select {
case message := <-ch1:
ch1Seen = append(ch1Seen, message.Event.ID())
ch1Seen = append(ch1Seen, message.Payload.(*logical.EventReceived).Event.Id)
case <-timeout:
t.Error("Timeout waiting for event1")
}
}
if len(ch1Seen) != 2 {
t.Errorf("Expected 2 events but got: %v", ch1Seen)
} else {
if !strutil.StrListContains(ch1Seen, event1.ID()) {
t.Errorf("Did not find %s event1 ID in ch1seen", event1.ID())
if !strutil.StrListContains(ch1Seen, event1.Id) {
t.Errorf("Did not find %s event1 ID in ch1seen", event1.Id)
}
if !strutil.StrListContains(ch1Seen, event2.ID()) {
t.Errorf("Did not find %s event2 ID in ch1seen", event2.ID())
if !strutil.StrListContains(ch1Seen, event2.Id) {
t.Errorf("Did not find %s event2 ID in ch1seen", event2.Id)
}
}
// Expect to receive just kv/bar on ch2, which subscribed to */bar
select {
case message := <-ch2:
if message.Event.ID() != event2.ID() {
if message.Payload.(*logical.EventReceived).Event.Id != event2.Id {
t.Errorf("Got unexpected message: %v", message)
}
case <-timeout:
Expand Down
3 changes: 2 additions & 1 deletion vault/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func TestCanSendEventsFromBuiltinPlugin(t *testing.T) {

// check that the event is routed to the subscription
select {
case received := <-ch:
case receivedEvent := <-ch:
received := receivedEvent.Payload.(*logical.EventReceived)
if event.Id != received.Event.Id {
t.Errorf("Got wrong event: %+v, expected %+v", received, event)
}
Expand Down