Skip to content

Commit

Permalink
*: rename NextChunk to Next (#6214)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu authored and coocood committed Apr 3, 2018
1 parent 17fb7d7 commit ccf6da1
Show file tree
Hide file tree
Showing 54 changed files with 233 additions and 271 deletions.
4 changes: 2 additions & 2 deletions ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ type RecordSet interface {
// Fields gets result fields.
Fields() []*ResultField

// NextChunk reads records into chunk.
NextChunk(ctx context.Context, chk *chunk.Chunk) error
// Next reads records into chunk.
Next(ctx context.Context, chk *chunk.Chunk) error

// NewChunk creates a new chunk with initial capacity.
NewChunk() *chunk.Chunk
Expand Down
2 changes: 1 addition & 1 deletion cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (ut *benchDB) mustExec(sql string) {
rs := rss[0]
chk := rs.NewChunk()
for {
err := rs.NextChunk(ctx, chk)
err := rs.Next(ctx, chk)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func LoadDeleteRanges(ctx sessionctx.Context, safePoint uint64) (ranges []DelRan
chk := rs.NewChunk()
it := chunk.NewIterator4Chunk(chk)
for {
err = rs.NextChunk(context.TODO(), chk)
err = rs.Next(context.TODO(), chk)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
27 changes: 4 additions & 23 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,10 @@ var (

// SelectResult is an iterator of coprocessor partial results.
type SelectResult interface {
// Next gets the next partial result.
Next(context.Context) (PartialResult, error)
// NextRaw gets the next raw result.
NextRaw(context.Context) ([]byte, error)
// NextChunk reads the data into chunk.
NextChunk(context.Context, *chunk.Chunk) error
// Next reads the data into chunk.
Next(context.Context, *chunk.Chunk) error
// Close closes the iterator.
Close() error
// Fetch fetches partial results from client.
Expand Down Expand Up @@ -121,23 +119,6 @@ func (r *selectResult) fetch(ctx context.Context) {
}
}

// Next returns the next row.
func (r *selectResult) Next(ctx context.Context) (PartialResult, error) {
re := <-r.results
if re.err != nil {
return nil, errors.Trace(re.err)
}
if re.result == nil {
return nil, nil
}
pr := &partialResult{}
pr.rowLen = r.rowLen
err := pr.unmarshal(re.result.GetData())
r.feedback.Update(re.result.GetStartKey(), pr.resp.OutputCounts)
r.partialCount++
return pr, errors.Trace(err)
}

// NextRaw returns the next raw partial result.
func (r *selectResult) NextRaw(ctx context.Context) ([]byte, error) {
re := <-r.results
Expand All @@ -149,8 +130,8 @@ func (r *selectResult) NextRaw(ctx context.Context) ([]byte, error) {
return re.result.GetData(), nil
}

// NextChunk reads data to the chunk.
func (r *selectResult) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next reads data to the chunk.
func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
for chk.NumRows() < r.ctx.GetSessionVars().MaxChunkSize {
if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) {
Expand Down
12 changes: 6 additions & 6 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *testSuite) TestSelectNormal(c *C) {
colTypes = append(colTypes, colTypes[0])
colTypes = append(colTypes, colTypes[0])

// Test NextChunk.
// Test Next.
response, err := Select(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false))
c.Assert(err, IsNil)
result, ok := response.(*selectResult)
Expand All @@ -64,11 +64,11 @@ func (s *testSuite) TestSelectNormal(c *C) {

response.Fetch(context.TODO())

// Test NextChunk.
// Test Next.
chk := chunk.NewChunk(colTypes)
numAllRows := 0
for {
err = response.NextChunk(context.TODO(), chk)
err = response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
numAllRows += chk.NumRows()
if chk.NumRows() == 0 {
Expand Down Expand Up @@ -108,7 +108,7 @@ func (s *testSuite) TestSelectStreaming(c *C) {

s.sctx.GetSessionVars().EnableStreaming = true

// Test NextChunk.
// Test Next.
response, err := Select(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false))
c.Assert(err, IsNil)
result, ok := response.(*streamResult)
Expand All @@ -117,11 +117,11 @@ func (s *testSuite) TestSelectStreaming(c *C) {

response.Fetch(context.TODO())

// Test NextChunk.
// Test Next.
chk := chunk.NewChunk(colTypes)
numAllRows := 0
for {
err = response.NextChunk(context.TODO(), chk)
err = response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
numAllRows += chk.NumRows()
if chk.NumRows() == 0 {
Expand Down
12 changes: 1 addition & 11 deletions distsql/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,7 @@ type streamResult struct {

func (r *streamResult) Fetch(context.Context) {}

func (r *streamResult) Next(ctx context.Context) (PartialResult, error) {
var ret streamPartialResult
ret.rowLen = r.rowLen
finished, err := r.readDataFromResponse(ctx, r.resp, &ret.Chunk)
if err != nil || finished {
return nil, errors.Trace(err)
}
return &ret, nil
}

func (r *streamResult) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
func (r *streamResult) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize
for chk.NumRows() < maxChunkSize {
Expand Down
8 changes: 4 additions & 4 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ func schema2ResultFields(schema *expression.Schema, defaultDB string) (rfs []*as
return rfs
}

// NextChunk use uses recordSet's executor to get next available chunk for later usage.
// Next use uses recordSet's executor to get next available chunk for later usage.
// If chunk does not contain any rows, then we update last query found rows in session variable as current found rows.
// The reason we need update is that chunk with 0 rows indicating we already finished current query, we need prepare for
// next query.
// If stmt is not nil and chunk with some rows inside, we simply update last query found rows by the number of row in chunk.
func (a *recordSet) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
err := a.executor.NextChunk(ctx, chk)
func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {
err := a.executor.Next(ctx, chk)
if err != nil {
a.lastErr = err
return errors.Trace(err)
Expand Down Expand Up @@ -262,7 +262,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
a.logSlowQuery(txnTS, err == nil)
}()

err = e.NextChunk(ctx, e.newChunk())
err = e.Next(ctx, e.newChunk())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
18 changes: 9 additions & 9 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ type CheckIndexRangeExec struct {
cols []*model.ColumnInfo
}

// NextChunk implements the Executor NextChunk interface.
func (e *CheckIndexRangeExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *CheckIndexRangeExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
handleIdx := e.schema.Len() - 1
for {
err := e.result.NextChunk(ctx, e.srcChunk)
err := e.result.Next(ctx, e.srcChunk)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func (e *RecoverIndexExec) fetchRecoverRows(ctx context.Context, srcResult dists
result.scanRowCount = 0

for {
err := srcResult.NextChunk(ctx, e.srcChunk)
err := srcResult.Next(ctx, e.srcChunk)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -428,8 +428,8 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
return result, nil
}

// NextChunk implements the Executor NextChunk interface.
func (e *RecoverIndexExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *RecoverIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
Expand Down Expand Up @@ -537,7 +537,7 @@ func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) e

sc := e.ctx.GetSessionVars().StmtCtx
for {
err := result.NextChunk(ctx, e.idxChunk)
err := result.Next(ctx, e.idxChunk)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -563,8 +563,8 @@ func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) e
}
}

// NextChunk implements the Executor NextChunk interface.
func (e *CleanupIndexExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *CleanupIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
Expand Down
14 changes: 7 additions & 7 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (e *HashAggExec) Open(ctx context.Context) error {
return nil
}

// NextChunk implements the Executor NextChunk interface.
func (e *HashAggExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
// In this stage we consider all data from src as a single group.
if !e.executed {
err := e.execute(ctx)
Expand Down Expand Up @@ -110,11 +110,11 @@ func (e *HashAggExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
}
}

// innerNextChunk fetches Chunks from src and update each aggregate function for each row in Chunk.
// execute fetches Chunks from src and update each aggregate function for each row in Chunk.
func (e *HashAggExec) execute(ctx context.Context) (err error) {
inputIter := chunk.NewIterator4Chunk(e.childrenResults[0])
for {
err := e.children[0].NextChunk(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childrenResults[0])
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -217,8 +217,8 @@ func (e *StreamAggExec) Open(ctx context.Context) error {
return nil
}

// NextChunk implements the Executor NextChunk interface.
func (e *StreamAggExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()

for !e.executed && chk.NumRows() < e.maxChunkSize {
Expand Down Expand Up @@ -264,7 +264,7 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch
return nil
}

err := e.children[0].NextChunk(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childrenResults[0])
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ const (
defaultCMSketchWidth = 2048
)

// NextChunk implements the Executor NextChunk interface.
func (e *AnalyzeExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *AnalyzeExec) Next(ctx context.Context, chk *chunk.Chunk) error {
concurrency, err := getBuildStatsConcurrency(e.ctx)
if err != nil {
return errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func (e *ChecksumTableExec) Open(ctx context.Context) error {
return nil
}

// NextChunk implements the Executor NextChunk interface.
func (e *ChecksumTableExec) NextChunk(ctx context.Context, chk *chunk.Chunk) error {
// Next implements the Executor Next interface.
func (e *ChecksumTableExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
Expand Down
4 changes: 2 additions & 2 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type DDLExec struct {
done bool
}

// NextChunk implements the Executor NextChunk interface.
func (e *DDLExec) NextChunk(ctx context.Context, chk *chunk.Chunk) (err error) {
// Next implements the Executor Next interface.
func (e *DDLExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.done {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *testSuite) TestCreateTable(c *C) {
chk := rs.NewChunk()
it := chunk.NewIterator4Chunk(chk)
for {
err1 := rs.NextChunk(ctx, chk)
err1 := rs.Next(ctx, chk)
c.Assert(err1, IsNil)
if chk.NumRows() == 0 {
break
Expand All @@ -82,7 +82,7 @@ func (s *testSuite) TestCreateTable(c *C) {
chk = rs.NewChunk()
it = chunk.NewIterator4Chunk(chk)
for {
err1 := rs.NextChunk(ctx, chk)
err1 := rs.Next(ctx, chk)
c.Assert(err1, IsNil)
if chk.NumRows() == 0 {
break
Expand Down Expand Up @@ -156,7 +156,7 @@ func (s *testSuite) TestAlterTableAddColumn(c *C) {
r, err := tk.Exec("select c2 from alter_test")
c.Assert(err, IsNil)
chk := r.NewChunk()
err = r.NextChunk(context.Background(), chk)
err = r.Next(context.Background(), chk)
c.Assert(err, IsNil)
row := chk.GetRow(0)
c.Assert(row.Len(), Equals, 1)
Expand Down
Loading

0 comments on commit ccf6da1

Please sign in to comment.