Skip to content

Commit

Permalink
feature: implement sequencer api with mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
GimmeCyy committed Jul 4, 2022
1 parent 2c3fff6 commit 667c39c
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 62 deletions.
58 changes: 24 additions & 34 deletions components/pkg/utils/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@ var (
const (
defaultTableName = "layotto_sequencer"
defaultTableNameKey = "tableName"
connectionStringKey = "connectionString"
dataBaseName = "dataBaseName"
userName = "userName"
defaultPassword = "password"
mysqlUrl = "mysqlUrl"
)

type MySQLMetadata struct {
TableName string
ConnectionString string
DataBaseName string
Db *sql.DB
TableName string
DataBaseName string
UserName string
Password string
MysqlUrl string
Db *sql.DB
}

func ParseMySQLMetadata(properties map[string]string) (MySQLMetadata, error) {
Expand All @@ -42,50 +46,36 @@ func ParseMySQLMetadata(properties map[string]string) (MySQLMetadata, error) {
if val, ok := properties[defaultTableNameKey]; ok && val != "" {
m.TableName = val
}

if val, ok := properties[connectionStringKey]; ok && val != "" {
m.ConnectionString = val
}

if val, ok := properties[dataBaseName]; ok && val != "" {
m.DataBaseName = val
}
if val, ok := properties[userName]; ok && val != "" {
m.UserName = val
}
if val, ok := properties[defaultPassword]; ok && val != "" {
m.Password = val
}
if val, ok := properties[mysqlUrl]; ok && val != "" {
m.MysqlUrl = val
}
return m, nil
}

func NewMySQLClient(meta MySQLMetadata) error {

val := meta

if val.TableName == "" {
val.TableName = defaultTableName
}

exists, err := tableExists(val)
if err != nil {
return err
}
if !exists {
createTable := fmt.Sprintf(`CREATE TABLE %s (
meta.Db.Begin()
createTable := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
sequencer_key VARCHAR(255),
sequencer_value INT,
UNIQUE INDEX (sequencer_key));`, val.TableName)
_, err = meta.Db.Exec(createTable)
if err != nil {
return err
}
_, err := meta.Db.Exec(createTable)
defer meta.Db.Close()
if err != nil {
return err
}

return nil
}

func tableExists(meta MySQLMetadata) (bool, error) {
exists := ""

query := `SELECT EXISTS (
SELECT * FROM ? WHERE TABLE_NAME = ?
) AS 'exists'`
err := meta.Db.QueryRow(query, meta.DataBaseName, meta.TableName).Scan(&exists)

return exists == "1", err
}
73 changes: 45 additions & 28 deletions components/sequencer/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ import (
)

const (
MySQLUrl = "localhost:xxxxx"
Value = 1
Key = "sequenceKey"
Size = 50
Version = 1
tableName = "layotto_sequencer"
connectionString = "connectionString"
dataBaseName = "layotto"
MySQLUrl = "localhost:3306"
Value = 1
Key = "sequenceKey"
Size = 50
Version = 1
tableName = "layotto_sequencer"
dataBaseName = "layotto"
userName = "root"
password = "123456"
)

func TestMySQLSequencer_Init(t *testing.T) {
Expand All @@ -43,7 +44,6 @@ func TestMySQLSequencer_Init(t *testing.T) {
if err != nil {
t.Fatalf("an error '%s' was not expected when opening a stub database connection", err)
}
defer db.Close()

comp := NewMySQLSequencer(log.DefaultLogger)
comp.db = db
Expand All @@ -52,14 +52,15 @@ func TestMySQLSequencer_Init(t *testing.T) {
Properties: make(map[string]string),
BiggerThan: make(map[string]int64),
}
mock.ExpectBegin()
mock.ExpectExec("CREATE TABLE IF NOT EXISTS").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

rows := sqlmock.NewRows([]string{"exists"}).AddRow(0)
mock.ExpectQuery("SELECT EXISTS").WillReturnRows(rows)
mock.ExpectExec("CREATE TABLE").WillReturnResult(sqlmock.NewResult(1, 1))

cfg.Properties["tableNameKey"] = tableName
cfg.Properties["connectionString"] = connectionString
cfg.Properties["tableName"] = tableName
cfg.Properties["dataBaseName"] = dataBaseName
cfg.Properties["userName"] = userName
cfg.Properties["password"] = password
cfg.Properties["mysqlUrl"] = MySQLUrl
err = comp.Init(cfg)

assert.Nil(t, err)
Expand All @@ -79,14 +80,16 @@ func TestMySQLSequencer_GetNextId(t *testing.T) {
mock.ExpectBegin()
mock.ExpectQuery("SELECT").WillReturnRows(rows)
mock.ExpectExec("UPDATE").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("INSERT INTO").WithArgs("layotto_sequencer", "sequenceKey", Value, Version).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("INSERT INTO").WithArgs(tableName, Key, Value, Version).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

properties := make(map[string]string)

properties["tableName"] = tableName
properties["connectionString"] = connectionString
properties["dataBaseName"] = dataBaseName
properties["userName"] = userName
properties["password"] = password
properties["mysqlUrl"] = MySQLUrl

req := &sequencer.GetNextIdRequest{Key: Key, Options: sequencer.SequencerOptions{AutoIncrement: sequencer.STRONG}, Metadata: properties}

Expand Down Expand Up @@ -114,14 +117,16 @@ func TestMySQLSequencer_GetSegment(t *testing.T) {
mock.ExpectBegin()
mock.ExpectQuery("SELECT").WillReturnRows(rows)
mock.ExpectExec("UPDATE").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("INSERT INTO").WithArgs("layotto_sequencer", Key, Size, Version).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("INSERT INTO").WithArgs(tableName, Key, Size, Version).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

properties := make(map[string]string)

properties["tableName"] = tableName
properties["connectionString"] = connectionString
properties["dataBaseName"] = dataBaseName
properties["userName"] = userName
properties["password"] = password
properties["mysqlUrl"] = MySQLUrl

req := &sequencer.GetSegmentRequest{Size: Size, Key: Key, Options: sequencer.SequencerOptions{AutoIncrement: sequencer.STRONG}, Metadata: properties}

Expand Down Expand Up @@ -169,14 +174,16 @@ func TestMySQLSequencer_Segment_Insert(t *testing.T) {

mock.ExpectBegin()
mock.ExpectQuery("SELECT").WillReturnError(sql.ErrNoRows)
mock.ExpectExec("INSERT INTO").WithArgs("layotto_sequencer", Key, Size, Version).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("INSERT INTO").WithArgs(tableName, Key, Size, Version).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

properties := make(map[string]string)

properties["tableName"] = tableName
properties["connectionString"] = connectionString
properties["dataBaseName"] = dataBaseName
properties["userName"] = userName
properties["password"] = password
properties["mysqlUrl"] = MySQLUrl

segmentReq := &sequencer.GetSegmentRequest{Size: Size, Key: Key, Options: sequencer.SequencerOptions{AutoIncrement: sequencer.STRONG}, Metadata: properties}
comp.db = db
Expand All @@ -198,14 +205,16 @@ func TestMySQLSequencer_GetNextId_Insert(t *testing.T) {

mock.ExpectBegin()
mock.ExpectQuery("SELECT").WillReturnError(sql.ErrNoRows)
mock.ExpectExec("INSERT INTO").WithArgs("layotto_sequencer", "sequenceKey", Value, Version).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("INSERT INTO").WithArgs(tableName, Key, Value, Version).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

properties := make(map[string]string)

properties["tableName"] = tableName
properties["connectionString"] = connectionString
properties["dataBaseName"] = dataBaseName
properties["userName"] = userName
properties["password"] = password
properties["mysqlUrl"] = MySQLUrl

req := &sequencer.GetNextIdRequest{Key: Key, Options: sequencer.SequencerOptions{AutoIncrement: sequencer.STRONG}, Metadata: properties}

Expand All @@ -231,8 +240,10 @@ func TestMySQLSequencer_GetNextId_InsertError(t *testing.T) {
properties := make(map[string]string)

properties["tableName"] = tableName
properties["connectionString"] = connectionString
properties["dataBaseName"] = dataBaseName
properties["userName"] = userName
properties["password"] = password
properties["mysqlUrl"] = MySQLUrl

req := &sequencer.GetNextIdRequest{Key: Key, Options: sequencer.SequencerOptions{AutoIncrement: sequencer.STRONG}, Metadata: properties}

Expand All @@ -253,14 +264,16 @@ func TestMySQLSequencer_Segment_InsertError(t *testing.T) {

mock.ExpectBegin()
mock.ExpectQuery("SELECT").WillReturnError(sql.ErrNoRows)
mock.ExpectExec("INSERT INTO").WithArgs("layotto_sequencer", Key, Size).WillReturnError(errors.New("insert error"))
mock.ExpectExec("INSERT INTO").WithArgs(tableName, Key, Size).WillReturnError(errors.New("insert error"))
mock.ExpectCommit()

properties := make(map[string]string)

properties["tableName"] = tableName
properties["connectionString"] = connectionString
properties["dataBaseName"] = dataBaseName
properties["userName"] = userName
properties["password"] = password
properties["mysqlUrl"] = MySQLUrl

segmentReq := &sequencer.GetSegmentRequest{Size: Size, Key: Key, Options: sequencer.SequencerOptions{AutoIncrement: sequencer.STRONG}, Metadata: properties}
comp.db = db
Expand All @@ -287,8 +300,10 @@ func TestMySQLSequencer_GetNextId_UpdateError(t *testing.T) {
properties := make(map[string]string)

properties["tableName"] = tableName
properties["connectionString"] = connectionString
properties["dataBaseName"] = dataBaseName
properties["userName"] = userName
properties["password"] = password
properties["mysqlUrl"] = MySQLUrl

req := &sequencer.GetNextIdRequest{Key: Key, Options: sequencer.SequencerOptions{AutoIncrement: sequencer.STRONG}, Metadata: properties}

Expand Down Expand Up @@ -317,8 +332,10 @@ func TestMySQLSequencer_Segment_UpdateError(t *testing.T) {
properties := make(map[string]string)

properties["tableName"] = tableName
properties["connectionString"] = connectionString
properties["dataBaseName"] = dataBaseName
properties["userName"] = userName
properties["password"] = password
properties["mysqlUrl"] = MySQLUrl

req := &sequencer.GetSegmentRequest{Size: Size, Key: Key, Options: sequencer.SequencerOptions{AutoIncrement: sequencer.STRONG}, Metadata: properties}

Expand Down

0 comments on commit 667c39c

Please sign in to comment.