Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: rename NextChunk to Next #6214

Merged
merged 4 commits into from
Apr 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ type RecordSet interface {
// Fields gets result fields.
Fields() []*ResultField

// NextChunk reads records into chunk.
NextChunk(ctx context.Context, chk *chunk.Chunk) error
// Next reads records into chunk.
Next(ctx context.Context, chk *chunk.Chunk) error

// NewChunk creates a new chunk with initial capacity.
NewChunk() *chunk.Chunk
Expand Down
2 changes: 1 addition & 1 deletion cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (ut *benchDB) mustExec(sql string) {
rs := rss[0]
chk := rs.NewChunk()
for {
err := rs.NextChunk(ctx, chk)
err := rs.Next(ctx, chk)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func LoadDeleteRanges(ctx sessionctx.Context, safePoint uint64) (ranges []DelRan
chk := rs.NewChunk()
it := chunk.NewIterator4Chunk(chk)
for {
err = rs.NextChunk(context.TODO(), chk)
err = rs.Next(context.TODO(), chk)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
27 changes: 4 additions & 23 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,10 @@ var (

// SelectResult is an iterator of coprocessor partial results.
type SelectResult interface {
// Next gets the next partial result.
Next(context.Context) (PartialResult, error)
// NextRaw gets the next raw result.
NextRaw(context.Context) ([]byte, error)
// NextChunk reads the data into chunk.
NextChunk(context.Context, *chunk.Chunk) error
// Next reads the data into chunk.
Next(context.Context, *chunk.Chunk) error
// Close closes the iterator.
Close() error
// Fetch fetches partial results from client.
Expand Down Expand Up @@ -121,23 +119,6 @@ func (r *selectResult) fetch(ctx context.Context) {
}
}

// Next returns the next row.
func (r *selectResult) Next(ctx context.Context) (PartialResult, error) {
re := <-r.results
if re.err != nil {
return nil, errors.Trace(re.err)
}
if re.result == nil {
return nil, nil
}
pr := &partialResult{}
pr.rowLen = r.rowLen
err := pr.unmarshal(re.result.GetData())
r.feedback.Update(re.result.GetStartKey(), pr.resp.OutputCounts)
r.partialCount++
return pr, errors.Trace(err)
}

// NextRaw returns the next raw partial result.
func (r *selectResult) NextRaw(ctx context.Context) ([]byte, error) {
re := <-r.results
Expand All @@ -149,8 +130,8 @@ func (r *selectResult) NextRaw(ctx context.Context) ([]byte, error) {
return re.result.GetData(), nil
}

// NextChunk reads data to the chunk.
func (r *selectResult) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next reads data to the chunk.
func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
for chk.NumRows() < r.ctx.GetSessionVars().MaxChunkSize {
if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) {
Expand Down
12 changes: 6 additions & 6 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *testSuite) TestSelectNormal(c *C) {
colTypes = append(colTypes, colTypes[0])
colTypes = append(colTypes, colTypes[0])

// Test NextChunk.
// Test Next.
response, err := Select(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false))
c.Assert(err, IsNil)
result, ok := response.(*selectResult)
Expand All @@ -64,11 +64,11 @@ func (s *testSuite) TestSelectNormal(c *C) {

response.Fetch(context.TODO())

// Test NextChunk.
// Test Next.
chk := chunk.NewChunk(colTypes)
numAllRows := 0
for {
err = response.NextChunk(context.TODO(), chk)
err = response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
numAllRows += chk.NumRows()
if chk.NumRows() == 0 {
Expand Down Expand Up @@ -108,7 +108,7 @@ func (s *testSuite) TestSelectStreaming(c *C) {

s.sctx.GetSessionVars().EnableStreaming = true

// Test NextChunk.
// Test Next.
response, err := Select(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false))
c.Assert(err, IsNil)
result, ok := response.(*streamResult)
Expand All @@ -117,11 +117,11 @@ func (s *testSuite) TestSelectStreaming(c *C) {

response.Fetch(context.TODO())

// Test NextChunk.
// Test Next.
chk := chunk.NewChunk(colTypes)
numAllRows := 0
for {
err = response.NextChunk(context.TODO(), chk)
err = response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
numAllRows += chk.NumRows()
if chk.NumRows() == 0 {
Expand Down
12 changes: 1 addition & 11 deletions distsql/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,7 @@ type streamResult struct {

func (r *streamResult) Fetch(context.Context) {}

func (r *streamResult) Next(ctx context.Context) (PartialResult, error) {
var ret streamPartialResult
ret.rowLen = r.rowLen
finished, err := r.readDataFromResponse(ctx, r.resp, &ret.Chunk)
if err != nil || finished {
return nil, errors.Trace(err)
}
return &ret, nil
}

func (r *streamResult) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
func (r *streamResult) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize
for chk.NumRows() < maxChunkSize {
Expand Down
8 changes: 4 additions & 4 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ func schema2ResultFields(schema *expression.Schema, defaultDB string) (rfs []*as
return rfs
}

// NextChunk use uses recordSet's executor to get next available chunk for later usage.
// Next use uses recordSet's executor to get next available chunk for later usage.
// If chunk does not contain any rows, then we update last query found rows in session variable as current found rows.
// The reason we need update is that chunk with 0 rows indicating we already finished current query, we need prepare for
// next query.
// If stmt is not nil and chunk with some rows inside, we simply update last query found rows by the number of row in chunk.
func (a *recordSet) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
err := a.executor.NextChunk(ctx, chk)
func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {
err := a.executor.Next(ctx, chk)
if err != nil {
a.lastErr = err
return errors.Trace(err)
Expand Down Expand Up @@ -262,7 +262,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
a.logSlowQuery(txnTS, err == nil)
}()

err = e.NextChunk(ctx, e.newChunk())
err = e.Next(ctx, e.newChunk())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
18 changes: 9 additions & 9 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ type CheckIndexRangeExec struct {
cols []*model.ColumnInfo
}

// NextChunk implements the Executor NextChunk interface.
func (e *CheckIndexRangeExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *CheckIndexRangeExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
handleIdx := e.schema.Len() - 1
for {
err := e.result.NextChunk(ctx, e.srcChunk)
err := e.result.Next(ctx, e.srcChunk)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func (e *RecoverIndexExec) fetchRecoverRows(ctx context.Context, srcResult dists
result.scanRowCount = 0

for {
err := srcResult.NextChunk(ctx, e.srcChunk)
err := srcResult.Next(ctx, e.srcChunk)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -428,8 +428,8 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
return result, nil
}

// NextChunk implements the Executor NextChunk interface.
func (e *RecoverIndexExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *RecoverIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
Expand Down Expand Up @@ -537,7 +537,7 @@ func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) e

sc := e.ctx.GetSessionVars().StmtCtx
for {
err := result.NextChunk(ctx, e.idxChunk)
err := result.Next(ctx, e.idxChunk)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -563,8 +563,8 @@ func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) e
}
}

// NextChunk implements the Executor NextChunk interface.
func (e *CleanupIndexExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *CleanupIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
Expand Down
14 changes: 7 additions & 7 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (e *HashAggExec) Open(ctx context.Context) error {
return nil
}

// NextChunk implements the Executor NextChunk interface.
func (e *HashAggExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
// In this stage we consider all data from src as a single group.
if !e.executed {
err := e.execute(ctx)
Expand Down Expand Up @@ -110,11 +110,11 @@ func (e *HashAggExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
}
}

// innerNextChunk fetches Chunks from src and update each aggregate function for each row in Chunk.
// execute fetches Chunks from src and update each aggregate function for each row in Chunk.
func (e *HashAggExec) execute(ctx context.Context) (err error) {
inputIter := chunk.NewIterator4Chunk(e.childrenResults[0])
for {
err := e.children[0].NextChunk(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childrenResults[0])
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -217,8 +217,8 @@ func (e *StreamAggExec) Open(ctx context.Context) error {
return nil
}

// NextChunk implements the Executor NextChunk interface.
func (e *StreamAggExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()

for !e.executed && chk.NumRows() < e.maxChunkSize {
Expand Down Expand Up @@ -264,7 +264,7 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch
return nil
}

err := e.children[0].NextChunk(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childrenResults[0])
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ const (
defaultCMSketchWidth = 2048
)

// NextChunk implements the Executor NextChunk interface.
func (e *AnalyzeExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *AnalyzeExec) Next(ctx context.Context, chk *chunk.Chunk) error {
concurrency, err := getBuildStatsConcurrency(e.ctx)
if err != nil {
return errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func (e *ChecksumTableExec) Open(ctx context.Context) error {
return nil
}

// NextChunk implements the Executor NextChunk interface.
func (e *ChecksumTableExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *ChecksumTableExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
Expand Down
4 changes: 2 additions & 2 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type DDLExec struct {
done bool
}

// NextChunk implements the Executor NextChunk interface.
func (e *DDLExec) NextChunk(ctx context.Context, chk *chunk.Chunk) (err error) {
// Next implements the Executor Next interface.
func (e *DDLExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.done {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *testSuite) TestCreateTable(c *C) {
chk := rs.NewChunk()
it := chunk.NewIterator4Chunk(chk)
for {
err1 := rs.NextChunk(ctx, chk)
err1 := rs.Next(ctx, chk)
c.Assert(err1, IsNil)
if chk.NumRows() == 0 {
break
Expand All @@ -82,7 +82,7 @@ func (s *testSuite) TestCreateTable(c *C) {
chk = rs.NewChunk()
it = chunk.NewIterator4Chunk(chk)
for {
err1 := rs.NextChunk(ctx, chk)
err1 := rs.Next(ctx, chk)
c.Assert(err1, IsNil)
if chk.NumRows() == 0 {
break
Expand Down Expand Up @@ -156,7 +156,7 @@ func (s *testSuite) TestAlterTableAddColumn(c *C) {
r, err := tk.Exec("select c2 from alter_test")
c.Assert(err, IsNil)
chk := r.NewChunk()
err = r.NextChunk(context.Background(), chk)
err = r.Next(context.Background(), chk)
c.Assert(err, IsNil)
row := chk.GetRow(0)
c.Assert(row.Len(), Equals, 1)
Expand Down
Loading