Skip to content

Commit

Permalink
feat: add sequencer of mongo (mosn#376)
Browse files Browse the repository at this point in the history
  • Loading branch information
LXPWing authored and seeflood committed Feb 10, 2022
1 parent f014696 commit db597e5
Show file tree
Hide file tree
Showing 17 changed files with 829 additions and 35 deletions.
4 changes: 4 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ import (

// Sequencer
sequencer_etcd "mosn.io/layotto/components/sequencer/etcd"
sequencer_mongo "mosn.io/layotto/components/sequencer/mongo"
sequencer_redis "mosn.io/layotto/components/sequencer/redis"
sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper"

Expand Down Expand Up @@ -334,6 +335,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime_sequencer.NewFactory("zookeeper", func() sequencer.Store {
return sequencer_zookeeper.NewZookeeperSequencer(log.DefaultLogger)
}),
runtime_sequencer.NewFactory("mongo", func() sequencer.Store {
return sequencer_mongo.NewMongoSequencer(log.DefaultLogger)
}),
))
// 4. check if unhealthy
if err != nil {
Expand Down
17 changes: 2 additions & 15 deletions components/lock/mongo/mongo_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,12 @@ func (e *MongoLock) Init(metadata lock.Metadata) error {
return err
}

wc, err := utils.GetWriteConcernObject(e.metadata.WriteConcern)
// Connections Collection
e.collection, err = utils.SetCollection(e.client, e.factory, e.metadata)
if err != nil {
return err
}

rc, err := utils.GetReadConcrenObject(e.metadata.ReadConcern)
if err != nil {
return err
}

// set mongo options of collection
opts := options.Collection().SetWriteConcern(wc).SetReadConcern(rc)

// create database
database := client.Database(e.metadata.DatabaseName)

// create collection
e.collection = e.factory.NewMongoCollection(database, e.metadata.CollectionName, opts)

// create exprie time index
indexModel := mongo.IndexModel{
Keys: bsonx.Doc{{"Expire", bsonx.Int64(1)}},
Expand Down
14 changes: 7 additions & 7 deletions components/lock/mongo/mongo_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"mosn.io/layotto/components/lock"
"mosn.io/layotto/components/pkg/utils"
"mosn.io/layotto/components/pkg/mock"
"mosn.io/pkg/log"
"sync"
"testing"
Expand Down Expand Up @@ -71,9 +71,9 @@ func TestMongoLock_TryLock(t *testing.T) {
insertOneResult := &mongo.InsertOneResult{}
singleResult := &mongo.SingleResult{}
result := make(map[string]bson.M)
mockMongoClient := utils.MockMongoClient{}
mockMongoSession := utils.NewMockMongoSession()
mockMongoCollection := utils.MockMongoCollection{
mockMongoClient := mock.MockMongoClient{}
mockMongoSession := mock.NewMockMongoSession()
mockMongoCollection := mock.MockMongoCollection{
InsertManyResult: insertManyResult,
InsertOneResult: insertOneResult,
SingleResult: singleResult,
Expand Down Expand Up @@ -148,9 +148,9 @@ func TestMongoLock_Unlock(t *testing.T) {
insertOneResult := &mongo.InsertOneResult{}
singleResult := &mongo.SingleResult{}
result := make(map[string]bson.M)
mockMongoClient := utils.MockMongoClient{}
mockMongoSession := utils.NewMockMongoSession()
mockMongoCollection := utils.MockMongoCollection{
mockMongoClient := mock.MockMongoClient{}
mockMongoSession := mock.NewMockMongoSession()
mockMongoCollection := mock.MockMongoCollection{
InsertManyResult: insertManyResult,
InsertOneResult: insertOneResult,
SingleResult: singleResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
package mock

import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"mosn.io/layotto/components/pkg/utils"
)

type MockMongoFactory struct{}
Expand Down Expand Up @@ -57,14 +58,18 @@ func NewMockMongoSession() *MockMongoSession {
return &MockMongoSession{}
}

func (f *MockMongoFactory) NewMongoClient(m MongoMetadata) (MongoClient, error) {
func (f *MockMongoFactory) NewMongoClient(m utils.MongoMetadata) (utils.MongoClient, error) {
return &MockMongoClient{}, nil
}

func (f *MockMongoFactory) NewMongoCollection(m *mongo.Database, collectionName string, opts *options.CollectionOptions) MongoCollection {
func (f *MockMongoFactory) NewMongoCollection(m *mongo.Database, collectionName string, opts *options.CollectionOptions) utils.MongoCollection {
return &MockMongoCollection{}
}

func (f *MockMongoFactory) NewSingleResult(sr *mongo.SingleResult) utils.MongoSingleResult {
return nil
}

func (mc *MockMongoCollection) FindOne(ctx context.Context, filter interface{}, opts ...*options.FindOneOptions) *mongo.SingleResult {
result := mongo.SingleResult{}
return &result
Expand Down Expand Up @@ -105,6 +110,14 @@ func (mc *MockMongoCollection) Indexes() mongo.IndexView {
return mongo.IndexView{}
}

func (mc *MockMongoCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
return nil, nil
}

func (mc *MockMongoCollection) FindOneAndUpdate(ctx context.Context, filter interface{}, update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult {
return nil
}

func (c *MockMongoClient) StartSession(opts ...*options.SessionOptions) (mongo.Session, error) {
return &MockMongoSession{}, nil
}
Expand Down
203 changes: 203 additions & 0 deletions components/pkg/mock/mongo_sequencer_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
//
// Copyright 2021 Layotto Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mock

import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"mosn.io/layotto/components/pkg/utils"
"reflect"
"unsafe"
)

var Result = make(map[string]bson.M)
var id string

type MockMongoSequencerFactory struct{}

// MockMongoClient is a mock of MongoClient interface
type MockMongoSequencerClient struct{}

// MockMongoSession is a mock of MongoSession interface
type MockMongoSequencerSession struct {
mongo.SessionContext
}

// MockMongoCollection is a mock of MongoCollection interface
type MockMongoSequencerCollection struct {
// '_id' document
InsertOneResult *mongo.InsertOneResult
Result map[string]bson.M
}

type MockMongoSequencerSingleResult struct{}

func NewMockMongoSequencerFactory() *MockMongoSequencerFactory {
return &MockMongoSequencerFactory{}
}

func NewMockMongoSequencerSession() *MockMongoSequencerSession {
return &MockMongoSequencerSession{}
}

func (f *MockMongoSequencerFactory) NewSingleResult(sr *mongo.SingleResult) utils.MongoSingleResult {
return &MockMongoSequencerSingleResult{}
}

func (f *MockMongoSequencerFactory) NewMongoClient(m utils.MongoMetadata) (utils.MongoClient, error) {
return &MockMongoSequencerClient{}, nil
}

func (f *MockMongoSequencerFactory) NewMongoCollection(m *mongo.Database, collectionName string, opts *options.CollectionOptions) utils.MongoCollection {
return &MockMongoSequencerCollection{}
}

func (mc *MockMongoSequencerCollection) FindOne(ctx context.Context, filter interface{}, opts ...*options.FindOneOptions) *mongo.SingleResult {
result := new(mongo.SingleResult)
value := reflect.ValueOf(result)
doc := filter.(bson.M)
id = doc["_id"].(string)
if value.Kind() == reflect.Ptr {
elem := value.Elem()
err := elem.FieldByName("err")
*(*error)(unsafe.Pointer(err.Addr().Pointer())) = nil

cur := elem.FieldByName("cur")
*(**mongo.Cursor)(unsafe.Pointer(cur.Addr().Pointer())) = &mongo.Cursor{}

rdr := elem.FieldByName("rdr")
*(*bson.Raw)(unsafe.Pointer(rdr.Addr().Pointer())) = bson.Raw{}

reg := elem.FieldByName("reg")
*(**bsoncodec.Registry)(unsafe.Pointer(reg.Addr().Pointer())) = &bsoncodec.Registry{}
}
return result
}

func (mc *MockMongoSequencerCollection) InsertOne(ctx context.Context, document interface{}, opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error) {
doc := document.(bson.M)
value := doc["_id"].(string)
if _, ok := Result[value]; ok {
return nil, nil
} else {
// insert cache
Result[value] = doc
mc.InsertOneResult.InsertedID = value
return mc.InsertOneResult, nil
}
}

func (mc *MockMongoSequencerCollection) DeleteOne(ctx context.Context, filter interface{}, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) {
return nil, nil
}

func (mc *MockMongoSequencerCollection) Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error) {
cursor := &mongo.Cursor{}
return cursor, nil
}

func (mc *MockMongoSequencerCollection) Indexes() mongo.IndexView {
return mongo.IndexView{}
}

func (mc *MockMongoSequencerCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
doc := filter.(bson.M)
value := doc["_id"].(string)
if res, ok := Result[value]; ok {
Result[value] = bson.M{"_id": res["_id"], "sequencer_value": res["sequencer_value"].(int) + 1}
return nil, nil
}
return nil, nil
}

func (mc *MockMongoSequencerCollection) FindOneAndUpdate(ctx context.Context, filter interface{},
update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult {
doc := filter.(bson.M)
id = doc["_id"].(string)
upDoc := update.(bson.M)
value := doc["_id"].(string)
up := upDoc["$inc"].(bson.M)
num := up["sequencer_value"].(int)
if res, ok := Result[value]; ok {
Result[value] = bson.M{"_id": res["_id"], "sequencer_value": res["sequencer_value"].(int) + num}
return nil
} else {
bm := bson.M{"_id": doc["_id"], "sequencer_value": num}
mc.InsertOne(ctx, bm)
}
return nil
}

func (c *MockMongoSequencerClient) StartSession(opts ...*options.SessionOptions) (mongo.Session, error) {
return &MockMongoSequencerSession{}, nil
}

func (c *MockMongoSequencerClient) Ping(ctx context.Context, rp *readpref.ReadPref) error {
return nil
}

func (c *MockMongoSequencerClient) Database(name string, opts ...*options.DatabaseOptions) *mongo.Database {
return nil
}

func (c *MockMongoSequencerClient) Disconnect(ctx context.Context) error {
return nil
}

func (s *MockMongoSequencerSession) AbortTransaction(context.Context) error {
return nil
}

func (s *MockMongoSequencerSession) CommitTransaction(context.Context) error {
return nil
}

func (s *MockMongoSequencerSession) WithTransaction(ctx context.Context, fn func(sessCtx mongo.SessionContext) (interface{}, error),
opts ...*options.TransactionOptions) (interface{}, error) {
res, err := fn(s)
return res, err
}

func (s *MockMongoSequencerSession) EndSession(context.Context) {}

func (d *MockMongoSequencerSingleResult) Decode(v interface{}) error {
b := Result[id]
id := b["_id"].(string)
value := b["sequencer_value"].(int)
ref := reflect.ValueOf(v)
if ref.Kind() == reflect.Ptr {
elem := ref.Elem()
Id := elem.FieldByName("Id")
*(*string)(unsafe.Pointer(Id.Addr().Pointer())) = id

v := elem.FieldByName("Sequencer_value")
*(*int)(unsafe.Pointer(v.Addr().Pointer())) = value
}
return nil
}

func (d *MockMongoSequencerSingleResult) Err() error {
if Result[id] == nil {
return mongo.ErrNoDocuments
}
return nil
}

func (d *MockMongoSequencerSingleResult) DecodeBytes() (bson.Raw, error) {
return nil, nil
}
Loading

0 comments on commit db597e5

Please sign in to comment.