Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add support for etcd v3 api #393

Merged
merged 3 commits into from
Jan 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Anyway it's quite easy to reset a cluster from scratch keeping the current maste
## Requirements

* PostgreSQL 10 or 9 (9.4, 9.5, 9.6)
* etcd >= 2.0 or consul >=0.6
* etcd >= 2.0 or consul >= 0.6


## build
Expand Down
6 changes: 5 additions & 1 deletion cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type CommonConfig struct {
}

func AddCommonFlags(cmd *cobra.Command, cfg *CommonConfig) {
cmd.PersistentFlags().StringVar(&cfg.StoreBackend, "store-backend", "", "store backend type (etcd or consul)")
cmd.PersistentFlags().StringVar(&cfg.StoreBackend, "store-backend", "", "store backend type (etcdv2/etcd, etcdv3 or consul)")
cmd.PersistentFlags().StringVar(&cfg.StoreEndpoints, "store-endpoints", "", "a comma-delimited list of store endpoints (use https scheme for tls communication) (defaults: http://127.0.0.1:2379 for etcd, http://127.0.0.1:8500 for consul)")
cmd.PersistentFlags().StringVar(&cfg.StoreCertFile, "store-cert-file", "", "certificate file for client identification to the store")
cmd.PersistentFlags().StringVar(&cfg.StoreKeyFile, "store-key", "", "private key file for client identification to the store")
Expand All @@ -54,6 +54,10 @@ func CheckCommonConfig(cfg *CommonConfig) error {
switch cfg.StoreBackend {
case "consul":
case "etcd":
// etcd is old alias for etcdv2
cfg.StoreBackend = "etcdv2"
case "etcdv2":
case "etcdv3":
default:
return fmt.Errorf("Unknown store backend: %q", cfg.StoreBackend)
}
Expand Down
12 changes: 6 additions & 6 deletions cmd/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ type PostgresKeeper struct {
sleepInterval time.Duration
requestTimeout time.Duration

e *store.StoreManager
e *store.Store
pgm *postgresql.Manager
stop chan bool
end chan error
Expand All @@ -381,7 +381,7 @@ type PostgresKeeper struct {
func NewPostgresKeeper(cfg *config, stop chan bool, end chan error) (*PostgresKeeper, error) {
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)

kvstore, err := store.NewStore(store.Config{
kvstore, err := store.NewKVStore(store.Config{
Backend: store.Backend(cfg.StoreBackend),
Endpoints: cfg.StoreEndpoints,
CertFile: cfg.StoreCertFile,
Expand All @@ -392,7 +392,7 @@ func NewPostgresKeeper(cfg *config, stop chan bool, end chan error) (*PostgresKe
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
e := store.NewStoreManager(kvstore, storePath)
e := store.NewStore(kvstore, storePath)

// Clean and get absolute datadir path
dataDir, err := filepath.Abs(cfg.dataDir)
Expand Down Expand Up @@ -480,7 +480,7 @@ func (p *PostgresKeeper) updateKeeperInfo() error {

// The time to live is just to automatically remove old entries, it's
// not used to determine if the keeper info has been updated.
if err := p.e.SetKeeperInfo(keeperUID, keeperInfo, p.sleepInterval); err != nil {
if err := p.e.SetKeeperInfo(context.TODO(), keeperUID, keeperInfo, p.sleepInterval); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -651,7 +651,7 @@ func (p *PostgresKeeper) Start() {

var err error
var cd *cluster.ClusterData
cd, _, err = p.e.GetClusterData()
cd, _, err = p.e.GetClusterData(context.TODO())
if err != nil {
log.Errorw("error retrieving cluster data", zap.Error(err))
} else if cd != nil {
Expand Down Expand Up @@ -841,7 +841,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
e := p.e
pgm := p.pgm

cd, _, err := e.GetClusterData()
cd, _, err := e.GetClusterData(pctx)
if err != nil {
log.Errorw("error retrieving cluster data", zap.Error(err))
return
Expand Down
17 changes: 11 additions & 6 deletions cmd/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -100,7 +101,7 @@ type ClusterChecker struct {

listener *net.TCPListener
pp *pollon.Proxy
e *store.StoreManager
e *store.Store
endPollonProxyCh chan error

pollonMutex sync.Mutex
Expand All @@ -109,7 +110,7 @@ type ClusterChecker struct {
func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) {
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)

kvstore, err := store.NewStore(store.Config{
kvstore, err := store.NewKVStore(store.Config{
Backend: store.Backend(cfg.StoreBackend),
Endpoints: cfg.StoreEndpoints,
CertFile: cfg.StoreCertFile,
Expand All @@ -120,7 +121,7 @@ func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) {
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
e := store.NewStoreManager(kvstore, storePath)
e := store.NewStore(kvstore, storePath)

return &ClusterChecker{
uid: uid,
Expand Down Expand Up @@ -189,22 +190,22 @@ func (c *ClusterChecker) sendPollonConfData(confData pollon.ConfData) {
}
}

func (c *ClusterChecker) SetProxyInfo(e *store.StoreManager, generation int64, ttl time.Duration) error {
func (c *ClusterChecker) SetProxyInfo(e *store.Store, generation int64, ttl time.Duration) error {
proxyInfo := &cluster.ProxyInfo{
UID: c.uid,
Generation: generation,
}
log.Debugf("proxyInfo dump: %s", spew.Sdump(proxyInfo))

if err := c.e.SetProxyInfo(proxyInfo, ttl); err != nil {
if err := c.e.SetProxyInfo(context.TODO(), proxyInfo, ttl); err != nil {
return err
}
return nil
}

// Check reads the cluster data and applies the right pollon configuration.
func (c *ClusterChecker) Check() error {
cd, _, err := c.e.GetClusterData()
cd, _, err := c.e.GetClusterData(context.TODO())
if err != nil {
return fmt.Errorf("cannot get cluster data: %v", err)
}
Expand Down Expand Up @@ -311,6 +312,10 @@ func (c *ClusterChecker) Start() error {
checkCh := make(chan error)
timerCh := time.NewTimer(0).C

// TODO(sgotti) TimeoutCecker is needed to forcefully close connection also
// if the Check method is blocked somewhere.
// The idomatic/cleaner solution will be to use a context instead of this
// TimeoutChecker but we have to change the libkv stores to support contexts.
go c.TimeoutChecker(checkOkCh)

for true {
Expand Down
45 changes: 19 additions & 26 deletions cmd/sentinel/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/sorintlab/stolon/pkg/util"

"github.com/davecgh/go-spew/spew"
"github.com/docker/leadership"
"github.com/mitchellh/copystructure"
"github.com/spf13/cobra"
"go.uber.org/zap"
Expand Down Expand Up @@ -96,7 +95,7 @@ func die(format string, a ...interface{}) {
func (s *Sentinel) electionLoop() {
for {
log.Infow("Trying to acquire sentinels leadership")
electedCh, errCh := s.candidate.RunForElection()
electedCh, errCh := s.election.RunForElection()
for {
select {
case elected := <-electedCh:
Expand All @@ -120,6 +119,7 @@ func (s *Sentinel) electionLoop() {
goto end
case <-s.stop:
log.Debugw("stopping election loop")
s.election.Stop()
return
}
}
Expand All @@ -136,22 +136,18 @@ func (s *Sentinel) syncRepl(spec *cluster.ClusterSpec) bool {
return *spec.SynchronousReplication && *spec.Role == cluster.ClusterRoleMaster
}

func (s *Sentinel) setSentinelInfo(ttl time.Duration) error {
func (s *Sentinel) setSentinelInfo(ctx context.Context, ttl time.Duration) error {
sentinelInfo := &cluster.SentinelInfo{
UID: s.uid,
}
log.Debugw("sentinelInfo dump", "sentinelInfo", sentinelInfo)

if err := s.e.SetSentinelInfo(sentinelInfo, ttl); err != nil {
if err := s.e.SetSentinelInfo(ctx, sentinelInfo, ttl); err != nil {
return err
}
return nil
}

func (s *Sentinel) getKeepersInfo(ctx context.Context) (cluster.KeepersInfo, error) {
return s.e.GetKeepersInfo()
}

func (s *Sentinel) SetKeeperError(uid string) {
if _, ok := s.keeperErrorTimers[uid]; !ok {
s.keeperErrorTimers[uid] = timer.Now()
Expand Down Expand Up @@ -1443,11 +1439,11 @@ type DBConvergenceInfo struct {
type Sentinel struct {
uid string
cfg *config
e *store.StoreManager
e *store.Store

candidate *leadership.Candidate
stop chan bool
end chan bool
election store.Election
stop chan bool
end chan bool

lastLeadershipCount uint

Expand Down Expand Up @@ -1493,7 +1489,7 @@ func NewSentinel(uid string, cfg *config, stop chan bool, end chan bool) (*Senti

storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)

kvstore, err := store.NewStore(store.Config{
kvstore, err := store.NewKVStore(store.Config{
Backend: store.Backend(cfg.StoreBackend),
Endpoints: cfg.StoreEndpoints,
CertFile: cfg.StoreCertFile,
Expand All @@ -1504,15 +1500,15 @@ func NewSentinel(uid string, cfg *config, stop chan bool, end chan bool) (*Senti
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
e := store.NewStoreManager(kvstore, storePath)
e := store.NewStore(kvstore, storePath)

candidate := leadership.NewCandidate(kvstore, filepath.Join(storePath, common.SentinelLeaderKey), uid, store.MinTTL)
election := store.NewElection(kvstore, filepath.Join(storePath, common.SentinelLeaderKey), uid)

return &Sentinel{
uid: uid,
cfg: cfg,
e: e,
candidate: candidate,
election: election,
leader: false,
initialClusterSpec: initialClusterSpec,
stop: stop,
Expand All @@ -1539,9 +1535,8 @@ func (s *Sentinel) Start() {
for true {
select {
case <-s.stop:
log.Debugw("stopping stolon sentinel")
log.Infow("stopping stolon sentinel")
cancel()
s.candidate.Stop()
s.end <- true
return
case <-timerCh:
Expand All @@ -1566,7 +1561,7 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) {
defer s.updateMutex.Unlock()
e := s.e

cd, prevCDPair, err := e.GetClusterData()
cd, prevCDPair, err := e.GetClusterData(pctx)
if err != nil {
log.Errorw("error retrieving cluster data", zap.Error(err))
return
Expand Down Expand Up @@ -1599,27 +1594,25 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) {
log.Infow("writing initial cluster data")
newcd := cluster.NewClusterData(c)
log.Debugf("newcd dump: %s", spew.Sdump(newcd))
if _, err = e.AtomicPutClusterData(newcd, nil); err != nil {
if _, err = e.AtomicPutClusterData(pctx, newcd, nil); err != nil {
log.Errorw("error saving cluster data", zap.Error(err))
}
return
}

if err = s.setSentinelInfo(2 * s.sleepInterval); err != nil {
if err = s.setSentinelInfo(pctx, 2*s.sleepInterval); err != nil {
log.Errorw("cannot update sentinel info", zap.Error(err))
return
}

ctx, cancel := context.WithTimeout(pctx, s.requestTimeout)
keepersInfo, err := s.getKeepersInfo(ctx)
cancel()
keepersInfo, err := s.e.GetKeepersInfo(pctx)
if err != nil {
log.Errorw("cannot get keepers info", zap.Error(err))
return
}
log.Debugf("keepersInfo dump: %s", spew.Sdump(keepersInfo))

proxiesInfo, err := s.e.GetProxiesInfo()
proxiesInfo, err := s.e.GetProxiesInfo(pctx)
if err != nil {
log.Errorw("failed to get proxies info", zap.Error(err))
return
Expand Down Expand Up @@ -1661,7 +1654,7 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) {
log.Debugf("newcd dump after updateCluster: %s", spew.Sdump(newcd))

if newcd != nil {
if _, err := e.AtomicPutClusterData(newcd, prevCDPair); err != nil {
if _, err := e.AtomicPutClusterData(pctx, newcd, prevCDPair); err != nil {
log.Errorw("error saving clusterdata", zap.Error(err))
}
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/stolonctl/clusterdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ func init() {
}

func clusterdata(cmd *cobra.Command, args []string) {
e, err := NewStore()
kvStore, err := NewKVStore()
if err != nil {
die("cannot create store: %v", err)
}

e := NewStore(kvStore)

cd, _, err := getClusterData(e)
if err != nil {
die("%v", err)
Expand Down
11 changes: 7 additions & 4 deletions cmd/stolonctl/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"encoding/json"
"io/ioutil"
"os"
Expand Down Expand Up @@ -70,12 +71,14 @@ func initCluster(cmd *cobra.Command, args []string) {
}
}

e, err := NewStore()
kvStore, err := NewKVStore()
if err != nil {
die("cannot create store: %v", err)
}

cd, _, err := e.GetClusterData()
e := NewStore(kvStore)

cd, _, err := e.GetClusterData(context.TODO())
if err != nil {
die("cannot get cluster data: %v", err)
}
Expand All @@ -96,7 +99,7 @@ func initCluster(cmd *cobra.Command, args []string) {
os.Exit(0)
}

cd, _, err = e.GetClusterData()
cd, _, err = e.GetClusterData(context.TODO())
if err != nil {
die("cannot get cluster data: %v", err)
}
Expand All @@ -120,7 +123,7 @@ func initCluster(cmd *cobra.Command, args []string) {
cd = cluster.NewClusterData(c)

// We ignore if cd has been modified between reading and writing
if err := e.PutClusterData(cd); err != nil {
if err := e.PutClusterData(context.TODO(), cd); err != nil {
die("cannot update cluster data: %v", err)
}
}
7 changes: 5 additions & 2 deletions cmd/stolonctl/promote.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"os"

"github.com/sorintlab/stolon/pkg/cluster"
Expand All @@ -40,11 +41,13 @@ func promote(cmd *cobra.Command, args []string) {
die("too many arguments")
}

e, err := NewStore()
kvStore, err := NewKVStore()
if err != nil {
die("cannot create store: %v", err)
}

e := NewStore(kvStore)

accepted := true
if !initOpts.forceYes {
accepted, err = askConfirmation("Are you sure you want to continue? [yes/no] ")
Expand Down Expand Up @@ -82,7 +85,7 @@ func promote(cmd *cobra.Command, args []string) {
}

// retry if cd has been modified between reading and writing
_, err = e.AtomicPutClusterData(cd, pair)
_, err = e.AtomicPutClusterData(context.TODO(), cd, pair)
if err != nil {
if err == libkvstore.ErrKeyModified {
retry++
Expand Down
Loading