Skip to content

Commit

Permalink
events: WS protobuf messages should be binary (#19232)
Browse files Browse the repository at this point in the history
The [WebSockets spec](https://www.rfc-editor.org/rfc/rfc6455) states
that text messages must be valid UTF-8 encoded strings, which protobuf
messages virtually never are. This now correctly sends the protobuf events
as binary messages.

We change the format to correspond to CloudEvents, as originally intended,
and remove a redundant timestamp and newline.

We also bump the eventlogger to fix a race condition that this code triggers.
  • Loading branch information
Christopher Swenson committed Feb 17, 2023
1 parent 1ec3397 commit 491e218
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 120 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ require (
github.com/hashicorp/consul-template v0.29.5
github.com/hashicorp/consul/api v1.17.0
github.com/hashicorp/errwrap v1.1.0
github.com/hashicorp/eventlogger v0.1.0
github.com/hashicorp/eventlogger v0.1.1
github.com/hashicorp/go-cleanhttp v0.5.2
github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192
github.com/hashicorp/go-gcp-common v0.8.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -969,8 +969,8 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FK
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/eventlogger v0.1.0 h1:S6xc4gZVzewuDUP4R4Ngko419h/CGDuV/b4ADL3XLik=
github.com/hashicorp/eventlogger v0.1.0/go.mod h1:a3IXf1aEJfpCPzseTOrwKj4fVW/Qn3oEmpQeaIznzH0=
github.com/hashicorp/eventlogger v0.1.1 h1:zyCjxsy7KunFsMPZKU5PnwWEakSrp1zjj2vPFmrDaeo=
github.com/hashicorp/eventlogger v0.1.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand Down
18 changes: 12 additions & 6 deletions http/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault"
"github.com/hashicorp/vault/vault/eventbus"
"google.golang.org/protobuf/encoding/protojson"
"nhooyr.io/websocket"
)

Expand Down Expand Up @@ -47,19 +46,26 @@ func handleEventsSubscribeWebsocket(args eventSubscribeArgs) (websocket.StatusCo
logger.Info("Websocket context is done, closing the connection")
return websocket.StatusNormalClosure, "", nil
case message := <-ch:
logger.Debug("Sending message to websocket", "message", message)
logger.Debug("Sending message to websocket", "message", message.Payload)
var messageBytes []byte
var messageType websocket.MessageType
if args.json {
messageBytes, err = protojson.Marshal(message)
var ok bool
messageBytes, ok = message.Format("cloudevents-json")
if !ok {
logger.Warn("Could not get cloudevents JSON format")
return 0, "", errors.New("could not get cloudevents JSON format")
}
messageType = websocket.MessageText
} else {
messageBytes, err = proto.Marshal(message)
messageBytes, err = proto.Marshal(message.Payload.(*logical.EventReceived))
messageType = websocket.MessageBinary
}
if err != nil {
logger.Warn("Could not serialize websocket event", "error", err)
return 0, "", err
}
messageString := string(messageBytes) + "\n"
err = args.conn.Write(ctx, websocket.MessageText, []byte(messageString))
err = args.conn.Write(ctx, messageType, messageBytes)
if err != nil {
return 0, "", err
}
Expand Down
53 changes: 33 additions & 20 deletions http/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"context"
"fmt"
"net/http"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -64,27 +65,39 @@ func TestEventsSubscribe(t *testing.T) {

wsAddr := strings.Replace(addr, "http", "ws", 1)

// check that the connection fails if we don't have a token
_, _, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/"+eventType+"?json=true", nil)
if err == nil {
t.Error("Expected websocket error but got none")
} else if !strings.HasSuffix(err.Error(), "401") {
t.Errorf("Expected 401 websocket but got %v", err)
}
testCases := []struct {
json bool
}{{true}, {false}}

conn, _, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/"+eventType+"?json=true", &websocket.DialOptions{
HTTPHeader: http.Header{"x-vault-token": []string{token}},
})
if err != nil {
t.Fatal(err)
}
for _, testCase := range testCases {
url := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?json=%v", wsAddr, eventType, testCase.json)
// check that the connection fails if we don't have a token
_, _, err := websocket.Dial(ctx, url, nil)
if err == nil {
t.Error("Expected websocket error but got none")
} else if !strings.HasSuffix(err.Error(), "401") {
t.Errorf("Expected 401 websocket but got %v", err)
}

_, msg, err := conn.Read(ctx)
if err != nil {
t.Fatal(err)
}
msgJson := strings.TrimSpace(string(msg))
if !strings.HasPrefix(msgJson, "{") || !strings.HasSuffix(msgJson, "}") {
t.Errorf("Expected to get JSON event but got: %v", msgJson)
conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{
HTTPHeader: http.Header{"x-vault-token": []string{token}},
})
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
conn.Close(websocket.StatusNormalClosure, "")
})

_, msg, err := conn.Read(ctx)
if err != nil {
t.Fatal(err)
}
if testCase.json {
msgJson := strings.TrimSpace(string(msg))
if !strings.HasPrefix(msgJson, "{") || !strings.HasSuffix(msgJson, "}") {
t.Errorf("Expected to get JSON event but got: %v", msgJson)
}
}
}
}
113 changes: 48 additions & 65 deletions sdk/logical/event.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions sdk/logical/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ option go_package = "github.com/hashicorp/vault/sdk/logical";
package logical;

