Skip to content

Commit

Permalink
Add webhook url, fix race condition, fix token.
Browse files Browse the repository at this point in the history
- Add a required url for the webhook url.  The original address that
  appeared to be the right url was the address of the requester, not the
  target.
- Fix a race condition where sending an update to the goroutine can
  cause a deadlock.  Instead of sending the signal inside a mutex, send
  it via a separate go routine that can recover from a panic if the
  channel happens to be closed while it is waiting.
- Fix the token interface so it's just that; using the Token interface
  not the literal token{}.
  • Loading branch information
schmidtw committed Aug 30, 2023
1 parent f33a8de commit 96f92b7
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 91 deletions.
4 changes: 2 additions & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ func ExampleBasicAuth() { // nolint: govet

// Create the listener.
r := webhook.Registration{
Address: server.URL, // Replace this with a real address.
Duration: webhook.CustomDuration(5 * time.Minute),
}

whl, err := listener.New(&r,
url := server.URL // replace with the URL of the webhook provider
whl, err := listener.New(&r, url,
listener.AuthBasic("username", "password"),
listener.AcceptSHA1(),
listener.AcceptedSecrets("foobar", "carport"),
Expand Down
174 changes: 118 additions & 56 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,17 @@ func TestNormalUsage(t *testing.T) {
defer server.Close()

// Create the listener.
whl, err := New(&webhook.Registration{
Address: server.URL,
Events: []string{
"foo",
},
Config: webhook.DeliveryConfig{
Secret: "secret1",
whl, err := New(
&webhook.Registration{
Events: []string{
"foo",
},
Config: webhook.DeliveryConfig{
Secret: "secret1",
},
Duration: webhook.CustomDuration(5 * time.Minute),
},
Duration: webhook.CustomDuration(5 * time.Minute),
},
server.URL,
Interval(1*time.Millisecond),
AuthBasic("user", "pass"),
)
Expand Down Expand Up @@ -148,16 +149,17 @@ func TestSingleShotUsage(t *testing.T) {
defer server.Close()

// Create the listener.
whl, err := New(&webhook.Registration{
Address: server.URL,
Events: []string{
"foo",
},
Config: webhook.DeliveryConfig{
Secret: "secret1",
whl, err := New(
&webhook.Registration{
Events: []string{
"foo",
},
Config: webhook.DeliveryConfig{
Secret: "secret1",
},
Duration: webhook.CustomDuration(5 * time.Minute),
},
Duration: webhook.CustomDuration(5 * time.Minute),
},
server.URL,
Once(),
)
require.NotNil(whl)
Expand All @@ -172,7 +174,7 @@ func TestSingleShotUsage(t *testing.T) {
assert.NoError(err)

// Wait a bit then roll the secret..
time.Sleep(time.Millisecond)
time.Sleep(10 * time.Millisecond)
m.Lock()
expectSecret = append(expectSecret, "secret2", "secret3", "secret4")
m.Unlock()
Expand All @@ -187,13 +189,13 @@ func TestSingleShotUsage(t *testing.T) {
assert.NoError(err)

// Wait a bit then remove the prior secret from the list of accepted secrets.
time.Sleep(time.Millisecond)
time.Sleep(10 * time.Millisecond)
m.Lock()
expectSecret = []string{"secret5"}
m.Unlock()

// Wait a bit then unregister.
time.Sleep(time.Millisecond)
time.Sleep(10 * time.Millisecond)
whl.Stop()

// Re-stop because it could happen.
Expand All @@ -214,16 +216,17 @@ func TestFailedHTTPCall(t *testing.T) {
defer server.Close()

// Create the listener.
whl, err := New(&webhook.Registration{
Address: server.URL,
Events: []string{
"foo",
},
Config: webhook.DeliveryConfig{
Secret: "secret1",
whl, err := New(
&webhook.Registration{
Events: []string{
"foo",
},
Config: webhook.DeliveryConfig{
Secret: "secret1",
},
Duration: webhook.CustomDuration(5 * time.Minute),
},
Duration: webhook.CustomDuration(5 * time.Minute),
},
server.URL,
Once(),
)

Expand All @@ -240,15 +243,17 @@ func TestFailedAuthCheck(t *testing.T) {
require := require.New(t)

// Create the listener.
whl, err := New(&webhook.Registration{
Events: []string{
"foo",
},
Config: webhook.DeliveryConfig{
Secret: "secret1",
whl, err := New(
&webhook.Registration{
Events: []string{
"foo",
},
Config: webhook.DeliveryConfig{
Secret: "secret1",
},
Duration: webhook.CustomDuration(5 * time.Minute),
},
Duration: webhook.CustomDuration(5 * time.Minute),
},
"http://example.com",
AuthBearerFunc(func() (string, error) {
return "", fmt.Errorf("nope")
}),
Expand All @@ -267,16 +272,18 @@ func TestFailedNewRequest(t *testing.T) {
require := require.New(t)

// Create the listener.
whl, err := New(&webhook.Registration{
Address: "//invalid::localhost/:99999",
Events: []string{
"foo",
},
Config: webhook.DeliveryConfig{
Secret: "secret1",
whl, err := New(
&webhook.Registration{
Events: []string{
"foo",
},
Config: webhook.DeliveryConfig{
Secret: "secret1",
},
Duration: webhook.CustomDuration(5 * time.Minute),
},
Duration: webhook.CustomDuration(5 * time.Minute),
})
"//invalid::localhost/:99999",
)

require.NotNil(whl)
require.NoError(err)
Expand All @@ -300,16 +307,17 @@ func TestFailedConnect(t *testing.T) {
defer server.Close()

// Create the listener.
whl, err := New(&webhook.Registration{
Address: server.URL,
Events: []string{
"foo",
},
Config: webhook.DeliveryConfig{
Secret: "secret1",
whl, err := New(
&webhook.Registration{
Events: []string{
"foo",
},
Config: webhook.DeliveryConfig{
Secret: "secret1",
},
Duration: webhook.CustomDuration(5 * time.Minute),
},
Duration: webhook.CustomDuration(5 * time.Minute),
},
server.URL,
HTTPClient(&http.Client{Timeout: 1 * time.Millisecond}),
Once(),
)
Expand All @@ -321,3 +329,57 @@ func TestFailedConnect(t *testing.T) {
err = whl.Register()
assert.ErrorIs(err, ErrRegistrationFailed)
}

func TestFailsAfterABit(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

var m sync.Mutex
var count int

server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
_, _ = io.ReadAll(r.Body)
r.Body.Close()

m.Lock()
if count == 0 {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusBadRequest)
}
count++
m.Unlock()
},
),
)
defer server.Close()

// Create the listener.
whl, err := New(
&webhook.Registration{
Events: []string{
"foo",
},
Config: webhook.DeliveryConfig{
Secret: "secret1",
},
Duration: webhook.CustomDuration(5 * time.Minute),
},
server.URL,
Interval(1*time.Millisecond),
AuthBasic("user", "pass"),
)
require.NotNil(whl)
require.NoError(err)

// Register the webhook before has started
err = whl.Register()
assert.NoError(err)

// Wait a bit then roll the secret..
time.Sleep(10 * time.Millisecond)

whl.Stop()
}
34 changes: 29 additions & 5 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
type Listener struct {
m sync.RWMutex
registration *webhook.Registration
webhookURL string
registrationOpts []webhook.Option
interval time.Duration
client *http.Client
Expand All @@ -57,9 +58,19 @@ type Option interface {
}

// New creates a new webhook listener with the given registration and options.
func New(r *webhook.Registration, opts ...Option) (*Listener, error) {
func New(r *webhook.Registration, url string, opts ...Option) (*Listener, error) {
if r == nil {
return nil, fmt.Errorf("%w: registration is required", ErrInput)
}

url = strings.TrimSpace(url)
if url == "" {
return nil, fmt.Errorf("%w: webhook url is required", ErrInput)
}

l := Listener{
registration: r,
webhookURL: url,
registrationOpts: make([]webhook.Option, 0),
logger: zap.NewNop(),
client: http.DefaultClient,
Expand Down Expand Up @@ -150,7 +161,14 @@ func (l *Listener) use(secret string) error {
}

Check warning on line 161 in listener.go

View check run for this annotation

Codecov / codecov/patch

listener.go#L160-L161

Added lines #L160 - L161 were not covered by tests

if l.update != nil {
l.update <- struct{}{}
go func() {
defer func() {
// If the update channel is closed, the listener is shutting down.
// Ignore the panic.
_ = recover()
}()
l.update <- struct{}{}
}()
}

return nil
Expand Down Expand Up @@ -183,6 +201,7 @@ func (l *Listener) run() {
select {
case <-l.ctx.Done():
l.m.Lock()
close(l.update)
l.update = nil
l.m.Unlock()
return
Expand Down Expand Up @@ -224,7 +243,7 @@ func (l *Listener) register(locked bool) error {
}

fn := l.getAuth
address := l.registration.Address
address := l.webhookURL
body := l.body

if !locked {
Expand Down Expand Up @@ -273,7 +292,7 @@ func (l *Listener) register(locked bool) error {

// Tokenize parses the token from the request header. If the token is not found
// or is invalid, an error is returned.
func (l *Listener) Tokenize(r *http.Request) (*Token, error) {
func (l *Listener) Tokenize(r *http.Request) (*token, error) {
headers := r.Header.Values(xmidtHeader)
if len(headers) != 0 {
l.metrics.TokenHeaderUsed.inc(xmidtHeader)
Expand Down Expand Up @@ -325,12 +344,17 @@ func (l *Listener) Tokenize(r *http.Request) (*Token, error) {
l.metrics.TokenAlgorithmUsed.inc(best)
l.metrics.TokenOutcome.incValid()

return NewToken(best, choices[best]), nil
return newToken(best, choices[best]), nil
}

// Authorize validates that the request body matches the hash and secret provided
// in the token.
func (l *Listener) Authorize(r *http.Request, t Token) error {
if t == nil {
l.metrics.Authorization.incInvalidToken()
return fmt.Errorf("%w: invalid token", ErrInput)
}

secret, err := hex.DecodeString(t.Principal())
if err != nil {
l.metrics.Authorization.incInvalidSignature()
Expand Down
Loading

0 comments on commit 96f92b7

Please sign in to comment.