Skip to content

Commit

Permalink
Add supporting for "halting" events (#26)
Browse files Browse the repository at this point in the history
* snapshot: wire in HaltEvent hooks for receivers and transformations

* allow dispatcher to throw non-fatal errors

* add docs wrt/ halting
  • Loading branch information
thisisaaronland committed Nov 30, 2022
1 parent a2b7b2c commit 6f629b4
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 62 deletions.
82 changes: 42 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ In between (receivers and dispatchers) are an optional chain of transformations

## Install

You will need to have both `Go` (specifically version [1.16](https://golang.org/dl) or higher) and the `make` programs installed on your computer. Assuming you do just type:
You will need to have both `Go` (specifically version [1.18](https://golang.org/dl) or higher) and the `make` programs installed on your computer. Assuming you do just type:

```
$ > make cli
$> make cli
go build -mod vendor -o bin/webhookd cmd/webhookd/main.go
go build -mod vendor -o bin/webhookd-generate-hook cmd/webhookd-generate-hook/main.go
go build -mod vendor -o bin/webhookd-flatten-config cmd/webhookd-flatten-config/main.go
Expand All @@ -28,44 +28,6 @@ go build -mod vendor -o bin/webhookd-inflate-config cmd/webhookd-inflate-config/

All of this package's dependencies are bundled with the code in the `vendor` directory.

## Important

`whosonfirst/go-webhookd/v3` does not introduce any _new_ functionality relative to `whosonfirst/go-webhookd/v2` but no longer comes with support for external platforms (GitHub, Slack, etc.) enabled by default. This functionality has been moved in to a number of separate `go-webhookd-{PLATFORM}` packages. This was done to make developing and adding custom receivers, transformations and dispatchers easier and modular.

## Upgrading from `whosonfirst/go-webhookd/v2`

You will need to add the relevant packages to your `cmd/webhookd/main.go` program. For example if your `webhookd` config file defines a GitHub receiver, a GitHub transformation and an AWS dispatcher you would need to import the [go-webhookd-github](https://github.com/whosonfirst/go-webhookd-github) and [go-webhookd-aws](https://github.com/whosonfirst/go-webhookd-aws) packages. Here's an abbreviated example in code, with error handling removed for the sake of brevity:

```
package main
import (
"context"
"github.com/sfomuseum/go-flags/flagset"
"github.com/whosonfirst/go-webhookd/v3/config"
"github.com/whosonfirst/go-webhookd/v3/daemon"
_ "github.com/whosonfirst/go-webhookd-aws"
_ "github.com/whosonfirst/go-webhookd-github"
"log"
"os"
)
func main() {
fs := flagset.NewFlagSet("webhooks")
config_uri := fs.String("config-uri", "", "A valid Go Cloud runtimevar URI representing your webhookd config.")
flagset.Parse(fs)
ctx := context.Background()
cfg, _ := config.NewConfigFromURI(ctx, *config_uri)
wh_daemon, _ := daemon.NewWebhookDaemonFromConfig(ctx, cfg)
wh_daemon.Start(ctx)
}
```

## Usage

### webhookd
Expand Down Expand Up @@ -382,6 +344,10 @@ The `Null` dispatcher will send messages in to the vortex, never to be seen agai
null://
```

## Halting a `webhookd` processing flow

As of `go-webhookd` v3.2.0 it is possible to "halt" a processing flow in mid-stream. This occurs is a receiver or transformation returns a `webhookd.WebhookError` with `Code` property whose value is `webhookd.HaltEvent`. These errors are treated as non-fatal but are treated as a signal to end processing and return immediately. Support for `webhookd.HaltEvent` in dispatchers is also enabled but they do not stop processing since dispatchers are invoked asynchronously.

## Testing

In advance of proper tests. In a terminal start `webhookd` like this:
Expand Down Expand Up @@ -422,6 +388,42 @@ e3a18d4de60a5e50ca78ca1733238735ddfaef4c,sfomuseum-data-flights-2020-05,data/171
* [Restrict access to receivers by host/IP](https://github.com/whosonfirst/go-webhookd/issues/6)
* Better logging

## Upgrading from `whosonfirst/go-webhookd/v2`

`whosonfirst/go-webhookd/v3` does not introduce any _new_ functionality relative to `whosonfirst/go-webhookd/v2` but no longer comes with support for external platforms (GitHub, Slack, etc.) enabled by default. This functionality has been moved in to a number of separate `go-webhookd-{PLATFORM}` packages. This was done to make developing and adding custom receivers, transformations and dispatchers easier and modular.

You will need to add the relevant packages to your `cmd/webhookd/main.go` program. For example if your `webhookd` config file defines a GitHub receiver, a GitHub transformation and an AWS dispatcher you would need to import the [go-webhookd-github](https://github.com/whosonfirst/go-webhookd-github) and [go-webhookd-aws](https://github.com/whosonfirst/go-webhookd-aws) packages. Here's an abbreviated example in code, with error handling removed for the sake of brevity:

```
package main
import (
"context"
"github.com/sfomuseum/go-flags/flagset"
"github.com/whosonfirst/go-webhookd/v3/config"
"github.com/whosonfirst/go-webhookd/v3/daemon"
_ "github.com/whosonfirst/go-webhookd-aws"
_ "github.com/whosonfirst/go-webhookd-github"
"log"
"os"
)
func main() {
fs := flagset.NewFlagSet("webhooks")
config_uri := fs.String("config-uri", "", "A valid Go Cloud runtimevar URI representing your webhookd config.")
flagset.Parse(fs)
ctx := context.Background()
cfg, _ := config.NewConfigFromURI(ctx, *config_uri)
wh_daemon, _ := daemon.NewWebhookDaemonFromConfig(ctx, cfg)
wh_daemon.Start(ctx)
}
```

## See also

* https://github.com/whosonfirst/go-webhookd-aws
Expand Down
2 changes: 1 addition & 1 deletion cmd/webhookd-generate-hook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
fmt.Fprintf(os.Stderr, "Usage:\n\t %s [options]\n", os.Args[0])
flag.PrintDefaults()
}

flag.Parse()

fmt.Println(RandomString(*length))
Expand Down
2 changes: 1 addition & 1 deletion cmd/webhookd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func main() {
fmt.Fprintf(os.Stderr, "Usage:\n\t %s [options]\n", os.Args[0])
fs.PrintDefaults()
}

flagset.Parse(fs)

err := flagset.SetFlagsFromEnvVarsWithFeedback(fs, "WEBHOOKD", false)
Expand Down
47 changes: 33 additions & 14 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,17 @@ func (d *WebhookDaemon) HandlerFuncWithLogger(logger *log.Logger) (http.HandlerF
// not an error, for example when github sends a ping message
// (20190212/thisisaaronland)

if err != nil && err.Code == -1 {
return
}

if err != nil {
http.Error(rsp, err.Error(), err.Code)
return

switch err.Code {
case webhookd.UnhandledEvent, webhookd.HaltEvent:
logger.Printf("Receiver step (%T) returned non-fatal error and exiting, %v", rcvr, err)
return
default:
logger.Printf("Receiver step (%T) failed, %v", rcvr, err)
http.Error(rsp, err.Error(), err.Code)
return
}
}

tb = time.Since(ta)
Expand All @@ -263,13 +267,21 @@ func (d *WebhookDaemon) HandlerFuncWithLogger(logger *log.Logger) (http.HandlerF

ta = time.Now()

for _, step := range wh.Transformations() {
for idx, step := range wh.Transformations() {

body, err = step.Transform(ctx, body)

if err != nil {
http.Error(rsp, err.Error(), err.Code)
return

switch err.Code {
case webhookd.UnhandledEvent, webhookd.HaltEvent:
logger.Printf("Transformation step (%T) at offset %d returned non-fatal error and exiting, %v", step, idx, err)
return
default:
logger.Printf("Transformation step (%T) at offset %d failed, %v", step, idx, err)
http.Error(rsp, err.Error(), err.Code)
return
}
}

// check to see if there is anything left the transformation
Expand All @@ -287,22 +299,29 @@ func (d *WebhookDaemon) HandlerFuncWithLogger(logger *log.Logger) (http.HandlerF
wg := new(sync.WaitGroup)
ch := make(chan *webhookd.WebhookError)

for _, d := range wh.Dispatchers() {
for idx, d := range wh.Dispatchers() {

wg.Add(1)

go func(d webhookd.WebhookDispatcher, body []byte) {
go func(idx int, d webhookd.WebhookDispatcher, body []byte) {

defer wg.Done()

err = d.Dispatch(ctx, body)

if err != nil {
logger.Printf("FAILED TO DISPATCH W/ %T, %v\n", d, err)
ch <- err

switch err.Code {
case webhookd.UnhandledEvent, webhookd.HaltEvent:
logger.Printf("Dispatch step (%T) at offset %d returned non-fatal error and exiting, %v", d, idx, err)
return
default:
logger.Printf("Dispatch step (%T) at offset %d failed, %v", d, idx, err)
ch <- err
}
}

}(d, body)
}(idx, d, body)
}

// https://github.com/whosonfirst/go-webhookd/issues/14
Expand Down
2 changes: 1 addition & 1 deletion dispatcher/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type LogDispatcher struct {

// NewLogDispatcher returns a new `LogDispatcher` instance configured by 'uri' in the form of:
//
// log://
// log://
//
// Messasges are dispatched to the default `log.Default()` instance.
func NewLogDispatcher(ctx context.Context, uri string) (webhookd.WebhookDispatcher, error) {
Expand Down
9 changes: 9 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"fmt"
)

const UnhandledEvent int = -1

const HaltEvent int = -2

// WebhookError implements the `error` interface for wrapping webhookd error codes and messages.
type WebhookError struct {
error
Expand All @@ -17,3 +21,8 @@ type WebhookError struct {
func (e WebhookError) Error() string {
return fmt.Sprintf("%d %s", e.Code, e.Message)
}

// String returns a string representation of 'e'.
func (e WebhookError) String() string {
return e.Error()
}
2 changes: 1 addition & 1 deletion receiver/insecure.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type InsecureReceiver struct {

// NewInsecureReceiver returns a new `InsecureReceiver` instance configured by 'uri' in the form of:
//
// insecure://
// insecure://
func NewInsecureReceiver(ctx context.Context, uri string) (webhookd.WebhookReceiver, error) {

wh := InsecureReceiver{}
Expand Down
2 changes: 1 addition & 1 deletion transformation/chicken.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type ChickenTransformation struct {

// NewInsecureTransformation returns a new `ChickenTransformation` instance configured by 'uri' in the form of:
//
// chicken://{LANGUAGE_TAG}?{PARAMETERS}
// chicken://{LANGUAGE_TAG}?{PARAMETERS}
//
// Where {LANGUAGE_TAG} is any valid language tag supported by the `aaronland/go-chicken` package. Valid {PARAMETERS} are:
// * `clucking={BOOLEAN}` A boolean flag to indicate whether messages should be transformed in the form of chicken noises.
Expand Down
2 changes: 1 addition & 1 deletion transformation/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type NullTransformation struct {

// NewInsecureTransformation returns a new `NullTransformation` instance configured by 'uri' in the form of:
//
// null://
// null://
func NewNullTransformation(ctx context.Context, uri string) (webhookd.WebhookTransformation, error) {

p := NullTransformation{}
Expand Down
4 changes: 2 additions & 2 deletions webhookd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ type WebhookReceiver interface {
Receive(context.Context, *http.Request) ([]byte, *WebhookError)
}

//WebhookTransformation is an interface that defines methods for altering (transforming) the body of a (webhook) message after receipt.
// WebhookTransformation is an interface that defines methods for altering (transforming) the body of a (webhook) message after receipt.
type WebhookTransformation interface {
// Transforms() alters the body of a (webhook) message (according to rules defined by the package implementing the `WebhookTransformation` interface).
Transform(context.Context, []byte) ([]byte, *WebhookError)
}

//WebhookDispatcher is an interface that defines methods for relaying the body of a (webhook) message after it has been transformed.
// WebhookDispatcher is an interface that defines methods for relaying the body of a (webhook) message after it has been transformed.
type WebhookDispatcher interface {
// Dispatch() relays the body of a message (according to rules defined defined by the package implementing the `WebhookDispatcher` interface).
Dispatch(context.Context, []byte) *WebhookError
Expand Down

0 comments on commit 6f629b4

Please sign in to comment.