Skip to content

Commit

Permalink
Merge branch 'master' into fix_cdc_compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
HuSharp authored Feb 2, 2023
2 parents d2329a7 + 6e56bf7 commit cbdcc1b
Show file tree
Hide file tree
Showing 18 changed files with 10,109 additions and 9,951 deletions.
16 changes: 13 additions & 3 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3034,7 +3034,7 @@ func SetDirectPlacementOpt(placementSettings *model.PlacementSettings, placement
}

// SetDirectResourceGroupUnit tries to set the ResourceGroupSettings.
func SetDirectResourceGroupUnit(resourceGroupSettings *model.ResourceGroupSettings, typ ast.ResourceUnitType, stringVal string, uintVal uint64) error {
func SetDirectResourceGroupUnit(resourceGroupSettings *model.ResourceGroupSettings, typ ast.ResourceUnitType, stringVal string, uintVal uint64, boolValue bool) error {
switch typ {
case ast.ResourceRURate:
resourceGroupSettings.RURate = uintVal
Expand All @@ -3044,6 +3044,16 @@ func SetDirectResourceGroupUnit(resourceGroupSettings *model.ResourceGroupSettin
resourceGroupSettings.IOReadBandwidth = stringVal
case ast.ResourceUnitIOWriteBandwidth:
resourceGroupSettings.IOWriteBandwidth = stringVal
case ast.ResourceBurstableOpiton:
// Some about BurstLimit(b):
// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within a unlimited capacity).
// - If b < 0, that means the limiter is unlimited capacity and fillrate(r) is ignored, can be seen as r == Inf (burst with a inf rate within a unlimited capacity).
// - If b > 0, that means the limiter is limited capacity. (current not used).
limit := int64(0)
if boolValue {
limit = -1
}
resourceGroupSettings.BurstLimit = limit
default:
return errors.Trace(errors.New("unknown resource unit type"))
}
Expand Down Expand Up @@ -7610,7 +7620,7 @@ func (d *ddl) CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResour
groupName := stmt.ResourceGroupName
groupInfo.Name = groupName
for _, opt := range stmt.ResourceGroupOptionList {
err := SetDirectResourceGroupUnit(groupInfo.ResourceGroupSettings, opt.Tp, opt.StrValue, opt.UintValue)
err := SetDirectResourceGroupUnit(groupInfo.ResourceGroupSettings, opt.Tp, opt.StrValue, opt.UintValue, opt.BoolValue)
if err != nil {
return err
}
Expand Down Expand Up @@ -7697,7 +7707,7 @@ func (d *ddl) DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResourceGr
func buildResourceGroup(oldGroup *model.ResourceGroupInfo, options []*ast.ResourceGroupOption) (*model.ResourceGroupInfo, error) {
groupInfo := &model.ResourceGroupInfo{Name: oldGroup.Name, ID: oldGroup.ID, ResourceGroupSettings: &model.ResourceGroupSettings{}}
for _, opt := range options {
err := SetDirectResourceGroupUnit(groupInfo.ResourceGroupSettings, opt.Tp, opt.StrValue, opt.UintValue)
err := SetDirectResourceGroupUnit(groupInfo.ResourceGroupSettings, opt.Tp, opt.StrValue, opt.UintValue, opt.BoolValue)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion ddl/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ func TestResourceGroupBasic(t *testing.T) {

tk.MustGetErrCode("create resource group x RU_PER_SEC=1000 ", mysql.ErrResourceGroupExists)

tk.MustExec("alter resource group x RU_PER_SEC=2000")
tk.MustExec("alter resource group x RU_PER_SEC=2000 BURSTABLE")
g = testResourceGroupNameFromIS(t, tk.Session(), "x")
re.Equal(uint64(2000), g.RURate)
re.Equal(int64(-1), g.BurstLimit)

tk.MustExec("alter resource group if exists not_exists RU_PER_SEC=2000")
// Check warning message
Expand Down
12 changes: 8 additions & 4 deletions ddl/resourcegroup/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func NewGroupFromOptions(groupName string, options *model.ResourceGroupSettings)
group.RUSettings = &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: options.RURate,
FillRate: options.RURate,
BurstLimit: options.BurstLimit,
},
},
}
Expand Down Expand Up @@ -77,17 +78,20 @@ func NewGroupFromOptions(groupName string, options *model.ResourceGroupSettings)
group.RawResourceSettings = &rmpb.GroupRawResourceSettings{
Cpu: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: cpuRate,
FillRate: cpuRate,
BurstLimit: options.BurstLimit,
},
},
IoRead: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: ioReadRate,
FillRate: ioReadRate,
BurstLimit: options.BurstLimit,
},
},
IoWrite: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: ioWriteRate,
FillRate: ioWriteRate,
BurstLimit: options.BurstLimit,
},
},
}
Expand Down
5 changes: 3 additions & 2 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
Expand All @@ -38,11 +39,11 @@ import (
)

