Skip to content

Commit

Permalink
feat: implement sequencer api with mysql (#605)
Browse files Browse the repository at this point in the history
Co-authored-by: seeflood <zhou.qunli@foxmail.com>
Co-authored-by: wenxuwan <wangwx_junction@163.com>
Co-authored-by: Xunzhuo <mixdeers@gmail.com>
  • Loading branch information
4 people authored Jul 18, 2022
1 parent 917485d commit 2604d62
Show file tree
Hide file tree
Showing 11 changed files with 639 additions and 1 deletion.
4 changes: 4 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ import (
sequencer_etcd "mosn.io/layotto/components/sequencer/etcd"
sequencer_inmemory "mosn.io/layotto/components/sequencer/in-memory"
sequencer_mongo "mosn.io/layotto/components/sequencer/mongo"
sequencer_mysql "mosn.io/layotto/components/sequencer/mysql"
sequencer_redis "mosn.io/layotto/components/sequencer/redis"
sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper"

Expand Down Expand Up @@ -425,6 +426,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime_sequencer.NewFactory("in-memory", func() sequencer.Store {
return sequencer_inmemory.NewInMemorySequencer()
}),
runtime_sequencer.NewFactory("mysql", func() sequencer.Store {
return sequencer_mysql.NewMySQLSequencer(log.DefaultLogger)
}),
),
// secretstores
runtime.WithSecretStoresFactory(
Expand Down
4 changes: 4 additions & 0 deletions cmd/layotto_multiple_api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ import (
sequencer_etcd "mosn.io/layotto/components/sequencer/etcd"
sequencer_inmemory "mosn.io/layotto/components/sequencer/in-memory"
sequencer_mongo "mosn.io/layotto/components/sequencer/mongo"
sequencer_mysql "mosn.io/layotto/components/sequencer/mysql"
sequencer_redis "mosn.io/layotto/components/sequencer/redis"
sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper"

Expand Down Expand Up @@ -436,6 +437,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime_sequencer.NewFactory("in-memory", func() sequencer.Store {
return sequencer_inmemory.NewInMemorySequencer()
}),
runtime_sequencer.NewFactory("mysql", func() sequencer.Store {
return sequencer_mysql.NewMySQLSequencer(log.DefaultLogger)
}),
),
// secretstores
runtime.WithSecretStoresFactory(
Expand Down
1 change: 1 addition & 0 deletions components/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module mosn.io/layotto/components
go 1.14

require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/alicebob/miniredis/v2 v2.16.0
github.com/aliyun/aliyun-oss-go-sdk v2.1.8+incompatible
github.com/apache/dubbo-go-hessian2 v1.10.2
Expand Down
2 changes: 2 additions & 0 deletions components/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:EXuID2Zs0p
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
Expand Down
81 changes: 81 additions & 0 deletions components/pkg/utils/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//
// Copyright 2021 Layotto Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils

import (
"database/sql"
"fmt"
)

var (
Db *sql.DB
)

const (
defaultTableName = "layotto_sequencer"
defaultTableNameKey = "tableName"
dataBaseName = "dataBaseName"
userName = "userName"
defaultPassword = "password"
mysqlUrl = "mysqlUrl"
)

type MySQLMetadata struct {
TableName string
DataBaseName string
UserName string
Password string
MysqlUrl string
Db *sql.DB
}

func ParseMySQLMetadata(properties map[string]string) (MySQLMetadata, error) {
m := MySQLMetadata{}

if val, ok := properties[defaultTableNameKey]; ok && val != "" {
m.TableName = val
}
if val, ok := properties[dataBaseName]; ok && val != "" {
m.DataBaseName = val
}
if val, ok := properties[userName]; ok && val != "" {
m.UserName = val
}
if val, ok := properties[defaultPassword]; ok && val != "" {
m.Password = val
}
if val, ok := properties[mysqlUrl]; ok && val != "" {
m.MysqlUrl = val
}
return m, nil
}

func NewMySQLClient(meta MySQLMetadata) error {

val := meta
if val.TableName == "" {
val.TableName = defaultTableName
}
meta.Db.Begin()
createTable := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
sequencer_key VARCHAR(255),
sequencer_value INT,
UNIQUE INDEX (sequencer_key));`, val.TableName)
_, err := meta.Db.Exec(createTable)
defer meta.Db.Close()
if err != nil {
return err
}
return nil
}
162 changes: 162 additions & 0 deletions components/sequencer/mysql/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
//
// Copyright 2021 Layotto Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql

import (
"database/sql"
"fmt"

"mosn.io/pkg/log"

"mosn.io/layotto/components/pkg/utils"
"mosn.io/layotto/components/sequencer"
)

type MySQLSequencer struct {
metadata utils.MySQLMetadata
biggerThan map[string]int64
logger log.ErrorLogger
db *sql.DB
}

func NewMySQLSequencer(logger log.ErrorLogger) *MySQLSequencer {
s := &MySQLSequencer{
logger: logger,
}

return s
}

func (e *MySQLSequencer) Init(config sequencer.Configuration) error {

m, err := utils.ParseMySQLMetadata(config.Properties)

if err != nil {
return err
}
e.metadata = m
e.metadata.Db = e.db
e.biggerThan = config.BiggerThan

if err = utils.NewMySQLClient(e.metadata); err != nil {
return err
}

if len(e.biggerThan) > 0 {
var Key string
var Value int64
for k, bt := range e.biggerThan {
if bt > 0 {
m.Db.QueryRow(fmt.Sprintf("SELECT sequencer_key,sequencer_value FROM %s WHERE sequencer_key = ?", m.TableName), k).Scan(&Key, &Value)
if Value < bt {
return fmt.Errorf("mysql sequencer error: can not satisfy biggerThan guarantee.key: %s,key in MySQL: %s", k, Key)
}
}
}
}
return nil
}

func (e *MySQLSequencer) GetNextId(req *sequencer.GetNextIdRequest) (*sequencer.GetNextIdResponse, error) {

metadata, err := utils.ParseMySQLMetadata(req.Metadata)
metadata.Db = e.db
var Key string
var Value int64
var Version int64
var oldVersion int64
if err != nil {
return nil, err
}
begin, err := metadata.Db.Begin()
if err != nil {
return nil, err
}
err = begin.QueryRow("SELECT sequencer_key, sequencer_value, version FROM ? WHERE sequencer_key = ?", metadata.TableName, req.Key).Scan(&Key, &Value, &oldVersion)

if err == sql.ErrNoRows {
Value = 1
Version = 1
_, err := begin.Exec("INSERT INTO ?(sequencer_key, sequencer_value, version) VALUES(?,?,?)", metadata.TableName, req.Key, Value, Version)
if err != nil {
begin.Rollback()
return nil, err
}

} else {
_, err := begin.Exec("UPDATE ? SET sequencer_value +=1, version += 1 WHERE sequencer_key = ? and version = ?", metadata.TableName, req.Key, oldVersion)
if err != nil {
begin.Rollback()
return nil, err
}
}
defer e.Close(metadata.Db)

return &sequencer.GetNextIdResponse{
NextId: Value,
}, nil
}

func (e *MySQLSequencer) GetSegment(req *sequencer.GetSegmentRequest) (support bool, result *sequencer.GetSegmentResponse, err error) {

if req.Size == 0 {
return true, nil, nil
}

metadata, err := utils.ParseMySQLMetadata(req.Metadata)

var Key string
var Value int64
var Version int64
var oldVersion int64
if err != nil {
return false, nil, err
}

metadata.Db = e.db

begin, err := metadata.Db.Begin()
if err != nil {
return false, nil, err
}
err = begin.QueryRow("SELECT sequencer_key, sequencer_value, version FROM ? WHERE sequencer_key == ?", metadata.TableName, req.Key).Scan(&Key, &Value, &oldVersion)
if err == sql.ErrNoRows {
Value = int64(req.Size)
Version = 1
_, err := begin.Exec("INSERT INTO ?(sequencer_key, sequencer_value, version) VALUES(?,?,?)", metadata.TableName, req.Key, Value, Version)
if err != nil {
begin.Rollback()
return false, nil, err
}

} else {
Value += int64(req.Size)
_, err1 := begin.Exec("UPDATE ? SET sequencer_value = ?, version += 1 WHERE sequencer_key = ? AND version = ?", metadata.TableName, Value, req.Key, oldVersion)
if err1 != nil {
begin.Rollback()
return false, nil, err1
}
}
defer e.Close(metadata.Db)

return false, &sequencer.GetSegmentResponse{
From: Value - int64(req.Size) + 1,
To: Value,
}, nil
}

func (e *MySQLSequencer) Close(db *sql.DB) error {

return db.Close()
}
Loading

0 comments on commit 2604d62

Please sign in to comment.