Skip to content

Commit

Permalink
domain: fast new a etcd session when the session is stale in the sche…
Browse files Browse the repository at this point in the history
…maVersionSyncer #7774 (#7810)
  • Loading branch information
winkyao authored Oct 8, 2018
1 parent c7c2f3c commit 0c8f98e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
26 changes: 19 additions & 7 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"math"
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
Expand Down Expand Up @@ -87,7 +89,7 @@ type SchemaSyncer interface {
type schemaVersionSyncer struct {
selfSchemaVerPath string
etcdCli *clientv3.Client
session *concurrency.Session
session unsafe.Pointer
mu struct {
sync.RWMutex
globalVerCh clientv3.WatchChan
Expand Down Expand Up @@ -138,23 +140,33 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error {
return errors.Trace(err)
}
logPrefix := fmt.Sprintf("[%s] %s", ddlPrompt, s.selfSchemaVerPath)
s.session, err = owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL)
session, err := owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL)
if err != nil {
return errors.Trace(err)
}
s.storeSession(session)

s.mu.Lock()
s.mu.globalVerCh = s.etcdCli.Watch(ctx, DDLGlobalSchemaVersion)
s.mu.Unlock()

err = s.putKV(ctx, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))
clientv3.WithLease(s.loadSession().Lease()))

return errors.Trace(err)
}

func (s *schemaVersionSyncer) loadSession() *concurrency.Session {
return (*concurrency.Session)(atomic.LoadPointer(&s.session))
}

func (s *schemaVersionSyncer) storeSession(session *concurrency.Session) {
atomic.StorePointer(&s.session, (unsafe.Pointer)(session))
}

// Done implements SchemaSyncer.Done interface.
func (s *schemaVersionSyncer) Done() <-chan struct{} {
return s.session.Done()
return s.loadSession().Done()
}

// Restart implements SchemaSyncer.Restart interface.
Expand All @@ -171,12 +183,12 @@ func (s *schemaVersionSyncer) Restart(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
s.session = session
s.storeSession(session)

childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout)
defer cancel()
err = s.putKV(childCtx, putKeyRetryUnlimited, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))
clientv3.WithLease(s.loadSession().Lease()))

return errors.Trace(err)
}
Expand Down Expand Up @@ -214,7 +226,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, version int
startTime := time.Now()
ver := strconv.FormatInt(version, 10)
err := s.putKV(ctx, putKeyNoRetry, s.selfSchemaVerPath, ver,
clientv3.WithLease(s.session.Lease()))
clientv3.WithLease(s.loadSession().Lease()))

metrics.UpdateSelfVersionHistogram.WithLabelValues(metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return errors.Trace(err)
Expand Down
10 changes: 8 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

// Domain represents a storage space. Different domains can use the same database name.
Expand Down Expand Up @@ -351,13 +352,12 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
case <-syncer.Done():
// The schema syncer stops, we need stop the schema validator to synchronize the schema version.
log.Info("[ddl] reload schema in loop, schema syncer need restart")
do.SchemaValidator.Stop()
err := do.mustRestartSyncer()
if err != nil {
log.Errorf("[ddl] reload schema in loop, schema syncer restart err %v", errors.ErrorStack(err))
break
}
do.SchemaValidator.Restart()
log.Info("[ddl] schema syncer restarted.")
case <-do.exit:
return
}
Expand Down Expand Up @@ -471,6 +471,12 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithBackoffMaxDelay(time.Second * 3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(10) * time.Second,
Timeout: time.Duration(3) * time.Second,
PermitWithoutStream: true,
}),
},
TLS: ebd.TLSConfig(),
})
Expand Down

0 comments on commit 0c8f98e

Please sign in to comment.