Skip to content

Commit

Permalink
Add support for custom events
Browse files Browse the repository at this point in the history
Add the ability to produce and consume custom events, including
testing of the custom/conformance.json from the spec.

This does not include validation against customSchemaUri yet,
that will be implemented as a separate feature as it applies to
both regular and custom events.

Signed-off-by: Andrea Frittoli <andrea.frittoli@gmail.com>
  • Loading branch information
afrittoli committed Jul 9, 2024
1 parent 9ac1c56 commit 4a594e9
Show file tree
Hide file tree
Showing 109 changed files with 1,295 additions and 342 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func main() {

See the [CloudEvents](https://github.com/cloudevents/sdk-go#send-your-first-cloudevent) docs as well.

## Documentation

More examples are available in the [docs](./docs) folder.
Online API Reference:
- [SDK Root](https://pkg.go.dev/github.com/cdevents/sdk-go/pkg/api)
- [v03 Specific](https://pkg.go.dev/github.com/cdevents/sdk-go/pkg/api/v03)
- [v04 Specific](https://pkg.go.dev/github.com/cdevents/sdk-go/pkg/api/v04)

## Contributing

If you would like to contribute, see our [development](DEVELOPMENT.md) guide.
Expand Down
117 changes: 117 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# CDEvents Go SDK Docs

This folder contains example of how to use this SDK.

## Create a Custom CDEvent

If a tool wants to emit events that are not supported by the CDEvents specification,
they can do so via [custom events](https://github.com/cdevents/spec/tree/main/custom).

Custom events are follow the CDEvents format and can be defined via the
`CustomTypeEvent` object, available since v0.4, as well as using the `CustomCDEventReader`
and `CustomCDEventWriter` interfaces.

Let's consider the following scenario: a tool called "MyRegistry" has a concept of "Quota"
which can be "exceeded" by users of the system. We want to use events to notify when that
happens, but CDEvents does not define any quota related subject.

```golang
type Quota struct {
User string `json:"user,omitempty"` // The use the quota applies ot
Limit string `json:"limit,omitempty"` // The limit enforced by the quota e.g. 100Gb
Current int `json:"current,omitempty"` // The current % of the quota used e.g. 90%
Threshold int `json:"threshold,omitempty"` // The threshold for warning event e.g. 85%
Level string `json:"level,omitempty"` // INFO: <threshold, WARNING: >threshold, <quota, CRITICAL: >quota
}
```
For this scenario we will need a few imports:

```golang
import (
"context"
"log"

cdevents "github.com/cdevents/sdk-go/pkg/api"
cdeventsv04 "github.com/cdevents/sdk-go/pkg/api/v04"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
```

Let's define a custom event type for this scenario.
This is our first iteration, so the event will have version "0.1.0".

```golang
eventType := cdevents.CDEventType{
Subject: "quota",
Predicate: "exceeded",
Version: "0.1.0",
Custom: "myregistry",
}
```

With a `Quota` object, let's create a CDEvent for it:

```golang
quotaRule123 := Quota{
User: "heavy_user",
Limit: "50Tb",
Current: 90,
Threshold: 85,
Level: "WARNING",
}

// Create the base event
event, err := cdeventsv04.NewCustomTypeEvent()
if err != nil {
log.Fatalf("could not create a cdevent, %v", err)
}
event.SetEventType(eventType)

// Set the required context fields
event.SetSubjectId("quotaRule123")
event.SetSource("myregistry/region/staging")

// Set the required subject content
event.SetSubjectContent(quotaRule123)

// If we host a schema for the overall custom CDEvent, we can add it
// to the event so that the receiver may validate custom fields like
// the event type and subject content
event.SetSchemaUri("https://myregistry.dev/schemas/cdevents/quota-exceeded/0_1_0")
```

To see the event, let's render it as JSON and log it:

```golang
// Render the event as JSON
eventJson, err := cdevents.AsJsonString(event)
if err != nil {
log.Fatalf("failed to marshal the CDEvent, %v", err)
}
// Print the event
log.Printf("event: %v", eventJson)
```

To send the event, let's setup a test sink, for instance using [smee.io/](https://smee.io/).
Then let's render the event as CloudEvent and send it to the sink:

```golang
ce, err = cdevents.AsCloudEvent(event)
if err != nil {
log.Fatalf("failed to create cloudevent, %v", err)
}

// Set send options
ctx := cloudevents.ContextWithTarget(context.Background(), "https://smee.io/<you-channel-id>")
ctx = cloudevents.WithEncodingBinary(ctx)

c, err = cloudevents.NewClientHTTP()
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

// Send the CloudEvent
if result := c.Send(ctx, *ce); cloudevents.IsUndelivered(result) {
log.Fatalf("failed to send, %v", result)
}
```
36 changes: 22 additions & 14 deletions pkg/api/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
"golang.org/x/mod/semver"
)

const SCHEMA_ID_REGEX = `^https://cdevents.dev/([0-9]\.[0-9])\.[0-9]/schema/([^ ]*)$`
const (
SCHEMA_ID_REGEX = `^https://cdevents.dev/([0-9]\.[0-9])\.[0-9]/schema/([^ ]*)$`
CustomEventMapKey = "custom"
)

var (
// Validation helper as singleton
Expand Down Expand Up @@ -172,24 +175,29 @@ func NewFromJsonBytesContext[CDEventType CDEvent](event []byte, cdeventsMap map[
eventAux := &struct {
Context Context `json:"context"`
}{}
var nilReturn CDEventType
var nilReturn, receiver CDEventType
var ok bool
err := json.Unmarshal(event, eventAux)
if err != nil {
return nilReturn, err
}
eventType := eventAux.Context.GetType()
receiver, ok := cdeventsMap[eventType.UnversionedString()]
if !ok {
// This should not happen as unmarshalling and validate checks if the type is known to the SDK
return nilReturn, fmt.Errorf("unknown event type %s", eventAux.Context.GetType())
}
// Check if the receiver is compatible. It must have the same subject and predicate
// and share the same major version.
// If the minor version is different and the message received as a version that is
// greater than the SDK one, some fields may be lost, as newer versions may add new
// fields to the event specification.
if !eventType.IsCompatible(receiver.GetType()) {
return nilReturn, fmt.Errorf("sdk event version %s not compatible with %s", receiver.GetType().Version, eventType.Version)
if eventType.Custom != "" {
receiver = cdeventsMap[CustomEventMapKey] // Custom type receiver does not have a predefined type
} else {
receiver, ok = cdeventsMap[eventType.UnversionedString()]
if !ok {
// This should not happen as unmarshalling and validate checks if the type is known to the SDK
return nilReturn, fmt.Errorf("unknown event type %s", eventAux.Context.GetType())
}
// Check if the receiver is compatible. It must have the same subject and predicate
// and share the same major version.
// If the minor version is different and the message received as a version that is
// greater than the SDK one, some fields may be lost, as newer versions may add new
// fields to the event specification.
if !eventType.IsCompatible(receiver.GetType()) {
return nilReturn, fmt.Errorf("sdk event version %s not compatible with %s", receiver.GetType().Version, eventType.Version)
}
}
err = json.Unmarshal(event, receiver)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/bindings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func TestInvalidEvent(t *testing.T) {
Context: api.ContextV04{
Context: api.Context{
Type: api.CDEventType{Subject: "not-a-valid-type"},
Version: api.CDEventsSpecVersion,
Version: testapi.SpecVersion,
},
},
Subject: testapi.FooSubjectBarPredicateSubject{
Expand Down
12 changes: 9 additions & 3 deletions pkg/api/schemas.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/api/spec-v0.4
110 changes: 95 additions & 15 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,22 @@ import (
)

const (
EventTypeRoot = "dev.cdevents"
CDEventsSpecVersion = "0.3.0"
CDEventsSchemaURLTemplate = "https://cdevents.dev/%s/schema/%s-%s-event"
CDEventsTypeRegex = "^dev\\.cdevents\\.(?P<subject>[a-z]+)\\.(?P<predicate>[a-z]+)\\.(?P<version>.*)$"
EventTypeRoot = "dev.cdevents"
CustomEventTypeRoot = "dev.cdeventsx"
CDEventsSchemaURLTemplate = "https://cdevents.dev/%s/schema/%s-%s-event"
CDEventsCustomSchemaURLTemplate = "https://cdevents.dev/%s/schema/custom"
CDEventsTypeRegex = "^dev\\.cdevents\\.(?P<subject>[a-z]+)\\.(?P<predicate>[a-z]+)\\.(?P<version>.*)$"
CDEventsCustomTypeRegex = "^dev\\.cdeventsx\\.(?P<tool>[a-z]+)-(?P<subject>[a-z]+)\\.(?P<predicate>[a-z]+)\\.(?P<version>.*)$"

LinkTypePath LinkType = "PATH"
LinkTypeEnd LinkType = "END"
LinkTypeRelation LinkType = "RELATION"
)

var (
CDEventsTypeCRegex = regexp.MustCompile(CDEventsTypeRegex)
LinkTypes = map[LinkType]interface{}{
CDEventsTypeCRegex = regexp.MustCompile(CDEventsTypeRegex)
CDEventsCustomTypeCRegex = regexp.MustCompile(CDEventsCustomTypeRegex)
LinkTypes = map[LinkType]interface{}{
LinkTypePath: "",
LinkTypeEnd: "",
LinkTypeRelation: "",
Expand Down Expand Up @@ -380,18 +383,59 @@ type CDEventType struct {

// Version is a semantic version in the form <major>.<minor>.<patch>
Version string

// Custom holds the tool name in case of custom events
Custom string
}

func (t CDEventType) Root() string {
root := EventTypeRoot
if t.Custom != "" {
root = CustomEventTypeRoot
}
return root
}

// FQSubject returns the fully qualified subject, which includes
// the tool name from t.Custom in case of custom events
func (t CDEventType) FQSubject() string {
s := t.Subject
if s == "" {
s = "<undefined-subject>"
}
if t.Custom != "" {
s = t.Custom + "-" + s
}
return s
}

func (t CDEventType) String() string {
return EventTypeRoot + "." + t.Subject + "." + t.Predicate + "." + t.Version
predicate := t.Predicate
if predicate == "" {
predicate = "<undefined-predicate>"
}
version := t.Version
if version == "" {
version = "<undefined-version>"
}
return t.Root() + "." + t.FQSubject() + "." + predicate + "." + version
}

func (t CDEventType) UnversionedString() string {
return EventTypeRoot + "." + t.Subject + "." + t.Predicate
predicate := t.Predicate
if predicate == "" {
predicate = "<undefined-predicate>"
}
return t.Root() + "." + t.FQSubject() + "." + predicate
}

func (t CDEventType) Short() string {
return t.Subject + "_" + t.Predicate
s := t.FQSubject()
p := t.Predicate
if s == "" || p == "" {
return ""
}
return t.FQSubject() + "_" + t.Predicate
}

// Two CDEventTypes are compatible if the subject and predicates
Expand Down Expand Up @@ -420,15 +464,32 @@ func (t CDEventType) MarshalJSON() ([]byte, error) {
}

func CDEventTypeFromString(cdeventType string) (*CDEventType, error) {
names := CDEventsTypeCRegex.SubexpNames()
parts := CDEventsTypeCRegex.FindStringSubmatch(cdeventType)
if len(parts) != 4 {
return nil, fmt.Errorf("cannot parse event type %s", cdeventType)
names = CDEventsCustomTypeCRegex.SubexpNames()
parts = CDEventsCustomTypeCRegex.FindStringSubmatch(cdeventType)
if len(parts) != 5 {
return nil, fmt.Errorf("cannot parse event type %s", cdeventType)
}
}
returnType := CDEventType{}
for i, matchName := range names {
if i == 0 {
continue
}
switch matchName {
case "subject":
returnType.Subject = parts[i]
case "predicate":
returnType.Predicate = parts[i]
case "version":
returnType.Version = parts[i]
case "tool":
returnType.Custom = parts[i]
}
}
return &CDEventType{
Subject: parts[1],
Predicate: parts[2],
Version: parts[3],
}, nil
return &returnType, nil
}

type CDEventReader interface {
Expand Down Expand Up @@ -459,6 +520,11 @@ type CDEventReader interface {
// for direct access to the content fields
GetSubject() Subject

// The event specific subject. It is possible to use a type assertion with
// the generic Subject to obtain an event specific implementation of Subject
// for direct access to the content fields
GetSubjectContent() interface{}

// The URL and content of the schema file associated to the event type
GetSchema() (string, *jsonschema.Schema, error)

Expand Down Expand Up @@ -531,6 +597,20 @@ type CDEventWriterV04 interface {
SetSchemaUri(schema string)
}

type CustomCDEventReader interface {
CDEventReaderV04
}

type CustomCDEventWriter interface {
CDEventWriterV04

// CustomCDEvent can represent different event types
SetEventType(eventType CDEventType)

// CustomCDEvent types can have different subject fields
SetSubjectContent(subjectContent interface{})
}

type CDEventCustomDataEncoding string

func (t CDEventCustomDataEncoding) String() string {
Expand Down
Loading

0 comments on commit 4a594e9

Please sign in to comment.