Skip to content

Commit

Permalink
Add support for custom events
Browse files Browse the repository at this point in the history
Add the ability to produce and consume custom events, including
testing of the custom/conformance.json from the spec.

This does not include validation against customSchemaUri yet,
that will be implemented as a separate feature as it applies to
both regular and custom events.

Signed-off-by: Andrea Frittoli <andrea.frittoli@gmail.com>
  • Loading branch information
afrittoli committed Jul 9, 2024
1 parent 9ac1c56 commit 78dafb2
Show file tree
Hide file tree
Showing 110 changed files with 1,435 additions and 342 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func main() {

See the [CloudEvents](https://github.com/cloudevents/sdk-go#send-your-first-cloudevent) docs as well.

## Documentation

More examples are available in the [docs](./docs) folder.
Online API Reference:
- [SDK Root](https://pkg.go.dev/github.com/cdevents/sdk-go/pkg/api)
- [v03 Specific](https://pkg.go.dev/github.com/cdevents/sdk-go/pkg/api/v03)
- [v04 Specific](https://pkg.go.dev/github.com/cdevents/sdk-go/pkg/api/v04)

## Contributing

If you would like to contribute, see our [development](DEVELOPMENT.md) guide.
Expand Down
152 changes: 152 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# CDEvents Go SDK Docs

This folder contains example of how to use this SDK.

## Create a Custom CDEvent

If a tool wants to emit events that are not supported by the CDEvents specification,
they can do so via [custom events](https://github.com/cdevents/spec/tree/main/custom).

Custom events are follow the CDEvents format and can be defined via the
`CustomTypeEvent` object, available since v0.4, as well as using the `CustomCDEventReader`
and `CustomCDEventWriter` interfaces.

Let's consider the following scenario: a tool called "MyRegistry" has a concept of "Quota"
which can be "exceeded" by users of the system. We want to use events to notify when that
happens, but CDEvents does not define any quota related subject.

```golang
type Quota struct {
User string `json:"user,omitempty"` // The use the quota applies ot
Limit string `json:"limit,omitempty"` // The limit enforced by the quota e.g. 100Gb
Current int `json:"current,omitempty"` // The current % of the quota used e.g. 90%
Threshold int `json:"threshold,omitempty"` // The threshold for warning event e.g. 85%
Level string `json:"level,omitempty"` // INFO: <threshold, WARNING: >threshold, <quota, CRITICAL: >quota
}
```
For this scenario we will need a few imports:

```golang
import (
"context"
"fmt"
"log"

cdevents "github.com/cdevents/sdk-go/pkg/api"
cdeventsv04 "github.com/cdevents/sdk-go/pkg/api/v04"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
```

Let's define a custom event type for this scenario.
This is our first iteration, so the event will have version "0.1.0".

```golang
eventType := cdevents.CDEventType{
Subject: "quota",
Predicate: "exceeded",
Version: "0.1.0",
Custom: "myregistry",
}
```

With a `Quota` object, let's create a CDEvent for it:

```golang
quotaRule123 := Quota{
User: "heavy_user",
Limit: "50Tb",
Current: 90,
Threshold: 85,
Level: "WARNING",
}

// Create the base event
event, err := cdeventsv04.NewCustomTypeEvent()
if err != nil {
log.Fatalf("could not create a cdevent, %v", err)
}
event.SetEventType(eventType)

// Set the required context fields
event.SetSubjectId("quotaRule123")
event.SetSource("myregistry/region/staging")

// Set the required subject content
event.SetSubjectContent(quotaRule123)

// If we host a schema for the overall custom CDEvent, we can add it
// to the event so that the receiver may validate custom fields like
// the event type and subject content
event.SetSchemaUri("https://myregistry.dev/schemas/cdevents/quota-exceeded/0_1_0")
```

To see the event, let's render it as JSON and log it:

```golang
// Render the event as JSON
eventJson, err := cdevents.AsJsonString(event)
if err != nil {
log.Fatalf("failed to marshal the CDEvent, %v", err)
}
// Print the event
fmt.Printf("%s", eventJson)
```

The resulting CDEvents will look like:

```json
{"context":{"version":"0.4.1","id":"37fc85d9-187f-4ceb-a11d-9df30f809624","source":"my/first/cdevent/program","type":"dev.cdeventsx.myregistry-quota.exceeded.0.1.0","timestamp":"2024-07-09T14:00:54.375172+01:00","schemaUri":"https://myregistry.dev/schemas/cdevents/quota-exceeded/0_1_0"},"subject":{"id":"quotaRule123","source":"my/first/cdevent/program","type":"myregistry-quota","content":{"user":"heavy_user","limit":"50Tb","current":90,"threshold":85,"level":"WARNING"}}}
```

To send the event, let's setup a test sink, for instance using [smee.io/](https://smee.io/).
Then let's render the event as CloudEvent and send it to the sink:

```golang
ce, err = cdevents.AsCloudEvent(event)
if err != nil {
log.Fatalf("failed to create cloudevent, %v", err)
}

// Set send options
ctx := cloudevents.ContextWithTarget(context.Background(), "https://smee.io/<you-channel-id>")
ctx = cloudevents.WithEncodingBinary(ctx)

c, err = cloudevents.NewClientHTTP()
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

// Send the CloudEvent
if result := c.Send(ctx, *ce); cloudevents.IsUndelivered(result) {
log.Fatalf("failed to send, %v", result)
}
```

The whole code of is available under [`examples/custom.go`](./examples/custom.go):

```shell
➜ go run custom.go | jq .
{
"context": {
"version": "0.4.1",
"id": "f7be8a13-8bd7-4a3a-881f-ed49cc0ebf8f",
"source": "my/first/cdevent/program",
"type": "dev.cdeventsx.myregistry-quota.exceeded.0.1.0",
"timestamp": "2024-07-09T14:01:00.449264+01:00",
"schemaUri": "https://myregistry.dev/schemas/cdevents/quota-exceeded/0_1_0"
},
"subject": {
"id": "quotaRule123",
"source": "my/first/cdevent/program",
"type": "myregistry-quota",
"content": {
"user": "heavy_user",
"limit": "50Tb",
"current": 90,
"threshold": 85,
"level": "WARNING"
}
}
}
```
105 changes: 105 additions & 0 deletions docs/examples/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"context"
"fmt"
"log"
"net/http"

cdevents "github.com/cdevents/sdk-go/pkg/api"
cdeventsv04 "github.com/cdevents/sdk-go/pkg/api/v04"
cloudevents "github.com/cloudevents/sdk-go/v2"
)

type Quota struct {
User string `json:"user,omitempty"` // The use the quota applies ot
Limit string `json:"limit,omitempty"` // The limit enforced by the quota e.g. 100Gb
Current int `json:"current,omitempty"` // The current % of the quota used e.g. 90%
Threshold int `json:"threshold,omitempty"` // The threshold for warning event e.g. 85%
Level string `json:"level,omitempty"` // INFO: <threshold, WARNING: >threshold, <quota, CRITICAL: >quota
}

// Copied from https://github.com/eswdd/go-smee/blob/33b0bac1f1ef3abef04c518ddf7552b04edbadd2/smee.go#L54C1-L67C2
func CreateSmeeChannel() (*string, error) {
httpClient := http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
resp, err := httpClient.Head("https://smee.io/new")
if err != nil {
return nil, err
}

loc := resp.Header.Get("Location")
return &loc, nil
}

func main() {
var ce *cloudevents.Event
var c cloudevents.Client

// Define the event type
eventType := cdevents.CDEventType{
Subject: "quota",
Predicate: "exceeded",
Version: "0.1.0",
Custom: "myregistry",
}

// Define the content
quotaRule123 := Quota{
User: "heavy_user",
Limit: "50Tb",
Current: 90,
Threshold: 85,
Level: "WARNING",
}

// Create the base event
event, err := cdeventsv04.NewCustomTypeEvent()
if err != nil {
log.Fatalf("could not create a cdevent, %v", err)
}
event.SetEventType(eventType)

// Set the required context fields
event.SetSubjectId("quotaRule123")
event.SetSource("my/first/cdevent/program")

// Set the required subject fields
event.SetSubjectContent(quotaRule123)

event.SetSchemaUri("https://myregistry.dev/schemas/cdevents/quota-exceeded/0_1_0")

// Print the event
eventJson, err := cdevents.AsJsonString(event)
if err != nil {
log.Fatalf("failed to marshal the CDEvent, %v", err)
}
fmt.Printf("%s", eventJson)

ce, err = cdevents.AsCloudEvent(event)
if err != nil {
log.Fatalf("failed to create cloudevent, %v", err)
}

// Set send options
source, err := CreateSmeeChannel()
if err != nil {
log.Fatalf("failed to create a smee channel: %v", err)
}
ctx := cloudevents.ContextWithTarget(context.Background(), *source)
ctx = cloudevents.WithEncodingBinary(ctx)

c, err = cloudevents.NewClientHTTP()
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

// Send the CloudEvent
// c is a CloudEvent client
if result := c.Send(ctx, *ce); cloudevents.IsUndelivered(result) {
log.Fatalf("failed to send, %v", result)
}
}
36 changes: 22 additions & 14 deletions pkg/api/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
"golang.org/x/mod/semver"
)

const SCHEMA_ID_REGEX = `^https://cdevents.dev/([0-9]\.[0-9])\.[0-9]/schema/([^ ]*)$`
const (
SCHEMA_ID_REGEX = `^https://cdevents.dev/([0-9]\.[0-9])\.[0-9]/schema/([^ ]*)$`
CustomEventMapKey = "custom"
)

var (
// Validation helper as singleton
Expand Down Expand Up @@ -172,24 +175,29 @@ func NewFromJsonBytesContext[CDEventType CDEvent](event []byte, cdeventsMap map[
eventAux := &struct {
Context Context `json:"context"`
}{}
var nilReturn CDEventType
var nilReturn, receiver CDEventType
var ok bool
err := json.Unmarshal(event, eventAux)
if err != nil {
return nilReturn, err
}
eventType := eventAux.Context.GetType()
receiver, ok := cdeventsMap[eventType.UnversionedString()]
if !ok {
// This should not happen as unmarshalling and validate checks if the type is known to the SDK
return nilReturn, fmt.Errorf("unknown event type %s", eventAux.Context.GetType())
}
// Check if the receiver is compatible. It must have the same subject and predicate
// and share the same major version.
// If the minor version is different and the message received as a version that is
// greater than the SDK one, some fields may be lost, as newer versions may add new
// fields to the event specification.
if !eventType.IsCompatible(receiver.GetType()) {
return nilReturn, fmt.Errorf("sdk event version %s not compatible with %s", receiver.GetType().Version, eventType.Version)
if eventType.Custom != "" {
receiver = cdeventsMap[CustomEventMapKey] // Custom type receiver does not have a predefined type
} else {
receiver, ok = cdeventsMap[eventType.UnversionedString()]
if !ok {
// This should not happen as unmarshalling and validate checks if the type is known to the SDK
return nilReturn, fmt.Errorf("unknown event type %s", eventAux.Context.GetType())
}
// Check if the receiver is compatible. It must have the same subject and predicate
// and share the same major version.
// If the minor version is different and the message received as a version that is
// greater than the SDK one, some fields may be lost, as newer versions may add new
// fields to the event specification.
if !eventType.IsCompatible(receiver.GetType()) {
return nilReturn, fmt.Errorf("sdk event version %s not compatible with %s", receiver.GetType().Version, eventType.Version)
}
}
err = json.Unmarshal(event, receiver)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/bindings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func TestInvalidEvent(t *testing.T) {
Context: api.ContextV04{
Context: api.Context{
Type: api.CDEventType{Subject: "not-a-valid-type"},
Version: api.CDEventsSpecVersion,
Version: testapi.SpecVersion,
},
},
Subject: testapi.FooSubjectBarPredicateSubject{
Expand Down
12 changes: 9 additions & 3 deletions pkg/api/schemas.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/api/spec-v0.4
Loading

0 comments on commit 78dafb2

Please sign in to comment.