Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add sequencer api component by mysql #509

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c0768f5
feature: add sequencer api component by mysql
GimmeCyy Apr 29, 2022
05e6e7c
Merge branch 'main' into feature/mysql
seeflood May 1, 2022
70751ae
Merge branch 'main' into feature/mysql
seeflood May 2, 2022
70b0751
Merge branch 'main' into feature/mysql
seeflood May 5, 2022
37f7442
Merge branch 'main' into feature/mysql
seeflood May 5, 2022
4acc0ef
feature: add sequencer api component by mysql
GimmeCyy May 6, 2022
152a815
Merge remote-tracking branch 'origin/feature/mysql' into feature/mysql
GimmeCyy May 6, 2022
04e6bdc
feature: add sequencer api component by mysql
GimmeCyy May 6, 2022
810b6fc
feature: add sequencer api component by mysql
GimmeCyy Apr 29, 2022
90f272f
feature: add sequencer api component by mysql
GimmeCyy May 6, 2022
0eb449e
feature: add sequencer api component by mysql
GimmeCyy May 6, 2022
dae9d55
Merge branch 'main' into feature/mysql
seeflood May 10, 2022
47dc077
Merge branch 'main' into feature/mysql
seeflood May 10, 2022
f32884d
feature: add sequencer api component by mysql
GimmeCyy May 11, 2022
ba9bd33
Merge remote-tracking branch 'origin/feature/mysql' into feature/mysql
GimmeCyy May 11, 2022
ae98100
Merge branch 'main' into feature/mysql
seeflood May 11, 2022
4e37ef9
fix: sequencer api component by mysql
GimmeCyy May 11, 2022
9fc22e9
Merge remote-tracking branch 'origin/feature/mysql' into feature/mysql
GimmeCyy May 11, 2022
2bdeef7
fix: sequencer api component by mysql
GimmeCyy May 13, 2022
131a665
Merge branch 'main' into feature/mysql
seeflood May 14, 2022
77fba98
Merge branch 'main' into feature/mysql
seeflood May 19, 2022
681ed86
Merge branch 'main' into feature/mysql
seeflood May 23, 2022
d35ec1a
Merge branch 'main' into feature/mysql
seeflood May 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions components/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module mosn.io/layotto/components
go 1.14

require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/alicebob/miniredis/v2 v2.16.0
github.com/aliyun/aliyun-oss-go-sdk v2.1.8+incompatible
github.com/apache/dubbo-go-hessian2 v1.7.0
Expand All @@ -12,6 +13,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.16.0
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect
github.com/go-redis/redis/v8 v8.8.0
github.com/go-sql-driver/mysql v1.5.0
github.com/go-zookeeper/zk v1.0.2
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
Expand Down
3 changes: 3 additions & 0 deletions components/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:EXuID2Zs0p
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
Expand Down Expand Up @@ -263,6 +265,7 @@ github.com/go-redis/redis/v8 v8.8.0/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqW
github.com/go-resty/resty/v2 v2.6.0/go.mod h1:PwvJS6hvaPkjtjNg9ph+VrSD92bi5Zq73w/BIH7cC3Q=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
Expand Down
81 changes: 81 additions & 0 deletions components/pkg/utils/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//
// Copyright 2021 Layotto Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils

import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)

var (
Db *sql.DB
)

const (
defaultTableName = "tableName"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change defaultTableName to another? such as layotto_sequencer

tableNameKey = "tableName"
connectionStringKey = "connectionString"
dataBaseName = "dataBaseName"
)

type MySQLMetadata struct {
TableName string
ConnectionString string
DataBaseName string
Db *sql.DB
}

func ParseMySQLMetadata(properties map[string]string, db *sql.DB) (MySQLMetadata, error) {
m := MySQLMetadata{}

if val, ok := properties[tableNameKey]; ok && val != "" {
m.TableName = val
}

if val, ok := properties[connectionStringKey]; ok && val != "" {
m.ConnectionString = val
}

if val, ok := properties[dataBaseName]; ok && val != "" {
m.DataBaseName = val
}
m.Db = db
return m, nil
}

func NewMySQLClient(meta MySQLMetadata) error {

val := meta

if val.TableName == "" {
val.TableName = defaultTableName
}

return tableExists(meta)
}

func tableExists(meta MySQLMetadata) error {
exists := ""

query := `SELECT EXISTS (
SELECT * FROM ? WHERE TABLE_NAME = ?
) AS 'exists'`
err := meta.Db.QueryRow(query, meta.DataBaseName, meta.TableName).Scan(&exists)

if exists == "1" {
return nil
} else {
return err
}
}
156 changes: 156 additions & 0 deletions components/sequencer/mysql/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
//
// Copyright 2021 Layotto Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mysql

