Skip to content

Commit

Permalink
Avoid returning Already[Started/Stopped] if omitted after (#3035)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Apr 28, 2021
1 parent 983114f commit 8d6cf2a
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 50 deletions.
79 changes: 30 additions & 49 deletions receiver/opencensusreceiver/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/receiver/opencensusreceiver/ocmetrics"
Expand Down Expand Up @@ -100,12 +99,36 @@ func newOpenCensusReceiver(
// Start runs the trace receiver on the gRPC server. Currently
// it also enables the metrics receiver too.
func (ocr *ocReceiver) Start(_ context.Context, host component.Host) error {
return ocr.start(host)
hasConsumer := false
if ocr.traceConsumer != nil {
hasConsumer = true
if err := ocr.registerTraceConsumer(); err != nil {
return err
}
}

if ocr.metricsConsumer != nil {
hasConsumer = true
if err := ocr.registerMetricsConsumer(); err != nil {
return err
}
}

if !hasConsumer {
return errors.New("cannot start receiver: no consumers were specified")
}

if err := ocr.startServer(host); err != nil {
return err
}

// At this point we've successfully started all the services/receivers.
// Add other start routines here.
return nil
}

func (ocr *ocReceiver) registerTraceConsumer() error {
var err = componenterror.ErrAlreadyStarted

var err error
ocr.startTracesReceiverOnce.Do(func() {
ocr.traceReceiver, err = octrace.New(
ocr.instanceName, ocr.traceConsumer, ocr.traceReceiverOpts...)
Expand All @@ -119,8 +142,7 @@ func (ocr *ocReceiver) registerTraceConsumer() error {
}

func (ocr *ocReceiver) registerMetricsConsumer() error {
var err = componenterror.ErrAlreadyStarted

var err error
ocr.startMetricsReceiverOnce.Do(func() {
ocr.metricsReceiver, err = ocmetrics.New(
ocr.instanceName, ocr.metricsConsumer)
Expand All @@ -145,51 +167,11 @@ func (ocr *ocReceiver) grpcServer() *grpc.Server {

// Shutdown is a method to turn off receiving.
func (ocr *ocReceiver) Shutdown(context.Context) error {
if err := ocr.stop(); err != componenterror.ErrAlreadyStopped {
return err
}
return nil
}

// start runs all the receivers/services namely, Trace and Metrics services.
func (ocr *ocReceiver) start(host component.Host) error {
hasConsumer := false
if ocr.traceConsumer != nil {
hasConsumer = true
if err := ocr.registerTraceConsumer(); err != nil && err != componenterror.ErrAlreadyStarted {
return err
}
}

if ocr.metricsConsumer != nil {
hasConsumer = true
if err := ocr.registerMetricsConsumer(); err != nil && err != componenterror.ErrAlreadyStarted {
return err
}
}

if !hasConsumer {
return errors.New("cannot start receiver: no consumers were specified")
}

if err := ocr.startServer(host); err != nil && err != componenterror.ErrAlreadyStarted {
return err
}

// At this point we've successfully started all the services/receivers.
// Add other start routines here.
return nil
}

// stop stops the underlying gRPC server and all the services running on it.
func (ocr *ocReceiver) stop() error {
ocr.mu.Lock()
defer ocr.mu.Unlock()

err := componenterror.ErrAlreadyStopped
var err error
ocr.stopOnce.Do(func() {
err = nil

if ocr.serverHTTP != nil {
_ = ocr.serverHTTP.Close()
}
Expand Down Expand Up @@ -225,9 +207,8 @@ func (ocr *ocReceiver) httpServer() *http.Server {
}

func (ocr *ocReceiver) startServer(host component.Host) error {
err := componenterror.ErrAlreadyStarted
var err error
ocr.startServerOnce.Do(func() {
err = nil
// Register the grpc-gateway on the HTTP server mux
c := context.Background()
opts := []grpc.DialOption{grpc.WithInsecure()}
Expand Down
2 changes: 1 addition & 1 deletion receiver/opencensusreceiver/opencensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func TestStopWithoutStartNeverCrashes(t *testing.T) {
ocr, err := newOpenCensusReceiver(ocReceiverName, "tcp", addr, nil, nil)
require.NoError(t, err, "Failed to create an OpenCensus receiver: %v", err)
// Stop it before ever invoking Start*.
require.NoError(t, ocr.stop())
require.NoError(t, ocr.Shutdown(context.Background()))
}

func TestNewPortAlreadyUsed(t *testing.T) {
Expand Down

0 comments on commit 8d6cf2a

Please sign in to comment.