Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement distributied lock with redis cluster #284

Merged
merged 17 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
),
// Lock
runtime.WithLockFactory(
runtime_lock.NewFactory("redis_cluster", func() lock.LockStore {
return lock_redis.NewClusterRedisLock(log.DefaultLogger)
}),
runtime_lock.NewFactory("redis", func() lock.LockStore {
return lock_redis.NewStandaloneRedisLock(log.DefaultLogger)
}),
Expand Down
3 changes: 3 additions & 0 deletions components/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ require (
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/server/v3 v3.5.0
golang.org/x/mod v0.5.1 // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5 // indirect
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 // indirect
golang.org/x/tools v0.1.7 // indirect
google.golang.org/grpc v1.38.0
mosn.io/api v0.0.0-20210714065837-5b4c2d66e70c
mosn.io/mosn v0.24.1-0.20210928035557-38b3b922b595
Expand Down
8 changes: 8 additions & 0 deletions components/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e h1:oIpIX9VKxSCFrfjsKpluGbNPBGq9iNnT9crH781j9wY=
github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
Expand Down Expand Up @@ -870,6 +871,8 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38=
golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180406214816-61147c48b25b/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -918,6 +921,7 @@ golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -1006,6 +1010,8 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e h1:WUoyKPm6nCo1BnNUvPGnFG3T5DUVem42yDJZZ4CNxMA=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -1083,6 +1089,8 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.7 h1:6j8CgantCy3yc8JGBqkDLMKWqZ0RDU2g1HVgacojGWQ=
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
216 changes: 216 additions & 0 deletions components/lock/redis/cluster_redis_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package redis

import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"mosn.io/layotto/components/lock"
"mosn.io/layotto/components/pkg/utils"
"mosn.io/pkg/log"
"strings"
"sync"
"time"
)

//RedLock
//at least 5 hosts
type ClusterRedisLock struct {
clients []*redis.Client
metadata utils.RedisClusterMetadata
replicas int
seeflood marked this conversation as resolved.
Show resolved Hide resolved

features []lock.Feature
logger log.ErrorLogger

ctx context.Context
cancel context.CancelFunc
}

// NewClusterRedisLock returns a new redis lock store
func NewClusterRedisLock(logger log.ErrorLogger) *ClusterRedisLock {
s := &ClusterRedisLock{
features: make([]lock.Feature, 0),
logger: logger,
}

return s
}

type resultMsg struct {
error error
host string
lockStatus bool
unlockStatus lock.LockStatus
}

func (c *ClusterRedisLock) Init(metadata lock.Metadata) error {
m, err := utils.ParseRedisClusterMetadata(metadata.Properties)
if err != nil {
return err
}
c.metadata = m
c.clients = utils.NewClusterRedisClient(m)
c.ctx, c.cancel = context.WithCancel(context.Background())

for i, client := range c.clients {
if _, err = client.Ping(c.ctx).Result(); err != nil {
return fmt.Errorf("[ClusterRedisLock]: error connecting to redis at %s: %s", c.metadata.Hosts[i], err)
}
}
return err
}

func (c *ClusterRedisLock) Features() []lock.Feature {
return c.features
}

func (c *ClusterRedisLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
//try to get lock on all redis nodes
intervalStart := time.Now().UnixNano() / 1e6
whalesongAndLittleFish marked this conversation as resolved.
Show resolved Hide resolved
intervalLimit := int64(req.Expire) * 1000 / 10
whalesongAndLittleFish marked this conversation as resolved.
Show resolved Hide resolved
wg := sync.WaitGroup{}
wg.Add(len(c.clients))

//resultChan will be used to collect results of getting lock
resultChan := make(chan resultMsg, len(c.clients))

//getting lock concurrently
for i := range c.clients {
go c.LockSingleRedis(i, req, &wg, resultChan)
whalesongAndLittleFish marked this conversation as resolved.
Show resolved Hide resolved
}
wg.Wait()
intervalEnd := time.Now().UnixNano() / 1e6
whalesongAndLittleFish marked this conversation as resolved.
Show resolved Hide resolved

//make sure time interval of locking far less than expire time
if intervalLimit < intervalEnd-intervalStart {
_, _ = c.UnlockAllRedis(&lock.UnlockRequest{
ResourceId: req.ResourceId,
LockOwner: req.LockOwner,
}, &wg)
return &lock.TryLockResponse{
Success: false,
}, fmt.Errorf("[ClusterRedisLock]: lock timeout. ResourceId: %s", req.ResourceId)
}
close(resultChan)

successCount := 0
errorStrs := make([]string, 0, len(c.clients))
for msg := range resultChan {
if msg.error != nil {
errorStrs = append(errorStrs, msg.error.Error())
continue
}
if msg.lockStatus {
successCount++
}
}
var err error
if len(errorStrs) > 0 {
err = fmt.Errorf(strings.Join(errorStrs, "\n"))
}

//getting lock on majority of redis cluster will be regarded as locking success
if successCount*2 > len(c.clients) {
return &lock.TryLockResponse{
Success: true,
}, err
} else {
_, unlockErr := c.UnlockAllRedis(&lock.UnlockRequest{
ResourceId: req.ResourceId,
LockOwner: req.LockOwner,
}, &wg)
if unlockErr != nil {
errorStrs = append(errorStrs, unlockErr.Error())
err = fmt.Errorf(strings.Join(errorStrs, "\n"))
}
return &lock.TryLockResponse{
Success: false,
}, err
}
}

func (c *ClusterRedisLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
wg := sync.WaitGroup{}
status, err := c.UnlockAllRedis(req, &wg)
whalesongAndLittleFish marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return newInternalErrorUnlockResponse(), err
}
return &lock.UnlockResponse{
Status: status,
}, nil
}

