Skip to content

Commit

Permalink
feat: add support for verifying the event format in the eventshub rec…
Browse files Browse the repository at this point in the history
…eiver (#750)

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 authored Aug 12, 2024
1 parent 0ff820e commit dfa4862
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
15 changes: 15 additions & 0 deletions pkg/eventshub/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,21 @@ func OIDCReceiverAudience(aud string) EventsHubOption {
return compose(envOption(OIDCReceiverAudienceEnv, aud), envOIDCEnabled())
}

// VerifyEventFormat has the receiver verify the event format when receiving events
func VerifyEventFormat(format string) EventsHubOption {
return compose(envOption("EVENT_FORMAT", format))
}

// VerifyEventFormat has the receiver verify the event format is structured when receiving events
func VerifyEventFormatStructured() EventsHubOption {
return VerifyEventFormat("json")
}

// VerifyEventFormat has the receiver verify the event format is binary when receiving events
func VerifyEventFormatBinary() EventsHubOption {
return VerifyEventFormat("binary")
}

// --- Sender options

// InitialSenderDelay defines how much the sender has to wait (in millisecond), when started, before start sending events.
Expand Down
32 changes: 30 additions & 2 deletions pkg/eventshub/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Receiver struct {
skipResponseBody string
EnforceTLS bool
oidcAudience string
expectedFormat *string

kubeclient kubernetes.Interface
}
Expand All @@ -66,6 +67,9 @@ type envConfig struct {
// ReceiverName is used to identify this instance of the receiver.
ReceiverName string `envconfig:"POD_NAME" default:"receiver-default" required:"true"`

// EventFormat is used to verify the format of the received events
EventFormat string `envconfig:"EVENT_FORMAT" default:"" required:"false"`

// EnforceTLS is used to enforce TLS.
EnforceTLS bool `envconfig:"ENFORCE_TLS" default:"false"`

Expand Down Expand Up @@ -142,6 +146,11 @@ func NewFromEnv(ctx context.Context, eventLogs *eventshub.EventLogs) *Receiver {
responseWaitTime = time.Duration(env.ResponseWaitTime) * time.Second
}

var expectedEventFormat *string
if env.EventFormat != "" {
expectedEventFormat = &env.EventFormat
}

return &Receiver{
Name: env.ReceiverName,
EnforceTLS: env.EnforceTLS,
Expand All @@ -155,6 +164,7 @@ func NewFromEnv(ctx context.Context, eventLogs *eventshub.EventLogs) *Receiver {
skipResponseHeaders: env.SkipResponseHeaders,
oidcAudience: env.OIDCAudience,
kubeclient: kubeclient.Get(ctx),
expectedFormat: expectedEventFormat,
}
}

Expand Down Expand Up @@ -238,6 +248,8 @@ func (o *Receiver) ServeHTTP(writer http.ResponseWriter, request *http.Request)
m := cloudeventshttp.NewMessageFromHttpRequest(request)
defer m.Finish(nil)

encoding := m.ReadEncoding()

event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m)
headers := make(http.Header)
for k, v := range request.Header {
Expand All @@ -260,7 +272,7 @@ func (o *Receiver) ServeHTTP(writer http.ResponseWriter, request *http.Request)
shouldSkip := o.counter.Skip()
var s uint64
var kind eventshub.EventKind
if shouldSkip || rejectErr != nil {
if shouldSkip || rejectErr != nil || !verifyFormat(o.expectedFormat, encoding) {
kind = eventshub.EventRejected
s = atomic.AddUint64(&o.dropSeq, 1)
} else {
Expand Down Expand Up @@ -295,7 +307,7 @@ func (o *Receiver) ServeHTTP(writer http.ResponseWriter, request *http.Request)
time.Sleep(o.responseWaitTime)
}

if rejectErr != nil {
if rejectErr != nil || !verifyFormat(o.expectedFormat, encoding) {
for headerKey, headerValue := range o.skipResponseHeaders {
writer.Header().Set(headerKey, headerValue)
}
Expand Down Expand Up @@ -361,3 +373,19 @@ func (o *Receiver) verifyJWT(request *http.Request) (*authv1.UserInfo, error) {
func isTLS(request *http.Request) bool {
return request.TLS != nil && request.TLS.HandshakeComplete && !eventshub.IsInsecureCipherSuite(request.TLS)
}

func verifyFormat(expected *string, actual cloudeventsbindings.Encoding) bool {
// we don't care what format is, always return true
if expected == nil {
return true
}

switch *expected {
case "binary":
return actual == cloudeventsbindings.EncodingBinary
case "json":
return actual == cloudeventsbindings.EncodingStructured
default:
return false
}
}

0 comments on commit dfa4862

Please sign in to comment.