Skip to content

Commit

Permalink
cleaned up telemetry code
Browse files Browse the repository at this point in the history
  • Loading branch information
deelawn committed Dec 8, 2023
1 parent 9797e8a commit 64d447a
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 149 deletions.
53 changes: 35 additions & 18 deletions gno.land/cmd/gnoland/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,28 +176,45 @@ func (c *startCfg) RegisterFlags(fs *flag.FlagSet) {
)
}

func execStart(c *startCfg, io commands.IO) error {
logger := log.NewTMLogger(log.NewSyncWriter(io.Out()))
dataDir := c.dataDir

if strings.ToLower(os.Getenv("TELEMETRY_ENABLED")) == "true" {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var port uint64
var err error
portStr := os.Getenv("TELEMETRY_METRICS_PORT")
if portStr != "" {
port, err = strconv.ParseUint(portStr, 10, 0)
}
func initTelemetry(ctx context.Context) error {
var options []telemetry.Option

if os.Getenv("TELEM_METRICS_ENABLED") == "true" {
options = append(options, telemetry.WithOptionMetricsEnabled())
}
if os.Getenv("TELEM_TRACES_ENABLED") == "true" {
options = append(options, telemetry.WithOptionTracesEnabled())
}
if portString := os.Getenv("TELEM_PORT"); portString != "" {
port, err := strconv.ParseUint(portString, 10, 64)
if err != nil {
panic("invalid TELEMETRY_METRICS_PORT: " + portStr)
return fmt.Errorf("invalid port: %w", err)
}

if err = telemetry.Init(ctx, port); err != nil {
panic("error initialzing telemetry: " + err.Error())
}
options = append(options, telemetry.WithOptionPort(port))
}
if os.Getenv("TELEM_USE_FAKE_METRICS") == "true" {
options = append(options, telemetry.WithOptionFakeMetrics())
}

// The string options can be added by default. Their absence would yield the same result
// as if the option were excluded altogether.
options = append(options, telemetry.WithOptionMeterName(os.Getenv("TELEM_METER_NAME")))
options = append(options, telemetry.WithOptionExporterEndpoint(os.Getenv("TELEM_EXPORTER_ENDPOINT")))
options = append(options, telemetry.WithOptionServiceName(os.Getenv("TELEM_SERVICE_NAME")))

return telemetry.Init(ctx, options...)
}

func execStart(c *startCfg, io commands.IO) error {
logger := log.NewTMLogger(log.NewSyncWriter(io.Out()))
dataDir := c.dataDir

// Attempt to initialize telemetry. If the enviroment variables required to initialize
// telemetry are not set, then the initialization will do nothing.
ctx := context.Background()
if err := initTelemetry(ctx); err != nil {
return fmt.Errorf("error initializing telemetry: %w", err)
}

var (
Expand Down
8 changes: 4 additions & 4 deletions gno.land/pkg/sdk/vm/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewHandler(vm *VMKeeper) vmHandler {
}

func (vh vmHandler) Process(ctx sdk.Context, msg std.Msg) sdk.Result {
if telemetry.IsEnabled() {
if telemetry.TracesEnabled() {
// This is the trace's entry point for the VM namespace, so initialize it with the context.
traces.InitNamespace(ctx.Context(), traces.NamespaceVMProcess)
spanEnder := traces.StartSpan(
Expand All @@ -49,7 +49,7 @@ func (vh vmHandler) Process(ctx sdk.Context, msg std.Msg) sdk.Result {

// Handle MsgAddPackage.
func (vh vmHandler) handleMsgAddPackage(ctx sdk.Context, msg MsgAddPackage) sdk.Result {
if telemetry.IsEnabled() {
if telemetry.TracesEnabled() {
spanEnder := traces.StartSpan(
traces.NamespaceVM,
"vmHandler.handleMsgAddPackage",
Expand Down Expand Up @@ -77,7 +77,7 @@ func (vh vmHandler) handleMsgAddPackage(ctx sdk.Context, msg MsgAddPackage) sdk.

// Handle MsgCall.
func (vh vmHandler) handleMsgCall(ctx sdk.Context, msg MsgCall) (res sdk.Result) {
if telemetry.IsEnabled() {
if telemetry.TracesEnabled() {
spanEnder := traces.StartSpan(
traces.NamespaceVM,
"vmHandler.handleMsgCall",
Expand Down Expand Up @@ -128,7 +128,7 @@ const (
)

func (vh vmHandler) Query(ctx sdk.Context, req abci.RequestQuery) (res abci.ResponseQuery) {
if telemetry.IsEnabled() {
if telemetry.TracesEnabled() {
traces.InitNamespace(ctx.Context(), traces.NamespaceVMQuery)
spanEnder := traces.StartSpan(
traces.NamespaceVMQuery,
Expand Down
6 changes: 3 additions & 3 deletions gno.land/pkg/sdk/vm/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (vm *VMKeeper) Initialize(ms store.MultiStore) {
panic("should not happen")
}

if telemetry.IsEnabled() {
if telemetry.TracesEnabled() {
traces.InitNamespace(nil, traces.NamespaceVMInit)
spanEnder := traces.StartSpan(
traces.NamespaceVMInit,
Expand Down Expand Up @@ -110,7 +110,7 @@ func (vm *VMKeeper) getGnoStore(ctx sdk.Context) gno.Store {
panic("VMKeeper must first be initialized")
}

if telemetry.IsEnabled() {
if telemetry.TracesEnabled() {
spanEnder := traces.StartSpan(
traces.NamespaceVM,
"VMKeeper.getGnoStore",
Expand Down Expand Up @@ -216,7 +216,7 @@ func (vm *VMKeeper) AddPackage(ctx sdk.Context, msg MsgAddPackage) error {

// Calls calls a public Gno function (for delivertx).
func (vm *VMKeeper) Call(ctx sdk.Context, msg MsgCall) (res string, err error) {
if telemetry.IsEnabled() {
if telemetry.TracesEnabled() {
spanEnder := traces.StartSpan(
traces.NamespaceVM,
"VMKeeper.Call",
Expand Down
6 changes: 3 additions & 3 deletions gnovm/pkg/gnolang/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func (m *Machine) RunMain() {
// Input must not have been preprocessed, that is,
// it should not be the child of any parent.
func (m *Machine) Eval(x Expr) []TypedValue {
if telemetry.IsEnabled() {
if telemetry.TracesEnabled() {
spanEnder := traces.StartSpan(
traces.NamespaceVM,
"Machine.Eval",
Expand Down Expand Up @@ -1031,7 +1031,7 @@ const (

func (m *Machine) Run() {
var spanEnder *traces.SpanEnder
if telemetry.IsEnabled() {
if telemetry.TracesEnabled() {
// Ensure that spanEnder.End() is called on panic.
defer func() {
if r := recover(); r != nil {
Expand All @@ -1044,7 +1044,7 @@ func (m *Machine) Run() {
for {
op := m.PopOp()

if telemetry.IsEnabled() {
if telemetry.TracesEnabled() {
spanEnder.End()

spanEnder = traces.StartSpan(
Expand Down
2 changes: 1 addition & 1 deletion gnovm/pkg/gnolang/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func Preprocess(store Store, ctx BlockNode, n Node) Node {
}()
}

if telemetry.IsEnabled() {
if telemetry.TracesEnabled() {
spanEnder := traces.StartSpan(
traces.NamespaceVM,
"Preprocess",
Expand Down
5 changes: 5 additions & 0 deletions telemetry/exporter/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package exporter

import "errors"

var ErrEndpointNotSet = errors.New("telemetry exporter endpoint not set")
98 changes: 28 additions & 70 deletions telemetry/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,95 +5,53 @@ package telemetry

import (
"context"
"fmt"
"log"
"net"
"net/http"
"os"
"strconv"
"time"

"github.com/gnolang/gno/telemetry/metrics"
"github.com/gnolang/gno/telemetry/options"
"github.com/gnolang/gno/telemetry/traces"
"github.com/prometheus/client_golang/prometheus/promhttp"

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/sdk/metric"
)

const (
meterName = "gno.land"
defaultPort uint64 = 4591
defaultMeterName = "gno.land"
defaultServiceName = "gno.land"
defaultPort uint64 = 4591
)

var enabled bool
var config options.Config

func IsEnabled() bool {
return enabled
// MetricsEnabled returns true if metrics have been initialized.
func MetricsEnabled() bool {
return config.MetricsEnabled
}

func Init(ctx context.Context, port uint64) error {
enabled = true

if port == 0 {
port = defaultPort
}

// The exporter embeds a default OpenTelemetry Reader and
// implements prometheus.Collector, allowing it to be used as
// both a Reader and Collector.
// exporter, err := prometheus.New()
// if err != nil {
// log.Fatal(err)
// }

// provider := metric.NewMeterProvider(metric.WithReader(exporter))
// meter := provider.Meter(meterName)
// TracesEnabled returns true if traces have been initialized.
func TracesEnabled() bool {
return config.TracesEnabled
}

// Start the prometheus HTTP server and pass the exporter Collector to it
// go serveMetrics(ctx, port)
// Init can indicate both, either, or none of metrics and tracing depending on the options provided.
func Init(ctx context.Context, options ...Option) error {

// Use oltp metric exporter
exporter, err := otlpmetricgrpc.New(
ctx,
otlpmetricgrpc.WithEndpoint(os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT")),
otlpmetricgrpc.WithInsecure(),
)
if err != nil {
log.Fatal(err)
config.Port = defaultPort
config.MeterName = defaultMeterName
config.ServiceName = defaultServiceName
for _, opt := range options {
opt(&config)
}

provider := metric.NewMeterProvider(metric.WithReader(metric.NewPeriodicReader(exporter)))
meter := provider.Meter(meterName)

// otel.SetMeterProvider(meterProvider)

// Initialize metrics to be collected.
if err := metrics.Init(ctx, meter); err != nil {
return err
if config.MetricsEnabled {
if err := metrics.Init(ctx, config); err != nil {
return err
}
}

// Tracing initialization.
_ = traces.Init()

return nil
}

func serveMetrics(ctx context.Context, port uint64) {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
server := http.Server{
Addr: ":" + strconv.FormatUint(uint64(port), 10),
Handler: mux,
// Nothing should need a connection for longer than a few seconds when scraping metrics.
BaseContext: func(net.Listener) context.Context {
boundedCtx, _ := context.WithTimeout(ctx, time.Second*10)
return boundedCtx
},
if config.TracesEnabled {
if err := traces.Init(config); err != nil {
return err
}
}

if err := server.ListenAndServe(); err != nil {
fmt.Printf("error serving metrics over http: %v", err)
return
}
return nil
}
35 changes: 24 additions & 11 deletions telemetry/metrics/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package metrics
import (
"context"
"math/rand"
"os"

"github.com/gnolang/gno/telemetry/exporter"
"github.com/gnolang/gno/telemetry/options"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/metric"
sdkMetric "go.opentelemetry.io/otel/sdk/metric"
)

var (
Expand All @@ -16,18 +20,27 @@ var (
BuildBlockTimer Int64Collector
)

func Init(setCtx context.Context, meter metric.Meter) error {
func Init(setCtx context.Context, config options.Config) error {
if config.ExporterEndpoint == "" {
return exporter.ErrEndpointNotSet
}

ctx = setCtx

// Setting fake metrics results in choosing random values in a given range, disregarding
// th evalues passed to Collect().
//
// DBTODO: clean up the fake metrics code to make it easier to compose future metric types.
var useFakeMetrics bool
if value := os.Getenv("FAKE_METRICS"); value == "true" {
useFakeMetrics = true
// Use oltp metric exporter
exporter, err := otlpmetricgrpc.New(
ctx,
otlpmetricgrpc.WithEndpoint(config.ExporterEndpoint),
otlpmetricgrpc.WithInsecure(),
)
if err != nil {
return err
}

provider := sdkMetric.NewMeterProvider(sdkMetric.WithReader(sdkMetric.NewPeriodicReader(exporter)))
otel.SetMeterProvider(provider)
meter := provider.Meter(config.MeterName)

broadcastTxTimer, err := meter.Int64Histogram(
"broadcast_tx_hist",
metric.WithDescription("broadcast tx duration"),
Expand All @@ -39,7 +52,7 @@ func Init(setCtx context.Context, meter metric.Meter) error {
}
BroadcastTxTimer = Int64Histogram{
Int64Histogram: broadcastTxTimer,
useFakeMetrics: useFakeMetrics,
useFakeMetrics: config.UseFakeMetrics,
fakeRangeStart: 5,
fakeRangeEnd: 250,
}
Expand All @@ -55,7 +68,7 @@ func Init(setCtx context.Context, meter metric.Meter) error {
}
BuildBlockTimer = Int64Histogram{
Int64Histogram: buildBlockTimer,
useFakeMetrics: useFakeMetrics,
useFakeMetrics: config.UseFakeMetrics,
fakeRangeStart: 0,
fakeRangeEnd: 150,
}
Expand Down
Loading

0 comments on commit 64d447a

Please sign in to comment.