import (
"context"
"database/sql"
"fmt"
"mosn.io/layotto/components/pkg/utils"
"mosn.io/layotto/components/sequencer"
"mosn.io/pkg/log"
)

type MySQLSequencer struct {
metadata utils.MySQLMetadata
biggerThan map[string]int64
logger log.ErrorLogger
ctx context.Context
cancel context.CancelFunc
GimmeCyy marked this conversation as resolved.
Show resolved Hide resolved
}

func NewMySQLSequencer(logger log.ErrorLogger) *MySQLSequencer {
GimmeCyy marked this conversation as resolved.
Show resolved Hide resolved
s := &MySQLSequencer{
logger: logger,
}

return s
}

func (e *MySQLSequencer) Init(config sequencer.Configuration, db *sql.DB) error {
GimmeCyy marked this conversation as resolved.
Show resolved Hide resolved
GimmeCyy marked this conversation as resolved.
Show resolved Hide resolved

m, err := utils.ParseMySQLMetadata(config.Properties, db)

if err != nil {
return err
}
e.metadata = m
e.biggerThan = config.BiggerThan

if err = utils.NewMySQLClient(e.metadata); err != nil {
return err
}
e.ctx, e.cancel = context.WithCancel(context.Background())

if len(e.biggerThan) > 0 {
var Key string
var Value int64
for k, bt := range e.biggerThan {
if bt <= 0 {
continue
} else {
m.Db.QueryRow(fmt.Sprintf("SELECT * FROM %s WHERE sequencer_key = ?", m.TableName), k).Scan(&Key, &Value)
GimmeCyy marked this conversation as resolved.
Show resolved Hide resolved
if Value < bt {
return fmt.Errorf("MySQL sequencer error: can not satisfy biggerThan guarantee.key: %s,key in MySQL: %s", k, Key)
}
}
}
}
return nil
}

func (e *MySQLSequencer) GetNextId(req *sequencer.GetNextIdRequest, db *sql.DB) (*sequencer.GetNextIdResponse, error) {
GimmeCyy marked this conversation as resolved.
Show resolved Hide resolved

metadata, err := utils.ParseMySQLMetadata(req.Metadata, db)

var Key string
var Value int64
if err != nil {
return nil, err
}

begin, err := metadata.Db.Begin()
if err != nil {
return nil, err
}

err = begin.QueryRow("SELECT sequencer_key,sequencer_value FROM ? WHERE sequencer_key = ?", metadata.TableName, req.Key).Scan(&Key, &Value)
if err != nil {
return nil, err
}

Value += 1
_, err1 := begin.Exec("UPDATE ? SET sequencer_value = ? WHERE sequencer_key = ?", metadata.TableName, Value, req.Key)
GimmeCyy marked this conversation as resolved.
Show resolved Hide resolved
if err1 != nil {
return nil, err1
}

Value += 1
GimmeCyy marked this conversation as resolved.
Show resolved Hide resolved
if _, err2 := begin.Exec("INSERT INTO ?(sequencer_key, sequencer_value) VALUES(?,?)", metadata.TableName, req.Key, Value); err2 != nil {
return nil, err2
}

e.Close(metadata.Db)
GimmeCyy marked this conversation as resolved.
Show resolved Hide resolved

return &sequencer.GetNextIdResponse{
NextId: Value,
}, nil
}

func (e *MySQLSequencer) GetSegment(req *sequencer.GetSegmentRequest, db *sql.DB) (support bool, result *sequencer.GetSegmentResponse, err error) {
GimmeCyy marked this conversation as resolved.
Show resolved Hide resolved

if req.Size == 0 {
return true, nil, nil
}

metadata, err := utils.ParseMySQLMetadata(req.Metadata, db)

var Key string
var Value int64
if err != nil {
return false, nil, err
}

begin, err := metadata.Db.Begin()
if err != nil {
return false, nil, err
}

err = begin.QueryRow("SELECT sequencer_key,sequencer_value FROM ? WHERE sequencer_key = ?", metadata.TableName, req.Key).Scan(&Key, &Value)
if err != nil {
return false, nil, err
}
Value += int64(req.Size)

_, err1 := begin.Exec("UPDATE ? SET sequencer_value = ? WHERE sequencer_key = ?", metadata.TableName, Value, req.Key)
if err1 != nil {
return false, nil, err1
}

if _, err2 := begin.Exec("INSERT INTO ?(sequencer_key, sequencer_value) VALUES(?,?)", metadata.TableName, req.Key, Value+1); err2 != nil {
return false, nil, err2
}

e.Close(metadata.Db)
GimmeCyy marked this conversation as resolved.
Show resolved Hide resolved

return false, &sequencer.GetSegmentResponse{
From: Value - int64(req.Size) + 1,
To: Value,
}, nil
}

func (e *MySQLSequencer) Close(db *sql.DB) error {

return db.Close()
}
Loading