func (c *ClusterRedisLock) UnlockAllRedis(req *lock.UnlockRequest, wg *sync.WaitGroup) (lock.LockStatus, error) {
wg.Add(len(c.clients))
ch := make(chan resultMsg, len(c.clients))

//unlock concurrently
for i := range c.clients {
go c.UnlockSingleRedis(i, req, wg, ch)
}
wg.Wait()
close(ch)
errorStrs := make([]string, 0, len(c.clients))
status := lock.SUCCESS

//collect result of unlocking
for msg := range ch {
if msg.unlockStatus == lock.INTERNAL_ERROR {
status = msg.unlockStatus
errorStrs = append(errorStrs, msg.error.Error())
}
}
if len(errorStrs) > 0 {
return status, fmt.Errorf(strings.Join(errorStrs, "\n"))
}
return status, nil
}

func (c *ClusterRedisLock) LockSingleRedis(clientIndex int, req *lock.TryLockRequest, wg *sync.WaitGroup, ch chan resultMsg) {
defer wg.Done()
msg := resultMsg{
host: c.metadata.Hosts[clientIndex],
}
nx := c.clients[clientIndex].SetNX(c.ctx, req.ResourceId, req.LockOwner, time.Second*time.Duration(req.Expire))
if nx == nil {
msg.error = fmt.Errorf("[ClusterRedisLock]: SetNX returned nil. host: %s \n ResourceId: %s", c.clients[clientIndex], req.ResourceId)
ch <- msg
return
}
if nx.Err() != nil {
msg.error = fmt.Errorf("[ClusterRedisLock]: %s host: %s \n ResourceId: %s", nx.Err().Error(), c.clients[clientIndex], req.ResourceId)
}
msg.lockStatus = nx.Val()
ch <- msg
}

