Skip to content

Commit

Permalink
planner: return the complete error info when Parallel Apply meets pro…
Browse files Browse the repository at this point in the history
…blem (#50335)

close #50256
  • Loading branch information
hawkingrei authored Jan 12, 2024
1 parent 7bfb7a3 commit 637aaa5
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 2 deletions.
16 changes: 16 additions & 0 deletions pkg/planner/core/casetest/parallelapply/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "parallelapply_test",
timeout = "short",
srcs = [
"main_test.go",
"parallel_apply_test.go",
],
flaky = True,
deps = [
"//pkg/testkit",
"//pkg/testkit/testsetup",
"@org_uber_go_goleak//:goleak",
],
)
36 changes: 36 additions & 0 deletions pkg/planner/core/casetest/parallelapply/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package parallelapply

import (
"testing"

"github.com/pingcap/tidb/pkg/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testsetup.SetupForCommonTest()
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/bazelbuild/rules_go/go/tools/bzltestutil.RegisterTimeoutHandler.func1"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
33 changes: 33 additions & 0 deletions pkg/planner/core/casetest/parallelapply/parallel_apply_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package parallelapply

import (
"testing"

"github.com/pingcap/tidb/pkg/testkit"
)

func TestParallelApplyWarnning(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (a int, b int, c int);")
tk.MustExec("create table t2 (a int, b int, c int, key(a));")
tk.MustExec("create table t3(a int, b int, c int, key(a));")
tk.MustExec("set tidb_enable_parallel_apply=on;")
tk.MustQuery("select (select 1 from t2, t3 where t2.a=t3.a and t2.b > t1.b) from t1;")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 Some apply operators can not be executed in parallel: *core.PhysicalIndexHashJoin doesn't support cloning"))
}
7 changes: 5 additions & 2 deletions pkg/planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,9 +1116,12 @@ func enableParallelApply(sctx sessionctx.Context, plan PhysicalPlan) PhysicalPla
if noOrder && supportClone {
apply.Concurrency = sctx.GetSessionVars().ExecutorConcurrency
} else {
sctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("Some apply operators can not be executed in parallel"))
if err != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("Some apply operators can not be executed in parallel: %v", err))
} else {
sctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackError("Some apply operators can not be executed in parallel"))
}
}

// because of the limitation 3, we cannot parallelize Apply operators in this Apply's inner size,
// so we only invoke recursively for its outer child.
apply.SetChild(outerIdx, enableParallelApply(sctx, apply.Children()[outerIdx]))
Expand Down

0 comments on commit 637aaa5

Please sign in to comment.