Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New coordinator API - AclEventLog #83

Merged
merged 6 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions acleventlog/acleventlog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
//go:generate mockgen -destination mock_eventlog/mock_eventlog.go github.com/anyproto/any-sync-coordinator/acleventlog AclEventLog
package acleventlog

import (
"context"
"errors"

"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/anyproto/any-sync-coordinator/db"
)

const CName = "coordinator.aclEventLog"

var log = logger.NewNamed(CName)

const (
collName = "aclEventLog"
defaultLimit = 1000
)

var (
ErrNoIdentity = errors.New("no identity")
)

func New() AclEventLog {
return new(aclEventLog)
}

type EventLogEntryType uint8

const (
EntryTypeSpaceReceipt EventLogEntryType = 0
EntryTypeSpaceShared EventLogEntryType = 1
EntryTypeSpaceUnshared EventLogEntryType = 2
EntryTypeSpaceAclAddRecord EventLogEntryType = 3
)

type AclEventLogEntry struct {
Id *primitive.ObjectID `bson:"_id,omitempty"`
SpaceId string `bson:"spaceId"`
PeerId string `bson:"peerId"`
Identity string `bson:"identity"`
Timestamp int64 `bson:"timestamp"`

EntryType EventLogEntryType `bson:"entryType"`
// only for EntryTypeSpaceAclAddRecord
AclChangeId string `bson:"aclChangeId"`
}

type findIdGt struct {
Identity string `bson:"identity"`

Id struct {
Gt primitive.ObjectID `bson:"$gt"`
} `bson:"_id"`
}

type findIdentity struct {
Identity string `bson:"identity"`
}

var sortById = bson.D{{"_id", 1}}

type AclEventLog interface {
AddLog(ctx context.Context, event AclEventLogEntry) (err error)
GetAfter(ctx context.Context, identity, afterId string, limit uint32) (records []AclEventLogEntry, hasMore bool, err error)

app.ComponentRunnable
}

type aclEventLog struct {
coll *mongo.Collection
}

func (d *aclEventLog) Init(a *app.App) (err error) {
d.coll = a.MustComponent(db.CName).(db.Database).Db().Collection(collName)
return
}

func (d *aclEventLog) Name() (name string) {
return CName
}

func (d *aclEventLog) Run(ctx context.Context) error {
// create collection if doesn't exist
_ = d.coll.Database().CreateCollection(ctx, collName)
return nil
}

func (d *aclEventLog) Close(_ context.Context) (err error) {
return nil
}

func (d *aclEventLog) GetAfter(ctx context.Context, identity string, afterId string, limit uint32) (records []AclEventLogEntry, hasMore bool, err error) {
// if no identity provided, return error
if identity == "" {
err = ErrNoIdentity
return
}

if limit == 0 || limit > defaultLimit {
limit = defaultLimit
}
// fetch one more item to detect a hasMore
limit += 1

var q any

if afterId != "" {
var qGt findIdGt
if qGt.Id.Gt, err = primitive.ObjectIDFromHex(afterId); err != nil {
return
}
qGt.Identity = identity

q = qGt
} else {
var qId findIdentity
qId.Identity = identity

q = qId
}

it, err := d.coll.Find(ctx, q, options.Find().SetSort(sortById).SetLimit(int64(limit)))
if err != nil {
return
}
defer func() {
_ = it.Close(ctx)
}()
records = make([]AclEventLogEntry, 0, limit)
for it.Next(ctx) {
var rec AclEventLogEntry
if err = it.Decode(&rec); err != nil {
return
}
records = append(records, rec)
}
if len(records) == int(limit) {
records = records[:len(records)-1]
hasMore = true
}
return
}

func (d *aclEventLog) AddLog(ctx context.Context, event AclEventLogEntry) (err error) {
_, err = d.coll.InsertOne(ctx, event)
if err != nil {
return
}
return nil
}
128 changes: 128 additions & 0 deletions acleventlog/acleventlog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package acleventlog

import (
"context"
"testing"
"time"

"github.com/anyproto/any-sync/app"
"github.com/stretchr/testify/require"
"github.com/zeebo/assert"
"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/anyproto/any-sync-coordinator/db"
)

var ctx = context.Background()

func TestEventLog_Add(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)

id := primitive.NewObjectID()

err := fx.AddLog(ctx, AclEventLogEntry{
Id: &id,
SpaceId: "space1",
PeerId: "peer1",
Identity: "identity1",
Timestamp: time.Now().Unix(),
EntryType: EntryTypeSpaceReceipt,
})

require.NoError(t, err)
}

func TestEventLog_GetAfter(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)

for i := 0; i < 10; i++ {
id := primitive.NewObjectID()

err := fx.AddLog(ctx, AclEventLogEntry{
Id: &id,
SpaceId: "space1",
PeerId: "peerA",
Identity: "identity1",
Timestamp: time.Now().Unix(),
EntryType: EntryTypeSpaceReceipt,
})

require.NoError(t, err)
}

t.Run("no identity", func(t *testing.T) {
// should return error
_, _, err := fx.GetAfter(ctx, "", "", 0)
require.Equal(t, ErrNoIdentity, err)
})

t.Run("success", func(t *testing.T) {
res, hasMore, err := fx.GetAfter(ctx, "identity1", "", 0)

require.NoError(t, err)
require.Len(t, res, 10)
assert.False(t, hasMore)
})

t.Run("hasMore for last item", func(t *testing.T) {
res, hasMore, err := fx.GetAfter(ctx, "identity1", "", 9)
require.NoError(t, err)
require.Len(t, res, 9)
assert.True(t, hasMore)
})

t.Run("afterId", func(t *testing.T) {
res, hasMore, err := fx.GetAfter(ctx, "identity1", "", 5)
require.NoError(t, err)
require.Len(t, res, 5)
assert.True(t, hasMore)
lastId := res[4].Id.Hex()

res2, hasMore2, err2 := fx.GetAfter(ctx, "identity1", lastId, 0)
require.NoError(t, err2)
require.Len(t, res2, 5)
assert.False(t, hasMore2)
})
}

func newFixture(t *testing.T) *fixture {
fx := &fixture{
AclEventLog: New(),
db: db.New(),
a: new(app.App),
}

fx.a.Register(config{}).Register(fx.db).Register(fx.AclEventLog)

require.NoError(t, fx.a.Start(ctx))
_ = fx.db.Db().Collection(collName).Drop(ctx)

time.Sleep(time.Second / 2)
return fx
}

type fixture struct {
AclEventLog

a *app.App
db db.Database
}

func (fx *fixture) finish(t *testing.T) {
require.NoError(t, fx.a.Close(ctx))
}

type config struct {
}

func (c config) Init(a *app.App) (err error) { return }
func (c config) Name() string { return "config" }

func (c config) GetMongo() db.Mongo {
return db.Mongo{
Connect: "mongodb://localhost:27017",
Database: "coordinator_unittest_eventlog",
}
}
Loading
Loading