// DispatchMPPTasks dispatches all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64, mppQueryID kv.MPPQueryID) (SelectResult, error) {
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64, mppQueryID kv.MPPQueryID, memTracker *memory.Tracker) (SelectResult, error) {
ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
ctx = SetTiFlashMaxThreadsInContext(ctx, sctx)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs, mppQueryID, sctx.GetSessionVars().ChooseMppVersion())
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs, mppQueryID, sctx.GetSessionVars().ChooseMppVersion(), memTracker)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3408,7 +3408,9 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe
originalPlan: v.GetTablePlan(),
startTS: startTs,
mppQueryID: kv.MPPQueryID{QueryTs: getMPPQueryTS(b.ctx), LocalQueryID: getMPPQueryID(b.ctx), ServerID: domain.GetDomain(b.ctx).ServerID()},
memTracker: memory.NewTracker(v.ID(), -1),
}
gather.memTracker.AttachTo(b.ctx.GetSessionVars().StmtCtx.MemTracker)
return gather
}

Expand Down
5 changes: 4 additions & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -63,6 +64,8 @@ type MPPGather struct {
mppReqs []*kv.MPPDispatchRequest

respIter distsql.SelectResult

memTracker *memory.Tracker
}

func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
Expand Down Expand Up @@ -146,7 +149,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
failpoint.Return(errors.Errorf("The number of tasks is not right, expect %d tasks but actually there are %d tasks", val.(int), len(e.mppReqs)))
}
})
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS, e.mppQueryID)
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS, e.mppQueryID, e.memTracker)
if err != nil {
return errors.Trace(err)
}
Expand Down
23 changes: 23 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1402,3 +1402,26 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) {
" └─TableFullScan_20 10000.00 mpp[tiflash] table:t1, partition:p2 keep order:false, stats:pseudo"))
// tk.MustQuery("select * from t1 where c1 < 2").Check(testkit.Rows("1 1"))
}

func TestMPPMemoryTracker(t *testing.T) {
store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("set tidb_mem_quota_query = 1 << 30")
tk.MustExec("set global tidb_mem_oom_action = 'CANCEL'")
tk.MustExec("use test")
tk.MustExec("create table t(a int);")
tk.MustExec("insert into t values (1);")
tk.MustExec("alter table t set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set tidb_enforce_mpp = on;")
tk.MustQuery("select * from t").Check(testkit.Rows("1"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/testMPPOOMPanic", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/testMPPOOMPanic"))
}()
err = tk.QueryToErr("select * from t")
require.NotNil(t, err)
require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!"))
}
3 changes: 2 additions & 1 deletion kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tipb/go-tipb"
)

Expand Down Expand Up @@ -151,7 +152,7 @@ type MPPClient interface {
// TODO:: This interface will be refined after we support more executors.
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error)
// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID, mppVersion MppVersion) Response
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID, mppVersion MppVersion, memTracker *memory.Tracker) Response
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
Expand Down
6 changes: 6 additions & 0 deletions parser/ast/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,7 @@ type ResourceGroupOption struct {
Tp ResourceUnitType
StrValue string
UintValue uint64
BoolValue bool
}

type ResourceUnitType int
Expand All @@ -2114,6 +2115,9 @@ const (
ResourceUnitCPU
ResourceUnitIOReadBandwidth
ResourceUnitIOWriteBandwidth

// Options
ResourceBurstableOpiton
)

func (n *ResourceGroupOption) Restore(ctx *format.RestoreCtx) error {
Expand All @@ -2138,6 +2142,8 @@ func (n *ResourceGroupOption) Restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord("IO_WRITE_BANDWIDTH ")
ctx.WritePlain("= ")
ctx.WriteString(n.StrValue)
case ResourceBurstableOpiton:
ctx.WriteKeyWord("BURSTABLE")
default:
return errors.Errorf("invalid PlacementOption: %d", n.Tp)
}
Expand Down
1 change: 1 addition & 0 deletions parser/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ var tokenMap = map[string]int{
"BTREE": btree,
"BUCKETS": buckets,
"BUILTINS": builtins,
"BURSTABLE": burstable,
"BY": by,
"BYTE": byteType,
"CACHE": cache,
Expand Down
5 changes: 5 additions & 0 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1850,6 +1850,7 @@ type ResourceGroupSettings struct {
CPULimiter string `json:"cpu_limit"`
IOReadBandwidth string `json:"io_read_bandwidth"`
IOWriteBandwidth string `json:"io_write_bandwidth"`
BurstLimit int64 `json:"burst_limit"`
}

func (p *ResourceGroupSettings) String() string {
Expand All @@ -1866,6 +1867,10 @@ func (p *ResourceGroupSettings) String() string {
if len(p.IOWriteBandwidth) > 0 {
writeSettingStringToBuilder(sb, "IO_WRITE_BANDWIDTH", p.IOWriteBandwidth)
}
// Once burst limit is negative, meaning allow burst with unlimit.
if p.BurstLimit < 0 {
writeSettingItemToBuilder(sb, "BURSTABLE")
}
return sb.String()
}

Expand Down
Loading

0 comments on commit cbdcc1b

Please sign in to comment.