Skip to content

Commit

Permalink
metrics renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
OS-M committed Jan 6, 2023
1 parent 85f2742 commit 0a6ddb4
Show file tree
Hide file tree
Showing 12 changed files with 27 additions and 27 deletions.
8 changes: 4 additions & 4 deletions cmd/dev/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@ package main

import (
"context"
"github.com/proxima-one/indexer-utils-go/pkg/consumer_metrics"
"github.com/proxima-one/indexer-utils-go/pkg/status_server"
"github.com/proxima-one/indexer-utils-go/pkg/consume_status"
"github.com/proxima-one/indexer-utils-go/pkg/prometheus_metrics"
"time"
)

func main() {
go func() {
serv := status_server.NewStatusServer()
serv := consume_status.NewConsumeStatusServer()
serv.Start(context.Background(), 27000, 8080)
serv.UpdateNetworkIndexingStatus("test", time.Now(), "1")
for {
time.Sleep(time.Second)
}
}()
go func() {
metrics := consumer_metrics.NewConsumerMetricsServer().EnableConsumerMetrics(context.Background())
metrics := prometheus_metrics.NewConsumerMetricsServer().EnableConsumerMetrics(context.Background())
go metrics.Start(12228)
for {
metrics.EventProcessed("net1", time.Unix(time.Now().Unix()-100, 0))
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package status_server
package consume_status

import (
"context"
"fmt"
pb "github.com/proxima-one/indexer-utils-go/pkg/consume_status/internal/proto"
"github.com/proxima-one/indexer-utils-go/pkg/grpc_gateway"
pb "github.com/proxima-one/indexer-utils-go/pkg/status_server/internal/proto"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -19,17 +19,17 @@ type networkIndexingStatus struct {
BlockNumber string
}

type StatusServer struct {
type ConsumeStatusServer struct {
states map[string]*networkIndexingStatus
}

func NewStatusServer() *StatusServer {
return &StatusServer{
func NewConsumeStatusServer() *ConsumeStatusServer {
return &ConsumeStatusServer{
states: make(map[string]*networkIndexingStatus),
}
}

func (s *StatusServer) Start(ctx context.Context, grpcPort, httpPort int) {
func (s *ConsumeStatusServer) Start(ctx context.Context, grpcPort, httpPort int) {
grpcServer := grpc.NewServer()
pb.RegisterStatusServiceServer(grpcServer, s)

Expand All @@ -53,7 +53,7 @@ func (s *StatusServer) Start(ctx context.Context, grpcPort, httpPort int) {
}()
}

func (s *StatusServer) UpdateNetworkIndexingStatus(network string, timestamp time.Time, blockNumber string) {
func (s *ConsumeStatusServer) UpdateNetworkIndexingStatus(network string, timestamp time.Time, blockNumber string) {
if _, ok := s.states[network]; !ok {
s.states[network] = &networkIndexingStatus{}
}
Expand All @@ -62,7 +62,7 @@ func (s *StatusServer) UpdateNetworkIndexingStatus(network string, timestamp tim
s.states[network].BlockNumber = blockNumber
}

func (s *StatusServer) GetStatus(_ context.Context, _ *emptypb.Empty) (*pb.GetStatusResponse, error) {
func (s *ConsumeStatusServer) GetStatus(_ context.Context, _ *emptypb.Empty) (*pb.GetStatusResponse, error) {
res := &pb.GetStatusResponse{Networks: make([]*pb.NetworkIndexingStatus, 0)}

for _, state := range s.states {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package consumer_metrics
package prometheus_metrics

import (
"context"
Expand All @@ -17,16 +17,16 @@ type eventProcessedEvent struct {
timestamp time.Time
}

type IndexingServiceMetricsServer struct {
type PrometheusMetricsServer struct {
processedEvents chan eventProcessedEvent
}

func NewConsumerMetricsServer() *IndexingServiceMetricsServer {
return new(IndexingServiceMetricsServer)
func NewConsumerMetricsServer() *PrometheusMetricsServer {
return new(PrometheusMetricsServer)
}

func (cm *IndexingServiceMetricsServer) EnableConsumerMetrics(ctx context.Context) *IndexingServiceMetricsServer {
cm.processedEvents = make(chan eventProcessedEvent, 100)
func (s *PrometheusMetricsServer) EnableConsumerMetrics(ctx context.Context) *PrometheusMetricsServer {
s.processedEvents = make(chan eventProcessedEvent, 100)
processingDelay := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "",
Name: "index_processing_delay",
Expand Down Expand Up @@ -86,7 +86,7 @@ func (cm *IndexingServiceMetricsServer) EnableConsumerMetrics(ctx context.Contex

lastUpdateTime = time.Now()

case event := <-cm.processedEvents:
case event := <-s.processedEvents:
if streams[event.streamId] == nil {
streams[event.streamId] = new(streamData)
}
Expand All @@ -99,25 +99,25 @@ func (cm *IndexingServiceMetricsServer) EnableConsumerMetrics(ctx context.Contex
}
}
}()
return cm
return s
}

func (cm *IndexingServiceMetricsServer) EnableServerMetrics(server *grpc.Server) *IndexingServiceMetricsServer {
func (s *PrometheusMetricsServer) EnableServerMetrics(server *grpc.Server) *PrometheusMetricsServer {
grpcPrometheus.EnableHandlingTimeHistogram()
grpcPrometheus.Register(server)
return cm
return s
}

func (cm *IndexingServiceMetricsServer) Start(port int) error {
func (s *PrometheusMetricsServer) Start(port int) error {
http.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}

func (cm *IndexingServiceMetricsServer) EventProcessed(stream string, timestamp time.Time) {
if cm.processedEvents == nil {
panic("cannot use consumer metrics with server-only IndexingServiceMetricsServer")
func (s *PrometheusMetricsServer) EventProcessed(stream string, timestamp time.Time) {
if s.processedEvents == nil {
panic("cannot use consumer metrics with server-only PrometheusMetricsServer")
}
cm.processedEvents <- eventProcessedEvent{
s.processedEvents <- eventProcessedEvent{
streamId: stream,
eventTimestamp: timestamp,
timestamp: time.Now(),
Expand Down

0 comments on commit 0a6ddb4

Please sign in to comment.