func (c *ClusterRedisLock) UnlockSingleRedis(clientIndex int, req *lock.UnlockRequest, wg *sync.WaitGroup, ch chan resultMsg) {
defer wg.Done()
eval := c.clients[clientIndex].Eval(c.ctx, unlockScript, []string{req.ResourceId}, req.LockOwner)
msg := resultMsg{}
msg.unlockStatus = lock.INTERNAL_ERROR
if eval == nil {
msg.error = fmt.Errorf("[ClusterRedisLock]: Eval unlock script returned nil. host: %s \n ResourceId: %s", c.clients[clientIndex], req.ResourceId)
ch <- msg
return
}
if eval.Err() != nil {
msg.error = fmt.Errorf("[ClusterRedisLock]: %s host: %s \n ResourceId: %s", eval.Err().Error(), c.clients[clientIndex], req.ResourceId)
ch <- msg
return
}
i, err := eval.Int()
if err != nil {
msg.error = err
ch <- msg
return
}
if i >= 0 {
msg.unlockStatus = lock.SUCCESS
} else if i == -1 {
msg.unlockStatus = lock.LOCK_UNEXIST
} else if i == -2 {
msg.unlockStatus = lock.LOCK_BELONG_TO_OTHERS
}
ch <- msg
}
89 changes: 89 additions & 0 deletions components/lock/redis/cluster_redis_lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package redis

import (
miniredis "github.com/alicebob/miniredis/v2"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"mosn.io/layotto/components/lock"
"mosn.io/pkg/log"
"strings"
"sync"
"testing"
)

const cResourceId = "resource_red_lock"

func TestClusterRedisLock_TryLock(t *testing.T) {
// start 5 miniredis instances
redisInstances := make([]*miniredis.Miniredis, 0, 5)
redisAddrs := make([]string, 0, 5)
var err error
for i := 0; i < 5; i++ {
redis, err := miniredis.Run()
assert.NoError(t, err)
redisInstances = append(redisInstances, redis)
redisAddrs = append(redisAddrs, redis.Addr())
}
// construct component
comp := NewClusterRedisLock(log.DefaultLogger)
cfg := lock.Metadata{
Properties: make(map[string]string),
}
cfg.Properties["redisHosts"] = strings.Join(redisAddrs, ",")
cfg.Properties["redisPassword"] = ""
// init
err = comp.Init(cfg)
assert.NoError(t, err)
// 1. client1 trylock
ownerId1 := uuid.New().String()
resp, err := comp.TryLock(&lock.TryLockRequest{
ResourceId: cResourceId,
LockOwner: ownerId1,
Expire: 10,
})
assert.NoError(t, err)
assert.True(t, resp.Success)
var wg sync.WaitGroup
wg.Add(1)
// 2. Client2 tryLock fail
go func() {
owner2 := uuid.New().String()
resp2, err2 := comp.TryLock(&lock.TryLockRequest{
ResourceId: cResourceId,
LockOwner: owner2,
Expire: 10,
})
assert.NoError(t, err2)
assert.False(t, resp2.Success)
wg.Done()
}()
wg.Wait()
// 3. client 1 unlock
unlockResp, err := comp.Unlock(&lock.UnlockRequest{
ResourceId: cResourceId,
LockOwner: ownerId1,
})
assert.NoError(t, err)
assert.True(t, unlockResp.Status == 0, "client1 failed to unlock!")
// 4. client 2 get lock
wg.Add(1)
go func() {
owner2 := uuid.New().String()
resp2, err2 := comp.TryLock(&lock.TryLockRequest{
ResourceId: cResourceId,
LockOwner: owner2,
Expire: 10,
})
assert.NoError(t, err2)
assert.True(t, resp2.Success, "client2 failed to get lock?!")
// 5. client2 unlock
unlockResp, err := comp.Unlock(&lock.UnlockRequest{
ResourceId: cResourceId,
LockOwner: owner2,
})
assert.NoError(t, err)
assert.True(t, unlockResp.Status == 0, "client2 failed to unlock!")
wg.Done()
}()
wg.Wait()
}
Loading