Skip to content

Commit

Permalink
Implement consul datasource (#116)
Browse files Browse the repository at this point in the history
Add consul datasource implementation
  • Loading branch information
gorexlv committed Jul 3, 2020
1 parent fd908a5 commit d3f4832
Show file tree
Hide file tree
Showing 7 changed files with 449 additions and 2 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ dist: xenial
sudo: required

go:
- 1.12.x
- 1.13.x
- 1.14.x
env:
Expand Down
140 changes: 140 additions & 0 deletions ext/datasource/consul/consul.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package consul

import (
"context"
"errors"
"time"

"github.com/alibaba/sentinel-golang/ext/datasource"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/hashicorp/consul/api"
)

type consulDataSource struct {
datasource.Base
*options

propertyKey string
kvQuerier KVQuerier
isInitialized util.AtomicBool
cancel context.CancelFunc
queryOptions api.QueryOptions
}

var (
ErrNilConsulClient = errors.New("nil consul client")
ErrInvalidConsulConfig = errors.New("invalid consul config")

logger = logging.GetDefaultLogger()
)

// NewDatasource returns new consul datasource instance
func NewDatasource(propertyKey string, opts ...Option) (datasource.DataSource, error) {
var options = evaluateOptions(opts)
// if not consul client is specified, initialize from the configuration
if options.consulClient == nil {
if options.consulConfig == nil {
return nil, ErrInvalidConsulConfig
}
client, err := api.NewClient(options.consulConfig)
if err != nil {
return nil, err
}
options.consulClient = client
}

// still no consul client, throw error
if options.consulClient == nil {
return nil, ErrNilConsulClient
}
return newConsulDataSource(propertyKey, options), nil
}

func newConsulDataSource(propertyKey string, options *options) *consulDataSource {
ctx, cancel := context.WithCancel(options.queryOptions.Context())
ds := &consulDataSource{
propertyKey: propertyKey,
options: options,
kvQuerier: options.consulClient.KV(),
cancel: cancel,
queryOptions: *options.queryOptions.WithContext(ctx),
}

for _, h := range options.propertyHandlers {
ds.AddPropertyHandler(h)
}
return ds
}

// ReadSource implement datasource.DataSource interface
func (c *consulDataSource) ReadSource() ([]byte, error) {
pair, meta, err := c.kvQuerier.Get(c.propertyKey, &c.queryOptions)

if err != nil {
return nil, err
}

c.queryOptions.WaitIndex = meta.LastIndex
if pair == nil {
return []byte{}, nil
}
return pair.Value, nil
}

// Initialize implement datasource.DataSource interface
func (c *consulDataSource) Initialize() error {
if !c.isInitialized.CompareAndSet(false, true) {
return errors.New("duplicate initialize consul datasource")
}
if err := c.doReadAndUpdate(); err != nil {
logger.Errorf("[consul] doReadAndUpdate failed: %s", err.Error())
return err
}

if !c.disableWatch {
go util.RunWithRecover(c.watch, logger)
}
return nil
}

func (c *consulDataSource) watch() {
for {
if err := c.doReadAndUpdate(); err != nil {
if errors.Is(err, context.Canceled) {
return
}

if api.IsRetryableError(err) {
logger.Warnf("[consul] doUpdate failed with retryable error: %s", err.Error())
time.Sleep(time.Second)
continue
}

logger.Errorf("[consul] doUpdate failed: %s", err.Error())
}
}
}

func (c *consulDataSource) doUpdate(src []byte) (err error) {
if len(src) == 0 {
return c.Handle(nil)
}
return c.Handle(src)
}

func (c *consulDataSource) doReadAndUpdate() (err error) {
src, err := c.ReadSource()
if err != nil {
return err
}
return c.doUpdate(src)
}

func (c *consulDataSource) Close() error {
if c.cancel != nil {
c.cancel()
}
logger.Info("[consul] close consul datasource")
return nil
}
61 changes: 61 additions & 0 deletions ext/datasource/consul/consul_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package consul

import (
"time"

"github.com/hashicorp/consul/api"
)

func Example_consulDatasource_CustomizeClient() {
client, err := api.NewClient(&api.Config{
Address: "127.0.0.1:8500",
})
if err != nil {
// todo something
}
ds, err := NewDatasource("property_key",
// customize consul client
WithConsulClient(client),
// disable dynamic datasource watch
WithDisableWatch(true),
// preset property handlers
WithPropertyHandlers(),
// reset queryOptions, defaultQueryOptions as default
WithQueryOptions(&api.QueryOptions{}),
)

if err != nil {
// todo something
}

if err := ds.Initialize(); err != nil {
// todo something
}
}

func Example_consulDatasource_CustomizeConfig() {
ds, err := NewDatasource("property_key",
// customize consul config
WithConsulConfig(&api.Config{
Address: "127.0.0.1:8500",
}),
// disable dynamic datasource watch
WithDisableWatch(true),
// preset property handlers
WithPropertyHandlers(),
// reset queryOptions, defaultQueryOptions as default
WithQueryOptions(&api.QueryOptions{
WaitIndex: 0,
// override default WaitTime(5min)
WaitTime: time.Second * 90,
}),
)

if err != nil {
// todo something
}

if err := ds.Initialize(); err != nil {
// todo something
}
}
117 changes: 117 additions & 0 deletions ext/datasource/consul/consul_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package consul

import (
"sync"
"testing"
"time"

"github.com/alibaba/sentinel-golang/core/system"
"github.com/alibaba/sentinel-golang/ext/datasource"
"github.com/hashicorp/consul/api"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

const (
TestSystemRules = `[
{
"id": 0,
"metricType": 0,
"adaptiveStrategy": 0
},
{
"id": 1,
"metricType": 0,
"adaptiveStrategy": 0
},
{
"id": 2,
"metricType": 0,
"adaptiveStrategy": 0
}
]`
)

var (
SystemRules = []*system.SystemRule{
{MetricType: 0, Strategy: 0},
{MetricType: 0, Strategy: 0},
{MetricType: 0, Strategy: 0},
}
)

type consulClientMock struct {
mock.Mock
pair *api.KVPair
lock sync.Mutex
}

func (c *consulClientMock) Get(key string, q *api.QueryOptions) (*api.KVPair, *api.QueryMeta, error) {
c.lock.Lock()
defer c.lock.Unlock()
return c.pair, &api.QueryMeta{
LastIndex: c.pair.ModifyIndex,
LastContentHash: "",
LastContact: 0,
KnownLeader: false,
RequestTime: 0,
AddressTranslationEnabled: false,
CacheHit: false,
CacheAge: 0,
}, nil
}

func (c consulClientMock) List(prefix string, q *api.QueryOptions) (api.KVPairs, *api.QueryMeta, error) {
panic("implement me")
}

func newQuerierMock() *consulClientMock {
return &consulClientMock{
pair: &api.KVPair{
Key: "property_key",
CreateIndex: 0,
ModifyIndex: 0,
LockIndex: 0,
Flags: 0,
Value: []byte(TestSystemRules),
Session: "",
},
}
}

func (c *consulClientMock) resetPair(pair *api.KVPair) {
c.lock.Lock()
defer c.lock.Unlock()
c.pair = pair
}

func TestConsulDatasource(t *testing.T) {
mock := newQuerierMock()
ds := newConsulDataSource("property_key", evaluateOptions([]Option{}))
ds.kvQuerier = mock

ds.AddPropertyHandler(datasource.NewDefaultPropertyHandler(
datasource.SystemRulesJsonConverter,
func(rule interface{}) error {
assert.NotNil(t, rule)
assert.EqualValues(t, SystemRules, rule)
return nil
},
))

assert.Nil(t, ds.Initialize())
assert.EqualError(t, ds.Initialize(), "duplicate initialize consul datasource")

t.Run("WatchSourceChange", func(t *testing.T) {
mock.resetPair(&api.KVPair{
Key: "property_key",
CreateIndex: 0,
ModifyIndex: 1,
LockIndex: 0,
Flags: 0,
Value: []byte(TestSystemRules),
Session: "",
})
})
time.Sleep(time.Second)
}
Loading

0 comments on commit d3f4832

Please sign in to comment.