Skip to content

Commit

Permalink
Merge pull request #393 from sgotti/etcd_api_v3
Browse files Browse the repository at this point in the history
*: add support for etcd v3 api
  • Loading branch information
sgotti committed Jan 8, 2018
2 parents 4fadc31 + b88a557 commit a5d6bb4
Show file tree
Hide file tree
Showing 247 changed files with 93,537 additions and 834 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Anyway it's quite easy to reset a cluster from scratch keeping the current maste
## Requirements

* PostgreSQL 10 or 9 (9.4, 9.5, 9.6)
* etcd >= 2.0 or consul >=0.6
* etcd >= 2.0 or consul >= 0.6


## build
Expand Down
6 changes: 5 additions & 1 deletion cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type CommonConfig struct {
}

func AddCommonFlags(cmd *cobra.Command, cfg *CommonConfig) {
cmd.PersistentFlags().StringVar(&cfg.StoreBackend, "store-backend", "", "store backend type (etcd or consul)")
cmd.PersistentFlags().StringVar(&cfg.StoreBackend, "store-backend", "", "store backend type (etcdv2/etcd, etcdv3 or consul)")
cmd.PersistentFlags().StringVar(&cfg.StoreEndpoints, "store-endpoints", "", "a comma-delimited list of store endpoints (use https scheme for tls communication) (defaults: http://127.0.0.1:2379 for etcd, http://127.0.0.1:8500 for consul)")
cmd.PersistentFlags().StringVar(&cfg.StoreCertFile, "store-cert-file", "", "certificate file for client identification to the store")
cmd.PersistentFlags().StringVar(&cfg.StoreKeyFile, "store-key", "", "private key file for client identification to the store")
Expand All @@ -54,6 +54,10 @@ func CheckCommonConfig(cfg *CommonConfig) error {
switch cfg.StoreBackend {
case "consul":
case "etcd":
// etcd is old alias for etcdv2
cfg.StoreBackend = "etcdv2"
case "etcdv2":
case "etcdv3":
default:
return fmt.Errorf("Unknown store backend: %q", cfg.StoreBackend)
}
Expand Down
12 changes: 6 additions & 6 deletions cmd/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ type PostgresKeeper struct {
sleepInterval time.Duration
requestTimeout time.Duration

e *store.StoreManager
e *store.Store
pgm *postgresql.Manager
stop chan bool
end chan error
Expand All @@ -381,7 +381,7 @@ type PostgresKeeper struct {
func NewPostgresKeeper(cfg *config, stop chan bool, end chan error) (*PostgresKeeper, error) {
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)

kvstore, err := store.NewStore(store.Config{
kvstore, err := store.NewKVStore(store.Config{
Backend: store.Backend(cfg.StoreBackend),
Endpoints: cfg.StoreEndpoints,
CertFile: cfg.StoreCertFile,
Expand All @@ -392,7 +392,7 @@ func NewPostgresKeeper(cfg *config, stop chan bool, end chan error) (*PostgresKe
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
e := store.NewStoreManager(kvstore, storePath)
e := store.NewStore(kvstore, storePath)

// Clean and get absolute datadir path
dataDir, err := filepath.Abs(cfg.dataDir)
Expand Down Expand Up @@ -480,7 +480,7 @@ func (p *PostgresKeeper) updateKeeperInfo() error {

// The time to live is just to automatically remove old entries, it's
// not used to determine if the keeper info has been updated.
if err := p.e.SetKeeperInfo(keeperUID, keeperInfo, p.sleepInterval); err != nil {
if err := p.e.SetKeeperInfo(context.TODO(), keeperUID, keeperInfo, p.sleepInterval); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -651,7 +651,7 @@ func (p *PostgresKeeper) Start() {

var err error
var cd *cluster.ClusterData
cd, _, err = p.e.GetClusterData()
cd, _, err = p.e.GetClusterData(context.TODO())
if err != nil {
log.Errorw("error retrieving cluster data", zap.Error(err))
} else if cd != nil {
Expand Down Expand Up @@ -841,7 +841,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
e := p.e
pgm := p.pgm

cd, _, err := e.GetClusterData()
cd, _, err := e.GetClusterData(pctx)
if err != nil {
log.Errorw("error retrieving cluster data", zap.Error(err))
return
Expand Down
17 changes: 11 additions & 6 deletions cmd/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -100,7 +101,7 @@ type ClusterChecker struct {

listener *net.TCPListener
pp *pollon.Proxy
e *store.StoreManager
e *store.Store
endPollonProxyCh chan error

pollonMutex sync.Mutex
Expand All @@ -109,7 +110,7 @@ type ClusterChecker struct {
func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) {
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)

kvstore, err := store.NewStore(store.Config{
kvstore, err := store.NewKVStore(store.Config{
Backend: store.Backend(cfg.StoreBackend),
Endpoints: cfg.StoreEndpoints,
CertFile: cfg.StoreCertFile,
Expand All @@ -120,7 +121,7 @@ func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) {
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
e := store.NewStoreManager(kvstore, storePath)
e := store.NewStore(kvstore, storePath)

return &ClusterChecker{
uid: uid,
Expand Down Expand Up @@ -189,22 +190,22 @@ func (c *ClusterChecker) sendPollonConfData(confData pollon.ConfData) {
}
}

func (c *ClusterChecker) SetProxyInfo(e *store.StoreManager, generation int64, ttl time.Duration) error {
func (c *ClusterChecker) SetProxyInfo(e *store.Store, generation int64, ttl time.Duration) error {
proxyInfo := &cluster.ProxyInfo{
UID: c.uid,
Generation: generation,
}
log.Debugf("proxyInfo dump: %s", spew.Sdump(proxyInfo))

if err := c.e.SetProxyInfo(proxyInfo, ttl); err != nil {
if err := c.e.SetProxyInfo(context.TODO(), proxyInfo, ttl); err != nil {
return err
}
return nil
}

// Check reads the cluster data and applies the right pollon configuration.
func (c *ClusterChecker) Check() error {
cd, _, err := c.e.GetClusterData()
cd, _, err := c.e.GetClusterData(context.TODO())
if err != nil {
return fmt.Errorf("cannot get cluster data: %v", err)
}
Expand Down Expand Up @@ -311,6 +312,10 @@ func (c *ClusterChecker) Start() error {
checkCh := make(chan error)
timerCh := time.NewTimer(0).C

// TODO(sgotti) TimeoutCecker is needed to forcefully close connection also
// if the Check method is blocked somewhere.
// The idomatic/cleaner solution will be to use a context instead of this
// TimeoutChecker but we have to change the libkv stores to support contexts.
go c.TimeoutChecker(checkOkCh)

for true {
Expand Down
45 changes: 19 additions & 26 deletions cmd/sentinel/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/sorintlab/stolon/pkg/util"

"github.com/davecgh/go-spew/spew"
"github.com/docker/leadership"
"github.com/mitchellh/copystructure"
"github.com/spf13/cobra"
"go.uber.org/zap"
Expand Down Expand Up @@ -96,7 +95,7 @@ func die(format string, a ...interface{}) {
func (s *Sentinel) electionLoop() {
for {
log.Infow("Trying to acquire sentinels leadership")
electedCh, errCh := s.candidate.RunForElection()
electedCh, errCh := s.election.RunForElection()
for {
select {
case elected := <-electedCh:
Expand All @@ -120,6 +119,7 @@ func (s *Sentinel) electionLoop() {
goto end
case <-s.stop:
log.Debugw("stopping election loop")
s.election.Stop()
return
}
}
Expand All @@ -136,22 +136,18 @@ func (s *Sentinel) syncRepl(spec *cluster.ClusterSpec) bool {
return *spec.SynchronousReplication && *spec.Role == cluster.ClusterRoleMaster
}

func (s *Sentinel) setSentinelInfo(ttl time.Duration) error {
func (s *Sentinel) setSentinelInfo(ctx context.Context, ttl time.Duration) error {
sentinelInfo := &cluster.SentinelInfo{
UID: s.uid,
}
log.Debugw("sentinelInfo dump", "sentinelInfo", sentinelInfo)

if err := s.e.SetSentinelInfo(sentinelInfo, ttl); err != nil {
if err := s.e.SetSentinelInfo(ctx, sentinelInfo, ttl); err != nil {
return err
}
return nil
}

func (s *Sentinel) getKeepersInfo(ctx context.Context) (cluster.KeepersInfo, error) {
return s.e.GetKeepersInfo()
}

func (s *Sentinel) SetKeeperError(uid string) {
if _, ok := s.keeperErrorTimers[uid]; !ok {
s.keeperErrorTimers[uid] = timer.Now()
Expand Down Expand Up @@ -1443,11 +1439,11 @@ type DBConvergenceInfo struct {
type Sentinel struct {
uid string
cfg *config
e *store.StoreManager
e *store.Store

candidate *leadership.Candidate
stop chan bool
end chan bool
election store.Election
stop chan bool
end chan bool

lastLeadershipCount uint

Expand Down Expand Up @@ -1493,7 +1489,7 @@ func NewSentinel(uid string, cfg *config, stop chan bool, end chan bool) (*Senti

storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)

kvstore, err := store.NewStore(store.Config{
kvstore, err := store.NewKVStore(store.Config{
Backend: store.Backend(cfg.StoreBackend),
Endpoints: cfg.StoreEndpoints,
CertFile: cfg.StoreCertFile,
Expand All @@ -1504,15 +1500,15 @@ func NewSentinel(uid string, cfg *config, stop chan bool, end chan bool) (*Senti
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
e := store.NewStoreManager(kvstore, storePath)
e := store.NewStore(kvstore, storePath)

candidate := leadership.NewCandidate(kvstore, filepath.Join(storePath, common.SentinelLeaderKey), uid, store.MinTTL)
election := store.NewElection(kvstore, filepath.Join(storePath, common.SentinelLeaderKey), uid)

return &Sentinel{
uid: uid,
cfg: cfg,
e: e,
candidate: candidate,
election: election,
leader: false,
initialClusterSpec: initialClusterSpec,
stop: stop,
Expand All @@ -1539,9 +1535,8 @@ func (s *Sentinel) Start() {
for true {
select {
case <-s.stop:
log.Debugw("stopping stolon sentinel")
log.Infow("stopping stolon sentinel")
cancel()
s.candidate.Stop()
s.end <- true
return
case <-timerCh:
Expand All @@ -1566,7 +1561,7 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) {
defer s.updateMutex.Unlock()
e := s.e

cd, prevCDPair, err := e.GetClusterData()
cd, prevCDPair, err := e.GetClusterData(pctx)
if err != nil {
log.Errorw("error retrieving cluster data", zap.Error(err))
return
Expand Down Expand Up @@ -1599,27 +1594,25 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) {
log.Infow("writing initial cluster data")
newcd := cluster.NewClusterData(c)
log.Debugf("newcd dump: %s", spew.Sdump(newcd))
if _, err = e.AtomicPutClusterData(newcd, nil); err != nil {
if _, err = e.AtomicPutClusterData(pctx, newcd, nil); err != nil {
log.Errorw("error saving cluster data", zap.Error(err))
}
return
}

if err = s.setSentinelInfo(2 * s.sleepInterval); err != nil {
if err = s.setSentinelInfo(pctx, 2*s.sleepInterval); err != nil {
log.Errorw("cannot update sentinel info", zap.Error(err))
return
}

ctx, cancel := context.WithTimeout(pctx, s.requestTimeout)
keepersInfo, err := s.getKeepersInfo(ctx)
cancel()
keepersInfo, err := s.e.GetKeepersInfo(pctx)
if err != nil {
log.Errorw("cannot get keepers info", zap.Error(err))
return
}
log.Debugf("keepersInfo dump: %s", spew.Sdump(keepersInfo))

proxiesInfo, err := s.e.GetProxiesInfo()
proxiesInfo, err := s.e.GetProxiesInfo(pctx)
if err != nil {
log.Errorw("failed to get proxies info", zap.Error(err))
return
Expand Down Expand Up @@ -1661,7 +1654,7 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) {
log.Debugf("newcd dump after updateCluster: %s", spew.Sdump(newcd))

if newcd != nil {
if _, err := e.AtomicPutClusterData(newcd, prevCDPair); err != nil {
if _, err := e.AtomicPutClusterData(pctx, newcd, prevCDPair); err != nil {
log.Errorw("error saving clusterdata", zap.Error(err))
}
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/stolonctl/clusterdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ func init() {
}

func clusterdata(cmd *cobra.Command, args []string) {
e, err := NewStore()
kvStore, err := NewKVStore()
if err != nil {
die("cannot create store: %v", err)
}

e := NewStore(kvStore)

cd, _, err := getClusterData(e)
if err != nil {
die("%v", err)
Expand Down
11 changes: 7 additions & 4 deletions cmd/stolonctl/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"encoding/json"
"io/ioutil"
"os"
Expand Down Expand Up @@ -70,12 +71,14 @@ func initCluster(cmd *cobra.Command, args []string) {
}
}

e, err := NewStore()
kvStore, err := NewKVStore()
if err != nil {
die("cannot create store: %v", err)
}

cd, _, err := e.GetClusterData()
e := NewStore(kvStore)

cd, _, err := e.GetClusterData(context.TODO())
if err != nil {
die("cannot get cluster data: %v", err)
}
Expand All @@ -96,7 +99,7 @@ func initCluster(cmd *cobra.Command, args []string) {
os.Exit(0)
}

cd, _, err = e.GetClusterData()
cd, _, err = e.GetClusterData(context.TODO())
if err != nil {
die("cannot get cluster data: %v", err)
}
Expand All @@ -120,7 +123,7 @@ func initCluster(cmd *cobra.Command, args []string) {
cd = cluster.NewClusterData(c)

// We ignore if cd has been modified between reading and writing
if err := e.PutClusterData(cd); err != nil {
if err := e.PutClusterData(context.TODO(), cd); err != nil {
die("cannot update cluster data: %v", err)
}
}
7 changes: 5 additions & 2 deletions cmd/stolonctl/promote.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"os"

"github.com/sorintlab/stolon/pkg/cluster"
Expand All @@ -40,11 +41,13 @@ func promote(cmd *cobra.Command, args []string) {
die("too many arguments")
}

e, err := NewStore()
kvStore, err := NewKVStore()
if err != nil {
die("cannot create store: %v", err)
}

e := NewStore(kvStore)

accepted := true
if !initOpts.forceYes {
accepted, err = askConfirmation("Are you sure you want to continue? [yes/no] ")
Expand Down Expand Up @@ -82,7 +85,7 @@ func promote(cmd *cobra.Command, args []string) {
}

// retry if cd has been modified between reading and writing
_, err = e.AtomicPutClusterData(cd, pair)
_, err = e.AtomicPutClusterData(context.TODO(), cd, pair)
if err != nil {
if err == libkvstore.ErrKeyModified {
retry++
Expand Down
Loading

0 comments on commit a5d6bb4

Please sign in to comment.