diff --git a/cmd/layotto/main.go b/cmd/layotto/main.go index 4282946e43..1ea8f2f805 100644 --- a/cmd/layotto/main.go +++ b/cmd/layotto/main.go @@ -106,6 +106,7 @@ import ( // Sequencer sequencer_etcd "mosn.io/layotto/components/sequencer/etcd" + sequencer_mongo "mosn.io/layotto/components/sequencer/mongo" sequencer_redis "mosn.io/layotto/components/sequencer/redis" sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper" @@ -334,6 +335,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp runtime_sequencer.NewFactory("zookeeper", func() sequencer.Store { return sequencer_zookeeper.NewZookeeperSequencer(log.DefaultLogger) }), + runtime_sequencer.NewFactory("mongo", func() sequencer.Store { + return sequencer_mongo.NewMongoSequencer(log.DefaultLogger) + }), )) // 4. check if unhealthy if err != nil { diff --git a/components/lock/mongo/mongo_lock.go b/components/lock/mongo/mongo_lock.go index 4baa7f4208..b315b62618 100644 --- a/components/lock/mongo/mongo_lock.go +++ b/components/lock/mongo/mongo_lock.go @@ -84,25 +84,12 @@ func (e *MongoLock) Init(metadata lock.Metadata) error { return err } - wc, err := utils.GetWriteConcernObject(e.metadata.WriteConcern) + // Connections Collection + e.collection, err = utils.SetCollection(e.client, e.factory, e.metadata) if err != nil { return err } - rc, err := utils.GetReadConcrenObject(e.metadata.ReadConcern) - if err != nil { - return err - } - - // set mongo options of collection - opts := options.Collection().SetWriteConcern(wc).SetReadConcern(rc) - - // create database - database := client.Database(e.metadata.DatabaseName) - - // create collection - e.collection = e.factory.NewMongoCollection(database, e.metadata.CollectionName, opts) - // create exprie time index indexModel := mongo.IndexModel{ Keys: bsonx.Doc{{"Expire", bsonx.Int64(1)}}, diff --git a/components/lock/mongo/mongo_lock_test.go b/components/lock/mongo/mongo_lock_test.go index ca806a8ab5..c9c9b44e1d 100644 --- a/components/lock/mongo/mongo_lock_test.go +++ b/components/lock/mongo/mongo_lock_test.go @@ -19,7 +19,7 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "mosn.io/layotto/components/lock" - "mosn.io/layotto/components/pkg/utils" + "mosn.io/layotto/components/pkg/mock" "mosn.io/pkg/log" "sync" "testing" @@ -71,9 +71,9 @@ func TestMongoLock_TryLock(t *testing.T) { insertOneResult := &mongo.InsertOneResult{} singleResult := &mongo.SingleResult{} result := make(map[string]bson.M) - mockMongoClient := utils.MockMongoClient{} - mockMongoSession := utils.NewMockMongoSession() - mockMongoCollection := utils.MockMongoCollection{ + mockMongoClient := mock.MockMongoClient{} + mockMongoSession := mock.NewMockMongoSession() + mockMongoCollection := mock.MockMongoCollection{ InsertManyResult: insertManyResult, InsertOneResult: insertOneResult, SingleResult: singleResult, @@ -148,9 +148,9 @@ func TestMongoLock_Unlock(t *testing.T) { insertOneResult := &mongo.InsertOneResult{} singleResult := &mongo.SingleResult{} result := make(map[string]bson.M) - mockMongoClient := utils.MockMongoClient{} - mockMongoSession := utils.NewMockMongoSession() - mockMongoCollection := utils.MockMongoCollection{ + mockMongoClient := mock.MockMongoClient{} + mockMongoSession := mock.NewMockMongoSession() + mockMongoCollection := mock.MockMongoCollection{ InsertManyResult: insertManyResult, InsertOneResult: insertOneResult, SingleResult: singleResult, diff --git a/components/pkg/utils/mongo_lock_mock.go b/components/pkg/mock/mongo_lock_mock.go similarity index 84% rename from components/pkg/utils/mongo_lock_mock.go rename to components/pkg/mock/mongo_lock_mock.go index 6e8ad842c2..cd8ad87186 100644 --- a/components/pkg/utils/mongo_lock_mock.go +++ b/components/pkg/mock/mongo_lock_mock.go @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package utils +package mock import ( "context" @@ -19,6 +19,7 @@ import ( "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" + "mosn.io/layotto/components/pkg/utils" ) type MockMongoFactory struct{} @@ -57,14 +58,18 @@ func NewMockMongoSession() *MockMongoSession { return &MockMongoSession{} } -func (f *MockMongoFactory) NewMongoClient(m MongoMetadata) (MongoClient, error) { +func (f *MockMongoFactory) NewMongoClient(m utils.MongoMetadata) (utils.MongoClient, error) { return &MockMongoClient{}, nil } -func (f *MockMongoFactory) NewMongoCollection(m *mongo.Database, collectionName string, opts *options.CollectionOptions) MongoCollection { +func (f *MockMongoFactory) NewMongoCollection(m *mongo.Database, collectionName string, opts *options.CollectionOptions) utils.MongoCollection { return &MockMongoCollection{} } +func (f *MockMongoFactory) NewSingleResult(sr *mongo.SingleResult) utils.MongoSingleResult { + return nil +} + func (mc *MockMongoCollection) FindOne(ctx context.Context, filter interface{}, opts ...*options.FindOneOptions) *mongo.SingleResult { result := mongo.SingleResult{} return &result @@ -105,6 +110,14 @@ func (mc *MockMongoCollection) Indexes() mongo.IndexView { return mongo.IndexView{} } +func (mc *MockMongoCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) { + return nil, nil +} + +func (mc *MockMongoCollection) FindOneAndUpdate(ctx context.Context, filter interface{}, update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult { + return nil +} + func (c *MockMongoClient) StartSession(opts ...*options.SessionOptions) (mongo.Session, error) { return &MockMongoSession{}, nil } diff --git a/components/pkg/mock/mongo_sequencer_mock.go b/components/pkg/mock/mongo_sequencer_mock.go new file mode 100644 index 0000000000..55e3151bbc --- /dev/null +++ b/components/pkg/mock/mongo_sequencer_mock.go @@ -0,0 +1,203 @@ +// +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package mock + +import ( + "context" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/bsoncodec" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" + "mosn.io/layotto/components/pkg/utils" + "reflect" + "unsafe" +) + +var Result = make(map[string]bson.M) +var id string + +type MockMongoSequencerFactory struct{} + +// MockMongoClient is a mock of MongoClient interface +type MockMongoSequencerClient struct{} + +// MockMongoSession is a mock of MongoSession interface +type MockMongoSequencerSession struct { + mongo.SessionContext +} + +// MockMongoCollection is a mock of MongoCollection interface +type MockMongoSequencerCollection struct { + // '_id' document + InsertOneResult *mongo.InsertOneResult + Result map[string]bson.M +} + +type MockMongoSequencerSingleResult struct{} + +func NewMockMongoSequencerFactory() *MockMongoSequencerFactory { + return &MockMongoSequencerFactory{} +} + +func NewMockMongoSequencerSession() *MockMongoSequencerSession { + return &MockMongoSequencerSession{} +} + +func (f *MockMongoSequencerFactory) NewSingleResult(sr *mongo.SingleResult) utils.MongoSingleResult { + return &MockMongoSequencerSingleResult{} +} + +func (f *MockMongoSequencerFactory) NewMongoClient(m utils.MongoMetadata) (utils.MongoClient, error) { + return &MockMongoSequencerClient{}, nil +} + +func (f *MockMongoSequencerFactory) NewMongoCollection(m *mongo.Database, collectionName string, opts *options.CollectionOptions) utils.MongoCollection { + return &MockMongoSequencerCollection{} +} + +func (mc *MockMongoSequencerCollection) FindOne(ctx context.Context, filter interface{}, opts ...*options.FindOneOptions) *mongo.SingleResult { + result := new(mongo.SingleResult) + value := reflect.ValueOf(result) + doc := filter.(bson.M) + id = doc["_id"].(string) + if value.Kind() == reflect.Ptr { + elem := value.Elem() + err := elem.FieldByName("err") + *(*error)(unsafe.Pointer(err.Addr().Pointer())) = nil + + cur := elem.FieldByName("cur") + *(**mongo.Cursor)(unsafe.Pointer(cur.Addr().Pointer())) = &mongo.Cursor{} + + rdr := elem.FieldByName("rdr") + *(*bson.Raw)(unsafe.Pointer(rdr.Addr().Pointer())) = bson.Raw{} + + reg := elem.FieldByName("reg") + *(**bsoncodec.Registry)(unsafe.Pointer(reg.Addr().Pointer())) = &bsoncodec.Registry{} + } + return result +} + +func (mc *MockMongoSequencerCollection) InsertOne(ctx context.Context, document interface{}, opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error) { + doc := document.(bson.M) + value := doc["_id"].(string) + if _, ok := Result[value]; ok { + return nil, nil + } else { + // insert cache + Result[value] = doc + mc.InsertOneResult.InsertedID = value + return mc.InsertOneResult, nil + } +} + +func (mc *MockMongoSequencerCollection) DeleteOne(ctx context.Context, filter interface{}, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) { + return nil, nil +} + +func (mc *MockMongoSequencerCollection) Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error) { + cursor := &mongo.Cursor{} + return cursor, nil +} + +func (mc *MockMongoSequencerCollection) Indexes() mongo.IndexView { + return mongo.IndexView{} +} + +func (mc *MockMongoSequencerCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) { + doc := filter.(bson.M) + value := doc["_id"].(string) + if res, ok := Result[value]; ok { + Result[value] = bson.M{"_id": res["_id"], "sequencer_value": res["sequencer_value"].(int) + 1} + return nil, nil + } + return nil, nil +} + +func (mc *MockMongoSequencerCollection) FindOneAndUpdate(ctx context.Context, filter interface{}, + update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult { + doc := filter.(bson.M) + id = doc["_id"].(string) + upDoc := update.(bson.M) + value := doc["_id"].(string) + up := upDoc["$inc"].(bson.M) + num := up["sequencer_value"].(int) + if res, ok := Result[value]; ok { + Result[value] = bson.M{"_id": res["_id"], "sequencer_value": res["sequencer_value"].(int) + num} + return nil + } else { + bm := bson.M{"_id": doc["_id"], "sequencer_value": num} + mc.InsertOne(ctx, bm) + } + return nil +} + +func (c *MockMongoSequencerClient) StartSession(opts ...*options.SessionOptions) (mongo.Session, error) { + return &MockMongoSequencerSession{}, nil +} + +func (c *MockMongoSequencerClient) Ping(ctx context.Context, rp *readpref.ReadPref) error { + return nil +} + +func (c *MockMongoSequencerClient) Database(name string, opts ...*options.DatabaseOptions) *mongo.Database { + return nil +} + +func (c *MockMongoSequencerClient) Disconnect(ctx context.Context) error { + return nil +} + +func (s *MockMongoSequencerSession) AbortTransaction(context.Context) error { + return nil +} + +func (s *MockMongoSequencerSession) CommitTransaction(context.Context) error { + return nil +} + +func (s *MockMongoSequencerSession) WithTransaction(ctx context.Context, fn func(sessCtx mongo.SessionContext) (interface{}, error), + opts ...*options.TransactionOptions) (interface{}, error) { + res, err := fn(s) + return res, err +} + +func (s *MockMongoSequencerSession) EndSession(context.Context) {} + +func (d *MockMongoSequencerSingleResult) Decode(v interface{}) error { + b := Result[id] + id := b["_id"].(string) + value := b["sequencer_value"].(int) + ref := reflect.ValueOf(v) + if ref.Kind() == reflect.Ptr { + elem := ref.Elem() + Id := elem.FieldByName("Id") + *(*string)(unsafe.Pointer(Id.Addr().Pointer())) = id + + v := elem.FieldByName("Sequencer_value") + *(*int)(unsafe.Pointer(v.Addr().Pointer())) = value + } + return nil +} + +func (d *MockMongoSequencerSingleResult) Err() error { + if Result[id] == nil { + return mongo.ErrNoDocuments + } + return nil +} + +func (d *MockMongoSequencerSingleResult) DecodeBytes() (bson.Raw, error) { + return nil, nil +} diff --git a/components/pkg/utils/mongo.go b/components/pkg/utils/mongo.go index d51cfd0ffd..dd7e360562 100644 --- a/components/pkg/utils/mongo.go +++ b/components/pkg/utils/mongo.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readconcern" @@ -75,6 +76,7 @@ type Item struct { type MongoFactory interface { NewMongoClient(m MongoMetadata) (MongoClient, error) NewMongoCollection(m *mongo.Database, collectionName string, opts *options.CollectionOptions) MongoCollection + NewSingleResult(sr *mongo.SingleResult) MongoSingleResult } type MongoClient interface { @@ -98,10 +100,24 @@ type MongoCollection interface { DeleteOne(ctx context.Context, filter interface{}, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error) Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error) Indexes() mongo.IndexView + UpdateOne(ctx context.Context, filter interface{}, update interface{}, + opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) + FindOneAndUpdate(ctx context.Context, filter interface{}, + update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult +} + +type MongoSingleResult interface { + Decode(v interface{}) error + Err() error + DecodeBytes() (bson.Raw, error) } type MongoFactoryImpl struct{} +func (c *MongoFactoryImpl) NewSingleResult(sr *mongo.SingleResult) MongoSingleResult { + return sr +} + func (c *MongoFactoryImpl) NewMongoCollection(m *mongo.Database, collectionName string, opts *options.CollectionOptions) MongoCollection { collection := m.Collection(collectionName, opts) return collection @@ -230,3 +246,29 @@ func GetReadConcrenObject(cn string) (*readconcern.ReadConcern, error) { } return nil, nil } + +func SetConcern(m MongoMetadata) (*options.CollectionOptions, error) { + + wc, err := GetWriteConcernObject(m.WriteConcern) + if err != nil { + return nil, err + } + + rc, err := GetReadConcrenObject(m.ReadConcern) + if err != nil { + return nil, err + } + + // set mongo options of collection + opts := options.Collection().SetWriteConcern(wc).SetReadConcern(rc) + + return opts, err +} + +func SetCollection(c MongoClient, f MongoFactory, m MongoMetadata) (MongoCollection, error) { + opts, err := SetConcern(m) + if err != nil { + return nil, err + } + return f.NewMongoCollection(c.Database(m.DatabaseName), m.CollectionName, opts), err +} diff --git a/components/sequencer/mongo/mongo_sequencer.go b/components/sequencer/mongo/mongo_sequencer.go new file mode 100644 index 0000000000..b90340dc5d --- /dev/null +++ b/components/sequencer/mongo/mongo_sequencer.go @@ -0,0 +1,220 @@ +// +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package mongo + +import ( + "context" + "fmt" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readconcern" + "go.mongodb.org/mongo-driver/mongo/writeconcern" + "mosn.io/layotto/components/pkg/utils" + "mosn.io/layotto/components/sequencer" + "mosn.io/pkg/log" +) + +type MongoSequencer struct { + factory utils.MongoFactory + + client utils.MongoClient + session utils.MongoSession + collection utils.MongoCollection + singResult utils.MongoSingleResult + metadata utils.MongoMetadata + biggerThan map[string]int64 + + logger log.ErrorLogger + + ctx context.Context + cancel context.CancelFunc +} + +type SequencerDocument struct { + Id string `bson:"_id"` + Sequencer_value int64 `bson:"sequencer_value"` +} + +// MongoSequencer returns a new mongo sequencer +func NewMongoSequencer(logger log.ErrorLogger) *MongoSequencer { + m := &MongoSequencer{ + logger: logger, + } + + return m +} + +func (e *MongoSequencer) Init(config sequencer.Configuration) error { + var document SequencerDocument + // 1.parse config + m, err := utils.ParseMongoMetadata(config.Properties) + if err != nil { + return err + } + e.metadata = m + e.biggerThan = config.BiggerThan + + e.factory = &utils.MongoFactoryImpl{} + + // 2. construct client + e.ctx, e.cancel = context.WithCancel(context.Background()) + + if e.client, err = e.factory.NewMongoClient(m); err != nil { + return err + } + + if err := e.client.Ping(e.ctx, nil); err != nil { + return err + } + + // Connections Collection + e.collection, err = utils.SetCollection(e.client, e.factory, e.metadata) + if err != nil { + return err + } + + if len(e.biggerThan) > 0 { + for k, bt := range e.biggerThan { + if bt <= 0 { + continue + } + // find key of biggerThan + cursor, err := e.collection.Find(e.ctx, bson.M{"_id": k}) + if err != nil { + return err + } + if cursor != nil && cursor.RemainingBatchLength() > 0 { + cursor.Decode(&document) + } + // check biggerThan's value + if document.Sequencer_value < bt { + return fmt.Errorf("mongo sequencer error: can not satisfy biggerThan guarantee.key: %s,current id:%v", k, document.Sequencer_value) + } + } + } + + return err +} + +func (e *MongoSequencer) GetNextId(req *sequencer.GetNextIdRequest) (*sequencer.GetNextIdResponse, error) { + var err error + var document SequencerDocument + // create mongo session + e.session, err = e.client.StartSession() + txnOpts := options.Transaction().SetReadConcern(readconcern.Snapshot()). + SetWriteConcern(writeconcern.New(writeconcern.WMajority())) + + // check session + if err != nil { + return nil, fmt.Errorf("[mongoSequencer]: Create Session return error: %s key: %s", err, req.Key) + } + + // close mongo session + defer e.session.EndSession(e.ctx) + + status, err := e.session.WithTransaction(e.ctx, func(sessionContext mongo.SessionContext) (interface{}, error) { + var err error + after := options.After + upsert := true + opt := options.FindOneAndUpdateOptions{ + ReturnDocument: &after, + Upsert: &upsert, + } + + e.singResult = e.factory.NewSingleResult(e.collection.FindOneAndUpdate(e.ctx, bson.M{"_id": req.Key}, bson.M{"$inc": bson.M{"sequencer_value": 1}}, &opt)) + + // rollback + if e.singResult.Err() != nil { + _ = sessionContext.AbortTransaction(sessionContext) + return nil, err + } + + // commit + if err = sessionContext.CommitTransaction(sessionContext); err != nil { + return nil, err + } + e.singResult.Decode(&document) + return document, nil + }, txnOpts) + if err != nil || status == nil { + return nil, err + } + + return &sequencer.GetNextIdResponse{ + NextId: document.Sequencer_value, + }, nil +} + +func (e *MongoSequencer) GetSegment(req *sequencer.GetSegmentRequest) (support bool, result *sequencer.GetSegmentResponse, err error) { + var document SequencerDocument + + // size=0 only check support + if req.Size == 0 { + return true, nil, nil + } + + // create mongo session + e.session, err = e.client.StartSession() + txnOpts := options.Transaction().SetReadConcern(readconcern.Snapshot()). + SetWriteConcern(writeconcern.New(writeconcern.WMajority())) + + // check session + if err != nil { + return true, nil, fmt.Errorf("[mongoSequencer]: Create Session return error: %s key: %s", err, req.Key) + } + + // close mongo session + defer e.session.EndSession(e.ctx) + + status, err := e.session.WithTransaction(e.ctx, func(sessionContext mongo.SessionContext) (interface{}, error) { + var err error + after := options.After + upsert := true + opt := options.FindOneAndUpdateOptions{ + ReturnDocument: &after, + Upsert: &upsert, + } + + // find and upsert + e.singResult = e.factory.NewSingleResult(e.collection.FindOneAndUpdate(e.ctx, bson.M{"_id": req.Key}, bson.M{"$inc": bson.M{"sequencer_value": req.Size}}, &opt)) + + // rollback + if e.singResult.Err() != nil { + _ = sessionContext.AbortTransaction(sessionContext) + return nil, err + } + + // commit + if err = sessionContext.CommitTransaction(sessionContext); err != nil { + return nil, err + } + e.singResult.Decode(&document) + return document, nil + }, txnOpts) + if err != nil || status == nil { + return true, nil, err + } + + return true, &sequencer.GetSegmentResponse{ + From: document.Sequencer_value - int64(req.Size) + 1, + To: document.Sequencer_value, + }, nil +} + +func (e *MongoSequencer) Close() error { + e.cancel() + + return e.client.Disconnect(e.ctx) +} diff --git a/components/sequencer/mongo/mongo_sequencer_test.go b/components/sequencer/mongo/mongo_sequencer_test.go new file mode 100644 index 0000000000..3fe078047c --- /dev/null +++ b/components/sequencer/mongo/mongo_sequencer_test.go @@ -0,0 +1,156 @@ +// +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package mongo + +import ( + "github.com/stretchr/testify/assert" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "mosn.io/layotto/components/pkg/mock" + "mosn.io/layotto/components/sequencer" + "mosn.io/pkg/log" + "testing" +) + +const key = "resource_xxx" + +func TestMongoSequencer_Init(t *testing.T) { + var mongoUrl = "localhost:xxxxx" + comp := NewMongoSequencer(log.DefaultLogger) + + cfg := sequencer.Configuration{ + BiggerThan: nil, + Properties: make(map[string]string), + } + + cfg.Properties["mongoHost"] = mongoUrl + err := comp.Init(cfg) + + assert.Error(t, err) +} + +func TestMongoSequencer_GetNextId(t *testing.T) { + var mongoUrl = "localhost:27017" + + comp := NewMongoSequencer(log.DefaultLogger) + + cfg := sequencer.Configuration{ + BiggerThan: nil, + Properties: make(map[string]string), + } + + cfg.Properties["mongoHost"] = mongoUrl + _ = comp.Init(cfg) + + // mock + mockMongoClient := mock.MockMongoSequencerClient{} + mockMongoSession := mock.NewMockMongoSequencerSession() + mockMongoFactory := mock.NewMockMongoSequencerFactory() + insertOneResult := &mongo.InsertOneResult{} + mockMongoCollection := mock.MockMongoSequencerCollection{ + InsertOneResult: insertOneResult, + } + + comp.session = mockMongoSession + comp.collection = &mockMongoCollection + comp.client = &mockMongoClient + comp.factory = mockMongoFactory + + v1, err1 := comp.GetNextId(&sequencer.GetNextIdRequest{ + Key: key, + }) + var expected1 int64 = 1 + assert.Equal(t, expected1, v1.NextId) + assert.NoError(t, err1) + + v2, err2 := comp.GetNextId(&sequencer.GetNextIdRequest{ + Key: key, + }) + delete(mock.Result, key) + var expected2 int64 = 2 + assert.Equal(t, expected2, v2.NextId) + assert.NoError(t, err2) +} + +func TestMongoSequencer_Close(t *testing.T) { + var mongoUrl = "localhost:xxxxx" + + comp := NewMongoSequencer(log.DefaultLogger) + + cfg := sequencer.Configuration{ + BiggerThan: nil, + Properties: make(map[string]string), + } + + cfg.Properties["mongoHost"] = mongoUrl + _ = comp.Init(cfg) + + // mock + result := make(map[string]bson.M) + mockMongoClient := mock.MockMongoSequencerClient{} + mockMongoSession := mock.NewMockMongoSequencerSession() + mockMongoFactory := mock.NewMockMongoSequencerFactory() + insertOneResult := &mongo.InsertOneResult{} + mockMongoCollection := mock.MockMongoSequencerCollection{ + InsertOneResult: insertOneResult, + Result: result, + } + + comp.session = mockMongoSession + comp.collection = &mockMongoCollection + comp.client = &mockMongoClient + comp.factory = mockMongoFactory + + err := comp.Close() + assert.NoError(t, err) +} + +func TestMongoSequencer_GetSegment(t *testing.T) { + var mongoUrl = "localhost:xxxxx" + + comp := NewMongoSequencer(log.DefaultLogger) + + cfg := sequencer.Configuration{ + BiggerThan: nil, + Properties: make(map[string]string), + } + + cfg.Properties["mongoHost"] = mongoUrl + _ = comp.Init(cfg) + + // mock + mockMongoClient := mock.MockMongoSequencerClient{} + mockMongoSession := mock.NewMockMongoSequencerSession() + mockMongoFactory := mock.NewMockMongoSequencerFactory() + insertOneResult := &mongo.InsertOneResult{} + mockMongoCollection := mock.MockMongoSequencerCollection{ + InsertOneResult: insertOneResult, + } + + comp.session = mockMongoSession + comp.collection = &mockMongoCollection + comp.client = &mockMongoClient + comp.factory = mockMongoFactory + + support, res, err := comp.GetSegment(&sequencer.GetSegmentRequest{ + Size: 20, + Key: key, + }) + var expected1 int64 = 1 + var expected2 int64 = 20 + assert.Equal(t, support, true) + assert.Equal(t, expected1, res.From) + assert.Equal(t, expected2, res.To) + assert.NoError(t, err) +} diff --git a/configs/config_lock_mongo.json b/configs/config_lock_mongo.json index 6fbb329b46..eadf0d992a 100644 --- a/configs/config_lock_mongo.json +++ b/configs/config_lock_mongo.json @@ -29,9 +29,11 @@ "lock": { "mongo": { "metadata": { - "host": "localhost:27017", + "mongoHost": "localhost:27017", + "mongoPassword": "", "username": "", - "password": "", + "databaseName ": "", + "collecttionName": "", "params": "" } } @@ -74,4 +76,4 @@ ] } ] -} \ No newline at end of file +} diff --git a/configs/config_sequencer_mongo.json b/configs/config_sequencer_mongo.json new file mode 100644 index 0000000000..9e20e4e1bc --- /dev/null +++ b/configs/config_sequencer_mongo.json @@ -0,0 +1,79 @@ +{ + "servers": [ + { + "default_log_path": "stdout", + "default_log_level": "DEBUG", + "routers": [ + { + "router_config_name": "actuator_dont_need_router" + } + ], + "listeners": [ + { + "name": "grpc", + "address": "127.0.0.1:34904", + "bind_port": true, + "filter_chains": [ + { + "filters": [ + { + "type": "grpc", + "config": { + "server_name": "runtime", + "grpc_config": { + "hellos": { + "helloworld": { + "hello": "greeting" + } + }, + "sequencer": { + "mongo": { + "metadata": { + "mongoHost": "localhost:27017", + "mongoPassword": "", + "username": "", + "databaseName ": "", + "collecttionName": "", + "params": "" + } + } + }, + "app": { + "app_id": "app1", + "grpc_callback_port": 9999 + } + } + } + } + ] + } + ] + }, + { + "name": "actuator", + "address": "127.0.0.1:34999", + "bind_port": true, + "filter_chains": [ + { + "filters": [ + { + "type": "proxy", + "config": { + "downstream_protocol": "Http1", + "upstream_protocol": "Http1", + "router_config_name": "actuator_dont_need_router" + } + } + ] + } + ], + "stream_filters": [ + { + "type": "actuator_filter" + } + ] + } + ] + } + ] +} diff --git a/demo/sequencer/mongo/client.go b/demo/sequencer/mongo/client.go new file mode 100644 index 0000000000..4a6fecd530 --- /dev/null +++ b/demo/sequencer/mongo/client.go @@ -0,0 +1,38 @@ +package main + +import ( + "context" + "fmt" + client "mosn.io/layotto/sdk/go-sdk/client" + runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" +) + +const ( + key = "key666" + storeName = "mongo" +) + +func main() { + + cli, err := client.NewClient() + if err != nil { + panic(err) + } + defer cli.Close() + ctx := context.Background() + fmt.Printf("Try to get next id.Key:%s \n", key) + for i := 0; i < 10; i++ { + id, err := cli.GetNextId(ctx, &runtimev1pb.GetNextIdRequest{ + StoreName: storeName, + Key: key, + Options: nil, + Metadata: nil, + }) + if err != nil { + fmt.Print(err) + return + } + fmt.Printf("Next id:%v \n", id) + } + fmt.Println("Demo success!") +} diff --git a/docs/_sidebar.md b/docs/_sidebar.md index d81e29dd35..19d92a8c85 100644 --- a/docs/_sidebar.md +++ b/docs/_sidebar.md @@ -60,6 +60,7 @@ - [Etcd](en/component_specs/sequencer/etcd.md) - [Redis](en/component_specs/sequencer/redis.md) - [Zookeeper](en/component_specs/sequencer/zookeeper.md) + - [MongoDB](en/component_specs/sequencer/mongo.md) - Design documents - [Actuator design doc](en/design/actuator/actuator-design-doc.md) - [Configuration API with Apollo](en/design/configuration/configuration-api-with-apollo.md) diff --git a/docs/en/component_specs/lock/mongo.md b/docs/en/component_specs/lock/mongo.md index ce4f246822..15ae848dd8 100644 --- a/docs/en/component_specs/lock/mongo.md +++ b/docs/en/component_specs/lock/mongo.md @@ -6,9 +6,11 @@ Example:configs/config_lock_mongo.json | Field | Required | Description | | --- | --- | --- | -| host | Y | MongoDB server address, such as localhost:27017 | -| username | N |specify username username | -| password | N | specify password | +| mongoHost | Y | MongoDB server address, such as localhost:27017 | +| username | N | specify username | +| mongoPassword | N | specify password | +| databaseName | N | MongoDB database name | +| collecttionName | N | MongoDB collection name | | params | N | custom params | @@ -18,4 +20,4 @@ If you want to run the mongoDB demo, you need to start a mongoDB server with Doc ```shell docker run --name mongoDB -d -p 27017:27017 mongo -``` \ No newline at end of file +``` diff --git a/docs/en/component_specs/sequencer/mongo.md b/docs/en/component_specs/sequencer/mongo.md new file mode 100644 index 0000000000..68a433ba4c --- /dev/null +++ b/docs/en/component_specs/sequencer/mongo.md @@ -0,0 +1,23 @@ +# MongoDB + +## metadata fields + +Example:configs/config_sequencer_mongo.json + +| Field | Required | Description | +| --- | --- | --- | +| mongoHost | Y | MongoDB server address, such as localhost:27017 | +| username | N | specify username | +| mongoPassword | N | specify password | +| databaseName | N | MongoDB database name | +| collecttionName | N | MongoDB collection name | +| params | N | custom params | + + +## How to start MongoDB + +If you want to run the mongoDB demo, you need to start a mongoDB server with Docker first. + +```shell +docker run --name mongoDB -d -p 27017:27017 mongo +``` diff --git a/docs/zh/_sidebar.md b/docs/zh/_sidebar.md index cf9acfc208..2100ff536f 100644 --- a/docs/zh/_sidebar.md +++ b/docs/zh/_sidebar.md @@ -53,6 +53,7 @@ - [Etcd](zh/component_specs/lock/etcd.md) - [Zookeeper](zh/component_specs/lock/zookeeper.md) - [Consul](zh/component_specs/lock/consul.md) + - [MongoDB](zh/component_specs/lock/mongo.md) - Configuration - [Etcd](zh/component_specs/configuration/etcd.md) - [Apollo](zh/component_specs/configuration/apollo.md) @@ -62,6 +63,7 @@ - [Etcd](zh/component_specs/sequencer/etcd.md) - [Redis](zh/component_specs/sequencer/redis.md) - [Zookeeper](zh/component_specs/sequencer/zookeeper.md) + - [MongoDB](zh/component_specs/sequencer/mongo.md) - 设计文档 - [Actuator设计文档](zh/design/actuator/actuator-design-doc.md) - [gRPC框架设计文档](zh/design/actuator/grpc-design-doc.md) diff --git a/docs/zh/component_specs/lock/mongo.md b/docs/zh/component_specs/lock/mongo.md index ec051ef4e2..930f673a90 100644 --- a/docs/zh/component_specs/lock/mongo.md +++ b/docs/zh/component_specs/lock/mongo.md @@ -6,10 +6,11 @@ | 字段 | 必填 | 说明 | | --- | --- | --- | -| host | Y | MongoDB的服务地址,例如localhost:27017 | +| mongoHost | Y | MongoDB的服务地址,例如localhost:27017 | | username | N | MongoDB用户名 | -| password | N | MongoDB密码 | -| database | N | MongoDB数据库 | +| mongoPassword | N | MongoDB密码 | +| databaseName | N | MongoDB数据库名称 | +| collecttionName | N | MongoDB集合名称 | | params | N | 自定义参数 | diff --git a/docs/zh/component_specs/sequencer/mongo.md b/docs/zh/component_specs/sequencer/mongo.md new file mode 100644 index 0000000000..1310ed743d --- /dev/null +++ b/docs/zh/component_specs/sequencer/mongo.md @@ -0,0 +1,21 @@ +# MongoDB + +## 配置项说明 +示例:configs/config_sequencer_mongo.json + +| 字段 | 必填 | 说明 | +| --- | --- | --- | +| mongoHost | Y | MongoDB的服务地址,例如localhost:27017 | +| username | N | MongoDB用户名 | +| mongoPassword | N | MongoDB密码 | +| databaseName | N | MongoDB数据库名称 | +| collecttionName | N | MongoDB集合名称 | +| params | N | 自定义参数 | + +## 怎么启动 MongoDB + +如果想启动MongoDB的demo,需要先用Docker启动一个MongoDB 命令: + +```shell +docker run --name mongoDB -d -p 27017:27017 mongo +```