Skip to content

Commit

Permalink
*: compatible upgrade with previous versions (pingcap#46276)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Sep 15, 2023
1 parent 6c16a2a commit 1eaa416
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 48 deletions.
25 changes: 15 additions & 10 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,43 +143,48 @@ func NewMockStateSyncer() syncer.StateSyncer {
return &MockStateSyncer{}
}

// clusterState mocks cluster state.
// We move it from MockStateSyncer to here. Because we want to make it unaffected by ddl close.
var clusterState *atomicutil.Pointer[syncer.StateInfo]

// MockStateSyncer is a mock state syncer, it is exported for testing.
type MockStateSyncer struct {
clusterState *atomicutil.Pointer[syncer.StateInfo]
globalVerCh chan clientv3.WatchResponse
mockSession chan struct{}
globalVerCh chan clientv3.WatchResponse
mockSession chan struct{}
}

// Init implements StateSyncer.Init interface.
func (s *MockStateSyncer) Init(context.Context) error {
s.globalVerCh = make(chan clientv3.WatchResponse, 1)
s.mockSession = make(chan struct{}, 1)
state := syncer.NewStateInfo(syncer.StateNormalRunning)
s.clusterState = atomicutil.NewPointer(state)
if clusterState == nil {
clusterState = atomicutil.NewPointer(state)
}
return nil
}

// UpdateGlobalState implements StateSyncer.UpdateGlobalState interface.
func (s *MockStateSyncer) UpdateGlobalState(_ context.Context, stateInfo *syncer.StateInfo) error {
failpoint.Inject("mockUpgradingState", func(val failpoint.Value) {
if val.(bool) {
s.clusterState.Store(stateInfo)
clusterState.Store(stateInfo)
failpoint.Return(nil)
}
})
s.globalVerCh <- clientv3.WatchResponse{}
s.clusterState.Store(stateInfo)
clusterState.Store(stateInfo)
return nil
}

// GetGlobalState implements StateSyncer.GetGlobalState interface.
func (s *MockStateSyncer) GetGlobalState(context.Context) (*syncer.StateInfo, error) {
return s.clusterState.Load(), nil
func (*MockStateSyncer) GetGlobalState(context.Context) (*syncer.StateInfo, error) {
return clusterState.Load(), nil
}

// IsUpgradingState implements StateSyncer.IsUpgradingState interface.
func (s *MockStateSyncer) IsUpgradingState() bool {
return s.clusterState.Load().State == syncer.StateUpgrading
func (*MockStateSyncer) IsUpgradingState() bool {
return clusterState.Load().State == syncer.StateUpgrading
}

// WatchChan implements StateSyncer.WatchChan interface.
Expand Down
10 changes: 6 additions & 4 deletions server/handler/upgrade_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func (h ClusterUpgradeHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
op := req.FormValue("op")
switch op {
case "start":
hasDone, err = h.startUpgrade()
hasDone, err = h.StartUpgrade()
case "finish":
hasDone, err = h.finishUpgrade()
hasDone, err = h.FinishUpgrade()
default:
WriteError(w, errors.Errorf("wrong operation:%s", op))
return
Expand All @@ -71,7 +71,8 @@ func (h ClusterUpgradeHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
zap.String("category", "upgrading"), zap.String("op", req.FormValue("op")), zap.Bool("hasDone", hasDone))
}

func (h ClusterUpgradeHandler) startUpgrade() (hasDone bool, err error) {
// StartUpgrade is used to start the upgrade.
func (h ClusterUpgradeHandler) StartUpgrade() (hasDone bool, err error) {
se, err := session.CreateSession(h.store)
if err != nil {
return false, err
Expand All @@ -90,7 +91,8 @@ func (h ClusterUpgradeHandler) startUpgrade() (hasDone bool, err error) {
return false, err
}

func (h ClusterUpgradeHandler) finishUpgrade() (hasDone bool, err error) {
// FinishUpgrade is used to finish the upgrade.
func (h ClusterUpgradeHandler) FinishUpgrade() (hasDone bool, err error) {
se, err := session.CreateSession(h.store)
if err != nil {
return false, err
Expand Down
50 changes: 42 additions & 8 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,8 +1072,14 @@ func getTiDBVar(s Session, name string) (sVal string, isNull bool, e error) {
return row.GetString(0), false, nil
}

// SupportUpgradeStateVer is exported for testing.
var SupportUpgradeStateVer = version145
var (
// SupportUpgradeStateVer is exported for testing.
// The minimum version that can be upgraded by paused user DDL.
SupportUpgradeStateVer int64 = version145
// SupportUpgradeHTTPOpVer is exported for testing.
// The minimum version of the upgrade can be notified through the HTTP API.
SupportUpgradeHTTPOpVer int64 = version172
)

// upgrade function will do some upgrade works, when the system is bootstrapped by low version TiDB server
// For example, add new system variables into mysql.global_variables table.
Expand All @@ -1084,6 +1090,10 @@ func upgrade(s Session) {
// It is already bootstrapped/upgraded by a higher version TiDB server.
return
}
if ver >= SupportUpgradeStateVer {
checkOrSyncUpgrade(s, ver)
}

// Only upgrade from under version92 and this TiDB is not owner set.
// The owner in older tidb does not support concurrent DDL, we should add the internal DDL to job queue.
if ver < version92 {
Expand All @@ -1103,9 +1113,6 @@ func upgrade(s Session) {
logutil.BgLogger().Fatal("[upgrade] init metadata lock failed", zap.Error(err))
}

if ver >= int64(SupportUpgradeStateVer) {
terror.MustNil(SyncUpgradeState(s))
}
if isNull {
upgradeToVer99Before(s)
}
Expand All @@ -1118,9 +1125,6 @@ func upgrade(s Session) {
if isNull {
upgradeToVer99After(s)
}
if ver >= int64(SupportUpgradeStateVer) {
terror.MustNil(SyncNormalRunning(s))
}

variable.DDLForce2Queue.Store(false)
updateBootstrapVer(s)
Expand Down Expand Up @@ -1225,6 +1229,36 @@ func IsUpgradingClusterState(s Session) (bool, error) {
return stateInfo.State == syncer.StateUpgrading, nil
}

func checkOrSyncUpgrade(s Session, ver int64) {
if ver < SupportUpgradeHTTPOpVer {
terror.MustNil(SyncUpgradeState(s))
return
}

interval := 200 * time.Millisecond
retryTimes := int(time.Duration(internalSQLTimeout) * time.Second / interval)
for i := 0; i < retryTimes; i++ {
isUpgrading, err := IsUpgradingClusterState(s)
if err == nil {
if isUpgrading {
break
}
logutil.BgLogger().Fatal("global state isn't upgrading, please send a request to start the upgrade first",
zap.String("category", "upgrading"), zap.Error(err))
}

if i == retryTimes-1 {
logutil.BgLogger().Fatal("get global state failed", zap.String("category", "upgrading"), zap.Error(err))
}
if i%10 == 0 {
logutil.BgLogger().Warn("get global state failed", zap.String("category", "upgrading"), zap.Error(err))
}
time.Sleep(interval)
}
logutil.BgLogger().Info("global state is upgrading", zap.String("category", "upgrading"),
zap.Int64("old version", ver), zap.Int64("latest version", currentBootstrapVersion))
}

// checkOwnerVersion is used to wait the DDL owner to be elected in the cluster and check it is the same version as this TiDB.
func checkOwnerVersion(ctx context.Context, dom *domain.Domain) (bool, error) {
ticker := time.NewTicker(100 * time.Millisecond)
Expand Down
3 changes: 2 additions & 1 deletion session/bootstraptest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 9,
shard_count = 10,
deps = [
"//config",
"//ddl",
Expand All @@ -17,6 +17,7 @@ go_test(
"//meta",
"//parser/model",
"//parser/terror",
"//server/handler",
"//session", #keep
"//sessionctx",
"//testkit", #keep
Expand Down
74 changes: 74 additions & 0 deletions session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/server/handler"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -255,10 +256,13 @@ func TestUpgradeVersionMockLatest(t *testing.T) {
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion-1, ver)
dom.Close()
startUpgrade(store, session.CurrentBootstrapVersion-1)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()

finishUpgrade(store)

seLatestV := session.CreateSessionAndSetID(t, store)
ver, err = session.GetBootstrapVersion(seLatestV)
require.NoError(t, err)
Expand Down Expand Up @@ -294,6 +298,54 @@ func TestUpgradeVersionMockLatest(t *testing.T) {
" PARTITION `p4` VALUES LESS THAN (7096))"))
}

// TestUpgradeVersionForUpgradeHTTPOp tests SupportUpgradeHTTPOpVer upgrade SupportUpgradeHTTPOpVer++.
func TestUpgradeVersionForUpgradeHTTPOp(t *testing.T) {
*session.WithMockUpgrade = true
session.MockUpgradeToVerLatestKind = session.MockSimpleUpgradeToVerLatest

store, dom := session.CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()

seV := session.CreateSessionAndSetID(t, store)
txn, err := store.Begin()
require.NoError(t, err)
m := meta.NewMeta(txn)
err = m.FinishBootstrap(session.SupportUpgradeHTTPOpVer)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.SupportUpgradeHTTPOpVer))
session.UnsetStoreBootstrapped(store.UUID())
ver, err := session.GetBootstrapVersion(seV)
require.NoError(t, err)
require.Equal(t, session.SupportUpgradeHTTPOpVer, ver)
dom.Close()

// Start the upgrade test.
// Current cluster state is normal.
isUpgrading, err := session.IsUpgradingClusterState(seV)
require.NoError(t, err)
require.Equal(t, false, isUpgrading)
upgradeHandler := handler.NewClusterUpgradeHandler(store)
upgradeHandler.StartUpgrade()
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
seLatestV := session.CreateSessionAndSetID(t, store)
ver, err = session.GetBootstrapVersion(seLatestV)
require.NoError(t, err)
require.Equal(t, session.SupportUpgradeHTTPOpVer+1, ver)
// Current cluster state is upgrading.
isUpgrading, err = session.IsUpgradingClusterState(seLatestV)
require.NoError(t, err)
require.Equal(t, true, isUpgrading)
upgradeHandler.FinishUpgrade()
// Upgrading is finished and current cluster state is normal.
isUpgrading, err = session.IsUpgradingClusterState(seV)
require.NoError(t, err)
require.Equal(t, false, isUpgrading)
}

func TestUpgradeVersionForPausedJob(t *testing.T) {
store, dom := session.CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()
Expand Down Expand Up @@ -333,6 +385,7 @@ func TestUpgradeVersionForPausedJob(t *testing.T) {
<-ch
dom.Close()
// Make sure upgrade is successful.
startUpgrade(store, session.CurrentBootstrapVersion-1)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
Expand All @@ -341,6 +394,8 @@ func TestUpgradeVersionForPausedJob(t *testing.T) {
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion, ver)

finishUpgrade(store)

// Resume the DDL job, then add index operation can be executed successfully.
session.MustExec(t, seLatestV, fmt.Sprintf("admin resume ddl jobs %d", jobID))
checkDDLJobExecSucc(t, seLatestV, jobID)
Expand Down Expand Up @@ -434,6 +489,7 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) {
<-ch
dom.Close()
// Make sure upgrade is successful.
startUpgrade(store, session.CurrentBootstrapVersion-1)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
Expand All @@ -442,6 +498,8 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) {
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion+1, ver)

finishUpgrade(store)

checkDDLJobExecSucc(t, seLatestV, jobID)
}

Expand All @@ -463,6 +521,20 @@ func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.R
return rows, nil
}

func startUpgrade(store kv.Storage, currVer int64) {
// It's used for compatible tests upgraded from previous versions of SupportUpgradeHTTPOpVer.
if currVer < session.SupportUpgradeHTTPOpVer {
return
}
upgradeHandler := handler.NewClusterUpgradeHandler(store)
upgradeHandler.StartUpgrade()
}

func finishUpgrade(store kv.Storage) {
upgradeHandler := handler.NewClusterUpgradeHandler(store)
upgradeHandler.FinishUpgrade()
}

// TestUpgradeWithPauseDDL adds a user and a system DB's DDL operations, before every test bootstrap(DDL operation). It tests:
//
// 1.Before and after each test bootstrap, the DDL of the user DB is paused, but the DDL of the system DB is not paused.
Expand Down Expand Up @@ -574,6 +646,7 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion-1, ver)
dom.Close()
startUpgrade(store, session.CurrentBootstrapVersion-1)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
Expand All @@ -584,6 +657,7 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
require.Equal(t, session.CurrentBootstrapVersion+1, ver)

wg.Wait()
finishUpgrade(store)

tk := testkit.NewTestKit(t, store)
var rows []chunk.Row
Expand Down
30 changes: 6 additions & 24 deletions session/mock_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import (
"flag"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util/logutil"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -130,29 +126,15 @@ func mockSimpleUpgradeToVerLatest(s Session, ver int64) {
// TestHook is exported for testing.
var TestHook = TestCallback{}

// modifyBootstrapVersionForTest is used to get the bootstrap version from the SQL, i.e. skipping the mBootstrapKey method.
// This makes it easy to modify the bootstrap version through SQL for easy testing.
func modifyBootstrapVersionForTest(store kv.Storage, ver int64) int64 {
// modifyBootstrapVersionForTest is used to test SupportUpgradeHTTPOpVer upgrade SupportUpgradeHTTPOpVer++.
func modifyBootstrapVersionForTest(ver int64) {
if !*WithMockUpgrade {
return ver
}

s, err := createSession(store)
var tmpVer int64
if err == nil {
tmpVer, err = getBootstrapVersion(s)
}
if err == nil {
return tmpVer
return
}

originErr := errors.Cause(err)
tErr, ok := originErr.(*terror.Error)
// If the error is ErrTableNotExists(mysql.global_variables), we can't replace the bootstrap version.
if !ok || tErr.Code() != mysql.ErrNoSuchTable {
logutil.BgLogger().Fatal("mock upgrade, check bootstrapped failed", zap.Error(err))
if ver == SupportUpgradeHTTPOpVer && currentBootstrapVersion == SupportUpgradeHTTPOpVer {
currentBootstrapVersion = mockLatestVer
}
return ver
}

const (
Expand All @@ -175,7 +157,7 @@ func addMockBootstrapVersionForTest(s Session) {
} else {
bootstrapVersion = append(bootstrapVersion, mockSimpleUpgradeToVerLatest)
}
currentBootstrapVersion++
currentBootstrapVersion = mockLatestVer
}

// Callback is used for Test.
Expand Down
Loading

0 comments on commit 1eaa416

Please sign in to comment.