Skip to content

Commit

Permalink
improve error handling (#805)
Browse files Browse the repository at this point in the history
  • Loading branch information
FoseFx authored Sep 8, 2024
1 parent e88a2b3 commit 9bbe537
Show file tree
Hide file tree
Showing 16 changed files with 71 additions and 93 deletions.
7 changes: 4 additions & 3 deletions libs/common/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package common
import (
"context"
"errors"
"fmt"
"github.com/coreos/go-oidc"
"github.com/google/uuid"
zlog "github.com/rs/zerolog/log"
Expand Down Expand Up @@ -131,18 +132,18 @@ func VerifyIDToken(ctx context.Context, token string) (*IDTokenClaims, error) {
// and still exposes .Claims() for us to access non-standard ID token claims
idToken, err := getIDTokenVerifier(ctx).Verify(context.Background(), token)
if err != nil {
return nil, err
return nil, fmt.Errorf("getIDTokenVerifier: verify failed: %w", err)
}

// now get the claims
claims := IDTokenClaims{}
if err = idToken.Claims(&claims); err != nil {
return nil, err
return nil, fmt.Errorf("getIDTokenVerifier: could not get claims: %w", err)
}

// and check that they are in the expected format
if err = claims.AsExpected(); err != nil {
return nil, err
return nil, fmt.Errorf("getIDTokenVerifier: claims are not as expected: %w", err)
}

return &claims, nil
Expand Down
5 changes: 3 additions & 2 deletions libs/common/dapr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"context"
"fmt"
daprc "github.com/dapr/go-sdk/client"
daprcmn "github.com/dapr/go-sdk/service/common"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -31,7 +32,7 @@ func PublishMessage(ctx context.Context, client daprc.Client, pubsub string, top
Error().
Err(err).
Msgf("could not marshal message for topic %s", topic)
return err
return fmt.Errorf("PublishMessage: could not marshal message: %w", err)
}

err = client.PublishEvent(ctx, pubsub, topic, bytes)
Expand All @@ -40,7 +41,7 @@ func PublishMessage(ctx context.Context, client daprc.Client, pubsub string, top
Error().
Err(err).
Msgf("could not publish message for topic %s", topic)
return err
return fmt.Errorf("PublishMessage: could not publish message: %w", err)
}
return nil
}
Expand Down
15 changes: 8 additions & 7 deletions libs/common/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/base64"
"errors"
"fmt"
"github.com/dapr/dapr/pkg/proto/runtime/v1"
daprd "github.com/dapr/go-sdk/service/grpc"
"github.com/go-playground/validator/v10"
Expand Down Expand Up @@ -136,12 +137,12 @@ func authUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.Unary

ctx, err := authFunc(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("authUnaryInterceptor: authFn failed: %w", err)
}

ctx, err = handleOrganizationIDForAuthFunc(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("authUnaryInterceptor: cant handle organization: %w", err)
}

return next(ctx, req)
Expand Down Expand Up @@ -249,7 +250,7 @@ func handleOrganizationIDForAuthFunc(ctx context.Context) (context.Context, erro

claims, err := GetAuthClaims(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("handleOrganizationIDForAuthFunc: could not get auth claims: %w", err)
}

// If InsecureFakeTokenEnable is true,
Expand All @@ -273,16 +274,16 @@ func VerifyFakeToken(ctx context.Context, token string) (*IDTokenClaims, error)

plainToken, err := base64.StdEncoding.DecodeString(token)
if err != nil {
return nil, err
return nil, fmt.Errorf("VerifyFakeToken: cant decode fake token: %w", err)
}

claims := IDTokenClaims{}
if err := hwutil.ParseValidJson(plainToken, &claims); err != nil {
return nil, err
return nil, fmt.Errorf("VerifyFakeToken: cant parse json: %w", err)
}

if err = claims.AsExpected(); err != nil {
return nil, err
return nil, fmt.Errorf("VerifyFakeToken: claims not as expected: %w", err)
}

log.Warn().Interface("claims", claims).Msg("fake token was verified")
Expand Down Expand Up @@ -449,7 +450,7 @@ func errorQualityControlInterceptor(ctx context.Context, req interface{}, info *
log.Error().
Err(err2).
Msg("there was an error while creating the generic fallback statusError")
return res, err
return res, err // respond with original error
}
}

Expand Down
2 changes: 1 addition & 1 deletion libs/common/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func setupOTelSDK(ctx context.Context, serviceName, serviceVersion string) (shut
}
}
shutdownFuncs = nil
return err
return fmt.Errorf("setupOTelSDK: shutdown error: %w", err)
}