import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";

// EventPluginInfo contains data related to the plugin that generated an event.
message EventPluginInfo {
Expand Down Expand Up @@ -49,5 +48,4 @@ message EventReceived {
string namespace = 2;
string event_type = 3;
EventPluginInfo plugin_info = 4;
google.protobuf.Timestamp timestamp = 5;
}
4 changes: 2 additions & 2 deletions sdk/logical/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
)

// ID is an alias to GetId() for CloudEvents compatibility.
func (x *EventData) ID() string {
return x.GetId()
func (x *EventReceived) ID() string {
return x.Event.GetId()
}

// NewEvent returns an event with a new, random EID.
Expand Down
17 changes: 7 additions & 10 deletions vault/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/logical"
"github.com/ryanuber/go-glob"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
Expand Down Expand Up @@ -53,7 +52,7 @@ type pluginEventBus struct {
type asyncChanNode struct {
// TODO: add bounded deque buffer of *EventReceived
ctx context.Context
ch chan *logical.EventReceived
ch chan *eventlogger.Event
logger hclog.Logger

// used to close the connection
Expand Down Expand Up @@ -95,7 +94,6 @@ func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace,
Namespace: ns.Path,
EventType: string(eventType),
PluginInfo: pluginInfo,
Timestamp: timestamppb.New(time.Now()),
}
bus.logger.Info("Sending event", "event", eventReceived)

Expand Down Expand Up @@ -169,7 +167,7 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) {
}, nil
}

func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *logical.EventReceived, context.CancelFunc, error) {
func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
// subscriptions are still stored even if the bus has not been started
pipelineID, err := uuid.GenerateUUID()
if err != nil {
Expand All @@ -193,7 +191,7 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat
}

ctx, cancel := context.WithCancel(ctx)
asyncNode := newAsyncNode(ctx, ns, bus.logger)
asyncNode := newAsyncNode(ctx, bus.logger)
err = bus.broker.RegisterNode(eventlogger.NodeID(sinkNodeID), asyncNode)
if err != nil {
defer cancel()
Expand Down Expand Up @@ -247,10 +245,10 @@ func newFilterNode(ns *namespace.Namespace, pattern string) *eventlogger.Filter
}
}

func newAsyncNode(ctx context.Context, namespace *namespace.Namespace, logger hclog.Logger) *asyncChanNode {
func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode {
return &asyncChanNode{
ctx: ctx,
ch: make(chan *logical.EventReceived),
ch: make(chan *eventlogger.Event),
logger: logger,
}
}
Expand All @@ -272,17 +270,16 @@ func (node *asyncChanNode) Close() {
func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
// sends to the channel async in another goroutine
go func() {
eventRecv := e.Payload.(*logical.EventReceived)
var timeout bool
select {
case node.ch <- eventRecv:
case node.ch <- e:
case <-ctx.Done():
timeout = errors.Is(ctx.Err(), context.DeadlineExceeded)
case <-node.ctx.Done():
timeout = errors.Is(node.ctx.Err(), context.DeadlineExceeded)
}
if timeout {
node.logger.Info("Subscriber took too long to process event, closing", "ID", eventRecv.Event.ID())
node.logger.Info("Subscriber took too long to process event, closing", "ID", e.Payload.(*logical.EventReceived).Event.Id)
node.Close()
}
}()
Expand Down
Loading

0 comments on commit 491e218

Please sign in to comment.