Skip to content

Commit

Permalink
*: Register DM-worker instance into DM-master (pingcap#431)
Browse files Browse the repository at this point in the history
* register in server

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored Dec 17, 2019
1 parent ac14f45 commit bdf102a
Show file tree
Hide file tree
Showing 15 changed files with 6,045 additions and 3,784 deletions.
6 changes: 5 additions & 1 deletion cmd/dm-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,17 @@ func main() {
syscall.SIGQUIT)

s := worker.NewServer(cfg)
err = s.JoinMaster(worker.GetJoinURLs(cfg.Join))
if err != nil {
log.L().Info("join the cluster meet error %v", zap.Error(err))
os.Exit(2)
}

go func() {
sig := <-sc
log.L().Info("got signal to exit", zap.Stringer("signal", sig))
s.Close()
}()

err = s.Start()
if err != nil {
log.L().Error("fail to start dm-worker", zap.Error(err))
Expand Down
3 changes: 2 additions & 1 deletion dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ type Config struct {

MasterAddr string `toml:"master-addr" json:"master-addr"`

Deploy []*DeployMapper `toml:"deploy" json:"-"`
Deploy []*DeployMapper `toml:"deploy" json:"-"`
// TODO: remove
DeployMap map[string]string `json:"deploy"`

ConfigFile string `json:"config-file"`
Expand Down
155 changes: 155 additions & 0 deletions dm/master/coordinator/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package coordinator

import (
"context"
"strings"
"sync"

"github.com/pingcap/dm/dm/master/workerrpc"
"github.com/pingcap/dm/pkg/log"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

var (
workerKeepAlivePath = "/dm-worker/a"
)

// Coordinator coordinate wrokers and upstream.
type Coordinator struct {
mu sync.RWMutex
// address -> worker
workers map[string]*Worker
// upstream(source-id) -> worker
upstreams map[string]*Worker
}

// NewCoordinator returns a coordinate.
func NewCoordinator() *Coordinator {
return &Coordinator{
workers: make(map[string]*Worker),
}
}

// AddWorker add the dm-worker to the coordinate.
func (c *Coordinator) AddWorker(name string, address string) {
c.mu.Lock()
defer c.mu.Unlock()
c.workers[address] = NewWorker(name, address)
}

// GetWorkerByAddress gets the worker through address.
func (c *Coordinator) GetWorkerByAddress(address string) *Worker {
c.mu.RLock()
defer c.mu.RUnlock()
return c.workers[address]
}

// GetWorkerClientByAddress gets the client of the worker through address.
func (c *Coordinator) GetWorkerClientByAddress(address string) workerrpc.Client {
c.mu.RLock()
defer c.mu.RUnlock()
w, ok := c.workers[address]
if !ok || w.State() == WorkerClosed {
log.L().Error("worker is not health", zap.Stringer("worker", w))
return nil
}
client, err := w.GetClient()
if err != nil {
log.L().Error("cannot get client", zap.String("worker-name", w.Name()))
return nil
}
return client
}

// GetAllWorkers gets all workers.
func (c *Coordinator) GetAllWorkers() map[string]*Worker {
c.mu.RLock()
defer c.mu.RUnlock()
return c.workers
}

// GetWorkerBySourceID gets the worker through source id.
func (c *Coordinator) GetWorkerBySourceID(source string) *Worker {
return nil
}

// GetWorkersByStatus gets the workers match the specified status.
func (c *Coordinator) GetWorkersByStatus(s WorkerState) []*Worker {
c.mu.RLock()
defer c.mu.RUnlock()
res := make([]*Worker, 0, len(c.workers))
for _, w := range c.workers {
if w.State() == s {
res = append(res, w)
}
}
return res
}

// ObserveWorkers observe the keepalive path and maintain the status of the worker.
func (c *Coordinator) ObserveWorkers(ctx context.Context, client *clientv3.Client) {
watcher := clientv3.NewWatcher(client)
ch := watcher.Watch(ctx, workerKeepAlivePath, clientv3.WithPrefix())

for {
select {
case wresp := <-ch:
if wresp.Canceled {
log.L().Error("leader watcher is canceled with", zap.Error(wresp.Err()))
return
}

for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT:
log.L().Info("putkv", zap.String("kv", string(ev.Kv.Key)))
key := string(ev.Kv.Key)
slice := strings.Split(string(key), ",")
addr, name := slice[1], slice[2]
c.mu.Lock()
if w, ok := c.workers[addr]; ok && name == w.Name() {
log.L().Info("worker became online, state: free", zap.String("name", w.Name()), zap.String("address", w.Address()))
w.setStatus(WorkerFree)
}
c.mu.Unlock()

case mvccpb.DELETE:
log.L().Info("deletekv", zap.String("kv", string(ev.Kv.Key)))
key := string(ev.Kv.Key)
slice := strings.Split(string(key), ",")
addr, name := slice[1], slice[2]
c.mu.Lock()
if w, ok := c.workers[addr]; ok && name == w.Name() {
log.L().Info("worker became offline, state: closed", zap.String("name", w.Name()), zap.String("address", w.Address()))
w.setStatus(WorkerClosed)
}
c.mu.Unlock()
}
}
case <-ctx.Done():
log.L().Info("coordinate exict due to context canceled")
return
}
}
}

// Schedule schedules a free worker to a upstream.
// TODO: bind the worker the upstreams and set the status to Bound.
func (c *Coordinator) Schedule() {

}
86 changes: 86 additions & 0 deletions dm/master/coordinator/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package coordinator

import (
"fmt"
"sync/atomic"

"github.com/pingcap/dm/dm/master/workerrpc"
)

// WorkerState the status of the worker
type WorkerState int

// the status of worker
const (
WorkerClosed WorkerState = iota + 1
WorkerFree
WorkerBound
)

// Worker the proc essor that let upstream and downstream synchronization.
type Worker struct {
name string
address string
client workerrpc.Client
status atomic.Value
}

// NewWorker creates a worker with specified name and address.
func NewWorker(name, address string) *Worker {
w := &Worker{
name: name,
address: address,
}
w.status.Store(WorkerClosed)
return w
}

// String formats the worker.
func (w *Worker) String() string {
return fmt.Sprintf("%s address:%s", w.name, w.address)
}

// GetClient returns the client of the worker.
func (w *Worker) GetClient() (workerrpc.Client, error) {
if w.client == nil {
client, err := workerrpc.NewGRPCClient(w.address)
if err != nil {
return nil, err
}
w.client = client
}
return w.client, nil
}

// Name returns the name of the worker.
func (w *Worker) Name() string {
return w.name
}

// Address returns the address of the worker.
func (w *Worker) Address() string {
return w.address
}

// State returns the state of the worker.
func (w *Worker) State() WorkerState {
// TODO: add more jugement.
return w.status.Load().(WorkerState)
}

func (w *Worker) setStatus(s WorkerState) {
w.status.Store(s)
}
Loading

0 comments on commit bdf102a

Please sign in to comment.