Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: integrate plugin framework with TiDB #9006

Merged
merged 7 commits into from
Jan 14, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH)))
export PATH := $(path_to_add):$(PATH)

GO := GO111MODULE=on go
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
GOBUILD := CGO_ENABLED=1 $(GO) build $(BUILD_FLAG)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tennix PTAL if it will affect the docker image in k8s.

GOTEST := CGO_ENABLED=1 $(GO) test -p 3
OVERALLS := CGO_ENABLED=1 GO111MODULE=on overalls

Expand Down
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type Config struct {
TiKVClient TiKVClient `toml:"tikv-client" json:"tikv-client"`
Binlog Binlog `toml:"binlog" json:"binlog"`
CompatibleKillQuery bool `toml:"compatible-kill-query" json:"compatible-kill-query"`
Plugin Plugin `toml:"plugin" json:"plugin"`
}

// Log is the log section of config.
Expand Down Expand Up @@ -255,6 +256,12 @@ type Binlog struct {
BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"`
}

// Plugin is the config for plugin
type Plugin struct {
Dir string `toml:"dir" json:"dir"`
Load string `toml:"load" json:"load"`
}

var defaultConf = Config{
Host: "0.0.0.0",
AdvertiseAddress: "",
Expand Down
7 changes: 5 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,13 @@ func (do *Domain) topNSlowQueryLoop() {
do.slowQuery.Append(info)
case msg := <-do.slowQuery.msgCh:
req := msg.request
if req.Tp == ast.ShowSlowTop {
switch req.Tp {
case ast.ShowSlowTop:
msg.result = do.slowQuery.QueryTop(int(req.Count), req.Kind)
} else if req.Tp == ast.ShowSlowRecent {
case ast.ShowSlowRecent:
msg.result = do.slowQuery.QueryRecent(int(req.Count))
default:
msg.result = do.slowQuery.QueryAll()
}
msg.Done()
}
Expand Down
4 changes: 4 additions & 0 deletions domain/topn_slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ func (q *topNSlowQueries) Append(info *SlowQueryInfo) {
}
}

func (q *topNSlowQueries) QueryAll() []*SlowQueryInfo {
return q.recent.data
}

func (q *topNSlowQueries) RemoveExpired(now time.Time) {
q.user.RemoveExpired(now, q.period)
q.internal.RemoveExpired(now, q.period)
Expand Down
7 changes: 7 additions & 0 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/infoschema"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -875,6 +876,12 @@ func (e *ShowExec) fetchShowProcedureStatus() error {
}

func (e *ShowExec) fetchShowPlugins() error {
tiPlugins := plugin.GetAll()
for _, ps := range tiPlugins {
for _, p := range ps {
e.appendRow([]interface{}{p.Name, p.State.String(), p.Kind.String(), p.Path, p.License, strconv.Itoa(int(p.Version))})
}
}
return nil
}

Expand Down
10 changes: 9 additions & 1 deletion infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,5 +358,13 @@ func initInfoSchemaDB() {

// IsMemoryDB checks if the db is in memory.
func IsMemoryDB(dbName string) bool {
return dbName == "information_schema" || dbName == "performance_schema"
if dbName == "information_schema" {
return true
}
for _, driver := range drivers {
if driver.DBInfo.Name.L == dbName {
return true
}
}
return false
}
13 changes: 13 additions & 0 deletions infoschema/perfschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,20 @@ type perfSchemaTable struct {
cols []*table.Column
}

var pluginTable = make(map[string]func(autoid.Allocator, *model.TableInfo) (table.Table, error))

// RegisterTable registers a new table into TiDB.
func RegisterTable(tableName, sql string,
tableFromMeta func(autoid.Allocator, *model.TableInfo) (table.Table, error)) {
perfSchemaTables = append(perfSchemaTables, sql)
pluginTable[tableName] = tableFromMeta
}

func tableFromMeta(alloc autoid.Allocator, meta *model.TableInfo) (table.Table, error) {
if f, ok := pluginTable[meta.Name.L]; ok {
ret, err := f(alloc, meta)
return ret, err
}
return createPerfSchemaTable(meta), nil
}

Expand Down
4 changes: 2 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,9 +1712,9 @@ func buildShowSchema(s *ast.ShowStmt) (schema *expression.Schema) {
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeLonglong,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar}
case ast.ShowPlugins:
names = []string{"Name", "Status", "Type", "Library", "License"}
names = []string{"Name", "Status", "Type", "Library", "License", "Version"}
ftypes = []byte{
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar,
}
case ast.ShowProcessList:
names = []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"}
Expand Down
95 changes: 66 additions & 29 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/privilege/privileges"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -1276,6 +1278,21 @@ func loadSystemTZ(se *session) (string, error) {

// BootstrapSession runs the first time when the TiDB server start.
func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
cfg := config.GetGlobalConfig()
if len(cfg.Plugin.Load) > 0 {
err := plugin.Init(context.Background(), plugin.Config{
Plugins: strings.Split(cfg.Plugin.Load, ","),
PluginDir: cfg.Plugin.Dir,
GlobalSysVar: &variable.SysVars,
PluginVarNames: &variable.PluginVarNames,
})
if err != nil {
return nil, err
}
}

initLoadCommonGlobalVarsSQL()

ver := getStoreBootstrapVersion(store)
if ver == notBootstrapped {
runInBootstrapSession(store, bootstrap)
Expand Down Expand Up @@ -1446,39 +1463,59 @@ func finishBootstrap(store kv.Storage) {
}

const quoteCommaQuote = "', '"
const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" +
variable.AutocommitVar + quoteCommaQuote +
variable.SQLModeVar + quoteCommaQuote +
variable.MaxAllowedPacket + quoteCommaQuote +
variable.TimeZone + quoteCommaQuote +
variable.BlockEncryptionMode + quoteCommaQuote +
variable.WaitTimeout + quoteCommaQuote +
variable.InteractiveTimeout + quoteCommaQuote +
variable.MaxPreparedStmtCount + quoteCommaQuote +

var builtinGlobalVariable = []string{
variable.AutocommitVar,
variable.SQLModeVar,
variable.MaxAllowedPacket,
variable.TimeZone,
variable.BlockEncryptionMode,
variable.WaitTimeout,
variable.InteractiveTimeout,
variable.MaxPreparedStmtCount,
/* TiDB specific global variables: */
variable.TiDBSkipUTF8Check + quoteCommaQuote +
variable.TiDBIndexJoinBatchSize + quoteCommaQuote +
variable.TiDBIndexLookupSize + quoteCommaQuote +
variable.TiDBIndexLookupConcurrency + quoteCommaQuote +
variable.TiDBIndexLookupJoinConcurrency + quoteCommaQuote +
variable.TiDBIndexSerialScanConcurrency + quoteCommaQuote +
variable.TiDBHashJoinConcurrency + quoteCommaQuote +
variable.TiDBProjectionConcurrency + quoteCommaQuote +
variable.TiDBHashAggPartialConcurrency + quoteCommaQuote +
variable.TiDBHashAggFinalConcurrency + quoteCommaQuote +
variable.TiDBBackoffLockFast + quoteCommaQuote +
variable.TiDBConstraintCheckInPlace + quoteCommaQuote +
variable.TiDBOptInSubqToJoinAndAgg + quoteCommaQuote +
variable.TiDBDistSQLScanConcurrency + quoteCommaQuote +
variable.TiDBInitChunkSize + quoteCommaQuote +
variable.TiDBMaxChunkSize + quoteCommaQuote +
variable.TiDBEnableCascadesPlanner + quoteCommaQuote +
variable.TiDBRetryLimit + quoteCommaQuote +
variable.TiDBDisableTxnAutoRetry + quoteCommaQuote +
variable.TiDBEnableWindowFunction + "')"
variable.TiDBSkipUTF8Check,
variable.TiDBIndexJoinBatchSize,
variable.TiDBIndexLookupSize,
variable.TiDBIndexLookupConcurrency,
variable.TiDBIndexLookupJoinConcurrency,
variable.TiDBIndexSerialScanConcurrency,
variable.TiDBHashJoinConcurrency,
variable.TiDBProjectionConcurrency,
variable.TiDBHashAggPartialConcurrency,
variable.TiDBHashAggFinalConcurrency,
variable.TiDBBackoffLockFast,
variable.TiDBConstraintCheckInPlace,
variable.TiDBDDLReorgWorkerCount,
variable.TiDBDDLReorgBatchSize,
variable.TiDBOptInSubqToJoinAndAgg,
variable.TiDBDistSQLScanConcurrency,
variable.TiDBInitChunkSize,
variable.TiDBMaxChunkSize,
variable.TiDBEnableCascadesPlanner,
variable.TiDBRetryLimit,
variable.TiDBDisableTxnAutoRetry,
variable.TiDBEnableWindowFunction,
}

var (
loadCommonGlobalVarsSQLOnce sync.Once
loadCommonGlobalVarsSQL string
)

func initLoadCommonGlobalVarsSQL() {
loadCommonGlobalVarsSQLOnce.Do(func() {
vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...)
if len(variable.PluginVarNames) > 0 {
vars = append(vars, variable.PluginVarNames...)
}
loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(vars, quoteCommaQuote) + "')"
})
}

// loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session.
func (s *session) loadCommonGlobalVariablesIfNeeded() error {
initLoadCommonGlobalVarsSQL()
vars := s.sessionVars
if vars.CommonGlobalLoaded {
return nil
Expand Down
10 changes: 9 additions & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func GetSysVar(name string) *SysVar {
return SysVars[name]
}

// PluginVarNames is global plugin var names set.
var PluginVarNames []string

// Variable error codes.
const (
CodeUnknownStatusVar terror.ErrCode = 1
Expand Down Expand Up @@ -315,7 +318,8 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, "sort_buffer_size", "262144"},
{ScopeGlobal, "innodb_flush_neighbors", "1"},
{ScopeNone, "innodb_use_sys_malloc", "ON"},
{ScopeNone, "plugin_dir", "/usr/local/mysql/lib/plugin/"},
{ScopeSession, PluginLoad, ""},
{ScopeSession, PluginDir, "/data/deploy/plugin"},
{ScopeNone, "performance_schema_max_socket_classes", "10"},
{ScopeNone, "performance_schema_max_stage_classes", "150"},
{ScopeGlobal, "innodb_purge_batch_size", "300"},
Expand Down Expand Up @@ -789,6 +793,10 @@ const (
ValidatePasswordNumberCount = "validate_password_number_count"
// ValidatePasswordLength is the name of 'validate_password_length' system variable.
ValidatePasswordLength = "validate_password_length"
// PluginDir is the name of 'plugin_dir' system variable.
PluginDir = "plugin_dir"
// PluginLoad is the name of 'plugin_load' system variable.
PluginLoad = "plugin_load"
)

// GlobalVarAccessor is the interface for accessing global scope system and status variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func GetSessionOnlySysVars(s *SessionVars, key string) (string, bool, error) {
return strconv.FormatUint(atomic.LoadUint64(&config.GetGlobalConfig().Log.SlowThreshold), 10), true, nil
case TiDBQueryLogMaxLen:
return strconv.FormatUint(atomic.LoadUint64(&config.GetGlobalConfig().Log.QueryLogMaxLen), 10), true, nil
case PluginDir:
return config.GetGlobalConfig().Plugin.Dir, true, nil
case PluginLoad:
return config.GetGlobalConfig().Plugin.Load, true, nil
}
sVal, ok := s.systems[key]
if ok {
Expand Down
10 changes: 10 additions & 0 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ const (
nmMetricsInterval = "metrics-interval"
nmDdlLease = "lease"
nmTokenLimit = "token-limit"
nmPluginDir = "plugin-dir"
nmPluginLoad = "plugin-load"

nmProxyProtocolNetworks = "proxy-protocol-networks"
nmProxyProtocolHeaderTimeout = "proxy-protocol-header-timeout"
Expand All @@ -101,6 +103,8 @@ var (
runDDL = flagBoolean(nmRunDDL, true, "run ddl worker on this tidb-server")
ddlLease = flag.String(nmDdlLease, "45s", "schema lease duration, very dangerous to change only if you know what you do")
tokenLimit = flag.Int(nmTokenLimit, 1000, "the limit of concurrent executed sessions")
pluginDir = flag.String(nmPluginDir, "/data/deploy/plugin", "the folder that hold plugin")
pluginLoad = flag.String(nmPluginLoad, "", "wait load plugin name(seperated by comma)")

// Log
logLevel = flag.String(nmLogLevel, "info", "log level: info, debug, warn, error, fatal")
Expand Down Expand Up @@ -323,6 +327,12 @@ func overrideConfig() {
if actualFlags[nmTokenLimit] {
cfg.TokenLimit = uint(*tokenLimit)
}
if actualFlags[nmPluginLoad] {
cfg.Plugin.Load = *pluginLoad
}
if actualFlags[nmPluginDir] {
cfg.Plugin.Dir = *pluginDir
}

// Log
if actualFlags[nmLogLevel] {
Expand Down