Skip to content

Commit

Permalink
Add Metrics (#41)
Browse files Browse the repository at this point in the history
klogs exports the following metrics now:

- "klogs_input_records_total": The number of received records from
  Fluent Bit / Kafka.
- "klogs_errors_total": The number of errors when writing logs to
  ClickHouse or receiving messages from Kafka.
- "klogs_batch_size": The number of logs written to ClickHouse in one
  write.
- "klogs_flush_time_seconds": The time in seconds needed to write logs
  to ClickHouse.

The metrics server is listining on ":2021" per default. This can be
changed via the "Metrics_Server_Address" setting or the
"--metrics-server.address" flag in the ingester.
  • Loading branch information
ricoberger committed Nov 23, 2022
1 parent d993716 commit 45f2b4d
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 160 deletions.
2 changes: 1 addition & 1 deletion cluster/fluent-bit/ingester/ingester.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ spec:
- --log.format=json
- --log.level=trace
ports:
- containerPort: 8080
- containerPort: 2021
name: http
livenessProbe:
httpGet:
Expand Down
1 change: 1 addition & 0 deletions cmd/ingester/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ An example Deployment for the ClickHouse ingester can be found in the [ingester.

| Command-Line Flag | Environment Variable | Description | Default |
| ----------------- | -------------------- | ----------- | ------- |
| `--metrics-server.address` | `METRICS_SERVER_ADDRESS` | The address, where the metrics server should listen on. | `:2021` |
| `--clickhouse.address` | `CLICKHOUSE_ADDRESS` | ClickHouse address to connect to. | |
| `--clickhouse.database` | `CLICKHOUSE_DATABASE` | ClickHouse database name. | `logs` |
| `--clickhouse.username` | `CLICKHOUSE_USERNAME` | ClickHouse username for the connection. | |
Expand Down
32 changes: 14 additions & 18 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package main

import (
"context"
"fmt"
"net/http"
"os"
"strconv"
"strings"
Expand All @@ -12,15 +10,16 @@ import (
"github.com/kobsio/klogs/pkg/clickhouse"
"github.com/kobsio/klogs/pkg/kafka"
"github.com/kobsio/klogs/pkg/log"
"github.com/kobsio/klogs/pkg/metrics"
"github.com/kobsio/klogs/pkg/version"

"github.com/prometheus/client_golang/prometheus/promhttp"
flag "github.com/spf13/pflag"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var (
metricsServerAddress string
clickhouseAddress string
clickhouseUsername string
clickhousePassword string
Expand Down Expand Up @@ -48,6 +47,11 @@ var (
// init is used to set the defaults for all configuration parameters and to set all flags and environment variables, for
// the ClickHouse, Kafka and logging configuration.
func init() {
defaultMetricsServerAddress := ":2021"
if os.Getenv("METRICS_SERVER_ADDRESS") != "" {
defaultMetricsServerAddress = os.Getenv("METRICS_SERVER_ADDRESS")
}

defaultCickhouseAddress := ""
if os.Getenv("CLICKHOUSE_ADDRESS") != "" {
defaultCickhouseAddress = os.Getenv("CLICKHOUSE_ADDRESS")
Expand Down Expand Up @@ -182,6 +186,8 @@ func init() {
defaultLogLevel = os.Getenv("LOG_LEVEL")
}

flag.StringVar(&metricsServerAddress, "metrics-server.address", defaultMetricsServerAddress, "The address, where the metrics server should listen on.")

flag.StringVar(&clickhouseAddress, "clickhouse.address", defaultCickhouseAddress, "ClickHouse address to connect to.")
flag.StringVar(&clickhouseUsername, "clickhouse.username", defaultClickHouseUsername, "ClickHouse username for the connection.")
flag.StringVar(&clickhousePassword, "clickhouse.password", defaultClickHousePassword, "ClickHouse password for the connection.")
Expand Down Expand Up @@ -261,20 +267,10 @@ func main() {
log.Info(nil, "Clickhouse configuration", zap.String("clickhouseAddress", clickhouseAddress), zap.String("clickhouseUsername", clickhouseUsername), zap.String("clickhousePassword", "*****"), zap.String("clickhouseDatabase", clickhouseDatabase), zap.String("clickhouseDialTimeout", clickhouseDialTimeout), zap.String("clickhouseConnMaxLifetime", clickhouseConnMaxLifetime), zap.Int("clickhouseMaxIdleConns", clickhouseMaxIdleConns), zap.Int("clickhouseMaxOpenConns", clickhouseMaxOpenConns), zap.Int64("clickhouseBatchSize", clickhouseBatchSize), zap.Duration("clickhouseFlushInterval", clickhouseFlushInterval))
log.Info(nil, "Kafka configuration", zap.String("kafkaBrokers", kafkaBrokers), zap.String("kafkaGroup", kafkaGroup), zap.String("kafkaVersion", kafkaVersion), zap.String("kafkaTopics", kafkaTopics))

// Create a http server, which can be used for the liveness and readiness probe in Kubernetes. The server also
// serves our Prometheus metrics.
router := http.NewServeMux()
router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "OK")
})
router.Handle("/metrics", promhttp.Handler())

server := http.Server{
Addr: ":8080",
Handler: router,
}

go server.ListenAndServe()
// Create the metrics server, which serves the Prometheus metrics for the ingester. The server can also be used to
// define a readiness and liveness probe.
metricsServer := metrics.New(metricsServerAddress)
go metricsServer.Start()

// Create a new client for the configured ClickHouse instance. Then pass the ClickHouse client to the Run function
// of the Kafka package, which listens for message in the configured Kafka instance. These messages are then written
Expand All @@ -285,5 +281,5 @@ func main() {
}

kafka.Run(kafkaBrokers, kafkaGroup, kafkaVersion, kafkaTopics, kafkaTimestampKey, clickhouseBatchSize, clickhouseFlushInterval, clickhouseForceNumberFields, clickhouseForceUnderscores, client)
server.Shutdown(context.Background())
metricsServer.Stop()
}
1 change: 1 addition & 0 deletions cmd/plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ An example configuration file can be found in the [fluent-bit-cm.yaml](../../clu

| Option | Description | Default |
| ------ | ----------- | ------- |
| `Metrics_Server_Address` | The address, where the metrics server should listen on. | `:2021` |
| `Address` | The address, where ClickHouse is listining on, e.g. `clickhouse-clickhouse.clickhouse.svc.cluster.local:9000`. | |
| `Database` | The name of the database for the logs. | `logs` |
| `Username` | The username, to authenticate to ClickHouse. | |
Expand Down
45 changes: 35 additions & 10 deletions cmd/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
import "github.com/kobsio/klogs/pkg/metrics"

const (
defaultDatabase string = "logs"
defaultDialTimeout string = "10s"
defaultConnMaxLifetime string = "1h"
defaultMaxIdleConns int = 1
defaultMaxOpenConns int = 1
defaultBatchSize int64 = 10000
defaultFlushInterval time.Duration = 60 * time.Second
defaultForceUnderscores bool = false
defaultMetricsServerAddress string = ":2021"
defaultDatabase string = "logs"
defaultDialTimeout string = "10s"
defaultConnMaxLifetime string = "1h"
defaultMaxIdleConns int = 1
defaultMaxOpenConns int = 1
defaultBatchSize int64 = 10000
defaultFlushInterval time.Duration = 60 * time.Second
defaultForceUnderscores bool = false
)

var (
Expand All @@ -42,6 +44,7 @@ var (
forceUnderscores bool
lastFlush = time.Now()
client *clickhouse.Client
metricsServer metrics.Server
)

//export FLBPluginRegister
Expand Down Expand Up @@ -93,6 +96,21 @@ func FLBPluginInit(plugin unsafe.Pointer) int {
log.Info(nil, "Version information", version.Info()...)
log.Info(nil, "Build context", version.BuildContext()...)

// Read the configuration for the address where the metrics server should listen on. Then create a new metrics
// server and start the server in a new Go routine via the `Start` method.
//
// When the plugin exits the metrics server should be stopped via the `Stop` method.
metricsServerAddress := output.FLBPluginConfigKey(plugin, "metrics_server_address")
if metricsServerAddress == "" {
metricsServerAddress = defaultMetricsServerAddress
}

metricsServer = metrics.New(metricsServerAddress)
go metricsServer.Start()

// Read all configuration values required for the ClickHouse client. Once we have all configuration values we
// create a new ClickHouse client, which can then be used to write the logs from Fluent Bit into ClickHouse when the
// FLBPluginFlushCtx function is called.
address := output.FLBPluginConfigKey(plugin, "address")

database = output.FLBPluginConfigKey(plugin, "database")
Expand Down Expand Up @@ -193,6 +211,8 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
break
}

metrics.InputRecordsTotalMetric.Inc()

var timestamp time.Time
switch t := ts.(type) {
case output.FLBTime:
Expand Down Expand Up @@ -312,18 +332,22 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
}

startFlushTime := time.Now()
if client.BufferLen() < int(batchSize) && lastFlush.Add(flushInterval).After(startFlushTime) {
currentBatchSize := client.BufferLen()
if currentBatchSize < int(batchSize) && lastFlush.Add(flushInterval).After(startFlushTime) {
return output.FLB_OK
}

log.Info(nil, "Start flushing", zap.Int("batchSize", client.BufferLen()), zap.Duration("flushInterval", startFlushTime.Sub(lastFlush)))
log.Info(nil, "Start flushing", zap.Int("batchSize", currentBatchSize), zap.Duration("flushInterval", startFlushTime.Sub(lastFlush)))
err := client.BufferWrite()
if err != nil {
metrics.ErrorsTotalMetric.Inc()
log.Error(nil, "Error while writing buffer", zap.Error(err))
return output.FLB_ERROR
}

lastFlush = time.Now()
metrics.BatchSizeMetric.Observe(float64(currentBatchSize))
metrics.FlushTimeSecondsMetric.Observe(lastFlush.Sub(startFlushTime).Seconds())
log.Info(nil, "End flushing", zap.Duration("flushTime", lastFlush.Sub(startFlushTime)))

return output.FLB_OK
Expand All @@ -337,6 +361,7 @@ func FLBPluginExit() int {

//export FLBPluginExitCtx
func FLBPluginExitCtx(ctx unsafe.Pointer) int {
metricsServer.Stop()
return output.FLB_OK
}

Expand Down
Loading

0 comments on commit 45f2b4d

Please sign in to comment.