// in dev environments we might not need a tracing setup, so we allow to skip its setup
Expand Down
2 changes: 1 addition & 1 deletion libs/hwauthz/spicedb/spicedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *SpiceDBAuthZ) Write(ctx context.Context, writes []hwauthz.Relationship,

res, err := s.client.WriteRelationships(ctx, req)
if err != nil {
return "", err
return "", fmt.Errorf("SpiceDBAuthZ.Write: write relationship failed: %w", err)
}

return res.WrittenAt.Token, nil
Expand Down
4 changes: 2 additions & 2 deletions libs/hwdb/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func openDatabasePool(ctx context.Context, dsn string) (*pgxpool.Pool, error) {
pgxConfig, err := pgxpool.ParseConfig(dsn)
if err != nil {
log.Error().Err(err).Msg("Unable to parse database config")
return nil, err
return nil, fmt.Errorf("openDatabasePool: could not parse dsn: %w", err)
}

// google's uuid <-> pgtype.uuid interop
Expand All @@ -101,7 +101,7 @@ func openDatabasePool(ctx context.Context, dsn string) (*pgxpool.Pool, error) {
dbpool, err := pgxpool.NewWithConfig(ctx, pgxConfig)
if err != nil {
log.Error().Err(err).Msg("Unable to create connection pool")
return nil, err
return nil, fmt.Errorf("openDatabasePool: unable to create connection pool: %w", err)
}
return dbpool, nil
}
Expand Down
6 changes: 3 additions & 3 deletions libs/hwes/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (a *AggregateBase) Load(events []Event) error {
}

if err := a.HandleEvent(event); err != nil {
return err
return fmt.Errorf("AggregateBase.Load: event handler failed: %w", err)
}

a.appliedEvents = append(a.appliedEvents, event)
Expand All @@ -169,7 +169,7 @@ func (a *AggregateBase) Apply(event Event) error {
}

if err := a.HandleEvent(event); err != nil {
return err
return fmt.Errorf("AggregateBase.Apply: event handler failed: %w", err)
}

a.version++
Expand All @@ -191,7 +191,7 @@ func (a *AggregateBase) Progress(event Event) error {
}

if err := a.HandleEvent(event); err != nil {
return err
return fmt.Errorf("AggregateBase.Progress: event handler failed: %w", err)
}

a.version = event.GetVersion()
Expand Down
15 changes: 7 additions & 8 deletions libs/hwes/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,17 @@ func resolveAggregateIDAndTypeFromStreamID(streamID string) (aggregateID uuid.UU
func NewEventFromRecordedEvent(esdbEvent *esdb.RecordedEvent) (Event, error) {
id, err := uuid.Parse(esdbEvent.EventID.String())
if err != nil {
return Event{}, err
return Event{}, fmt.Errorf("NewEventFromRecordedEvent: event id is not a uuid: %w", err)
}

aggregateID, aggregateType, err := resolveAggregateIDAndTypeFromStreamID(esdbEvent.StreamID)
if err != nil {
return Event{}, err
return Event{}, fmt.Errorf("NewEventFromRecordedEvent: could not resove AggregateID and type: %w", err)
}

md := metadata{}
if err := json.Unmarshal(esdbEvent.UserMetadata, &md); err != nil {
return Event{}, err
return Event{}, fmt.Errorf("NewEventFromRecordedEvent: UserMetadata is not json: %w", err)
}

event := Event{
Expand Down Expand Up @@ -186,7 +186,7 @@ func (e *Event) ToEventData() (esdb.EventData, error) {

mdBytes, err := json.Marshal(md)
if err != nil {
return esdb.EventData{}, err
return esdb.EventData{}, fmt.Errorf("ToEventData: failed to encode md as json: %w", err)
}

return esdb.EventData{
Expand All @@ -213,16 +213,15 @@ func (e *Event) SetJsonData(data interface{}) error {
}

if err != nil {
return err
return fmt.Errorf("SetJsonData: %w", err)
}
e.Data = dataBytes
return nil
}

func (e *Event) GetJsonData(data interface{}) error {
if jsonable, ok := data.(hwutil.JSONAble); ok {
err := jsonable.FromJSON(e.Data)
return err
return jsonable.FromJSON(e.Data)
}
return json.Unmarshal(e.Data, data)
}
Expand All @@ -243,7 +242,7 @@ func (e *Event) SetCommitterFromCtx(ctx context.Context) error {

// Just to make sure we are actually dealing with a valid UUID
if _, err := uuid.Parse(e.CommitterUserID.String()); err != nil {
return err
return fmt.Errorf("SetCommitterFromCtx: cant parse comitter uid: %w", err)
}

telemetry.SetSpanStr(ctx, "committerUserID", e.CommitterUserID.String())
Expand Down
52 changes: 13 additions & 39 deletions libs/hwes/eventstoredb/aggregate_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventstoredb
import (
"context"
"errors"
"fmt"
"github.com/EventStore/EventStore-Client-Go/v4/esdb"
zlog "github.com/rs/zerolog/log"
"hwes"
Expand All @@ -23,33 +24,6 @@ func NewAggregateStore(es *esdb.Client) *AggregateStore {
return &AggregateStore{es: es}
}

// getExpectedRevisionByReadTEST implements a strategy for our getExpectedRevision strategy pattern.
// This function resolves the version by returning the version of the last event in
// the event stream of EventStore of our aggregate.
// NOT FOR PRODUCTION
//
// nolint:unused
func (a *AggregateStore) getExpectedRevisionByReadTEST(ctx context.Context, aggregate hwes.Aggregate) (esdb.ExpectedRevision, error) {
readOpts := esdb.ReadStreamOptions{Direction: esdb.Backwards, From: esdb.End{}}
stream, err := a.es.ReadStream(
ctx,
aggregate.GetTypeID(),
readOpts,
1,
)
if err != nil {
return nil, err
}
defer stream.Close()

lastEvent, err := stream.Recv()
if err != nil {
return nil, err
}

return esdb.Revision(lastEvent.OriginalEvent().EventNumber), nil
}

// getExpectedRevisionByPreviousRead implements a strategy for our getExpectedRevision strategy pattern.
// This function resolves the version by returning the version of the last applied event of our aggregate.
func (a *AggregateStore) getExpectedRevisionByPreviousRead(ctx context.Context, aggregate hwes.Aggregate) (esdb.ExpectedRevision, error) {
Expand All @@ -70,7 +44,7 @@ func (a *AggregateStore) doSave(ctx context.Context, aggregate hwes.Aggregate, g
return event.ToEventData()
})
if err != nil {
return err
return fmt.Errorf("AggregateStore.doSave: could not convert one uncomitted event to event data: %w", err)
}

// If AppliedEvents are empty, we imply that this entity was not loaded from an event store and therefore non-existing.
Expand All @@ -85,7 +59,7 @@ func (a *AggregateStore) doSave(ctx context.Context, aggregate hwes.Aggregate, g
eventsData...,
)
if err != nil {
return err
return fmt.Errorf("AggregateStore.doSave: could not append event to stream: %w", err)
}

return nil
Expand All @@ -94,7 +68,7 @@ func (a *AggregateStore) doSave(ctx context.Context, aggregate hwes.Aggregate, g
// We resolve the expectedRevision by the passed strategy of the caller
expectedRevision, err := getExpectedRevision(ctx, aggregate)
if err != nil {
return err
return fmt.Errorf("AggregateStore.doSave: could not resolve expected revision: %w", err)
}

appendOpts := esdb.AppendToStreamOptions{ExpectedRevision: expectedRevision}
Expand All @@ -105,7 +79,7 @@ func (a *AggregateStore) doSave(ctx context.Context, aggregate hwes.Aggregate, g
eventsData...,
)
if err != nil {
return err
return fmt.Errorf("AggregateStore.doSave: could not append event to stream: %w", err)
}

aggregate.ClearUncommittedEvents()
Expand All @@ -117,7 +91,7 @@ func (a *AggregateStore) doSave(ctx context.Context, aggregate hwes.Aggregate, g
func (a *AggregateStore) Load(ctx context.Context, aggregate hwes.Aggregate) error {
stream, err := a.es.ReadStream(ctx, aggregate.GetTypeID(), esdb.ReadStreamOptions{}, math.MaxUint64) // MaxUint64 for "all" events
if err != nil {
return err
return fmt.Errorf("AggregateStore.Load: could not open stream: %w", err)
}
defer stream.Close()

Expand All @@ -127,16 +101,16 @@ func (a *AggregateStore) Load(ctx context.Context, aggregate hwes.Aggregate) err
// exit condition for for-loop
break
} else if err != nil {
return err
return fmt.Errorf("AggregateStore.Load: could not read from stream: %w", err)
}

event, err := hwes.NewEventFromRecordedEvent(esdbEvent.Event)
if err != nil {
return err
return fmt.Errorf("AggregateStore.Load: %w", err)
}

if err := aggregate.Progress(event); err != nil {
return err
return fmt.Errorf("AggregateStore.Load: Progress failed: %w", err)
}
}

Expand All @@ -159,12 +133,12 @@ func (a *AggregateStore) Exists(ctx context.Context, aggregate hwes.Aggregate) (
var esErr *esdb.Error
if !errors.As(err, &esErr) {
log.Warn().Err(err).Msg("non esdb.Error returned")
return false, err
return false, fmt.Errorf("AggregateStore.Exists: ReadStream failed with unexpected error type: %w", err)
}
if esErr.IsErrorCode(esdb.ErrorCodeResourceNotFound) {
return false, nil
} else {
return false, err
return false, fmt.Errorf("AggregateStore.Exists: ReadStream failed: %w", err)
}
}
defer stream.Close()
Expand All @@ -174,12 +148,12 @@ func (a *AggregateStore) Exists(ctx context.Context, aggregate hwes.Aggregate) (
var esErr *esdb.Error
if !errors.As(err, &esErr) {
log.Warn().Err(err).Msg("non esdb.Error returned")
return false, err
return false, fmt.Errorf("AggregateStore.Exists: Recv failed with unexpected error type: %w", err)
}
if esErr.IsErrorCode(esdb.ErrorCodeResourceNotFound) {
return false, nil
} else {
return false, err
return false, fmt.Errorf("AggregateStore.Exists: Recv failed: %w", err)
}
}

Expand Down
Loading

0 comments on commit 9bbe537

Please sign in to comment.