Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make refinery run from minimal config #724

Merged
merged 10 commits into from
Jun 21, 2023
2 changes: 1 addition & 1 deletion app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func newStartedApp(

a := App{}

lgr := &logger.LogrusLogger{
lgr := &logger.StdoutLogger{
Config: c,
}
lgr.SetLevel("error")
Expand Down
67 changes: 46 additions & 21 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func main() {

c, err := config.NewConfig(opts, func(err error) {
if a.Logger != nil {
a.Logger.Error().WithField("error", err).Logf("error reloading config")
a.Logger.Error().WithField("error", err).Logf("error loading config")
}
})
if err != nil {
Expand All @@ -95,6 +95,11 @@ func main() {
fmt.Println("Config and Rules validated successfully.")
os.Exit(0)
}
c.RegisterReloadCallback(func() {
if a.Logger != nil {
a.Logger.Info().Logf("configuration change was detected and the configuration was reloaded")
}
})

// get desired implementation for each dependency to inject
lgr := logger.GetLoggerImplementation(c)
Expand Down Expand Up @@ -142,21 +147,6 @@ func main() {
upstreamMetricsRecorder := metrics.NewMetricsPrefixer("libhoney_upstream")
peerMetricsRecorder := metrics.NewMetricsPrefixer("libhoney_peer")

// these are the metrics that libhoney will emit; we preregister them so that they always appear
libhoneyMetricsName := map[string]string{
"queue_length": "gauge",
"queue_overflow": "counter",
"send_errors": "counter",
"send_retries": "counter",
"batches_sent": "counter",
"messages_sent": "counter",
"response_decode_errors": "counter",
}
for name, typ := range libhoneyMetricsName {
upstreamMetricsRecorder.Register(name, typ)
peerMetricsRecorder.Register(name, typ)
}

userAgentAddition := "refinery/" + version
upstreamClient, err := libhoney.NewClient(libhoney.ClientConfig{
Transmission: &transmission.Honeycomb{
Expand Down Expand Up @@ -195,6 +185,8 @@ func main() {
}

stressRelief := &collect.StressRelief{Done: done}
upstreamTransmission := transmit.NewDefaultTransmission(upstreamClient, upstreamMetricsRecorder, "upstream")
peerTransmission := transmit.NewDefaultTransmission(peerClient, peerMetricsRecorder, "peer")

var g inject.Graph
if opts.Debug {
Expand All @@ -206,8 +198,8 @@ func main() {
{Value: lgr},
{Value: upstreamTransport, Name: "upstreamTransport"},
{Value: peerTransport, Name: "peerTransport"},
{Value: transmit.NewDefaultTransmission(upstreamClient, upstreamMetricsRecorder, "upstream"), Name: "upstreamTransmission"},
{Value: transmit.NewDefaultTransmission(peerClient, peerMetricsRecorder, "peer"), Name: "peerTransmission"},
{Value: upstreamTransmission, Name: "upstreamTransmission"},
{Value: peerTransmission, Name: "peerTransmission"},
{Value: shrdr},
{Value: collector},
{Value: metricsSingleton, Name: "metrics"},
Expand Down Expand Up @@ -242,20 +234,53 @@ func main() {
os.Exit(1)
}

// We need to remove the upstream and peer transmissions from the list of
// objects to startstop. For reasons I haven't been able to figure out,
// startstop sometimes decides to start them before the rest of the objects
// are ready, which causes a panic. We have to do it this way because
// startstop deliberately randomizes the graph order.
var nonxmitObjects []*inject.Object
for _, obj := range g.Objects() {
if obj.Name == "upstreamTransmission" || obj.Name == "peerTransmission" {
continue
}
nonxmitObjects = append(nonxmitObjects, obj)
}

// the logger provided to startstop must be valid before any service is
// started, meaning it can't rely on injected configs. make a custom logger
// just for this step
ststLogger := logrus.New()
level, _ := logrus.ParseLevel(logLevel)
ststLogger.SetLevel(level)
// level, _ := logrus.ParseLevel(logLevel)
ststLogger.SetLevel(logrus.DebugLevel)

// we can stop all the objects in one call, but we need to start the
// transmissions manually.
defer startstop.Stop(g.Objects(), ststLogger)
if err := startstop.Start(g.Objects(), ststLogger); err != nil {
if err := startstop.Start(nonxmitObjects, ststLogger); err != nil {
fmt.Printf("failed to start injected dependencies. error: %+v\n", err)
os.Exit(1)
}
// now start these manually
upstreamTransmission.Start()
peerTransmission.Start()

// these have to be done after the injection (of metrics)
// these are the metrics that libhoney will emit; we preregister them so that they always appear
libhoneyMetricsName := map[string]string{
"queue_length": "gauge",
"queue_overflow": "counter",
"send_errors": "counter",
"send_retries": "counter",
"batches_sent": "counter",
"messages_sent": "counter",
"response_decode_errors": "counter",
}
for name, typ := range libhoneyMetricsName {
upstreamMetricsRecorder.Register(name, typ)
peerMetricsRecorder.Register(name, typ)
}

metricsSingleton.Store("UPSTREAM_BUFFER_SIZE", float64(c.GetUpstreamBufferSize()))
metricsSingleton.Store("PEER_BUFFER_SIZE", float64(c.GetPeerBufferSize()))

Expand Down
Loading