Skip to content

Commit

Permalink
cherry pick pingcap#30773 to release-5.1
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
fzhedu authored and ti-srebot committed Dec 20, 2021
1 parent 0c172eb commit 6e0839e
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 1 deletion.
117 changes: 117 additions & 0 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1374,8 +1374,125 @@ func (s *testSuiteAgg) TestIssue20658(c *C) {
}
}

<<<<<<< HEAD
func (s *testSerialSuite) TestRandomPanicAggConsume(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
=======
func TestIssue23277(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t;")

tk.MustExec("create table t(a tinyint(1));")
tk.MustExec("insert into t values (-120), (127);")
tk.MustQuery("select avg(a) from t group by a").Sort().Check(testkit.Rows("-120.0000", "127.0000"))
tk.MustExec("drop table t;")

tk.MustExec("create table t(a smallint(1));")
tk.MustExec("insert into t values (-120), (127);")
tk.MustQuery("select avg(a) from t group by a").Sort().Check(testkit.Rows("-120.0000", "127.0000"))
tk.MustExec("drop table t;")

tk.MustExec("create table t(a mediumint(1));")
tk.MustExec("insert into t values (-120), (127);")
tk.MustQuery("select avg(a) from t group by a").Sort().Check(testkit.Rows("-120.0000", "127.0000"))
tk.MustExec("drop table t;")

tk.MustExec("create table t(a int(1));")
tk.MustExec("insert into t values (-120), (127);")
tk.MustQuery("select avg(a) from t group by a").Sort().Check(testkit.Rows("-120.0000", "127.0000"))
tk.MustExec("drop table t;")

tk.MustExec("create table t(a bigint(1));")
tk.MustExec("insert into t values (-120), (127);")
tk.MustQuery("select avg(a) from t group by a").Sort().Check(testkit.Rows("-120.0000", "127.0000"))
tk.MustExec("drop table t;")
}

func TestAvgDecimal(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists td;")
tk.MustExec("create table td (col_bigint bigint(20), col_smallint smallint(6));")
tk.MustExec("insert into td values (null, 22876);")
tk.MustExec("insert into td values (9220557287087669248, 32767);")
tk.MustExec("insert into td values (28030, 32767);")
tk.MustExec("insert into td values (-3309864251140603904,32767);")
tk.MustExec("insert into td values (4,0);")
tk.MustExec("insert into td values (null,0);")
tk.MustExec("insert into td values (4,-23828);")
tk.MustExec("insert into td values (54720,32767);")
tk.MustExec("insert into td values (0,29815);")
tk.MustExec("insert into td values (10017,-32661);")
tk.MustQuery(" SELECT AVG( col_bigint / col_smallint) AS field1 FROM td;").Sort().Check(testkit.Rows("25769363061037.62077260"))
tk.MustQuery(" SELECT AVG(col_bigint) OVER (PARTITION BY col_smallint) as field2 FROM td where col_smallint = -23828;").Sort().Check(testkit.Rows("4.0000"))
tk.MustExec("drop table td;")
}

// https://github.com/pingcap/tidb/issues/23314
func TestIssue23314(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(col1 time(2) NOT NULL)")
tk.MustExec("insert into t1 values(\"16:40:20.01\")")
res := tk.MustQuery("select col1 from t1 group by col1")
res.Check(testkit.Rows("16:40:20.01"))
}

func TestAggInDisk(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set tidb_hashagg_final_concurrency = 1;")
tk.MustExec("set tidb_hashagg_partial_concurrency = 1;")
tk.MustExec("set tidb_mem_quota_query = 4194304")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t(a int)")
sql := "insert into t values (0)"
for i := 1; i <= 200; i++ {
sql += fmt.Sprintf(",(%v)", i)
}
sql += ";"
tk.MustExec(sql)
rows := tk.MustQuery("desc analyze select /*+ HASH_AGG() */ avg(t1.a) from t t1 join t t2 group by t1.a, t2.a;").Rows()
for _, row := range rows {
length := len(row)
line := fmt.Sprintf("%v", row)
disk := fmt.Sprintf("%v", row[length-1])
if strings.Contains(line, "HashAgg") {
require.False(t, strings.Contains(disk, "0 Bytes"))
require.True(t, strings.Contains(disk, "MB") ||
strings.Contains(disk, "KB") ||
strings.Contains(disk, "Bytes"))
}
}

// Add code cover
// Test spill chunk. Add a line to avoid tmp spill chunk is always full.
tk.MustExec("insert into t values(0)")
tk.MustQuery("select sum(tt.b) from ( select /*+ HASH_AGG() */ avg(t1.a) as b from t t1 join t t2 group by t1.a, t2.a) as tt").Check(
testkit.Rows("4040100.0000"))
// Test no groupby and no data.
tk.MustExec("drop table t;")
tk.MustExec("create table t(c int, c1 int);")
tk.MustQuery("select /*+ HASH_AGG() */ count(c) from t;").Check(testkit.Rows("0"))
tk.MustQuery("select /*+ HASH_AGG() */ count(c) from t group by c1;").Check(testkit.Rows())
}

func TestRandomPanicAggConsume(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
>>>>>>> e3c56b75e... executor: buildWindow cannot call typeInfer twice (#30773)
tk.MustExec("set @@tidb_max_chunk_size=32")
tk.MustExec("set @@tidb_init_chunk_size=1")
tk.MustExec("use test")
Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3883,7 +3883,7 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) Executor {
partialResults := make([]aggfuncs.PartialResult, 0, len(v.WindowFuncDescs))
resultColIdx := v.Schema().Len() - len(v.WindowFuncDescs)
for _, desc := range v.WindowFuncDescs {
aggDesc, err := aggregation.NewAggFuncDesc(b.ctx, desc.Name, desc.Args, false)
aggDesc, err := aggregation.NewAggFuncDescForWindowFunc(b.ctx, desc, false)
if err != nil {
b.err = err
return nil
Expand Down
68 changes: 68 additions & 0 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,74 @@ func (s *tiflashTestSuite) TestMppUnionAll(c *C) {

}

<<<<<<< HEAD
=======
func (s *tiflashTestSuite) TestUnionWithEmptyDualTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t (a int not null, b int, c varchar(20))")
tk.MustExec("create table t1 (a int, b int not null, c double)")
tk.MustExec("alter table t set tiflash replica 1")
tk.MustExec("alter table t1 set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tb = testGetTableByName(c, tk.Se, "test", "t1")
err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t values(1,2,3)")
tk.MustExec("insert into t1 values(1,2,3)")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
tk.MustQuery("select count(*) from (select a , b from t union all select a , c from t1 where false) tt").Check(testkit.Rows("1"))
}

func (s *tiflashTestSuite) TestAvgOverflow(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
// avg int
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a decimal(1,0))")
tk.MustExec("alter table t set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t values(9)")
for i := 0; i < 16; i++ {
tk.MustExec("insert into t select * from t")
}
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
tk.MustQuery("select avg(a) from t group by a").Check(testkit.Rows("9.0000"))
tk.MustExec("drop table if exists t")

// avg decimal
tk.MustExec("drop table if exists td;")
tk.MustExec("create table td (col_bigint bigint(20), col_smallint smallint(6));")
tk.MustExec("alter table td set tiflash replica 1")
tb = testGetTableByName(c, tk.Se, "test", "td")
err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into td values (null, 22876);")
tk.MustExec("insert into td values (9220557287087669248, 32767);")
tk.MustExec("insert into td values (28030, 32767);")
tk.MustExec("insert into td values (-3309864251140603904,32767);")
tk.MustExec("insert into td values (4,0);")
tk.MustExec("insert into td values (null,0);")
tk.MustExec("insert into td values (4,-23828);")
tk.MustExec("insert into td values (54720,32767);")
tk.MustExec("insert into td values (0,29815);")
tk.MustExec("insert into td values (10017,-32661);")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
tk.MustQuery(" SELECT AVG( col_bigint / col_smallint) AS field1 FROM td;").Sort().Check(testkit.Rows("25769363061037.62077260"))
tk.MustQuery(" SELECT AVG(col_bigint) OVER (PARTITION BY col_smallint) as field2 FROM td where col_smallint = -23828;").Sort().Check(testkit.Rows("4.0000"))
tk.MustExec("drop table if exists td;")
}

>>>>>>> e3c56b75e... executor: buildWindow cannot call typeInfer twice (#30773)
func (s *tiflashTestSuite) TestMppApply(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
9 changes: 9 additions & 0 deletions expression/aggregation/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type AggFuncDesc struct {
}

// NewAggFuncDesc creates an aggregation function signature descriptor.
// this func cannot be called twice as the TypeInfer has changed the type of args in the first time.
func NewAggFuncDesc(ctx sessionctx.Context, name string, args []expression.Expression, hasDistinct bool) (*AggFuncDesc, error) {
b, err := newBaseFuncDesc(ctx, name, args)
if err != nil {
Expand All @@ -49,6 +50,14 @@ func NewAggFuncDesc(ctx sessionctx.Context, name string, args []expression.Expre
return &AggFuncDesc{baseFuncDesc: b, HasDistinct: hasDistinct}, nil
}

// NewAggFuncDescForWindowFunc creates an aggregation function from window functions, where baseFuncDesc may be ready.
func NewAggFuncDescForWindowFunc(ctx sessionctx.Context, Desc *WindowFuncDesc, hasDistinct bool) (*AggFuncDesc, error) {
if Desc.RetTp == nil { // safety check
return NewAggFuncDesc(ctx, Desc.Name, Desc.Args, hasDistinct)
}
return &AggFuncDesc{baseFuncDesc: baseFuncDesc{Desc.Name, Desc.Args, Desc.RetTp}, HasDistinct: hasDistinct}, nil
}

// String implements the fmt.Stringer interface.
func (a *AggFuncDesc) String() string {
buffer := bytes.NewBufferString(a.Name)
Expand Down

0 comments on commit 6e0839e

Please sign in to comment.