Skip to content

Commit

Permalink
Merge branch 'master' into improve-check
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 28, 2021
2 parents 4f19e80 + 080f833 commit 0cce5fc
Show file tree
Hide file tree
Showing 19 changed files with 149 additions and 110 deletions.
27 changes: 21 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,21 @@

# What is TiUP

`tiup` is a tool to download and install TiDB components.
`tiup` is a tool to download and install [TiDB](https://docs.pingcap.com/tidb/stable/overview) components.

## Documentation

- [English](https://docs.pingcap.com/tidb/stable/tiup-documentation-guide)
- [简体中文](https://docs.pingcap.com/zh/tidb/stable/tiup-documentation-guide)

## Blog

- [English](https://pingcap.com/blog/)
- [简体中文](https://pingcap.com/blog-cn/)

## TiDB Monthly

[TiDB Monthly](https://pingcap.com/weekly/)

## Installation

Expand Down Expand Up @@ -40,20 +54,21 @@ tiup uninstall tidb tikv pd
tiup update --all
```

# Usage
## Usage

After installing `tiup`, you can use it to install binaries of TiDB components and create clusters.

See our [doc](doc/user/README.md) for more information on how to use TiUp.

# Contributing to TiUp
## Contributing to TiUP

Contributions of code, tests, docs, and bug reports are welcome! To get started take a look at our [open issues](https://github.com/pingcap/tiup/issues).
Contributions of code, tests, docs, and bug reports are welcome! To get started, take a look at our [open issues](https://github.com/pingcap/tiup/issues).

For docs on how to build, test, and run TiUp, see our [dev docs](doc/dev/README.md).
For docs on how to build, test, and run TiUP, see our [dev docs](doc/dev/README.md).

See also the [Contribution Guide](https://github.com/pingcap/community/blob/master/contributors/README.md) in PingCAP's
[community](https://github.com/pingcap/community) repo.


## License

[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgit.luolix.top%2Fpingcap%2Ftiup.svg?type=large)](https://app.fossa.com/projects/git%2Bgit.luolix.top%2Fpingcap%2Ftiup?ref=badge_large)
12 changes: 6 additions & 6 deletions components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (p *Playground) removePumpWhenTombstone(c *api.BinlogClient, inst *instance
defer logIfErr(p.renderSDFile())

for {
tombstone, err := c.IsPumpTombstone(inst.Addr())
tombstone, err := c.IsPumpTombstone(context.TODO(), inst.Addr())
if err != nil {
fmt.Println(err)
}
Expand All @@ -200,7 +200,7 @@ func (p *Playground) removeDrainerWhenTombstone(c *api.BinlogClient, inst *insta
defer logIfErr(p.renderSDFile())

for {
tombstone, err := c.IsDrainerTombstone(inst.Addr())
tombstone, err := c.IsDrainerTombstone(context.TODO(), inst.Addr())
if err != nil {
fmt.Println(err)
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error {
if err != nil {
return err
}
err = c.OfflinePump(inst.Addr())
err = c.OfflinePump(context.TODO(), inst.Addr())
if err != nil {
return err
}
Expand All @@ -345,7 +345,7 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error {
if err != nil {
return err
}
err = c.OfflineDrainer(inst.Addr())
err = c.OfflineDrainer(context.TODO(), inst.Addr())
if err != nil {
return err
}
Expand Down Expand Up @@ -452,7 +452,7 @@ func (p *Playground) handleScaleOut(w io.Writer, cmd *Command) error {
return err
}

err = p.startInstance(context.Background(), inst)
err = p.startInstance(context.TODO(), inst)
if err != nil {
return err
}
Expand Down Expand Up @@ -768,7 +768,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme

// if no any pump, tidb will quit right away.
if cid == "pump" && !anyPumpReady {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*120)
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*120)
err = ins.(*instance.Pump).Ready(ctx)
cancel()
if err != nil {
Expand Down
60 changes: 30 additions & 30 deletions doc/design/checkpoint.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# The checkpoint implemention for tiup-cluster and tiup-dm

When there is occasional error on `tiup cluster` or `tiup dm` command, some users may want to retry previews action from the fail point instead of from scratch.
When there is an occasional error on `tiup cluster` or `tiup dm` command, some users may want to retry previews action from the fail point instead of from scratch.

For example, the following tasks:

Expand All @@ -14,11 +14,11 @@ For example, the following tasks:
4. start service
```

If something wrong with the third task, retry from the first task is OK because TiUP provides guarantee that all commands are idempotent. However, for some large cluster, it may waste a lot of time on successed tasks (task 1 and stpe 2) and the user may want to restart the process from task 3.
If something wrong with the third task, retry from the first task is OK because TiUP provides a guarantee that all commands are idempotent. However, for some large clusters, it may waste a lot of time on successful tasks (task 1 and stpe 2) and the user may want to restart the process from task 3.

## The audit log

tiup-cluster and tiup-dm will generate an audit log file in `${TIUP_HOME}/storage/{cluster,dm}/audit/`, you can view the audit list with the command `tiup cluster audit` or `tiup dm audit`. The list looks like:
tiup-cluster and tiup-dm will generate an audit log file in `${TIUP_HOME}/storage/{cluster,dm}/audit/`, you can view the audit list with the command `tiup cluster audit` or `tiup dm audit`. The list looks like this:

```
ID Time Command
Expand All @@ -35,11 +35,11 @@ The first column is the id of the audit, to view a specified audit log, use the
2021-01-21T18:36:09.805+0800 INFO SSHCommand {"host": "172.16.5.140", "port": "22", "cmd": "xxx command", "stdout": "xxxx", "stderr": ""}
```

The first line of the file is the command the user execute, the following lines are structure logs.
The first line of the file is the command the user executed, the following lines are structure logs.

## The checkpoint

In the implemention of checkpoint, we mix checkpoint in audit log, like this:
In the implementation of the checkpoint, we mix checkpoint in the audit log, like this:

```
/home/tidb/.tiup/components/cluster/v1.3.1/tiup-cluster display test
Expand All @@ -52,20 +52,20 @@ In the implemention of checkpoint, we mix checkpoint in audit log, like this:
2021-01-21T18:36:09.806+0800 INFO CheckPoint {"host": "172.16.5.140", "action": "task", "n": 3, "result": true}
```

If the user run tiup-cluster or tiup-dm in replay mode by giving an audit id, we will parse that audit log file and pick up all `CheckPoint` by order into a queue, then in corresponding functions we check if the checkpoint is in the queue, if hit, we dequeue the checkpoint and return the result directlly instead of do the real work. Example:
If the user runs tiup-cluster or tiup-dm in replay mode by giving an audit id, we will parse that audit log file and pick up all `CheckPoint` by order into a queue, then in corresponding functions, we check if the checkpoint is in the queue, if hit, we dequeue the checkpoint and return the result directly instead of doing the real work. Example:

```golang
func init() {
// Register checkpoint fields so that we know how to compare checkpoints
checkpoint.RegisterField(
checkpoint.Field("action", reflect.DeepEqual),
checkpoint.Field("host", reflect.DeepEqual),
checkpoint.Field("n", func(a, b interface{}) bool {
checkpoint.Field("host", reflect.DeepEqual),
checkpoint.Field("n", func(a, b interface{}) bool {
// the n is a int, however, it will be float after it write to json because json only has float number.
// so we just compare the string format.
return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b)
}),
)
return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b)
}),
)
}

func processCommand() {
Expand All @@ -84,25 +84,25 @@ func task(ctx context.Context, host string, n int) (result bool, err error) {
point := checkpoint.Acquire(ctx, map[string]interface{}{
"action": "task",
"host": host,
"n": n,
})
defer func() {
// we must call point.Release, otherwise there will be resource leak.
"n": n,
})
defer func() {
// we must call point.Release, otherwise there will be a resource leak.
// the release function will write the checkpoint into current audit log (not the one user specified)
// for latter replay.
point.Release(err,
point.Release(err,
zap.String("action", "task"),
zap.String("host", host),
zap.Int("n", n),
zap.Bool("result", result),
)
)
}()
// If the checkpoint not exist, point.Hit() will return nil, otherwise, the checkpoint with the type map[string]interface{}
if point.Hit() != nil {
return point.Hit()["result"].(bool), nil
// Then, if the checkpoint exists in the specified audit file, point.Hit() will return map[string]interface{}
if point.Hit() != nil {
return point.Hit()["result"].(bool), nil
}
// If the checkpoint not exist in specified audit file, we should do real work and return the result

// Last, if the checkpoint does not exist in the specified audit file, we should do real work and return the result
return do_real_work(host, n)
}
```
Expand Down Expand Up @@ -136,7 +136,7 @@ func task(ctx context.Context, host string, n int) (result bool, err error) {
}
```

The execute flow and return value will be:
The execution flow and return value will be:

```
task(1)[called by processCommand]: return true
Expand All @@ -154,7 +154,7 @@ the checkpoint in audit log will be:
... {"host": "...", "action": "task", "n": 0, "result": false}
```

There is three checkpoints, but when we try to replay the process, the `task(0)[called by task(1)]` will not be called at all since `task(1)` will return early with the cached result, so the execute flow will be:
There are three checkpoints, but when we try to replay the process, the `task(0)[called by task(1)]` will not be called at all since `task(1)` will return early with the cached result, so the execution flow will be:

```
task(1)[called by processCommand]: return true (cached by {"host": "...", "action": "task", "n": 1, "result": true})
Expand All @@ -164,11 +164,11 @@ task(0)[called by processCommand]: return true (cached by {"host": "...", "acti

The trouble is coming: in the real case the `task(0)[called by processCommand]` returns false but in replay case it return true because it takes the result of `task(0)[called by task(1)]` by mistake. The problem is that the `CheckPoint` of `task(0)[called by task(1)]` should not be record because it's parent, `task(0)[called by processCommand]`, has record a `CheckPoint`.

So we implement a semaphore and insert it into the context passed to `checkpoint.Acquire`, the context or it's ancestor must be generated by `checkpoint.NewContext` where the semaphore is generated. When `checkpoint.Acquire` called, it will try to acquire the semaphore and record if it success in the returned value, when we call `Release` on the returned value, it will check if previews semaphore acuire success, if not, the `Release` will not writing checkpoint.
So we implement a semaphore and insert it into the context passed to `checkpoint.Acquire`, the context or it's ancestor must be generated by `checkpoint.NewContext` where the semaphore is generated. When `checkpoint.Acquire` called, it will try to acquire the semaphore and record if it success in the returned value, when we call `Release` on the returned value, it will check if previews semaphore acquire success, if not, the `Release` will not writing checkpoint.

## Parallel task

Because we use a semaphore in the context to trace if it's the first stack layer who want to write checkpoint, the context can't be shared between goroutines:
Because we use a semaphore in the context to trace if it's the first stack layer that wants to write checkpoint, the context can't be shared between goroutines:

```golang
func processCommand() {
Expand All @@ -188,7 +188,7 @@ There are three tasks, `task(1)`, `task(2)` and `task(3)`, they run parallelly.
```
task(1): start -------------------------------------------> return
task(2): start ------------------------> return
task(3): start ------------------------> return
task(3): start ------------------------> return
```

The checkpoint of `task(2)` and `task(3)` will not be recorded because they think they are called by `task(1)`. The solution is to add a semaphore for every goroutine:
Expand All @@ -206,7 +206,7 @@ func processCommand() {
}
```

What if the `processCommand` or its' ancestor has its' own checkpoint?
What if the `processCommand` or its' ancestor has its' own checkpoint?

```golang
func processTask(ctx context.Context) {
Expand All @@ -227,6 +227,6 @@ func processCommand(ctx context.Context) {
}
```

If `checkpoint.NewContext` just append a unacquired semaphore, the checkpoint of `processTask` and it's children(`task(1..3)`) will be all recored, that's not correct (we have talked this before).
If `checkpoint.NewContext` just append a unacquired semaphore, the checkpoint of `processTask` and it's children(`task(1..3)`) will be all recorded, that's not correct (we have talked this before).

So the `checkpoint.NewContext` should check if there is alread a semaphore in current context, if there is, just copy it's value. By this way, if `processTask` has acquired the semaphore, the `task(1..3)` will get their own acuired semaphore, otherwise, they will get their own unacquired semaphore.
So the `checkpoint.NewContext` should check if there is already a semaphore in current context, if there is, just copy it's value. By this way, if `processTask` has acquired the semaphore, the `task(1..3)` will get their own acquired semaphore, otherwise, they will get their own unacquired semaphore.
18 changes: 14 additions & 4 deletions pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/logger/log"
"github.com/pingcap/tiup/pkg/queue"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -39,6 +40,11 @@ const (
var (
checkpoint *CheckPoint
checkfields []FieldSet

// DebugCheckpoint is a switch used to debug if:
// - The context passed to checkpoint is generated by checkpoint.NewContext
// - multilple context acquire applied to the same context belone to the sampe goroutine
DebugCheckpoint = os.Getenv("DEBUG_CHECKPOINT") == "1"
)

// SetCheckPoint set global checkpoint for executor
Expand All @@ -59,6 +65,14 @@ func SetCheckPoint(file string) error {

// Acquire wraps CheckPoint.Acquire
func Acquire(ctx context.Context, point map[string]interface{}) *Point {
if ctx.Value(goroutineKey) == nil || ctx.Value(semKey) == nil {
if DebugCheckpoint {
panic("the context passed to checkpoint.Acquire is not generated by checkpoint.NewContext")
}
log.Debugf("context missing for checkpoint, the result of replaying this operation may be unexpected!")
ctx = NewContext(ctx)
}

// Check goroutine if we are in test
gptr := ctx.Value(goroutineKey).(*goroutineLock)
g := atomic.LoadUint64((*uint64)(gptr))
Expand Down Expand Up @@ -123,10 +137,6 @@ func NewCheckPoint(r io.Reader) (*CheckPoint, error) {

// Acquire get point from checkpoints
func (c *CheckPoint) Acquire(ctx context.Context, point map[string]interface{}) *Point {
if ctx.Value(semKey) == nil {
panic("the value of the key " + semKey + " not found in context, use the ctx from checkpoint.NewContext please")
}

acquired := ctx.Value(semKey).(*semaphore.Weighted).TryAcquire(1)
if p := c.points.Get(point); p != nil {
return &Point{ctx, p.(map[string]interface{}), acquired}
Expand Down
9 changes: 9 additions & 0 deletions pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
)

func setup() {
DebugCheckpoint = true

checkfields = nil
RegisterField(
Field("host", reflect.DeepEqual),
Expand Down Expand Up @@ -213,3 +215,10 @@ func TestCheckPointNil(t *testing.T) {
assert.True(p.acquired)
p.Release(nil)
}

func TestCheckPointNotInited(t *testing.T) {
setup()

assert := require.New(t)
assert.Panics(func() { Acquire(context.Background(), map[string]interface{}{}) })
}
9 changes: 2 additions & 7 deletions pkg/checkpoint/gotrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,22 @@ import (
"bytes"
"errors"
"fmt"
"os"
"runtime"
"strconv"
"sync"
)

// DebugGoroutines is a switch used to debug if multilple context acquire applied
// to the same context belone to the sampe goroutine
var DebugGoroutines = os.Getenv("DEBUG_CHECKPOINT_GOROUTINES") == "1"

type goroutineLock uint64

func newGoroutineLock() goroutineLock {
if !DebugGoroutines {
if !DebugCheckpoint {
return 0
}
return goroutineLock(curGoroutineID())
}

func (g goroutineLock) check() {
if !DebugGoroutines {
if !DebugCheckpoint {
return
}
if curGoroutineID() != uint64(g) {
Expand Down
Loading

0 comments on commit 0cce5fc

Please sign in to comment.