Skip to content

Commit

Permalink
Enhance MYSQL function SHOW PROCESSLIST (#546)
Browse files Browse the repository at this point in the history
Signed-off-by: Charlie17Li <charlie1213800@gmail.com>

Signed-off-by: Charlie17Li <charlie1213800@gmail.com>
  • Loading branch information
Charlie17Li authored Nov 27, 2022
1 parent f2dfe76 commit 7590528
Showing 1 changed file with 54 additions and 3 deletions.
57 changes: 54 additions & 3 deletions pkg/runtime/plan/dal/show_process_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package dal

import (
"context"
"fmt"
"strconv"
"strings"
)

Expand All @@ -27,11 +29,18 @@ import (
)

import (
"github.com/arana-db/arana/pkg/dataset"
"github.com/arana-db/arana/pkg/mysql/rows"
"github.com/arana-db/arana/pkg/proto"
"github.com/arana-db/arana/pkg/resultx"
"github.com/arana-db/arana/pkg/runtime/ast"
"github.com/arana-db/arana/pkg/runtime/plan"
)

const (
sep = "_"
)

type ShowProcessListPlan struct {
plan.BasePlan
db string
Expand All @@ -52,17 +61,59 @@ func (s *ShowProcessListPlan) ExecIn(ctx context.Context, conn proto.VConn) (pro
var (
sb strings.Builder
indexes []int
err error
)

ctx, span := plan.Tracer.Start(ctx, "ShowProcessListPlan.ExecIn")
defer span.End()

if err = s.Stmt.Restore(ast.RestoreDefault, &sb, &indexes); err != nil {
if err := s.Stmt.Restore(ast.RestoreDefault, &sb, &indexes); err != nil {
return nil, errors.WithStack(err)
}

res, err := conn.Query(ctx, s.db, sb.String(), s.ToArgs(indexes)...)
if err != nil {
return nil, errors.WithStack(err)
}

ds, err := res.Dataset()
if err != nil {
return nil, errors.WithStack(err)
}

fields, err := ds.Fields()
if err != nil {
return nil, errors.WithStack(err)
}

return conn.Query(ctx, s.db, sb.String(), s.ToArgs(indexes)...)
strs := strings.Split(s.db, sep)
if len(strs) < 2 {
return nil, fmt.Errorf("can get the id of sub database")
}
groupId, err := strconv.ParseInt(strs[1], 10, 64)
if err != nil {
return nil, errors.WithStack(err)
}

ds = dataset.Pipe(ds,
dataset.Map(nil, func(next proto.Row) (proto.Row, error) {
dest := make([]proto.Value, len(fields))
if next.Scan(dest) != nil {
return next, nil
}

id := dest[0].(int64) << 16
if id <= 0 {
return nil, fmt.Errorf("integer operation result is out of range")
}
dest[0] = id + groupId

if next.IsBinary() {
return rows.NewBinaryVirtualRow(fields, dest), nil
}
return rows.NewTextVirtualRow(fields, dest), nil
}))

return resultx.New(resultx.WithDataset(ds)), nil
}

func (s *ShowProcessListPlan) SetDatabase(db string) {
Expand Down

0 comments on commit 7590528

Please sign in to comment.