Skip to content

Commit

Permalink
Improve server logging in json format (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
agbpatro committed Apr 4, 2024
1 parent 9e2f2d0 commit d192938
Show file tree
Hide file tree
Showing 30 changed files with 353 additions and 120 deletions.
6 changes: 3 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"fmt"
"os"

"github.com/sirupsen/logrus"
_ "go.uber.org/automaxprocs"

"github.com/teslamotors/fleet-telemetry/config"
logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/server/monitoring"
"github.com/teslamotors/fleet-telemetry/server/streaming"
)
Expand All @@ -23,7 +23,7 @@ func main() {

if config.Monitoring != nil && config.Monitoring.ProfilingPath != "" {
if config.Monitoring.ProfilerFile, err = os.Create(config.Monitoring.ProfilingPath); err != nil {
logger.Errorf("profiling_file_error %v", err)
logger.ErrorLog("profiling_file_error", err, nil)
config.Monitoring.ProfilingPath = ""
}

Expand All @@ -37,7 +37,7 @@ func main() {
}

func startServer(config *config.Config, logger *logrus.Logger) (err error) {
logger.Infoln("starting")
logger.ActivityLog("starting_server", nil)
registry := streaming.NewSocketRegistry()

if config.StatusPort > 0 {
Expand Down
17 changes: 7 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ import (
"time"

"cloud.google.com/go/pubsub"
"github.com/sirupsen/logrus"

confluent "github.com/confluentinc/confluent-kafka-go/v2/kafka"
githublogrus "github.com/sirupsen/logrus"

"github.com/teslamotors/fleet-telemetry/datastore/googlepubsub"
"github.com/teslamotors/fleet-telemetry/datastore/kafka"
"github.com/teslamotors/fleet-telemetry/datastore/kinesis"
"github.com/teslamotors/fleet-telemetry/datastore/simple"
"github.com/teslamotors/fleet-telemetry/datastore/zmq"
logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/telemetry"
)
Expand Down Expand Up @@ -164,7 +165,7 @@ func (c *Config) ExtractServiceTLSConfig(logger *logrus.Logger) (*tls.Config, er
if !ok {
return nil, fmt.Errorf("custom ca not properly loaded: %s", c.TLS.CAFile)
}
logger.Infof("appending custom CA file :%s", c.TLS.CAFile)
logger.ActivityLog("custom_ca_file_appened", logrus.LogInfo{"ca_file_path": c.TLS.CAFile})
}

return &tls.Config{
Expand All @@ -174,17 +175,13 @@ func (c *Config) ExtractServiceTLSConfig(logger *logrus.Logger) (*tls.Config, er
}

func (c *Config) configureLogger(logger *logrus.Logger) {
level, err := logrus.ParseLevel(c.LogLevel)
level, err := githublogrus.ParseLevel(c.LogLevel)
if err != nil {
logger.Errorf("Invalid level: %s\n", err)
logger.ErrorLog("invalid_level", err, nil)
} else {
logrus.SetLevel(level)
}

// Log as JSON instead of the default ASCII formatter.
if c.JSONLogEnable {
logrus.SetFormatter(&logrus.JSONFormatter{TimestampFormat: time.RFC3339Nano})
githublogrus.SetLevel(level)
}
logger.SetJSONFormatter(c.JSONLogEnable)
}

func (c *Config) configureMetricsCollector(logger *logrus.Logger) {
Expand Down
10 changes: 6 additions & 4 deletions config/config_initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,23 @@ import (
"flag"
"os"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"

logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/telemetry"
)

// LoadApplicationConfiguration loads the configuration from args and config files
func LoadApplicationConfiguration() (config *Config, logger *logrus.Logger, err error) {
logger = logrus.New()

logger = logrus.NewBasicLogrusLogger("fleet-telemetry")

configFilePath := loadConfigFlags()

config, err = loadApplicationConfig(configFilePath)
if err != nil {
logger.Errorf("read_application_configuration_error %s", err)
logger.ErrorLog("read_application_configuration_error", err, nil)
return nil, nil, err
}

Expand All @@ -38,7 +39,8 @@ func loadApplicationConfig(configFilePath string) (*Config, error) {
config := &Config{}
err = json.NewDecoder(configFile).Decode(&config)

logger, _ := test.NewNullLogger()
log, _ := test.NewNullLogger()
logger := logrus.NewLogrusLogger("null_logger", map[string]interface{}{}, log.WithField("context", "metrics"))
config.MetricCollector = metrics.NewCollector(config.Monitoring, logger)

config.AckChan = make(chan *telemetry.Record)
Expand Down
25 changes: 12 additions & 13 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import (
. "github.com/onsi/gomega"

confluent "github.com/confluentinc/confluent-kafka-go/v2/kafka"
githublogrus "github.com/sirupsen/logrus"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"

logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/telemetry"
)
Expand All @@ -25,7 +24,7 @@ var _ = Describe("Test full application config", func() {
)

BeforeEach(func() {
log, _ = test.NewNullLogger()
log, _ = logrus.NoOpLogger()
config = &Config{
Host: "127.0.0.1",
Port: 443,
Expand Down Expand Up @@ -147,7 +146,7 @@ var _ = Describe("Test full application config", func() {

Context("configure kinesis", func() {
It("returns an error if kinesis isn't included", func() {
log, _ := test.NewNullLogger()
log, _ := logrus.NoOpLogger()
config.Records = map[string][]telemetry.Dispatcher{"FS": {"kinesis"}}

var err error
Expand Down Expand Up @@ -186,15 +185,15 @@ var _ = Describe("Test full application config", func() {
})

It("pubsub does not work when both the environment variables are set", func() {
log, _ := test.NewNullLogger()
log, _ := logrus.NoOpLogger()
_ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url")
_ = os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", "some_service_account_path")
_, err := pubsubConfig.ConfigureProducers(log)
Expect(err).To(MatchError("pubsub_connect_error pubsub cannot initialize with both emulator and GCP resource"))
})

It("pubsub config works", func() {
log, _ := test.NewNullLogger()
log, _ := logrus.NoOpLogger()
_ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url")
var err error
producers, err = pubsubConfig.ConfigureProducers(log)
Expand All @@ -213,7 +212,7 @@ var _ = Describe("Test full application config", func() {
})

It("returns an error if zmq isn't included", func() {
log, _ := test.NewNullLogger()
log, _ := logrus.NoOpLogger()
config.Records = map[string][]telemetry.Dispatcher{"FS": {"zmq"}}
var err error
producers, err = config.ConfigureProducers(log)
Expand All @@ -226,7 +225,7 @@ var _ = Describe("Test full application config", func() {
It("zmq config works", func() {
// ZMQ close is async, this removes the need to sync between tests.
zmqConfig.ZMQ.Addr = "tcp://127.0.0.1:5285"
log, _ := test.NewNullLogger()
log, _ := logrus.NoOpLogger()
var err error
producers, err = zmqConfig.ConfigureProducers(log)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -236,26 +235,26 @@ var _ = Describe("Test full application config", func() {

Context("configureMetricsCollector", func() {
It("does not fail when TLS is nil ", func() {
log, _ := test.NewNullLogger()
log, _ := logrus.NoOpLogger()
config = &Config{}
config.configureMetricsCollector(log)

Expect(config.Monitoring).To(BeNil())
})

It("fails if not reachable", func() {
log, _ := test.NewNullLogger()
log, _ := logrus.NoOpLogger()
config.configureMetricsCollector(log)
Expect(config.MetricCollector).NotTo(BeNil())
})
})

Context("configureLogger", func() {
It("Should properly configure logger", func() {
log, _ := test.NewNullLogger()
log, _ := logrus.NoOpLogger()
config.configureLogger(log)

Expect(log.Level).To(Equal(logrus.InfoLevel))
Expect(githublogrus.GetLevel().String()).To(Equal("info"))
})
})
})
20 changes: 12 additions & 8 deletions datastore/googlepubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"time"

"cloud.google.com/go/pubsub"
"github.com/sirupsen/logrus"

logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/metrics/adapter"
"github.com/teslamotors/fleet-telemetry/telemetry"
Expand Down Expand Up @@ -58,31 +58,35 @@ func NewProducer(ctx context.Context, prometheusEnabled bool, projectID string,
if err != nil {
return nil, fmt.Errorf("pubsub_connect_error %s", err)
}
logger.Infof("registered pubsub for project: %s, namespace: %s", projectID, namespace)
return &Producer{

p := &Producer{
projectID: projectID,
namespace: namespace,
pubsubClient: pubsubClient,
prometheusEnabled: prometheusEnabled,
metricsCollector: metricsCollector,
logger: logger,
}, nil
}
p.logger.ActivityLog("pubsub_registerd", logrus.LogInfo{"project": projectID, "namespace": namespace})
return p, nil
}

// Produce sends the record payload to pubsub
func (p *Producer) Produce(entry *telemetry.Record) {
ctx := context.Background()

pubsubTopic, err := p.createTopicIfNotExists(ctx, telemetry.BuildTopicName(p.namespace, entry.TxType))
topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)
logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid}
pubsubTopic, err := p.createTopicIfNotExists(ctx, topicName)

if err != nil {
p.logger.Errorf("error creating topic %v", err)
p.logger.ErrorLog("pubsub_topic_creation_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{})
return
}

if exists, err := pubsubTopic.Exists(ctx); !exists || err != nil {
p.logger.Errorf("error checking existing topic %v", err)
p.logger.ErrorLog("pubsub_topic_check_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{})
return
}
Expand All @@ -93,7 +97,7 @@ func (p *Producer) Produce(entry *telemetry.Record) {
Attributes: entry.Metadata(),
})
if _, err = result.Get(ctx); err != nil {
p.logger.Errorf("pubsub_err err: %v", err)
p.logger.ErrorLog("pubsub_err", err, logInfo)
metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType})
return
}
Expand Down
8 changes: 4 additions & 4 deletions datastore/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/sirupsen/logrus"

logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/metrics/adapter"
"github.com/teslamotors/fleet-telemetry/telemetry"
Expand Down Expand Up @@ -55,7 +55,7 @@ func NewProducer(config *kafka.ConfigMap, namespace string, reliableAckWorkers i
for i := 0; i < reliableAckWorkers; i++ {
go producer.handleProducerEvents(ackChan)
}
producer.logger.Infof("registered kafka for namespace: %s", namespace)
producer.logger.ActivityLog("kafka_registered", logrus.LogInfo{"namespace": namespace})
return producer, nil
}

Expand Down Expand Up @@ -105,13 +105,13 @@ func (p *Producer) handleProducerEvents(ackChan chan (*telemetry.Record)) {
ackChan <- record
}
default:
p.logger.Info("ignored kafka producer event")
p.logger.ActivityLog("kafka_event_ignored", logrus.LogInfo{"event": ev.String()})
}
}
}

func (p *Producer) logError(err error) {
p.logger.Errorf("kafka_err err: %v", err)
p.logger.ErrorLog("kafka_err", err, nil)
metricsRegistry.errorCount.Inc(map[string]string{})
}

Expand Down
9 changes: 4 additions & 5 deletions datastore/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/sirupsen/logrus"

logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/metrics/adapter"
"github.com/teslamotors/fleet-telemetry/telemetry"
Expand Down Expand Up @@ -74,7 +74,7 @@ func (p *Producer) Produce(entry *telemetry.Record) {
entry.ProduceTime = time.Now()
stream, ok := p.streams[entry.TxType]
if !ok {
p.logger.Errorf("kinesis_produce_stream_not_configured: %s", entry.TxType)
p.logger.ErrorLog("kinesis_produce_stream_not_configured", nil, logrus.LogInfo{"record_type": entry.TxType})
return
}
kinesisRecord := &kinesis.PutRecordInput{
Expand All @@ -85,13 +85,12 @@ func (p *Producer) Produce(entry *telemetry.Record) {

kinesisRecordOutput, err := p.kinesis.PutRecord(kinesisRecord)
if err != nil {
p.logger.Errorf("kinesis_err: %v", err)
p.logger.ErrorLog("kinesis_err", err, nil)
metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType})
return
}

p.logger.Debugf("kinesis_publish vin=%s,type=%s,txid=%s,shard_id=%s,sequence_number=%s", entry.Vin, entry.TxType, entry.Txid, *kinesisRecordOutput.ShardId, *kinesisRecordOutput.SequenceNumber)

p.logger.Log(logrus.DEBUG, "kinesis_err", logrus.LogInfo{"vin": entry.Vin, "record_type": entry.TxType, "txid": entry.Txid, "shard_id": *kinesisRecordOutput.ShardId, "sequence_number": *kinesisRecordOutput.SequenceNumber})
metricsRegistry.publishCount.Inc(map[string]string{"record_type": entry.TxType})
metricsRegistry.byteTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType})
}
Expand Down
7 changes: 3 additions & 4 deletions datastore/simple/logger.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package simple

import (
"github.com/sirupsen/logrus"

logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/telemetry"
)

Expand All @@ -20,8 +19,8 @@ func NewProtoLogger(logger *logrus.Logger) telemetry.Producer {
func (p *ProtoLogger) Produce(entry *telemetry.Record) {
data, err := entry.GetJSONPayload()
if err != nil {
p.logger.Errorf("json_unmarshal_error %s %v %s\n", entry.Vin, entry.Metadata(), err.Error())
p.logger.ErrorLog("json_unmarshal_error", err, logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata()})
return
}
p.logger.Infof("logger_json_unmarshal %s %v %s\n", entry.Vin, entry.Metadata(), string(data))
p.logger.ActivityLog("logger_json_unmarshal", logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata(), "data": string(data)})
}
9 changes: 4 additions & 5 deletions datastore/zmq/zmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"

"github.com/pebbe/zmq4"
"github.com/sirupsen/logrus"
logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/metrics/adapter"
"github.com/teslamotors/fleet-telemetry/telemetry"
Expand Down Expand Up @@ -71,7 +71,7 @@ func (p *ZMQProducer) Produce(rec *telemetry.Record) {
}
if nBytes, err := p.sock.SendMessage(telemetry.BuildTopicName(p.namespace, rec.TxType), rec.Payload()); err != nil {
metricsRegistry.errorCount.Inc(map[string]string{"record_type": rec.TxType})
p.logger.Errorf("Failed sending log on zmq socket: %s", err.Error())
p.logger.ErrorLog("zmq_dispatch_error", err, nil)
} else {
metricsRegistry.byteTotal.Add(int64(nBytes), map[string]string{"record_type": rec.TxType})
metricsRegistry.publishCount.Inc(map[string]string{"record_type": rec.TxType})
Expand Down Expand Up @@ -175,11 +175,10 @@ func logSocketInBackground(target *zmq4.Socket, logger *logrus.Logger, addr stri

eventType, addr, value, err := monitor.RecvEvent(0)
if err != nil {
logger.Errorf("Failed to receive event on zmq socket: %s", err)
logger.ErrorLog("zmq_event_receive_error", err, nil)
continue
}

logger.Debugf("ZMQ socket event: %v %v %v", eventType, addr, value)
logger.Log(logrus.DEBUG, "zmq_socket_event", logrus.LogInfo{"event_type": eventType, "addr": addr, "value": value})
}
}()

Expand Down
Loading

0 comments on commit d192938

Please sign in to comment.