Skip to content

Commit

Permalink
implement watcher that gets trigged by db changes
Browse files Browse the repository at this point in the history
  • Loading branch information
victorguarana committed Apr 30, 2024
1 parent f15e6bb commit ef0232a
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 1 deletion.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ require (
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/r3labs/diff v1.1.0
github.com/shirou/gopsutil/v3 v3.24.3 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand All @@ -64,7 +65,7 @@ require (
go.opentelemetry.io/otel/metric v1.25.0 // indirect
go.opentelemetry.io/otel/trace v1.25.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f
golang.org/x/net v0.24.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU=
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/r3labs/diff v1.1.0 h1:V53xhrbTHrWFWq3gI4b94AjgEJOerO1+1l0xyHOBi8M=
github.com/r3labs/diff v1.1.0/go.mod h1:7WjXasNzi0vJetRcB/RqNl5dlIsmXcTTLmF5IoH6Xig=
github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg=
github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o=
github.com/shirou/gopsutil/v3 v3.24.3 h1:eoUGJSmdfLzJ3mxIhmOAhgKEKgQkeOwKpz1NbhVnuPE=
Expand All @@ -114,6 +116,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down Expand Up @@ -238,6 +241,7 @@ google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
57 changes: 57 additions & 0 deletions gomongo/changestream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package gomongo

import (
"context"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/exp/slices"
)

type namespace struct {
Database string `bson:"db"`
Collection string `bson:"coll"`
}

type updateDescription struct {
RemovedFields bson.A `bson:"removedFields"`
UpdatedFields bson.M `bson:"updatedFields"`
}

type event struct {
NS namespace `bson:"ns"`
ClusterTime time.Time `bson:"clusterTime"`
FullDocument bson.M `bson:"fullDocument"`
DocumentKey bson.M `bson:"documentKey"`
UpdateDescription updateDescription `bson:"updateDescription"`
OperationType string `bson:"operationType"`
}

func watch(ctx context.Context, mongoDatabase *mongo.Database, handleEvent func(ctx context.Context, e event) error, collectionNamesToWatch []string) error {
cs, err := mongoDatabase.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
if err != nil {
return err
}

defer cs.Close(ctx)
for cs.Next(ctx) {
var e event
if err := cs.Decode(&e); err != nil {
return err
}

if collectionBellongToWatch(e.NS.Collection, collectionNamesToWatch) {
err := handleEvent(ctx, e)
if err != nil {
return err
}
}
}
return nil
}

func collectionBellongToWatch(collectionName string, collections []string) bool {
return slices.Contains(collections, collectionName)
}
32 changes: 32 additions & 0 deletions gomongo/history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package gomongo

import (
"context"
"time"
)

type History interface {
All(ctx context.Context) ([]Document, error)
Count(ctx context.Context) (int, error)
First(ctx context.Context) (Document, error)
Last(ctx context.Context) (Document, error)
Where(ctx context.Context, filter any) ([]Document, error)

Drop(ctx context.Context) error

Name() string
}

type Document struct {
CreatedAt time.Time
CollectionName string
ObjectID ID
Modified map[string]any
UpdatedFields map[string]UpdatedField
Action string
}

type UpdatedField struct {
Old any
New any
}
99 changes: 99 additions & 0 deletions gomongo/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package gomongo

import (
"context"
"errors"
"fmt"
"strings"

"github.com/r3labs/diff"
)

type Watcher interface {
Watch(ctx context.Context, collections ...string) error
}

type watcher struct {
database *Database
historyCollection *collection[Document]
}

func NewWatcher(database *Database, historyCollectionName string) (Watcher, History, error) {
if err := validateDatabase(database); err != nil {
return nil, nil, err
}

w := watcher{
database: database,
historyCollection: &collection[Document]{database.mongoDatabase.Collection(historyCollectionName)},
}

return w, w.historyCollection, nil
}

func (w watcher) Watch(ctx context.Context, collectionNamesToWatch ...string) error {
err := watch(ctx, w.database.mongoDatabase, w.handleEvents, collectionNamesToWatch)
if err != nil {
return err
}

return nil
}

func (w watcher) handleEvents(ctx context.Context, e event) error {
id, ok := e.DocumentKey["_id"].(ID)
if !ok {
return fmt.Errorf("could not get id from event document")
}

last, err := lastInsertedByObjectID(ctx, w.historyCollection, id)
if err != nil && err != ErrDocumentNotFound {
return fmt.Errorf("failed to get last entry")
}

updatedFields, err := updatedFields(last.Modified, e.FullDocument)
if err != nil {
return err
}

doc := Document{
CreatedAt: e.ClusterTime,
CollectionName: e.NS.Collection,
ObjectID: id,
Modified: e.FullDocument,
Action: e.OperationType,
UpdatedFields: updatedFields,
}

_, err = w.historyCollection.Create(ctx, doc)
if err != nil && !errors.Is(err, ErrDuplicateKey) {
return err
}

return nil
}

func lastInsertedByObjectID[T any](ctx context.Context, c *collection[T], objectID ID) (T, error) {
filter := map[string]any{"objectid": objectID}
return c.LastInserted(ctx, filter)
}

func updatedFields(last any, history any) (map[string]UpdatedField, error) {
changes, err := diff.Diff(last, history)
if err != nil {
return nil, fmt.Errorf("failed to get diff between entries")
}

updatedFields := make(map[string]UpdatedField, len(changes))
for _, change := range changes {
field := strings.Join(change.Path, ".")
if field != "_id" {
updatedFields[field] = UpdatedField{
Old: change.From,
New: change.To,
}
}
}

return updatedFields, nil
}

0 comments on commit ef0232a

Please sign in to comment.