Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement sequencer api with mysql #605

Merged
merged 37 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
be03853
feature: add sequencer api component by mysql
GimmeCyy May 25, 2022
3c81d7f
Merge branch 'mosn:main' into feature/mysql
GimmeCyy May 26, 2022
77b27ff
feature: add sequencer api component by mysql
GimmeCyy May 26, 2022
57535a8
Merge branch 'main' into feature/mysql
seeflood May 27, 2022
b4b3cd7
Merge branch 'mosn:main' into feature/mysql
GimmeCyy May 31, 2022
96ecaa5
feature: implement sequencer api with mysql
GimmeCyy Jun 1, 2022
20c100b
feature: implement sequencer api with mysql
GimmeCyy Jun 1, 2022
03df7be
feature: implement sequencer api with mysql
GimmeCyy Jun 1, 2022
639da05
feature: implement sequencer api with mysql
GimmeCyy Jun 1, 2022
0535b6d
feature: implement sequencer api with mysql
GimmeCyy Jun 1, 2022
2c31bed
Merge branch 'main' into feature/mysql
seeflood Jun 3, 2022
afd181d
feature: implement sequencer api with mysql
GimmeCyy Jun 3, 2022
2df82bb
feature: implement sequencer api with mysql
GimmeCyy Jun 6, 2022
f9bcb32
feature: implement sequencer api with mysql
GimmeCyy Jun 6, 2022
62e26eb
Merge branch 'main' into feature/mysql
GimmeCyy Jun 7, 2022
796670a
Merge branch 'main' into feature/mysql
seeflood Jun 7, 2022
88f4403
Merge branch 'main' into feature/mysql
seeflood Jun 8, 2022
9636a79
feature: implement sequencer api with mysql
GimmeCyy Jun 8, 2022
eb24fc2
Merge remote-tracking branch 'origin/feature/mysql' into feature/mysql
GimmeCyy Jun 8, 2022
edfc806
Merge branch 'main' into feature/mysql
GimmeCyy Jun 9, 2022
b43389e
Merge branch 'main' into feature/mysql
seeflood Jun 11, 2022
7ab7d6d
feature: implement sequencer api with mysql
GimmeCyy Jun 12, 2022
fde577b
feature: implement sequencer api with mysql
GimmeCyy Jun 12, 2022
067506b
feature: implement sequencer api with mysql
GimmeCyy Jun 13, 2022
1face1c
feature: implement sequencer api with mysql
GimmeCyy Jun 13, 2022
b9a1d20
Merge branch 'main' into feature/mysql
GimmeCyy Jun 13, 2022
6e7b80c
Merge branch 'main' into feature/mysql
seeflood Jun 14, 2022
7a120e4
Merge branch 'main' into feature/mysql
wenxuwan Jun 19, 2022
161ad95
Merge branch 'main' into feature/mysql
seeflood Jun 22, 2022
f1994c4
Merge branch 'main' into feature/mysql
seeflood Jun 22, 2022
534f006
Merge branch 'main' into feature/mysql
seeflood Jun 29, 2022
2c3fff6
Merge branch 'main' into feature/mysql
seeflood Jul 1, 2022
667c39c
feature: implement sequencer api with mysql
GimmeCyy Jul 4, 2022
64fd667
Merge branch 'main' into feature/mysql
Xunzhuo Jul 4, 2022
84e6958
Merge branch 'main' into feature/mysql
Xunzhuo Jul 4, 2022
3cb9006
Merge branch 'main' into feature/mysql
seeflood Jul 7, 2022
070be20
Merge remote-tracking branch 'upstream/main' into feature/mysql
seeflood Jul 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ import (
sequencer_etcd "mosn.io/layotto/components/sequencer/etcd"
sequencer_inmemory "mosn.io/layotto/components/sequencer/in-memory"
sequencer_mongo "mosn.io/layotto/components/sequencer/mongo"
sequencer_mysql "mosn.io/layotto/components/sequencer/mysql"
sequencer_redis "mosn.io/layotto/components/sequencer/redis"
sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper"

Expand Down Expand Up @@ -397,6 +398,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime_sequencer.NewFactory("in-memory", func() sequencer.Store {
return sequencer_inmemory.NewInMemorySequencer()
}),
runtime_sequencer.NewFactory("mysql", func() sequencer.Store {
seeflood marked this conversation as resolved.
Show resolved Hide resolved
return sequencer_mysql.NewMySQLSequencer(log.DefaultLogger)
}),
),
// secretstores
runtime.WithSecretStoresFactory(
Expand Down
1 change: 1 addition & 0 deletions components/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module mosn.io/layotto/components
go 1.14

require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/alicebob/miniredis/v2 v2.16.0
github.com/aliyun/aliyun-oss-go-sdk v2.1.8+incompatible
github.com/apache/dubbo-go-hessian2 v1.7.0
Expand Down
2 changes: 2 additions & 0 deletions components/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:EXuID2Zs0p
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
Expand Down
90 changes: 90 additions & 0 deletions components/pkg/utils/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//
// Copyright 2021 Layotto Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils

import (
"database/sql"
"fmt"
)

var (
Db *sql.DB
)

const (
defaultTableName = "layotto_sequencer"
defaultTableNameKey = "tableName"
connectionStringKey = "connectionString"
dataBaseName = "dataBaseName"
)

type MySQLMetadata struct {
TableName string
ConnectionString string
DataBaseName string
Db *sql.DB
}

func ParseMySQLMetadata(properties map[string]string) (MySQLMetadata, error) {
m := MySQLMetadata{}

if val, ok := properties[defaultTableNameKey]; ok && val != "" {
m.TableName = val
}

if val, ok := properties[connectionStringKey]; ok && val != "" {
m.ConnectionString = val
}

if val, ok := properties[dataBaseName]; ok && val != "" {
m.DataBaseName = val
}
return m, nil
}

func NewMySQLClient(meta MySQLMetadata) error {

val := meta

if val.TableName == "" {
val.TableName = defaultTableName
}

exists, err := tableExists(val)
if err != nil {
return err
}
if !exists {
createTable := fmt.Sprintf(`CREATE TABLE %s (
sequencer_key VARCHAR(255),
sequencer_value INT);`, val.TableName)
_, err = meta.Db.Exec(createTable)
if err != nil {
return err
}
}

return nil
}

func tableExists(meta MySQLMetadata) (bool, error) {
exists := ""

query := `SELECT EXISTS (
SELECT * FROM ? WHERE TABLE_NAME = ?
) AS 'exists'`
err := meta.Db.QueryRow(query, meta.DataBaseName, meta.TableName).Scan(&exists)

return exists == "1", err
}
169 changes: 169 additions & 0 deletions components/sequencer/mysql/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
//
// Copyright 2021 Layotto Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql

import (
"database/sql"
"fmt"
"sync"

"mosn.io/pkg/log"

"mosn.io/layotto/components/pkg/utils"
"mosn.io/layotto/components/sequencer"
)

var lock sync.Mutex

type MySQLSequencer struct {
metadata utils.MySQLMetadata
biggerThan map[string]int64
logger log.ErrorLogger
db *sql.DB
}

func NewMySQLSequencer(logger log.ErrorLogger) *MySQLSequencer {
s := &MySQLSequencer{
logger: logger,
}

return s
}

func (e *MySQLSequencer) Init(config sequencer.Configuration) error {

m, err := utils.ParseMySQLMetadata(config.Properties)

if err != nil {
return err
}
e.metadata = m
e.metadata.Db = e.db
e.biggerThan = config.BiggerThan

if err = utils.NewMySQLClient(e.metadata); err != nil {
return err
}

if len(e.biggerThan) > 0 {
var Key string
var Value int64
for k, bt := range e.biggerThan {
if bt > 0 {
m.Db.QueryRow(fmt.Sprintf("SELECT sequencer_key,sequencer_value FROM %s WHERE sequencer_key = ?", m.TableName), k).Scan(&Key, &Value)
if Value < bt {
return fmt.Errorf("mysql sequencer error: can not satisfy biggerThan guarantee.key: %s,key in MySQL: %s", k, Key)
}
}
}
}
return nil
}

func (e *MySQLSequencer) GetNextId(req *sequencer.GetNextIdRequest) (*sequencer.GetNextIdResponse, error) {

lock.Lock()
defer lock.Unlock()

metadata, err := utils.ParseMySQLMetadata(req.Metadata)
metadata.Db = e.db
var Key string
var Value int64
var Version int64
var oldVersion int64
if err != nil {
return nil, err
}
begin, err := metadata.Db.Begin()
if err != nil {
return nil, err
}
err = begin.QueryRow("SELECT sequencer_key, sequencer_value, version FROM ? WHERE sequencer_key = ?", metadata.TableName, req.Key).Scan(&Key, &Value, &oldVersion)

if err == sql.ErrNoRows {
Value = 1
Version = 1
_, err := begin.Exec("INSERT INTO ?(sequencer_key, sequencer_value, version) VALUES(?,?,?)", metadata.TableName, req.Key, Value, Version)
GimmeCyy marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
} else {
_, err := begin.Exec("UPDATE ? SET sequencer_value +=1, version += 1 WHERE sequencer_key = ? and version = ?", metadata.TableName, req.Key, oldVersion)
if err != nil {
return nil, err
}
}

defer e.Close(metadata.Db)

return &sequencer.GetNextIdResponse{
NextId: Value,
}, nil
}

func (e *MySQLSequencer) GetSegment(req *sequencer.GetSegmentRequest) (support bool, result *sequencer.GetSegmentResponse, err error) {

lock.Lock()
defer lock.Unlock()

if req.Size == 0 {
return true, nil, nil
}

metadata, err := utils.ParseMySQLMetadata(req.Metadata)

var Key string
var Value int64
var Version int64
var oldVersion int64
if err != nil {
return false, nil, err
}

metadata.Db = e.db

begin, err := metadata.Db.Begin()
if err != nil {
return false, nil, err
}

err = begin.QueryRow("SELECT sequencer_key, sequencer_value, version FROM ? WHERE sequencer_key == ?", metadata.TableName, req.Key).Scan(&Key, &Value, &oldVersion)
if err == sql.ErrNoRows {
Value = int64(req.Size)
Version = 1
_, err := begin.Exec("INSERT INTO ?(sequencer_key, sequencer_value, version) VALUES(?,?,?)", metadata.TableName, req.Key, Value, Version)
if err != nil {
return false, nil, err
}

} else {
Value += int64(req.Size)
_, err1 := begin.Exec("UPDATE ? SET sequencer_value = ?, version += 1 WHERE sequencer_key = ? AND version = ?", metadata.TableName, Value, req.Key, oldVersion)
if err1 != nil {
return false, nil, err1
}
}

defer e.Close(metadata.Db)

return false, &sequencer.GetSegmentResponse{
From: Value - int64(req.Size) + 1,
To: Value,
}, nil
}

func (e *MySQLSequencer) Close(db *sql.DB) error {

return db.Close()
}
Loading