Skip to content

Commit

Permalink
Merge pull request #2 from qiuyesuifeng/qiueysuifeng/hackathon
Browse files Browse the repository at this point in the history
*: add mock data for stream select
  • Loading branch information
qiuyesuifeng authored Dec 1, 2018
2 parents d0b1b13 + 6e556c0 commit fb5ff81
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 56 deletions.
51 changes: 49 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,44 @@ func checkTooLongIndex(index model.CIStr) error {
return nil
}

func checkStreamType(args map[string]string) error {
key, ok := args["type"]
if !ok {
return errors.New("Cannot find stream table type")
}

tp := strings.ToLower(key)
if tp != "kafka" && tp != "pulsar" && tp != "binlog" && tp != "log" {
return errors.New("Invalid stream table type")
}

if tp == "kafka" {
_, ok := args["topic"]
if !ok {
return errors.New("Cannot find kafka stream table topic")
}
}

if tp == "pulsar" {
_, ok := args["topic"]
if !ok {
return errors.New("Cannot find pulsar stream table topic")
}
}

return nil
}

func checkStreamTimestampField(cols []*model.ColumnInfo) error {
for _, col := range cols {
if col.Tp == mysql.TypeTimestamp {
return nil
}
}

return errors.New("Cannot find stream table timestamp type")
}

func setColumnFlagWithConstraint(colMap map[string]*table.Column, v *ast.Constraint) {
switch v.Tp {
case ast.ConstraintPrimaryKey:
Expand Down Expand Up @@ -917,7 +955,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
}

tbInfo, err := buildTableInfo(ctx, d, ident.Name, cols, newConstraints)
tbInfo.IsStream = false;
tbInfo.IsStream = false
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -993,7 +1031,6 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
return errors.Trace(err)
}


