Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enh: log db support cachemode opts #30

Merged
merged 6 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewAdapterImporter(conf *config.Config) {
host: conf.TDengine.Host,
port: conf.TDengine.Port,
database: conf.Metrics.Database,
adapters: conf.TaosAdapter.Addrs,
adapters: conf.TaosAdapter.Address,
}
imp.setNextTime(time.Now())
go imp.work()
Expand Down Expand Up @@ -124,6 +124,8 @@ func (imp *AdapterImporter) queryMetrics() {
logger.Errorf("error reading body: %s", err)
continue
}

logger.Debug("## adapter metrics: ", string(body))
imp.lineWriteBody(body, addr)
_ = resp.Body.Close()
}
Expand Down
49 changes: 37 additions & 12 deletions api/report.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"bytes"
"context"
"fmt"
"strconv"
Expand Down Expand Up @@ -32,21 +33,23 @@ var createList = []string{
}

type Reporter struct {
username string
password string
host string
port int
dbname string
totalRep atomic.Value
username string
password string
host string
port int
dbname string
databaseOptions map[string]interface{}
totalRep atomic.Value
}

func NewReporter(conf *config.Config) *Reporter {
r := &Reporter{
username: conf.TDengine.Username,
password: conf.TDengine.Password,
host: conf.TDengine.Host,
port: conf.TDengine.Port,
dbname: conf.Metrics.Database,
username: conf.TDengine.Username,
password: conf.TDengine.Password,
host: conf.TDengine.Host,
port: conf.TDengine.Port,
dbname: conf.Metrics.Database,
databaseOptions: conf.Metrics.DatabaseOptions,
}
r.totalRep.Store(0)
return r
Expand Down Expand Up @@ -138,12 +141,34 @@ func (r *Reporter) createDatabase() {
conn := r.getConn()
defer r.closeConn(conn)

if _, err := conn.Exec(ctx, fmt.Sprintf("create database if not exists %s", r.dbname)); err != nil {
createDBSql := r.generateCreateDBSql()
logger.Warningf("create database sql: %s", createDBSql)

if _, err := conn.Exec(ctx, createDBSql); err != nil {
logger.WithError(err).Errorf("create database %s error %v", r.dbname, err)
panic(err)
}
}

func (r *Reporter) generateCreateDBSql() string {
var buf bytes.Buffer
buf.WriteString("create database if not exists ")
buf.WriteString(r.dbname)

for k, v := range r.databaseOptions {
buf.WriteString(" ")
buf.WriteString(k)
switch v := v.(type) {
case string:
buf.WriteString(fmt.Sprintf(" '%s'", v))
default:
buf.WriteString(fmt.Sprintf(" %v", v))
}
buf.WriteString(" ")
}
return buf.String()
}

func (r *Reporter) creatTables() {
ctx := context.Background()
conn, err := db.NewConnectorWithDb(r.username, r.password, r.host, r.port, r.dbname)
Expand Down
4 changes: 4 additions & 0 deletions config/keeper.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ database = "log"
# export some tables that are not super table
tables = []

# database options for db storing metrics data
[metrics.databaseoptions]
cachemodel = "none"

[environment]
# Whether running in cgroup.
incgroup = false
67 changes: 26 additions & 41 deletions infrastructure/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"encoding/json"
"fmt"
"os"

Expand All @@ -14,23 +15,23 @@ import (
)

type Config struct {
Cors web.CorsConfig
Debug bool
Port int
LogLevel string
GoPoolSize int
RotationInterval string
TDengine TDengineRestful
TaosAdapter TaosAdapter
Metrics MetricsConfig
Env Environment
Cors web.CorsConfig `toml:"cors"`
Debug bool `toml:"debug"`
Port int `toml:"port"`
LogLevel string `toml:"loglevel"`
GoPoolSize int `toml:"gopoolsize"`
RotationInterval string `toml:"RotationInterval"`
TDengine TDengineRestful `toml:"tdengine"`
TaosAdapter TaosAdapter `toml:"taosAdapter"`
Metrics MetricsConfig `toml:"metrics"`
Env Environment `toml:"environment"`
}

type TDengineRestful struct {
Host string
Port int
Username string
Password string
Host string `toml:"host"`
Port int `toml:"port"`
Username string `toml:"username"`
Password string `toml:"password"`
}

func InitConfig() *Config {
Expand Down Expand Up @@ -73,36 +74,20 @@ func InitConfig() *Config {
}
}

conf := &Config{
Debug: viper.GetBool("debug"),
Port: viper.GetInt("port"),
LogLevel: viper.GetString("logLevel"),
GoPoolSize: viper.GetInt("gopoolsize"),
RotationInterval: viper.GetString("RotationInterval"),
var conf Config
if err = viper.Unmarshal(&conf); err != nil {
panic(err)
}
if conf.Debug {
j, _ := json.Marshal(conf)
fmt.Println("config file:", string(j))
}

conf.Cors.Init()
pool.Init(conf.GoPoolSize)
log.Init(conf.LogLevel)
conf.TDengine = TDengineRestful{
Host: viper.GetString("tdengine.host"),
Port: viper.GetInt("tdengine.port"),
Username: viper.GetString("tdengine.username"),
Password: viper.GetString("tdengine.password"),
}
conf.TaosAdapter = TaosAdapter{
Addrs: viper.GetStringSlice("taosAdapter.address"),
}

conf.Metrics = MetricsConfig{
Prefix: viper.GetString("metrics.prefix"),
Database: viper.GetString("metrics.database"),
Tables: map[string]struct{}{},
Normals: viper.GetStringSlice("metrics.tables"),
}
conf.Env = Environment{
InCGroup: viper.GetBool("environment.incgroup"),
}
return conf
return &conf
}

func init() {
Expand Down Expand Up @@ -154,9 +139,9 @@ func init() {
_ = viper.BindEnv("metrics.database", "TAOS_KEEPER_METRICS_DATABASE")
pflag.String("metrics.database", "log", `database for storing metrics data. Env "TAOS_KEEPER_METRICS_DATABASE"`)

viper.SetDefault("metrics.tables", "")
viper.SetDefault("metrics.tables", []string{})
_ = viper.BindEnv("metrics.tables", "TAOS_KEEPER_METRICS_TABLES")
pflag.String("metrics.tables", "", `export some tables that are not super table, multiple values split with white space. Env "TAOS_KEEPER_METRICS_TABLES"`)
pflag.StringArray("metrics.tables", []string{}, `export some tables that are not super table, multiple values split with white space. Env "TAOS_KEEPER_METRICS_TABLES"`)

viper.SetDefault("environment.incgroup", false)
_ = viper.BindEnv("environment.incgroup", "TAOS_KEEPER_ENVIRONMENT_INCGROUP")
Expand Down
12 changes: 6 additions & 6 deletions infrastructure/config/metrics.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package config

type MetricsConfig struct {
Prefix string `toml:"prefix"`
Database string `toml:"database"`
Tables map[string]struct{}
Normals []string `toml:"tables"`
Prefix string `toml:"prefix"`
Database string `toml:"database"`
Tables []string `toml:"tables"`
DatabaseOptions map[string]interface{} `toml:"databaseoptions"`
}

type TaosAdapter struct {
Addrs []string `toml:"address"`
Address []string `toml:"address"`
}

type Metric struct {
Expand All @@ -20,5 +20,5 @@ type Metric struct {
}

type Environment struct {
InCGroup bool `toml:"whether running in cgroup"`
InCGroup bool `toml:"incgroup"`
}
18 changes: 9 additions & 9 deletions process/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,35 @@ import (

var builderLogger = log.GetLogger("builder")

func ExpandMetricsFromConfig(ctx context.Context, conn *db.Connector, cfg *config.MetricsConfig) error {

for _, name := range cfg.Normals {
func ExpandMetricsFromConfig(ctx context.Context, conn *db.Connector, cfg *config.MetricsConfig) (tables map[string]struct{}, err error) {
tables = make(map[string]struct{})
for _, name := range cfg.Tables {
builderLogger.Debug("normal table: ", name)

_, exist := cfg.Tables[name]
_, exist := tables[name]
if exist {
builderLogger.Debug(name, "is exist in config")
continue
}
cfg.Tables[name] = struct{}{}
tables[name] = struct{}{}
}

data, err := conn.Query(ctx, fmt.Sprintf("show %s.stables", cfg.Database))
if err != nil {
return err
return nil, err
}
builderLogger.Debug("show stables")

for _, info := range data.Data {
name := info[0].(string)
builderLogger.Debug("stable: ", info)

_, exist := cfg.Tables[name]
_, exist := tables[name]
if exist {
builderLogger.Debug(name, "is exist in config")
continue
}
cfg.Tables[name] = struct{}{}
tables[name] = struct{}{}
}
return err
return
}
4 changes: 2 additions & 2 deletions process/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func NewProcessor(conf *config.Config) *Processor {
panic(err)
}
ctx := context.Background()
err = ExpandMetricsFromConfig(ctx, conn, &conf.Metrics)
tables, err := ExpandMetricsFromConfig(ctx, conn, &conf.Metrics)
if err != nil {
panic(err)
}
Expand All @@ -176,7 +176,7 @@ func NewProcessor(conf *config.Config) *Processor {
exitChan: make(chan struct{}),
dbConn: conn,
summaryTable: map[string]*Table{"taosadapter_restful_http_request_summary_milliseconds": nil},
tables: conf.Metrics.Tables,
tables: tables,
}
p.Prepare()
p.process()
Expand Down