diff --git a/receiver/opencensusreceiver/opencensus.go b/receiver/opencensusreceiver/opencensus.go index 6cb00c409ef5..0b7781549ae5 100644 --- a/receiver/opencensusreceiver/opencensus.go +++ b/receiver/opencensusreceiver/opencensus.go @@ -30,7 +30,6 @@ import ( "google.golang.org/grpc" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/receiver/opencensusreceiver/ocmetrics" @@ -100,12 +99,36 @@ func newOpenCensusReceiver( // Start runs the trace receiver on the gRPC server. Currently // it also enables the metrics receiver too. func (ocr *ocReceiver) Start(_ context.Context, host component.Host) error { - return ocr.start(host) + hasConsumer := false + if ocr.traceConsumer != nil { + hasConsumer = true + if err := ocr.registerTraceConsumer(); err != nil { + return err + } + } + + if ocr.metricsConsumer != nil { + hasConsumer = true + if err := ocr.registerMetricsConsumer(); err != nil { + return err + } + } + + if !hasConsumer { + return errors.New("cannot start receiver: no consumers were specified") + } + + if err := ocr.startServer(host); err != nil { + return err + } + + // At this point we've successfully started all the services/receivers. + // Add other start routines here. + return nil } func (ocr *ocReceiver) registerTraceConsumer() error { - var err = componenterror.ErrAlreadyStarted - + var err error ocr.startTracesReceiverOnce.Do(func() { ocr.traceReceiver, err = octrace.New( ocr.instanceName, ocr.traceConsumer, ocr.traceReceiverOpts...) @@ -119,8 +142,7 @@ func (ocr *ocReceiver) registerTraceConsumer() error { } func (ocr *ocReceiver) registerMetricsConsumer() error { - var err = componenterror.ErrAlreadyStarted - + var err error ocr.startMetricsReceiverOnce.Do(func() { ocr.metricsReceiver, err = ocmetrics.New( ocr.instanceName, ocr.metricsConsumer) @@ -145,51 +167,11 @@ func (ocr *ocReceiver) grpcServer() *grpc.Server { // Shutdown is a method to turn off receiving. func (ocr *ocReceiver) Shutdown(context.Context) error { - if err := ocr.stop(); err != componenterror.ErrAlreadyStopped { - return err - } - return nil -} - -// start runs all the receivers/services namely, Trace and Metrics services. -func (ocr *ocReceiver) start(host component.Host) error { - hasConsumer := false - if ocr.traceConsumer != nil { - hasConsumer = true - if err := ocr.registerTraceConsumer(); err != nil && err != componenterror.ErrAlreadyStarted { - return err - } - } - - if ocr.metricsConsumer != nil { - hasConsumer = true - if err := ocr.registerMetricsConsumer(); err != nil && err != componenterror.ErrAlreadyStarted { - return err - } - } - - if !hasConsumer { - return errors.New("cannot start receiver: no consumers were specified") - } - - if err := ocr.startServer(host); err != nil && err != componenterror.ErrAlreadyStarted { - return err - } - - // At this point we've successfully started all the services/receivers. - // Add other start routines here. - return nil -} - -// stop stops the underlying gRPC server and all the services running on it. -func (ocr *ocReceiver) stop() error { ocr.mu.Lock() defer ocr.mu.Unlock() - err := componenterror.ErrAlreadyStopped + var err error ocr.stopOnce.Do(func() { - err = nil - if ocr.serverHTTP != nil { _ = ocr.serverHTTP.Close() } @@ -225,9 +207,8 @@ func (ocr *ocReceiver) httpServer() *http.Server { } func (ocr *ocReceiver) startServer(host component.Host) error { - err := componenterror.ErrAlreadyStarted + var err error ocr.startServerOnce.Do(func() { - err = nil // Register the grpc-gateway on the HTTP server mux c := context.Background() opts := []grpc.DialOption{grpc.WithInsecure()} diff --git a/receiver/opencensusreceiver/opencensus_test.go b/receiver/opencensusreceiver/opencensus_test.go index 2fd4fd85fe44..e3060ecdcc43 100644 --- a/receiver/opencensusreceiver/opencensus_test.go +++ b/receiver/opencensusreceiver/opencensus_test.go @@ -241,7 +241,7 @@ func TestStopWithoutStartNeverCrashes(t *testing.T) { ocr, err := newOpenCensusReceiver(ocReceiverName, "tcp", addr, nil, nil) require.NoError(t, err, "Failed to create an OpenCensus receiver: %v", err) // Stop it before ever invoking Start*. - require.NoError(t, ocr.stop()) + require.NoError(t, ocr.Shutdown(context.Background())) } func TestNewPortAlreadyUsed(t *testing.T) {