Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support subscription in mo table size/rows. #20785

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions pkg/frontend/back_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,11 +1140,12 @@ func (sh *SqlHelper) GetSubscriptionMeta(dbName string) (*plan.SubscriptionMeta,
return sh.ses.txnCompileCtx.GetSubscriptionMeta(dbName, nil)
}

// Made for sequence func. nextval, setval.
func (sh *SqlHelper) ExecSql(sql string) (ret [][]interface{}, err error) {
func (sh *SqlHelper) execSql(
ctx context.Context,
sql string,
) (ret [][]interface{}, err error) {
var erArray []ExecResult

ctx := sh.ses.txnCompileCtx.execCtx.reqCtx
/*
if we run the transaction statement (BEGIN, ect) here , it creates an independent transaction.
if we do not run the transaction statement (BEGIN, ect) here, it runs the sql in the share transaction
Expand All @@ -1171,3 +1172,13 @@ func (sh *SqlHelper) ExecSql(sql string) (ret [][]interface{}, err error) {

return erArray[0].(*MysqlResultSet).Data, nil
}

// Made for sequence func. nextval, setval.
func (sh *SqlHelper) ExecSql(sql string) (ret [][]interface{}, err error) {
ctx := sh.ses.txnCompileCtx.execCtx.reqCtx
return sh.execSql(ctx, sql)
}

func (sh *SqlHelper) ExecSqlWithCtx(ctx context.Context, sql string) ([][]interface{}, error) {
return sh.execSql(ctx, sql)
}
182 changes: 173 additions & 9 deletions pkg/sql/plan/function/func_mo.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,113 @@ type GetMoTableSizeRowsFuncType = func() func(
var GetMoTableSizeFunc atomic.Pointer[GetMoTableSizeRowsFuncType]
var GetMoTableRowsFunc atomic.Pointer[GetMoTableSizeRowsFuncType]

type subscription struct {
valid bool

oriAccId uint64
oriDatabaseId uint64
oriTableId uint64

oriTableName string
oriDatabaseName string
}

func (s subscription) String() string {
return fmt.Sprintf("valid: %v, oriAcc(%d), oriDatabase(%d-%s), oriTable(%d-%s)",
s.valid,
s.oriAccId,
s.oriDatabaseId,
s.oriDatabaseName,
s.oriTableId,
s.oriTableName)
}

func isSubscribedTable(
proc *process.Process,
reqAcc uint32,
db engine.Database,
dbName, tblName string,
) (sub subscription, err error) {

var (
sql string
ret [][]interface{}
meta *plan.SubscriptionMeta
)

if db.IsSubscription(proc.Ctx) {
defer func() {
if err != nil {
sub.valid = false

metaInfo := ""
if meta != nil {
metaInfo = fmt.Sprintf("ACC(%s,%d)-DB(%s)-TBLS(%s)",
meta.AccountName, meta.AccountId, meta.DbName, meta.Tables)
}

logutil.Error("MO_TABLE_SIZE/ROWS",
zap.String("source", "isSubscribedTable"),
zap.Error(err),
zap.String("sub meta", metaInfo),
zap.Uint32("request acc", reqAcc),
zap.String("db name", dbName),
zap.String("tbl name", tblName),
zap.String("subscription", sub.String()),
zap.String("sql", sql),
)
}
}()

meta, err = proc.GetSessionInfo().SqlHelper.GetSubscriptionMeta(dbName)
if err != nil {
return sub,
moerr.NewInternalErrorNoCtx(fmt.Sprintf("get subscription meta failed, err: %v", err))
}

if meta.Tables != pubsub.TableAll && !strings.Contains(meta.Tables, tblName) {
return sub, moerr.NewInternalErrorNoCtx("no such subscribed table")
}

// check passed, get acc, db, tbl info
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
ctx = defines.AttachAccountId(ctx, uint32(sysAccountID))
defer cancel()

sql = fmt.Sprintf(`
select
reldatabase_id, rel_id
from
mo_catalog.mo_tables
where
account_id = %d and reldatabase = '%s' and relname = '%s';`,
meta.AccountId, meta.DbName, tblName)

ret, err = proc.GetSessionInfo().SqlHelper.ExecSqlWithCtx(ctx, sql)
if err != nil {
return sub,
moerr.NewInternalErrorNoCtx(fmt.Sprintf("exec get subscribed tbl info sql failed, err: %v", err))
}

if len(ret) != 1 {
return sub,
moerr.NewInternalErrorNoCtx(fmt.Sprintf("get the subscribed tbl info empty: %s", tblName))
}

sub.valid = true
sub.oriAccId = uint64(meta.AccountId)
sub.oriDatabaseId = ret[0][0].(uint64)
sub.oriTableId = ret[0][1].(uint64)
sub.oriTableName = tblName
sub.oriDatabaseName = meta.DbName

return sub, nil
}

sub.valid = false
return sub, nil
}

func MoTableSizeRowsHelper(
iVecs []*vector.Vector,
result vector.FunctionResultWrapper,
Expand Down Expand Up @@ -283,17 +390,28 @@ func MoTableSizeRowsHelper(
return err
}

if rel, err = db.Relation(proc.Ctx, tblName, nil); err != nil {
if moerr.IsMoErrCode(err, moerr.OkExpectedEOB) {
return moerr.NewInternalErrorNoCtxf("tbl not exist: %s-%s(%s)",
dbName, tblName, "OkExpectedEOB")
}
var sub subscription
if sub, err = isSubscribedTable(
proc, accountId, db, dbName, tblName); err != nil {
return err
}
} else if sub.valid {
// is subscription
accIds = append(accIds, sub.oriAccId)
dbIds = append(dbIds, sub.oriDatabaseId)
tblIds = append(tblIds, sub.oriTableId)
} else {
if rel, err = db.Relation(proc.Ctx, tblName, nil); err != nil {
if moerr.IsMoErrCode(err, moerr.OkExpectedEOB) {
return moerr.NewInternalErrorNoCtxf("tbl not exist: %s-%s(%s)",
dbName, tblName, "OkExpectedEOB")
}
return err
}

accIds = append(accIds, uint64(accountId))
dbIds = append(dbIds, uint64(rel.GetDBID(proc.Ctx)))
tblIds = append(tblIds, uint64(rel.GetTableID(proc.Ctx)))
accIds = append(accIds, uint64(accountId))
dbIds = append(dbIds, uint64(rel.GetDBID(proc.Ctx)))
tblIds = append(tblIds, uint64(rel.GetTableID(proc.Ctx)))
}
}

ret, err = (*executor.Load())()(
Expand Down Expand Up @@ -399,6 +517,29 @@ func MoTableRowsOld(ivecs []*vector.Vector, result vector.FunctionResultWrapper,
}
return err
}

var accId uint32
accId, err = defines.GetAccountId(foolCtx)
if err != nil {
return err
}

var sub subscription
if sub, err = isSubscribedTable(
proc, accId, dbo, dbStr, tblStr); err != nil {
logutil.Error("MoTableRowsOld",
zap.String("source", "isSubscribeTable"),
zap.Error(err))
return err
} else if sub.valid {
// subscription
foolCtx = defines.AttachAccountId(foolCtx, uint32(sub.oriAccId))
dbo, err = e.Database(foolCtx, sub.oriDatabaseName, txn)
if err != nil {
return err
}
}

rel, err = dbo.Relation(foolCtx, tblStr, nil)
if err != nil {
return err
Expand Down Expand Up @@ -518,6 +659,29 @@ func MoTableSizeOld(ivecs []*vector.Vector, result vector.FunctionResultWrapper,
}
return err
}

var accId uint32
accId, err = defines.GetAccountId(foolCtx)
if err != nil {
return err
}

var sub subscription
if sub, err = isSubscribedTable(
proc, accId, dbo, dbStr, tblStr); err != nil {
logutil.Error("MoTableSizeOld",
zap.String("source", "isSubscribeTable"),
zap.Error(err))
return err
} else if sub.valid {
// subscription
foolCtx = defines.AttachAccountId(foolCtx, uint32(sub.oriAccId))
dbo, err = e.Database(foolCtx, sub.oriDatabaseName, txn)
if err != nil {
return err
}
}

rel, err = dbo.Relation(foolCtx, tblStr, nil)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/process/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ type Process struct {
type sqlHelper interface {
GetCompilerContext() any
ExecSql(string) ([][]interface{}, error)
ExecSqlWithCtx(context.Context, string) ([][]interface{}, error)
GetSubscriptionMeta(string) (sub *plan.SubscriptionMeta, err error)
}

Expand Down
67 changes: 67 additions & 0 deletions test/distributed/cases/function/func_mo.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
drop database if exists testdb;
create database testdb;
use testdb;
create account acc admin_name "root" identified by "111";
create publication pub1 database testdb account all;
create table t1 (a int);
create table t2 (a int);
create table t3 (a int);
insert into t1 select * from generate_series(1, 1000)g;
insert into t2 select * from generate_series(1, 1000)g;
insert into t3 select * from generate_series(1, 1000)g;
drop database if exists testdb_sub;
create database testdb_sub from sys publication pub1;
drop database if exists testdb_nor;
create database testdb_nor;
use testdb_nor;
create table t1 (a int);
create table t2 (a int);
create table t3 (a int);
insert into t1 select * from generate_series(1, 1001)g;
insert into t2 select * from generate_series(1, 1001)g;
insert into t3 select * from generate_series(1, 1001)g;
create table tmp(dbName varchar, tblName varchar);
insert into tmp values ("testdb_nor", "t1"), ("testdb_nor", "t2"), ("testdb_nor", "t3");
insert into tmp values ("testdb_sub", "t1"), ("testdb_sub", "t2"), ("testdb_sub", "t3");
set mo_table_stats.force_update = yes;
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
mo_table_rows(dbName, tblName)
1001
1001
1001
1000
1000
1000
insert into tmp values ("testdb_sub", "t4");
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
internal error: get the subscribed tbl info empty: t4
set mo_table_stats.force_update = no;
delete from tmp where dbName = "testdb_sub" and tblName = "t4";
set mo_table_stats.use_old_impl = yes;
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
mo_table_rows(dbName, tblName)
1001
1001
1001
1000
1000
1000
select mo_table_size(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
mo_table_size(dbName, tblName)
36036
36036
36036
36000
36000
36000
insert into tmp values ("testdb_sub", "t4");
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
internal error: get the subscribed tbl info empty: t4
select mo_table_size(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
internal error: get the subscribed tbl info empty: t4
set mo_table_stats.use_old_impl = no;
drop database testdb_nor;
drop database testdb_sub;
drop account acc;
drop publication pub1;
drop database testdb;
64 changes: 64 additions & 0 deletions test/distributed/cases/function/func_mo.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
drop database if exists testdb;
create database testdb;
use testdb;

create account acc admin_name "root" identified by "111";
create publication pub1 database testdb account all;

create table t1 (a int);
create table t2 (a int);
create table t3 (a int);

insert into t1 select * from generate_series(1, 1000)g;
insert into t2 select * from generate_series(1, 1000)g;
insert into t3 select * from generate_series(1, 1000)g;

-- @session:id=2&user=acc:root&password=111
drop database if exists testdb_sub;
create database testdb_sub from sys publication pub1;

drop database if exists testdb_nor;
create database testdb_nor;
use testdb_nor;

create table t1 (a int);
create table t2 (a int);
create table t3 (a int);

insert into t1 select * from generate_series(1, 1001)g;
insert into t2 select * from generate_series(1, 1001)g;
insert into t3 select * from generate_series(1, 1001)g;

create table tmp(dbName varchar, tblName varchar);
insert into tmp values ("testdb_nor", "t1"), ("testdb_nor", "t2"), ("testdb_nor", "t3");
insert into tmp values ("testdb_sub", "t1"), ("testdb_sub", "t2"), ("testdb_sub", "t3");

set mo_table_stats.force_update = yes;
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);

insert into tmp values ("testdb_sub", "t4");
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);

set mo_table_stats.force_update = no;
delete from tmp where dbName = "testdb_sub" and tblName = "t4";

set mo_table_stats.use_old_impl = yes;
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
-- @ignore:0
select mo_table_size(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);

insert into tmp values ("testdb_sub", "t4");
select mo_table_rows(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);
-- @ignore:0
select mo_table_size(dbName, tblName) from (select * from testdb_nor.tmp order by dbName, tblName asc);

set mo_table_stats.use_old_impl = no;

drop database testdb_nor;
drop database testdb_sub;

-- @session

drop account acc;
drop publication pub1;
drop database testdb;
Loading