diff --git a/config/config.go b/config/config.go index ffa0012e5..bb70d43eb 100644 --- a/config/config.go +++ b/config/config.go @@ -120,6 +120,8 @@ type Config struct { LogTopologyLevel int `mapstructure:"log-topology-level" toml:"log-topology-level" json:"logTopologyLevel"` LogProxy bool `mapstructure:"log-proxy" toml:"log-proxy" json:"logProxy"` LogProxyLevel int `mapstructure:"log-proxy-level" toml:"log-proxy-level" json:"logProxyLevel"` + LogGraphite bool `mapstructure:"log-graphite" toml:"log-graphite" json:"logGraphite"` + LogGraphiteLevel int `mapstructure:"log-graphite-level" toml:"log-graphite-level" json:"logGraphiteLevel"` User string `mapstructure:"db-servers-credential" toml:"db-servers-credential" json:"dbServersCredential"` Hosts string `mapstructure:"db-servers-hosts" toml:"db-servers-hosts" json:"dbServersHosts"` HostsDelayed string `mapstructure:"replication-delayed-hosts" toml:"replication-delayed-hosts" json:"replicationDelayedHosts"` @@ -901,6 +903,7 @@ const ( ConstLogModHAProxy = 12 ConstLogModProxyJanitor = 13 ConstLogModMaxscale = 14 + ConstLogModGraphite = 15 ) func (conf *Config) GetSecrets() map[string]Secret { @@ -1880,6 +1883,11 @@ func (conf *Config) IsEligibleForPrinting(module int, level string) bool { return conf.MxsLogLevel >= lvl } break + case module == ConstLogModGraphite: + if conf.LogGraphite { + return conf.LogGraphiteLevel >= lvl + } + break } } @@ -1889,3 +1897,16 @@ func (conf *Config) IsEligibleForPrinting(module int, level string) bool { func (conf *Config) SetLogOutput(out io.Writer) { log.SetOutput(out) } + +func (conf *Config) ToLogrusLevel(l int) log.Level { + switch l { + case 2: + return log.WarnLevel + case 3: + return log.InfoLevel + case 4: + return log.DebugLevel + } + //Always return at least error level to make sure Logger not exit + return log.ErrorLevel +} diff --git a/graphite/cache/cache.go b/graphite/cache/cache.go index b7c56af12..c487f61b5 100644 --- a/graphite/cache/cache.go +++ b/graphite/cache/cache.go @@ -1,4 +1,3 @@ - package cache /* @@ -14,6 +13,7 @@ import ( "github.com/signal18/replication-manager/graphite/helper" "github.com/signal18/replication-manager/graphite/points" + "github.com/sirupsen/logrus" ) type WriteStrategy int @@ -40,7 +40,8 @@ type Cache struct { writeoutQueue *WriteoutQueue - xlog atomic.Value // io.Writer + xlog atomic.Value // io.Writer + logger *logrus.Logger stat struct { size int32 // changing via atomic @@ -61,9 +62,10 @@ type Shard struct { } // Creates a new cache instance -func New() *Cache { +func New(logger *logrus.Logger) *Cache { c := &Cache{ data: make([]*Shard, shardCount), + logger: logger, writeStrategy: Noop, maxSize: 1000000, } diff --git a/graphite/cache/writeout_queue.go b/graphite/cache/writeout_queue.go index 9aa0153a9..d2e3ffdee 100644 --- a/graphite/cache/writeout_queue.go +++ b/graphite/cache/writeout_queue.go @@ -1,11 +1,9 @@ - package cache import ( "sync" "time" - "github.com/sirupsen/logrus" "github.com/signal18/replication-manager/graphite/points" ) @@ -37,10 +35,10 @@ func (q *WriteoutQueue) makeRebuildCallback(nextRebuildTime time.Time) func(chan // next rebuild nextRebuildOnce.Do(func() { now := time.Now() - logrus.Debugf("nextRebuildOnce.Do: %#v %#v", now.String(), nextRebuildTime.String()) + q.cache.logger.Debugf("nextRebuildOnce.Do: %#v %#v", now.String(), nextRebuildTime.String()) if now.Before(nextRebuildTime) { sleepTime := nextRebuildTime.Sub(now) - logrus.Debugf("sleep %s before rebuild", sleepTime.String()) + q.cache.logger.Debugf("sleep %s before rebuild", sleepTime.String()) select { case <-time.After(sleepTime): diff --git a/graphite/carbon/app.go b/graphite/carbon/app.go index 34e86c601..591d38722 100644 --- a/graphite/carbon/app.go +++ b/graphite/carbon/app.go @@ -30,14 +30,16 @@ type App struct { Persister *persister.Whisper Carbonserver *carbonserver.CarbonserverListener Collector *Collector // (!!!) Should be re-created on every change config/modules + Logger *logrus.Logger exit chan bool } // New App instance -func New(configFilename string) *App { +func New(configFilename string, logger *logrus.Logger) *App { app := &App{ ConfigFilename: configFilename, Config: NewConfig(), + Logger: logger, exit: make(chan bool), } return app @@ -143,31 +145,31 @@ func (app *App) stopListeners() { if app.TCP != nil { app.TCP.Stop() app.TCP = nil - Log.Debug("[tcp] finished") + app.Logger.Debug("[tcp] finished") } if app.Pickle != nil { app.Pickle.Stop() app.Pickle = nil - Log.Debug("[pickle] finished") + app.Logger.Debug("[pickle] finished") } if app.UDP != nil { app.UDP.Stop() app.UDP = nil - Log.Debug("[udp] finished") + app.Logger.Debug("[udp] finished") } if app.CarbonLink != nil { app.CarbonLink.Stop() app.CarbonLink = nil - Log.Debug("[carbonlink] finished") + app.Logger.Debug("[carbonlink] finished") } if app.Carbonserver != nil { app.Carbonserver.Stop() app.Carbonserver = nil - Log.Debug("[carbonserver] finished") + app.Logger.Debug("[carbonserver] finished") } } @@ -177,25 +179,25 @@ func (app *App) stopAll() { if app.Persister != nil { app.Persister.Stop() app.Persister = nil - Log.Debug("[persister] finished") + app.Logger.Debug("[persister] finished") } if app.Cache != nil { app.Cache.Stop() app.Cache = nil - Log.Debug("[cache] finished") + app.Logger.Debug("[cache] finished") } if app.Collector != nil { app.Collector.Stop() app.Collector = nil - Log.Debug("[stat] finished") + app.Logger.Debug("[stat] finished") } if app.exit != nil { close(app.exit) app.exit = nil - Log.Debug("[app] close(exit)") + app.Logger.Debug("[app] close(exit)") } } @@ -214,6 +216,7 @@ func (app *App) startPersister() { app.Config.Whisper.Aggregation, app.Cache.WriteoutQueue().GetNotConfirmed, app.Cache.Confirm, + app.Logger, ) p.SetMaxUpdatesPerSecond(app.Config.Whisper.MaxUpdatesPerSecond) p.SetSparse(app.Config.Whisper.Sparse) @@ -240,7 +243,7 @@ func (app *App) Start() (err error) { runtime.GOMAXPROCS(conf.Common.MaxCPU) - core := cache.New() + core := cache.New(app.Logger) core.SetMaxSize(conf.Cache.MaxSize) core.SetWriteStrategy(conf.Cache.WriteStrategy) @@ -300,7 +303,7 @@ func (app *App) Start() (err error) { return } - carbonserver.Log = Log + carbonserver.Log = app.Logger carbonserver := carbonserver.NewCarbonserverListener(core.Get) carbonserver.SetWhisperData(conf.Whisper.DataDir) carbonserver.SetMaxGlobs(conf.Carbonserver.MaxGlobs) diff --git a/graphite/carbonapi.go b/graphite/carbonapi.go index b754bb5ef..c1135ea8d 100644 --- a/graphite/carbonapi.go +++ b/graphite/carbonapi.go @@ -1,4 +1,3 @@ - package graphite import ( @@ -17,6 +16,7 @@ import ( "strings" "time" + "github.com/signal18/replication-manager/config" pb "github.com/signal18/replication-manager/graphite/carbonzipper/carbonzipperpb" "github.com/signal18/replication-manager/graphite/carbonzipper/mlog" "github.com/signal18/replication-manager/graphite/carbonzipper/mstats" @@ -658,7 +658,16 @@ func usageHandler(w http.ResponseWriter, r *http.Request) { w.Write(usageMsg) } -func RunCarbonApi(z string, port int, l int, cacheType string, mc string, memsize int, cpus int, tz string, logdir string) { +func RunCarbonApi(conf *config.Config) { + var z string = "http://0.0.0.0:" + strconv.Itoa(conf.GraphiteCarbonServerPort) + var port int = conf.GraphiteCarbonApiPort + var l int = 20 + var cacheType string = "mem" + var mc string = "" + var memsize int = 200 + var cpus int = 0 + var tz string = "" + var logdir string = conf.WorkingDir interval := 60 * time.Second graphiteHost := "" diff --git a/graphite/carbonserver/carbonserver.go b/graphite/carbonserver/carbonserver.go index 64131e2a2..48a545f10 100644 --- a/graphite/carbonserver/carbonserver.go +++ b/graphite/carbonserver/carbonserver.go @@ -2,7 +2,7 @@ * Copyright 2013-2016 Fabian Groffen, Damian Gryski, Vladimir Smirnov * * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. + * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 diff --git a/graphite/graphite.go b/graphite/graphite.go index 384169c7a..6b83c40fd 100644 --- a/graphite/graphite.go +++ b/graphite/graphite.go @@ -14,6 +14,7 @@ import ( "syscall" "time" + "github.com/signal18/replication-manager/config" "github.com/signal18/replication-manager/graphite/carbon" logging "github.com/signal18/replication-manager/graphite/logging" "github.com/sirupsen/logrus" @@ -208,73 +209,83 @@ func httpServe(addr string) (func(), error) { return func() { listener.Close() }, nil } -func RunCarbon(ShareDir string, DataDir string, GraphiteCarbonPort int, GraphiteCarbonLinkPort int, GraphiteCarbonPicklePort int, GraphiteCarbonPprofPort int, GraphiteCarbonServerPort int) error { +func RunCarbon(conf *config.Config) error { var err error + var loglevel logrus.Level + if conf.LogGraphite { + //Log based on repman config + loglevel = conf.ToLogrusLevel(conf.LogGraphiteLevel) + } else { + //Only log errors + loglevel = logrus.ErrorLevel + } + + log.SetLevel(loglevel) logging.Log = log - input, err := ioutil.ReadFile(ShareDir + "/carbon.conf.template") + input, err := ioutil.ReadFile(conf.ShareDir + "/carbon.conf.template") if err != nil { return err } - output := bytes.Replace(input, []byte("{{.schemas}}"), []byte(ShareDir+"/schemas.conf"), -1) - fullpath, err := filepath.Abs(DataDir + "/graphite") + output := bytes.Replace(input, []byte("{{.schemas}}"), []byte(conf.ShareDir+"/schemas.conf"), -1) + fullpath, err := filepath.Abs(conf.WorkingDir + "/graphite") output2 := bytes.Replace(output, []byte("{{.datadir}}"), []byte(fullpath), -1) - output3 := bytes.Replace(output2, []byte("{{.graphitecarbonport}}"), []byte(strconv.Itoa(GraphiteCarbonPort)), -1) - output4 := bytes.Replace(output3, []byte("{{.graphitecarbonlinkport}}"), []byte(strconv.Itoa(GraphiteCarbonLinkPort)), -1) - output5 := bytes.Replace(output4, []byte("{{.graphitecarbonpickleport}}"), []byte(strconv.Itoa(GraphiteCarbonPicklePort)), -1) - output6 := bytes.Replace(output5, []byte("{{.graphitecarbonpprofport}}"), []byte(strconv.Itoa(GraphiteCarbonPprofPort)), -1) - output7 := bytes.Replace(output6, []byte("{{.graphitecarbonserverport}}"), []byte(strconv.Itoa(GraphiteCarbonServerPort)), -1) + output3 := bytes.Replace(output2, []byte("{{.graphitecarbonport}}"), []byte(strconv.Itoa(conf.GraphiteCarbonPort)), -1) + output4 := bytes.Replace(output3, []byte("{{.graphitecarbonlinkport}}"), []byte(strconv.Itoa(conf.GraphiteCarbonLinkPort)), -1) + output5 := bytes.Replace(output4, []byte("{{.graphitecarbonpickleport}}"), []byte(strconv.Itoa(conf.GraphiteCarbonPicklePort)), -1) + output6 := bytes.Replace(output5, []byte("{{.graphitecarbonpprofport}}"), []byte(strconv.Itoa(conf.GraphiteCarbonPprofPort)), -1) + output7 := bytes.Replace(output6, []byte("{{.graphitecarbonserverport}}"), []byte(strconv.Itoa(conf.GraphiteCarbonServerPort)), -1) - if err = ioutil.WriteFile(DataDir+"/carbon.conf", output7, 0666); err != nil { + if err = ioutil.WriteFile(conf.WorkingDir+"/carbon.conf", output7, 0666); err != nil { fmt.Println(err) os.Exit(1) } - carbon.Log = log - app := carbon.New(DataDir + "/carbon.conf") + // carbon.Log = log + app := carbon.New(conf.WorkingDir+"/carbon.conf", log) if err = app.ParseConfig(); err != nil { return err } - app.Config.Common.Logfile = DataDir + "/carbon.log" - // log.Fatal(app.Config.Whisper.SchemasFilename) + app.Config.Common.Logfile = conf.WorkingDir + "/carbon.log" + // graphite.Log.Fatal(app.Config.Whisper.SchemasFilename) cfg := app.Config var runAsUser *user.User if cfg.Common.User != "" { runAsUser, err = user.Lookup(cfg.Common.User) if err != nil { - log.Fatal(err) + logging.Log.Fatal(err) } } if err := logging.SetLevel(cfg.Common.LogLevel); err != nil { - log.Fatal(err) + logging.Log.Fatal(err) } if err := logging.PrepareFile(cfg.Common.Logfile, runAsUser); err != nil { - log.Fatal(err) + logging.Log.Fatal(err) } if err := logging.SetFile(cfg.Common.Logfile); err != nil { - log.Fatal(err) + logging.Log.Fatal(err) } if cfg.Pprof.Enabled { _, err = httpServe(cfg.Pprof.Listen) if err != nil { - log.Fatal(err) + logging.Log.Fatal(err) } } if err = app.Start(); err != nil { - log.Fatal(err) + logging.Log.Fatal(err) } else { - log.Info("started") + logging.Log.Info("started") } go func() { @@ -291,17 +302,17 @@ func RunCarbon(ShareDir string, DataDir string, GraphiteCarbonPort int, Graphite signal.Notify(c, syscall.SIGHUP) for { <-c - log.Info("HUP received. Reload config") + logging.Log.Info("HUP received. Reload config") if err := app.ReloadConfig(); err != nil { - log.Errorf("Config reload failed: %s", err.Error()) + logging.Log.Errorf("Config reload failed: %s", err.Error()) } else { - log.Info("Config successfully reloaded") + logging.Log.Info("Config successfully reloaded") } } }() app.Loop() - log.Info("stopped") + logging.Log.Info("stopped") return nil } diff --git a/graphite/persister/whisper.go b/graphite/persister/whisper.go index e8dd146b1..afa9d3214 100644 --- a/graphite/persister/whisper.go +++ b/graphite/persister/whisper.go @@ -1,4 +1,3 @@ - package persister import ( @@ -8,8 +7,8 @@ import ( "sync" "sync/atomic" - "github.com/sirupsen/logrus" whisper "github.com/signal18/replication-manager/graphite/whisper" + "github.com/sirupsen/logrus" "github.com/signal18/replication-manager/graphite/helper" "github.com/signal18/replication-manager/graphite/points" @@ -36,6 +35,7 @@ type Whisper struct { throttleTicker *ThrottleTicker storeMutex [storeMutexCount]sync.Mutex mockStore func() (StoreFunc, func()) + logger *logrus.Logger // blockThrottleNs uint64 // sum ns counter // blockQueueGetNs uint64 // sum ns counter // blockAvoidConcurrentNs uint64 // sum ns counter @@ -48,7 +48,8 @@ func NewWhisper( schemas WhisperSchemas, aggregation *WhisperAggregation, recv func(chan bool) *points.Points, - confirm func(*points.Points)) *Whisper { + confirm func(*points.Points), + logger *logrus.Logger) *Whisper { return &Whisper{ recv: recv, @@ -58,6 +59,7 @@ func NewWhisper( workersCount: 1, rootPath: rootPath, maxUpdatesPerSecond: 0, + logger: logger, } } @@ -114,23 +116,23 @@ func store(p *Whisper, values *points.Points) { if err != nil { // create new whisper if file not exists if !os.IsNotExist(err) { - logrus.Errorf("[persister] Failed to open whisper file %s: %s", path, err.Error()) + p.logger.Errorf("[persister] Failed to open whisper file %s: %s", path, err.Error()) return } schema, ok := p.schemas.Match(values.Metric) if !ok { - logrus.Errorf("[persister] No storage schema defined for %s", values.Metric) + p.logger.Errorf("[persister] No storage schema defined for %s", values.Metric) return } aggr := p.aggregation.match(values.Metric) if aggr == nil { - logrus.Errorf("[persister] No storage aggregation defined for %s", values.Metric) + p.logger.Errorf("[persister] No storage aggregation defined for %s", values.Metric) return } - logrus.WithFields(logrus.Fields{ + p.logger.WithFields(logrus.Fields{ "retention": schema.RetentionStr, "schema": schema.Name, "aggregation": aggr.name, @@ -139,7 +141,7 @@ func store(p *Whisper, values *points.Points) { }).Debugf("[persister] Creating %s", path) if err = os.MkdirAll(filepath.Dir(path), os.ModeDir|os.ModePerm); err != nil { - logrus.Error(err) + p.logger.Error(err) return } @@ -147,7 +149,7 @@ func store(p *Whisper, values *points.Points) { Sparse: p.sparse, }) if err != nil { - logrus.Errorf("[persister] Failed to create new whisper file %s: %s", path, err.Error()) + p.logger.Errorf("[persister] Failed to create new whisper file %s: %s", path, err.Error()) return } @@ -166,7 +168,7 @@ func store(p *Whisper, values *points.Points) { defer func() { if r := recover(); r != nil { - logrus.Errorf("[persister] UpdateMany %s recovered: %s", path, r) + p.logger.Errorf("[persister] UpdateMany %s recovered: %s", path, r) } }() diff --git a/server/server.go b/server/server.go index 09716871c..800a65077 100644 --- a/server/server.go +++ b/server/server.go @@ -272,11 +272,14 @@ func (repman *ReplicationManager) AddFlags(flags *pflag.FlagSet, conf *config.Co flags.BoolVar(&conf.LogSQLInMonitoring, "log-sql-in-monitoring", false, "Log SQL queries send to servers in monitoring") flags.BoolVar(&conf.LogHeartbeat, "log-heartbeat", false, "Log Heartbeat") - flags.IntVar(&conf.LogHeartbeatLevel, "log-heartbeat-level", 1, "Log Hearbeat Level") + flags.IntVar(&conf.LogHeartbeatLevel, "log-heartbeat-level", 1, "Log Heartbeat Level") flags.BoolVar(&conf.LogFailedElection, "log-failed-election", true, "Log failed election") flags.IntVar(&conf.LogFailedElectionLevel, "log-failed-election-level", 1, "Log failed election Level") + flags.BoolVar(&conf.LogGraphite, "log-graphite", true, "Log Graphite") + flags.IntVar(&conf.LogGraphiteLevel, "log-graphite-level", 2, "Log Graphite Level") + // SST flags.IntVar(&conf.SSTSendBuffer, "sst-send-buffer", 16384, "SST send buffer size") flags.BoolVar(&conf.LogSST, "log-sst", true, "Log open and close SST transfert") @@ -1420,9 +1423,9 @@ func (repman *ReplicationManager) Run() error { } } - if repman.Conf.LogLevel > 1 { - log.SetLevel(log.DebugLevel) - } + // if repman.Conf.LogLevel > 1 { + log.SetLevel(log.DebugLevel) + // } if repman.Conf.LogFile != "" { log.WithField("version", repman.Version).Info("Log to file: " + repman.Conf.LogFile) @@ -1503,13 +1506,13 @@ func (repman *ReplicationManager) Run() error { // Initialize go-carbon if repman.Conf.GraphiteEmbedded { - go graphite.RunCarbon(repman.Conf.ShareDir, repman.Conf.WorkingDir, repman.Conf.GraphiteCarbonPort, repman.Conf.GraphiteCarbonLinkPort, repman.Conf.GraphiteCarbonPicklePort, repman.Conf.GraphiteCarbonPprofPort, repman.Conf.GraphiteCarbonServerPort) + go graphite.RunCarbon(&repman.Conf) log.WithFields(log.Fields{ "metricport": repman.Conf.GraphiteCarbonPort, "httpport": repman.Conf.GraphiteCarbonServerPort, }).Info("Carbon server started") time.Sleep(2 * time.Second) - go graphite.RunCarbonApi("http://0.0.0.0:"+strconv.Itoa(repman.Conf.GraphiteCarbonServerPort), repman.Conf.GraphiteCarbonApiPort, 20, "mem", "", 200, 0, "", repman.Conf.WorkingDir) + go graphite.RunCarbonApi(&repman.Conf) log.WithField("apiport", repman.Conf.GraphiteCarbonApiPort).Info("Carbon server API started") }