func (d *ddl) CreateStream(ctx sessionctx.Context, s *ast.CreateStreamStmt) (err error) {
ident := ast.Ident{Schema: s.StreamName.Schema, Name: s.StreamName.Name}
colDefs := s.Cols
Expand Down Expand Up @@ -1042,6 +1079,16 @@ func (d *ddl) CreateStream(ctx sessionctx.Context, s *ast.CreateStreamStmt) (err
tbInfo.StreamProperties[p.K] = p.V
}

err = checkStreamType(tbInfo.StreamProperties)
if err != nil {
return errors.Trace(err)
}

err = checkStreamTimestampField(tbInfo.Columns)
if err != nil {
return errors.Trace(err)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tbInfo.ID,
Expand Down
24 changes: 15 additions & 9 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,8 +1087,12 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor
e.FinalAggFuncs = append(e.FinalAggFuncs, finalAggFunc)
if partialAggDesc.Name == ast.AggFuncGroupConcat {
// For group_concat, finalAggFunc and partialAggFunc need shared `truncate` flag to do duplicate.
finalAggFunc.(interface{ SetTruncated(t *int32) }).SetTruncated(
partialAggFunc.(interface{ GetTruncated() *int32 }).GetTruncated(),
finalAggFunc.(interface {
SetTruncated(t *int32)
}).SetTruncated(
partialAggFunc.(interface {
GetTruncated() *int32
}).GetTruncated(),
)
}
}
Expand All @@ -1100,20 +1104,20 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor

if v.StreamWindow != nil {
return &StreamWindowHashAggExec{
baseExecutor: e.baseExecutor,
prepared: e.prepared,
sc: e.sc,
PartialAggFuncs: e.PartialAggFuncs,
baseExecutor: e.baseExecutor,
prepared: e.prepared,
sc: e.sc,
PartialAggFuncs: e.PartialAggFuncs,
FinalAggFuncs: e.FinalAggFuncs,
partialResultMap: e.partialResultMap,
groupSet: e.groupSet,
groupKeys: e.groupKeys,
cursor4GroupKey: e.cursor4GroupKey,
GroupByItems: e.GroupByItems,
GroupByItems: e.GroupByItems,
groupKeyBuffer: e.groupKeyBuffer,
groupValDatums: e.groupValDatums,

defaultVal: e.defaultVal,
defaultVal: e.defaultVal,

childResult: e.childResult,
}
Expand Down Expand Up @@ -1729,7 +1733,9 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) *

func (b *executorBuilder) buildStreamReader(v *plannercore.PhysicalStreamReader) *StreamReaderExecutor {
return &StreamReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
Table: v.Table,
Columns: v.Columns,
}
}

Expand Down
166 changes: 125 additions & 41 deletions executor/stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,72 +14,156 @@
package executor

import (

"github.com/cznic/mathutil"
"github.com/pingcap/errors"

//"github.com/pingcap/parser/model"
//plannercore "github.com/pingcap/tidb/planner/core"
//"github.com/pingcap/tidb/statistics"
//"github.com/pingcap/tidb/table"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/chunk"
//"github.com/pingcap/tidb/util/ranger"
//tipb "github.com/pingcap/tipb/go-tipb"
"github.com/pingcap/tidb/util/mock"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

// make sure `TableReaderExecutor` implements `Executor`.
// make sure `StreamReaderExecutor` implements `Executor`.
var _ Executor = &StreamReaderExecutor{}

// StreamReaderExecutor sends DAG request and reads data from a stream.
var batchFetchCnt = 10
var maxFetchCnt = 10000
var streamCursor = 0

// streamTableMap : {type|topic|table =>pos}
var streamTableMaps = make(map[string]int64)

// StreamReaderExecutor reads data from a stream.
type StreamReaderExecutor struct {
baseExecutor
Table *model.TableInfo
Columns []*model.ColumnInfo

/*
table table.Table
physicalTableID int64
keepOrder bool
desc bool
ranges []*ranger.Range
dagPB *tipb.DAGRequest
// columns are only required by union scan.
columns []*model.ColumnInfo
// resultHandler handles the order of the result. Since (MAXInt64, MAXUint64] stores before [0, MaxInt64] physically
// for unsigned int.
resultHandler *tableResultHandler
streaming bool
feedback *statistics.QueryFeedback
// corColInFilter tells whether there's correlated column in filter.
corColInFilter bool
// corColInAccess tells whether there's correlated column in access conditions.
corColInAccess bool
plans []plannercore.PhysicalPlan
*/
rowCnt int
isInit bool
result *chunk.Chunk
cursor int
}

func (e *StreamReaderExecutor) init() error {
if e.isInit {
return nil
}

e.isInit = true
return nil
}

// Open initialzes necessary variables for using this executor.
func (e *StreamReaderExecutor) Open(ctx context.Context) error {
err := e.init()
if err != nil {
return errors.Trace(err)
}

return nil
}

// Next fills data into the chunk passed by its caller.
func (e *StreamReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
for {
if e.rowCnt < 10 && chk.NumRows() < e.ctx.GetSessionVars().MaxChunkSize {
chk.AppendInt64(0, int64(e.rowCnt))
e.rowCnt += 1
} else {
break
log.Warnf("[qiuyesuifeng]%v:%v", e.Table, e.cursor)

chk.GrowAndReset(e.maxChunkSize)
if e.result == nil {
e.result = e.newFirstChunk()
err := e.fetchAll()
if err != nil {
return errors.Trace(err)
}
iter := chunk.NewIterator4Chunk(e.result)
for colIdx := 0; colIdx < e.Schema().Len(); colIdx++ {
retType := e.Schema().Columns[colIdx].RetType
if !types.IsTypeVarchar(retType.Tp) {
continue
}
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
if valLen := len(row.GetString(colIdx)); retType.Flen < valLen {
retType.Flen = valLen
}
}
}
}
if e.cursor >= e.result.NumRows() {
return nil
}
return errors.Trace(nil)
numCurBatch := mathutil.Min(chk.Capacity(), e.result.NumRows()-e.cursor)
chk.Append(e.result, e.cursor, e.cursor+numCurBatch)
e.cursor += numCurBatch
streamCursor += numCurBatch
return nil
}

// Close implements the Executor Close interface.
func (e *StreamReaderExecutor) Close() error {
e.isInit = false
return nil
}

func (e *StreamReaderExecutor) fetchAll() error {
err := e.fetchMockData()
if err != nil {
return errors.Trace(err)
}

return nil
}

func (e *StreamReaderExecutor) fetchMockData() error {
log.Warnf("[qiuyesuifeng][fetch mock data]%v", e.cursor)

for i := streamCursor; i < maxFetchCnt && i < streamCursor+batchFetchCnt; i++ {
row := []interface{}{mock.MockStreamData[i].ID, mock.MockStreamData[i].Content, mock.MockStreamData[i].CreateTime}
e.appendRow(e.result, row)
}

return nil
}

func (e *StreamReaderExecutor) appendRow(chk *chunk.Chunk, row []interface{}) {
for i, col := range row {
if col == nil {
chk.AppendNull(i)
continue
}
switch x := col.(type) {
case nil:
chk.AppendNull(i)
case int:
chk.AppendInt64(i, int64(x))
case int64:
chk.AppendInt64(i, x)
case uint64:
chk.AppendUint64(i, x)
case float64:
chk.AppendFloat64(i, x)
case float32:
chk.AppendFloat32(i, x)
case string:
chk.AppendString(i, x)
case []byte:
chk.AppendBytes(i, x)
case types.BinaryLiteral:
chk.AppendBytes(i, x)
case *types.MyDecimal:
chk.AppendMyDecimal(i, x)
case types.Time:
chk.AppendTime(i, x)
case json.BinaryJSON:
chk.AppendJSON(i, x)
case types.Duration:
chk.AppendDuration(i, x)
case types.Enum:
chk.AppendEnum(i, x)
case types.Set:
chk.AppendSet(i, x)
default:
chk.AppendNull(i)
}
}
}
22 changes: 21 additions & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,32 @@ func (ds *DataSource) tryToGetDualTask() (task, error) {
// It will enumerate all the available indices and choose a plan with least cost.
func (ds *DataSource) findBestTask(prop *property.PhysicalProperty) (t task, err error) {
if ds.tableInfo.IsStream == true {
sr := PhysicalStreamReader{}.Init(ds.ctx)
sr := PhysicalStreamReader{
Table: ds.tableInfo,
Columns: ds.Columns,
TableAsName: ds.TableAsName,
DBName: ds.DBName,
}.Init(ds.ctx)
sr.SetSchema(ds.schema)
sr.stats = property.NewSimpleStats(10)
t := rootTask{p: sr}

// access where condition
for _, path := range ds.possibleAccessPaths {
if path.isTablePath {
tableConds := path.tableFilters
if tableConds != nil {
tableSel := PhysicalSelection{Conditions: tableConds}.
Init(ds.ctx, ds.stats.ScaleByExpectCnt(0))
tableSel.SetChildren(t.p)
t.p = tableSel
}
}
}

return &t, nil
}

// If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself,
// and set inner child prop nil, so here we do nothing.
if prop == nil {
Expand Down
4 changes: 2 additions & 2 deletions planner/core/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ const (
TypeTableReader = "TableReader"
// TypeIndexReader is the type of IndexReader.
TypeIndexReader = "IndexReader"
// TypeStreamReader is the type of TableReader.
// TypeStreamReader is the type of StreamReader.
TypeStreamReader = "StreamReader"
)

Expand Down Expand Up @@ -366,7 +366,7 @@ func (p PhysicalTableReader) Init(ctx sessionctx.Context) *PhysicalTableReader {
}

func (p PhysicalStreamReader) Init(ctx sessionctx.Context) *PhysicalStreamReader {
p.basePhysicalPlan = newBasePhysicalPlan(ctx, TypeTableReader, &p)
p.basePhysicalPlan = newBasePhysicalPlan(ctx, TypeStreamReader, &p)
return &p
}

Expand Down
7 changes: 6 additions & 1 deletion planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ var (
_ PhysicalPlan = &PhysicalStreamReader{}
)

// PhysicalStreamReader is the table reader in tidb.
// PhysicalStreamReader is the stream reader in tidb.
type PhysicalStreamReader struct {
physicalSchemaProducer

Table *model.TableInfo
Columns []*model.ColumnInfo
TableAsName *model.CIStr
DBName model.CIStr
}

// PhysicalTableReader is the table reader in tidb.
Expand Down
Loading

0 comments on commit fb5ff81

Please sign in to comment.