diff --git a/api/adapter.go b/api/adapter.go index a6e240b..0a8085c 100644 --- a/api/adapter.go +++ b/api/adapter.go @@ -56,7 +56,7 @@ func NewAdapterImporter(conf *config.Config) { host: conf.TDengine.Host, port: conf.TDengine.Port, database: conf.Metrics.Database, - adapters: conf.TaosAdapter.Addrs, + adapters: conf.TaosAdapter.Address, } imp.setNextTime(time.Now()) go imp.work() @@ -124,6 +124,8 @@ func (imp *AdapterImporter) queryMetrics() { logger.Errorf("error reading body: %s", err) continue } + + logger.Debug("## adapter metrics: ", string(body)) imp.lineWriteBody(body, addr) _ = resp.Body.Close() } diff --git a/api/report.go b/api/report.go index 3c81999..716f77d 100644 --- a/api/report.go +++ b/api/report.go @@ -1,6 +1,7 @@ package api import ( + "bytes" "context" "fmt" "strconv" @@ -32,21 +33,23 @@ var createList = []string{ } type Reporter struct { - username string - password string - host string - port int - dbname string - totalRep atomic.Value + username string + password string + host string + port int + dbname string + databaseOptions map[string]interface{} + totalRep atomic.Value } func NewReporter(conf *config.Config) *Reporter { r := &Reporter{ - username: conf.TDengine.Username, - password: conf.TDengine.Password, - host: conf.TDengine.Host, - port: conf.TDengine.Port, - dbname: conf.Metrics.Database, + username: conf.TDengine.Username, + password: conf.TDengine.Password, + host: conf.TDengine.Host, + port: conf.TDengine.Port, + dbname: conf.Metrics.Database, + databaseOptions: conf.Metrics.DatabaseOptions, } r.totalRep.Store(0) return r @@ -138,12 +141,34 @@ func (r *Reporter) createDatabase() { conn := r.getConn() defer r.closeConn(conn) - if _, err := conn.Exec(ctx, fmt.Sprintf("create database if not exists %s", r.dbname)); err != nil { + createDBSql := r.generateCreateDBSql() + logger.Warningf("create database sql: %s", createDBSql) + + if _, err := conn.Exec(ctx, createDBSql); err != nil { logger.WithError(err).Errorf("create database %s error %v", r.dbname, err) panic(err) } } +func (r *Reporter) generateCreateDBSql() string { + var buf bytes.Buffer + buf.WriteString("create database if not exists ") + buf.WriteString(r.dbname) + + for k, v := range r.databaseOptions { + buf.WriteString(" ") + buf.WriteString(k) + switch v := v.(type) { + case string: + buf.WriteString(fmt.Sprintf(" '%s'", v)) + default: + buf.WriteString(fmt.Sprintf(" %v", v)) + } + buf.WriteString(" ") + } + return buf.String() +} + func (r *Reporter) creatTables() { ctx := context.Background() conn, err := db.NewConnectorWithDb(r.username, r.password, r.host, r.port, r.dbname) diff --git a/config/keeper.toml b/config/keeper.toml index 5028da6..587f60b 100644 --- a/config/keeper.toml +++ b/config/keeper.toml @@ -33,6 +33,10 @@ database = "log" # export some tables that are not super table tables = [] +# database options for db storing metrics data +[metrics.databaseoptions] +cachemodel = "none" + [environment] # Whether running in cgroup. incgroup = false diff --git a/infrastructure/config/config.go b/infrastructure/config/config.go index f2802aa..b9ff936 100644 --- a/infrastructure/config/config.go +++ b/infrastructure/config/config.go @@ -1,6 +1,7 @@ package config import ( + "encoding/json" "fmt" "os" @@ -14,23 +15,23 @@ import ( ) type Config struct { - Cors web.CorsConfig - Debug bool - Port int - LogLevel string - GoPoolSize int - RotationInterval string - TDengine TDengineRestful - TaosAdapter TaosAdapter - Metrics MetricsConfig - Env Environment + Cors web.CorsConfig `toml:"cors"` + Debug bool `toml:"debug"` + Port int `toml:"port"` + LogLevel string `toml:"loglevel"` + GoPoolSize int `toml:"gopoolsize"` + RotationInterval string `toml:"RotationInterval"` + TDengine TDengineRestful `toml:"tdengine"` + TaosAdapter TaosAdapter `toml:"taosAdapter"` + Metrics MetricsConfig `toml:"metrics"` + Env Environment `toml:"environment"` } type TDengineRestful struct { - Host string - Port int - Username string - Password string + Host string `toml:"host"` + Port int `toml:"port"` + Username string `toml:"username"` + Password string `toml:"password"` } func InitConfig() *Config { @@ -73,36 +74,20 @@ func InitConfig() *Config { } } - conf := &Config{ - Debug: viper.GetBool("debug"), - Port: viper.GetInt("port"), - LogLevel: viper.GetString("logLevel"), - GoPoolSize: viper.GetInt("gopoolsize"), - RotationInterval: viper.GetString("RotationInterval"), + var conf Config + if err = viper.Unmarshal(&conf); err != nil { + panic(err) + } + if conf.Debug { + j, _ := json.Marshal(conf) + fmt.Println("config file:", string(j)) } + conf.Cors.Init() pool.Init(conf.GoPoolSize) log.Init(conf.LogLevel) - conf.TDengine = TDengineRestful{ - Host: viper.GetString("tdengine.host"), - Port: viper.GetInt("tdengine.port"), - Username: viper.GetString("tdengine.username"), - Password: viper.GetString("tdengine.password"), - } - conf.TaosAdapter = TaosAdapter{ - Addrs: viper.GetStringSlice("taosAdapter.address"), - } - conf.Metrics = MetricsConfig{ - Prefix: viper.GetString("metrics.prefix"), - Database: viper.GetString("metrics.database"), - Tables: map[string]struct{}{}, - Normals: viper.GetStringSlice("metrics.tables"), - } - conf.Env = Environment{ - InCGroup: viper.GetBool("environment.incgroup"), - } - return conf + return &conf } func init() { @@ -154,9 +139,9 @@ func init() { _ = viper.BindEnv("metrics.database", "TAOS_KEEPER_METRICS_DATABASE") pflag.String("metrics.database", "log", `database for storing metrics data. Env "TAOS_KEEPER_METRICS_DATABASE"`) - viper.SetDefault("metrics.tables", "") + viper.SetDefault("metrics.tables", []string{}) _ = viper.BindEnv("metrics.tables", "TAOS_KEEPER_METRICS_TABLES") - pflag.String("metrics.tables", "", `export some tables that are not super table, multiple values split with white space. Env "TAOS_KEEPER_METRICS_TABLES"`) + pflag.StringArray("metrics.tables", []string{}, `export some tables that are not super table, multiple values split with white space. Env "TAOS_KEEPER_METRICS_TABLES"`) viper.SetDefault("environment.incgroup", false) _ = viper.BindEnv("environment.incgroup", "TAOS_KEEPER_ENVIRONMENT_INCGROUP") diff --git a/infrastructure/config/metrics.go b/infrastructure/config/metrics.go index 3b88b3d..7335d8f 100644 --- a/infrastructure/config/metrics.go +++ b/infrastructure/config/metrics.go @@ -1,14 +1,14 @@ package config type MetricsConfig struct { - Prefix string `toml:"prefix"` - Database string `toml:"database"` - Tables map[string]struct{} - Normals []string `toml:"tables"` + Prefix string `toml:"prefix"` + Database string `toml:"database"` + Tables []string `toml:"tables"` + DatabaseOptions map[string]interface{} `toml:"databaseoptions"` } type TaosAdapter struct { - Addrs []string `toml:"address"` + Address []string `toml:"address"` } type Metric struct { @@ -20,5 +20,5 @@ type Metric struct { } type Environment struct { - InCGroup bool `toml:"whether running in cgroup"` + InCGroup bool `toml:"incgroup"` } diff --git a/process/builder.go b/process/builder.go index b58282c..d0920b4 100644 --- a/process/builder.go +++ b/process/builder.go @@ -11,22 +11,22 @@ import ( var builderLogger = log.GetLogger("builder") -func ExpandMetricsFromConfig(ctx context.Context, conn *db.Connector, cfg *config.MetricsConfig) error { - - for _, name := range cfg.Normals { +func ExpandMetricsFromConfig(ctx context.Context, conn *db.Connector, cfg *config.MetricsConfig) (tables map[string]struct{}, err error) { + tables = make(map[string]struct{}) + for _, name := range cfg.Tables { builderLogger.Debug("normal table: ", name) - _, exist := cfg.Tables[name] + _, exist := tables[name] if exist { builderLogger.Debug(name, "is exist in config") continue } - cfg.Tables[name] = struct{}{} + tables[name] = struct{}{} } data, err := conn.Query(ctx, fmt.Sprintf("show %s.stables", cfg.Database)) if err != nil { - return err + return nil, err } builderLogger.Debug("show stables") @@ -34,12 +34,12 @@ func ExpandMetricsFromConfig(ctx context.Context, conn *db.Connector, cfg *confi name := info[0].(string) builderLogger.Debug("stable: ", info) - _, exist := cfg.Tables[name] + _, exist := tables[name] if exist { builderLogger.Debug(name, "is exist in config") continue } - cfg.Tables[name] = struct{}{} + tables[name] = struct{}{} } - return err + return } diff --git a/process/handle.go b/process/handle.go index ddc4306..a0dffb4 100644 --- a/process/handle.go +++ b/process/handle.go @@ -163,7 +163,7 @@ func NewProcessor(conf *config.Config) *Processor { panic(err) } ctx := context.Background() - err = ExpandMetricsFromConfig(ctx, conn, &conf.Metrics) + tables, err := ExpandMetricsFromConfig(ctx, conn, &conf.Metrics) if err != nil { panic(err) } @@ -176,7 +176,7 @@ func NewProcessor(conf *config.Config) *Processor { exitChan: make(chan struct{}), dbConn: conn, summaryTable: map[string]*Table{"taosadapter_restful_http_request_summary_milliseconds": nil}, - tables: conf.Metrics.Tables, + tables: tables, } p.Prepare() p.process()