Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add ability to stop routines on partial error.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Mar 4, 2022
1 parent d3fc7be commit 36e8907
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We use the following categories for changes:
### Fixed
- Register `promscale_ingest_channel_len_bucket` metric and make it a gauge [#1177]
- Log warning when failing to write response to remote read requests [#1180]
- Fix Promscale running even when some component may fail to start [#1217]

## [0.10.0] - 2022-02-17

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/jackc/pgtype v1.10.0
github.com/jackc/pgx/v4 v4.15.1-0.20220219175125-b6b24f9e8a5d
github.com/jaegertracing/jaeger v1.31.0
github.com/oklog/run v1.1.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.46.0
github.com/opentracing-contrib/go-stdlib v1.0.0
github.com/opentracing/opentracing-go v1.2.0
Expand Down
108 changes: 64 additions & 44 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"fmt"
"net"
"net/http"
"sync"
"os"
"os/signal"
"time"

_ "github.com/jackc/pgx/v4/stdlib"
"github.com/oklog/run"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/otel"
"google.golang.org/grpc"
Expand Down Expand Up @@ -126,9 +128,7 @@ func Run(cfg *Config) error {
return fmt.Errorf("error registering metrics for telemetry: %w", err)
}

log.Info("msg", "Started Prometheus HTTP server", "listening-port", cfg.ListenAddr)

var wg sync.WaitGroup
var group run.Group
if len(cfg.ThanosStoreAPIListenAddr) > 0 {
srv := thanos.NewStorage(client.Queryable())
options := make([]grpc.ServerOption, 0)
Expand All @@ -143,22 +143,20 @@ func Run(cfg *Config) error {
grpcServer := grpc.NewServer(options...)
storepb.RegisterStoreServer(grpcServer, srv)

wg.Add(1)
go func() {
listener, err := net.Listen("tcp", cfg.ThanosStoreAPIListenAddr)
if err != nil {
wg.Done()
log.Error("msg", "Listening for Thanos StoreAPI failed", "err", err)
return
}

log.Info("msg", "Started Thanos StoreAPI GRPC server", "listening-port", cfg.ThanosStoreAPIListenAddr)
wg.Done()
if err := grpcServer.Serve(listener); err != nil {
log.Error("msg", "Starting the Thanos store failed", "err", err)
return
}
}()
group.Add(
func() error {
listener, err := net.Listen("tcp", cfg.ThanosStoreAPIListenAddr)
if err != nil {
log.Error("msg", "Listening for Thanos StoreAPI failed", "err", err)
return err
}
log.Info("msg", "Started Thanos StoreAPI GRPC server", "listening-port", cfg.ThanosStoreAPIListenAddr)
return grpcServer.Serve(listener)
}, func(error) {
log.Info("msg", "Stopping Thanos StoreAPI GRPC server")
grpcServer.Stop()
},
)
}

options := []grpc.ServerOption{
Expand Down Expand Up @@ -194,40 +192,62 @@ func Run(cfg *Config) error {
return err
}

wg.Add(1)
go func() {
group.Add(
func() error {
listener, err := net.Listen("tcp", cfg.OTLPGRPCListenAddr)
if err != nil {
wg.Done()
log.Error("msg", "Listening for OTLP GRPC server failed", "err", err)
return
log.Error("msg", "Listening for OpenTelemetry OTLP GRPC server failed", "err", err)
return err
}

log.Info("msg", "Started OpenTelemetry OTLP GRPC server", "listening-port", cfg.OTLPGRPCListenAddr)
wg.Done()
if err := grpcServer.Serve(listener); err != nil {
log.Error("msg", "Starting the OTLP GRPC server failed", "err", err)
return
}
}()
}
return grpcServer.Serve(listener)
}, func(error) {
log.Info("msg", "Stopping OpenTelemetry OTLP GRPC server")
grpcServer.Stop()
},
)

mux := http.NewServeMux()
mux.Handle("/", router)

wg.Wait()
log.Info("msg", "All components are ready!")
server := http.Server{
Addr: cfg.ListenAddr,
Handler: mux,
}
group.Add(
func() error {
var err error
log.Info("msg", "Started Prometheus remote-storage HTTP server", "listening-port", cfg.ListenAddr)
if cfg.TLSCertFile != "" {
err = server.ListenAndServeTLS(cfg.TLSCertFile, cfg.TLSKeyFile)
} else {
err = server.ListenAndServe()
}
return err
}, func(error) {
log.Info("msg", "Stopping Prometheus remote-storage HTTP server")
err = server.Shutdown(context.Background())
if err != nil {
log.Error("msg", "unable to shutdown Prometheus remote-storage HTTP server", "err", err.Error())
}
},
)

if cfg.TLSCertFile != "" {
err = http.ListenAndServeTLS(cfg.ListenAddr, cfg.TLSCertFile, cfg.TLSKeyFile, mux)
} else {
err = http.ListenAndServe(cfg.ListenAddr, mux)
}
// Listen to OS interrupt signals.
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
group.Add(
func() error {
<-c
return nil
}, func(err error) {
close(c)
},
)

err = group.Run()
if err != nil {
log.Error("msg", "Listen failure", "err", err)
return startupError
log.Error("msg", "Execution failure, stopping Promscale", "err", err)
}

return nil
return err
}

0 comments on commit 36e8907

Please sign in to comment.