diff --git a/cmd/layotto/main.go b/cmd/layotto/main.go index 14dfa1e27a..7f1d78ba43 100644 --- a/cmd/layotto/main.go +++ b/cmd/layotto/main.go @@ -125,6 +125,7 @@ import ( sequencer_etcd "mosn.io/layotto/components/sequencer/etcd" sequencer_inmemory "mosn.io/layotto/components/sequencer/in-memory" sequencer_mongo "mosn.io/layotto/components/sequencer/mongo" + sequencer_mysql "mosn.io/layotto/components/sequencer/mysql" sequencer_redis "mosn.io/layotto/components/sequencer/redis" sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper" @@ -397,6 +398,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp runtime_sequencer.NewFactory("in-memory", func() sequencer.Store { return sequencer_inmemory.NewInMemorySequencer() }), + runtime_sequencer.NewFactory("Mysql", func() sequencer.Store { + return sequencer_mysql.NewMySQLSequencer(log.DefaultLogger) + }), ), // secretstores runtime.WithSecretStoresFactory( diff --git a/components/go.mod b/components/go.mod index 4d52b353d1..12e7e70aff 100644 --- a/components/go.mod +++ b/components/go.mod @@ -3,6 +3,7 @@ module mosn.io/layotto/components go 1.14 require ( + github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/alicebob/miniredis/v2 v2.16.0 github.com/aliyun/aliyun-oss-go-sdk v2.1.8+incompatible github.com/apache/dubbo-go-hessian2 v1.7.0 @@ -12,6 +13,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.16.0 github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect github.com/go-redis/redis/v8 v8.8.0 + github.com/go-sql-driver/mysql v1.5.0 github.com/go-zookeeper/zk v1.0.2 github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 diff --git a/components/go.sum b/components/go.sum index cb0e47f59b..aa0f91dec7 100644 --- a/components/go.sum +++ b/components/go.sum @@ -38,6 +38,8 @@ gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:EXuID2Zs0p github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= +github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= @@ -263,6 +265,7 @@ github.com/go-redis/redis/v8 v8.8.0/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqW github.com/go-resty/resty/v2 v2.6.0/go.mod h1:PwvJS6hvaPkjtjNg9ph+VrSD92bi5Zq73w/BIH7cC3Q= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= diff --git a/components/pkg/utils/mysql.go b/components/pkg/utils/mysql.go new file mode 100644 index 0000000000..97b46b00e5 --- /dev/null +++ b/components/pkg/utils/mysql.go @@ -0,0 +1,91 @@ +// +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package utils + +import ( + "database/sql" + "fmt" + _ "github.com/go-sql-driver/mysql" +) + +var ( + Db *sql.DB +) + +const ( + defaultTableName = "tableName" + tableNameKey = "tableName" + connectionStringKey = "connectionString" + dataBaseName = "dataBaseName" +) + +type MySQLMetadata struct { + TableName string + ConnectionString string + DataBaseName string + Db *sql.DB +} + +func ParseMySQLMetadata(properties map[string]string) (MySQLMetadata, error) { + m := MySQLMetadata{} + + if val, ok := properties[tableNameKey]; ok && val != "" { + m.TableName = val + } + + if val, ok := properties[connectionStringKey]; ok && val != "" { + m.ConnectionString = val + } + + if val, ok := properties[dataBaseName]; ok && val != "" { + m.DataBaseName = val + } + return m, nil +} + +func NewMySQLClient(meta MySQLMetadata) error { + + val := meta + + if val.TableName == "" { + val.TableName = defaultTableName + } + + exists, err := tableExists(val) + if err != nil { + return err + } + if !exists { + createTable := fmt.Sprintf(`CREATE TABLE %s ( + sequencer_key VARCHAR(255), + sequencer_value INT);`, val.TableName) + _, err = meta.Db.Exec(createTable) + if err != nil { + return err + } + } + + return nil +} + +func tableExists(meta MySQLMetadata) (bool, error) { + exists := "" + + query := `SELECT EXISTS ( + SELECT * FROM ? WHERE TABLE_NAME = ? + ) AS 'exists'` + err := meta.Db.QueryRow(query, meta.DataBaseName, meta.TableName).Scan(&exists) + + return exists == "1", err +} diff --git a/components/sequencer/mysql/mysql.go b/components/sequencer/mysql/mysql.go new file mode 100644 index 0000000000..bbfc6f4a01 --- /dev/null +++ b/components/sequencer/mysql/mysql.go @@ -0,0 +1,158 @@ +// +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package mysql + +import ( + "database/sql" + "fmt" + "mosn.io/layotto/components/pkg/utils" + "mosn.io/layotto/components/sequencer" + "mosn.io/pkg/log" +) + +type MySQLSequencer struct { + metadata utils.MySQLMetadata + biggerThan map[string]int64 + logger log.ErrorLogger + db *sql.DB +} + +func NewMySQLSequencer(logger log.ErrorLogger) *MySQLSequencer { + s := &MySQLSequencer{ + logger: logger, + } + + return s +} + +func (e *MySQLSequencer) Init(config sequencer.Configuration) error { + + m, err := utils.ParseMySQLMetadata(config.Properties) + + if err != nil { + return err + } + e.metadata = m + e.metadata.Db = e.db + e.biggerThan = config.BiggerThan + + if err = utils.NewMySQLClient(e.metadata); err != nil { + return err + } + + if len(e.biggerThan) > 0 { + var Key string + var Value int64 + for k, bt := range e.biggerThan { + if bt <= 0 { + continue + } else { + m.Db.QueryRow(fmt.Sprintf("SELECT sequencer_key,sequencer_value FROM %s WHERE sequencer_key = ?", m.TableName), k).Scan(&Key, &Value) + if Value < bt { + return fmt.Errorf("MySQL sequencer error: can not satisfy biggerThan guarantee.key: %s,key in MySQL: %s", k, Key) + } + } + } + } + return nil +} + +func (e *MySQLSequencer) GetNextId(req *sequencer.GetNextIdRequest) (*sequencer.GetNextIdResponse, error) { + + metadata, err := utils.ParseMySQLMetadata(req.Metadata) + metadata.Db = e.db + var Key string + var Value int64 + if err != nil { + return nil, err + } + + begin, err := metadata.Db.Begin() + if err != nil { + return nil, err + } + + err = begin.QueryRow("SELECT sequencer_key,sequencer_value FROM ? WHERE sequencer_key = ?", metadata.TableName, req.Key).Scan(&Key, &Value) + + if err == sql.ErrNoRows { + Value = 1 + _, err := begin.Exec("INSERT INTO ?(sequencer_key, sequencer_value) VALUES(?,?)", metadata.TableName, req.Key, Value) + if err != nil { + return nil, err + } + } else { + Value += 1 + _, err := begin.Exec("UPDATE ? SET sequencer_value += 1 WHERE sequencer_key = ?", metadata.TableName, req.Key) + if err != nil { + return nil, err + } + } + + defer e.Close(metadata.Db) + + return &sequencer.GetNextIdResponse{ + NextId: Value, + }, nil +} + +func (e *MySQLSequencer) GetSegment(req *sequencer.GetSegmentRequest) (support bool, result *sequencer.GetSegmentResponse, err error) { + + if req.Size == 0 { + return true, nil, nil + } + + metadata, err := utils.ParseMySQLMetadata(req.Metadata) + + var Key string + var Value int64 + if err != nil { + return false, nil, err + } + + metadata.Db = e.db + + begin, err := metadata.Db.Begin() + if err != nil { + return false, nil, err + } + + err = begin.QueryRow("SELECT sequencer_key,sequencer_value FROM ? WHERE sequencer_key = ?", metadata.TableName, req.Key).Scan(&Key, &Value) + + if err == sql.ErrNoRows { + Value = int64(req.Size) + _, err := begin.Exec("INSERT INTO ?(sequencer_key, sequencer_value) VALUES(?,?)", metadata.TableName, req.Key, Value) + if err != nil { + return false, nil, err + } + + } else { + Value += int64(req.Size) + _, err1 := begin.Exec("UPDATE ? SET sequencer_value = ? WHERE sequencer_key = ?", metadata.TableName, Value, req.Key) + if err1 != nil { + return false, nil, err1 + } + } + + defer e.Close(metadata.Db) + + return false, &sequencer.GetSegmentResponse{ + From: Value - int64(req.Size) + 1, + To: Value, + }, nil +} + +func (e *MySQLSequencer) Close(db *sql.DB) error { + + return db.Close() +} diff --git a/components/sequencer/mysql/mysql_test.go b/components/sequencer/mysql/mysql_test.go new file mode 100644 index 0000000000..7365753441 --- /dev/null +++ b/components/sequencer/mysql/mysql_test.go @@ -0,0 +1,151 @@ +// +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package mysql + +import ( + "database/sql/driver" + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + "mosn.io/layotto/components/sequencer" + "mosn.io/pkg/log" + "testing" +) + +const ( + MySQLUrl = "localhost:xxxxx" + Value = 1 + Key = "sequenceKey" + Size = 50 +) + +func TestMySQLSequencer_Init(t *testing.T) { + + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + + comp := NewMySQLSequencer(log.DefaultLogger) + comp.db = db + + cfg := sequencer.Configuration{ + Properties: make(map[string]string), + BiggerThan: make(map[string]int64), + } + + rows := sqlmock.NewRows([]string{"exists"}).AddRow(0) + mock.ExpectQuery("SELECT EXISTS").WillReturnRows(rows) + mock.ExpectExec("CREATE TABLE").WillReturnResult(sqlmock.NewResult(1, 1)) + + cfg.Properties["tableNameKey"] = "tableName" + cfg.Properties["connectionString"] = "connectionString" + cfg.Properties["dataBaseName"] = "dataBaseName" + err = comp.Init(cfg) + + assert.Nil(t, err) +} + +func TestMySQLSequencer_GetNextId(t *testing.T) { + + comp := NewMySQLSequencer(log.DefaultLogger) + + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + + rows := sqlmock.NewRows([]string{"sequencer_key", "sequencer_value"}).AddRow([]driver.Value{Key, Value}...) + + mock.ExpectBegin() + mock.ExpectQuery("SELECT").WillReturnRows(rows) + mock.ExpectExec("UPDATE").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("INSERT INTO").WithArgs("tableName", "sequenceKey", 2).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + properties := make(map[string]string) + + properties["tableName"] = "tableName" + properties["connectionString"] = "connectionString" + properties["dataBaseName"] = "dataBaseName" + + req := &sequencer.GetNextIdRequest{Key: Key, Options: sequencer.SequencerOptions{AutoIncrement: sequencer.STRONG}, Metadata: properties} + + comp.db = db + _, err = comp.GetNextId(req) + if err != nil { + t.Errorf("error was not expected while updating stats: %s", err) + } + + assert.NoError(t, err) +} + +func TestMySQLSequencer_GetSegment(t *testing.T) { + + comp := NewMySQLSequencer(log.DefaultLogger) + + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + + rows := sqlmock.NewRows([]string{"sequencer_key", "sequencer_value"}).AddRow([]driver.Value{Key, Value}...) + + mock.ExpectBegin() + mock.ExpectQuery("SELECT").WillReturnRows(rows) + mock.ExpectExec("UPDATE").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("INSERT INTO").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + properties := make(map[string]string) + + properties["tableName"] = "tableName" + properties["connectionString"] = "connectionString" + properties["dataBaseName"] = "dataBaseName" + + req := &sequencer.GetSegmentRequest{Size: Size, Key: Key, Options: sequencer.SequencerOptions{AutoIncrement: sequencer.STRONG}, Metadata: properties} + + comp.db = db + _, _, err = comp.GetSegment(req) + if err != nil { + t.Errorf("error was not expected while updating stats: %s", err) + } + + assert.NoError(t, err) +} + +func TestMySQLSequencer_Close(t *testing.T) { + + var MySQLUrl = MySQLUrl + + db, _, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + + comp := NewMySQLSequencer(log.DefaultLogger) + + cfg := sequencer.Configuration{ + BiggerThan: nil, + Properties: make(map[string]string), + } + + cfg.Properties["MySQLHost"] = MySQLUrl + + comp.db = db + _ = comp.Init(cfg) + + comp.Close(db) +}