Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric creation slowed down by unreachable collector with gRPC #3925

Closed
fracasula opened this issue Mar 23, 2023 · 8 comments · Fixed by #4395
Closed

Metric creation slowed down by unreachable collector with gRPC #3925

fracasula opened this issue Mar 23, 2023 · 8 comments · Fixed by #4395
Assignees
Labels
area:metrics Part of OpenTelemetry Metrics bug Something isn't working pkg:SDK Related to an SDK package

Comments

@fracasula
Copy link

Description

I noticed that the creation of instruments while the agent/collector is down can easily take more than 10 seconds each.
I'm testing this by passing an invalid host (e.g. unreachable:4317) to my application and I see these calls taking more than 10 seconds each:

switch any(m).(type) {
case map[string]instrument.Int64Counter:
	value, err = meter.Int64Counter(name, castOptions[instrument.Int64Option](opts...)...)
case map[string]instrument.Int64Histogram:
	value, err = meter.Int64Histogram(name, castOptions[instrument.Int64Option](opts...)...)
case map[string]instrument.Float64Histogram:
	value, err = meter.Float64Histogram(name, castOptions[instrument.Float64Option](opts...)...)
default:
	panic(fmt.Errorf("unknown instrument type %T", instr))
}

That is a small switch that I have in my own instruments factory that works as an adapter to your library.

image

To me it looks like these operations all hold a mutex (i.e. e.clientMu):

image

I think Aggregation and Temporality are called when an instrument is created but the same mutex is kept locked for the entire duration of Export as well, so maybe that is where the problem lies. I would expect Export to hold a lock just to create a copy of the data that it needs to send, then unlock then asynchronously take its time to send the data over the gRPC connection. Buffered channels can be used to and metrics can be discarded once the buffer is full.

Environment

Steps To Reproduce

You don't even need a collector. Just create many go routines spawning instruments (also the same instrument over and over since it should be cached in memory) and pass something like "unreachable:4317" as the gRPC endpoint.

This is my default retry config:

var DefaultRetryConfig = RetryConfig{
	Enabled:         true,
	InitialInterval: 5 * time.Second,
	MaxInterval:     30 * time.Second,
	MaxElapsedTime:  time.Minute,
}

And this is how I initialize the meter provider:

meterProviderOptions := []otlpmetricgrpc.Option{
	otlpmetricgrpc.WithEndpoint(c.metricsEndpoint),
	otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetryConfig{
		Enabled:         c.retryConfig.Enabled,
		InitialInterval: c.retryConfig.InitialInterval,
		MaxInterval:     c.retryConfig.MaxInterval,
		MaxElapsedTime:  c.retryConfig.MaxElapsedTime,
	}),
}
if c.withInsecure {
	meterProviderOptions = append(meterProviderOptions, otlpmetricgrpc.WithInsecure())
}
if len(c.meterProviderConfig.otlpMetricGRPCOptions) > 0 {
	meterProviderOptions = append(meterProviderOptions, c.meterProviderConfig.otlpMetricGRPCOptions...)
}
exp, err := otlpmetricgrpc.New(ctx, meterProviderOptions...)
if err != nil {
	return nil, nil, fmt.Errorf("failed to create metric exporter: %w", err)
}

m.mp = sdkmetric.NewMeterProvider(
	sdkmetric.WithResource(res),
	sdkmetric.WithReader(sdkmetric.NewPeriodicReader(
		exp,
		sdkmetric.WithInterval(c.meterProviderConfig.exportsInterval),
	)),
	sdkmetric.WithView(c.meterProviderConfig.views...),
)

if c.meterProviderConfig.global {
	global.SetMeterProvider(m.mp)
}

Expected behavior

I would expect the creation of instruments not to take more than a few milliseconds at most.

@fracasula fracasula added the bug Something isn't working label Mar 23, 2023
@fracasula
Copy link
Author

Removing the lock from Aggregation and Temporality solve the issue for me:

// Temporality returns the Temporality to use for an instrument kind.
func (e *exporter) Temporality(k metric.InstrumentKind) metricdata.Temporality {
	start := time.Now()
	defer func() {
		fmt.Println("OTEL: exporter.Temporality took", time.Since(start))
	}()
	//e.clientMu.Lock()
	//defer e.clientMu.Unlock()
	return e.client.Temporality(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (e *exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation {
	start := time.Now()
	defer func() {
		fmt.Println("OTEL: exporter.Aggregation took", time.Since(start))
	}()
	//e.clientMu.Lock()
	//defer e.clientMu.Unlock()
	return e.client.Aggregation(k)
}

Don't mind the fmt.Println, it was just to debug. Again, with the clientMu.Lock it takes 20-30s.

Anyway, given that the temporality and aggregation selectors aren't really using the client, is the sequentiality that you're trying to achieve via the lock needed there?

@MrAlias MrAlias added pkg:SDK Related to an SDK package area:metrics Part of OpenTelemetry Metrics labels Mar 23, 2023
@fracasula
Copy link
Author

@MrAlias can you tell me the reason why this comment was added?

// Ensure synchronous access to the client across all functionality.

Perhaps if I understand the rationale behind that I could try to fix it myself. It's a serious issue so we can't go to production with it.

The problem is that I'm not sure that removing the lock for Aggregation and Temporality is the best way to go. It might not cause issues right now because at the moment both methods are just returning copies across the stack but I don't know what are your plans for that component moving forward.

@pellared
Copy link
Member

I think that synchronization is required to avoid races when doing ForceFlush and Shudown of the exporter.

@fracasula
Copy link
Author

fracasula commented Mar 30, 2023

I think that synchronization is required to avoid races when doing ForceFlush and Shudown of the exporter.

Right, but that doesn't affect Temporality and Aggregation though, correct? Unless there is a plan to have them communicate via gRPC as well.

At the moment I see these two being used for Temporality and Aggregation:

@pellared
Copy link
Member

pellared commented Mar 30, 2023

I see that the lock is currently needed to avoid a race as the Shutdown mutates e.client. Do you want to contribute by refining the code so that locking is not needed for Temporality and Aggregation?

EDIT: Personally I would simply use a isShutdown atomic.Bool instead of mutating the e.client 😅

@fracasula
Copy link
Author

I see that the lock is currently needed to avoid a race as the Shutdown mutates e.client. Do you want to contribute by refining the code so that locking is not needed for Temporality and Aggregation?

If there are no plans to get such information via the gRPC connection (for some reason) then yeah I can try to have a stab at it.

I'd like confirmation from @MrAlias before commencing any work though since apparently he wrote that bit.

@MrAlias
Copy link
Contributor

MrAlias commented Mar 30, 2023

It's been a while since I looked at this code, but, if I recall correctly, the client lock was included to ensure synchronous access to all client methods so the http and grpc clients didn't have to manage concurrency. This was before the temporality and aggregation selection was added to the reader.

As long as client implementations are updated to ensure they are concurrent safe and the coordination with the client field changing is handled. I don't see why this couldn't be updated.

@fracasula
Copy link
Author

fracasula commented Apr 14, 2023

What do you think about having a leaner client and pass a ConfigSelector along with the client to get temporality and aggregation selectors? The ConfigSelector wouldn't need any synchronization since it's just getting configuration variables.

Both the aggregation and temporality selectors are already coming from Config here and here.

Logically speaking the separation is already there. Instead of Client we could have:

// Client handles the transmission of OTLP data to an OTLP receiving endpoint.
type Client interface {
	// UploadMetrics transmits metric data to an OTLP receiver.
	//
	// All retry logic must be handled by UploadMetrics alone, the Exporter
	// does not implement any retry logic. All returned errors are considered
	// unrecoverable.
	UploadMetrics(context.Context, *mpb.ResourceMetrics) error

	// ForceFlush flushes any metric data held by an Client.
	//
	// The deadline or cancellation of the passed context must be honored. An
	// appropriate error should be returned in these situations.
	ForceFlush(context.Context) error

	// Shutdown flushes all metric data held by a Client and closes any
	// connections it holds open.
	//
	// The deadline or cancellation of the passed context must be honored. An
	// appropriate error should be returned in these situations.
	//
	// Shutdown will only be called once by the Exporter. Once a return value
	// is received by the Exporter from Shutdown the Client will not be used
	// anymore. Therefore all computational resources need to be released
	// after this is called so the Client can be garbage collected.
	Shutdown(context.Context) error
}

type ConfigSelector interface {
	// Temporality returns the Temporality to use for an instrument kind.
	Temporality(metric.InstrumentKind) metricdata.Temporality

	// Aggregation returns the Aggregation to use for an instrument kind.
	Aggregation(metric.InstrumentKind) aggregation.Aggregation
}

And then in exporter.go:

// exporter exports metrics data as OTLP.
type exporter struct {
	// Ensure synchronous access to the client across all functionality.
	clientMu       sync.Mutex
	client         Client
	configSelector ConfigSelector

	shutdownOnce sync.Once
}

Here's a draft PR.

@MrAlias MrAlias self-assigned this Jul 28, 2023
MrAlias added a commit to MrAlias/opentelemetry-go that referenced this issue Jul 28, 2023
The aggregation and temporality methods may be called concurrently. Both
with themselves and with other methods of the Reader or Exporter.
Implementations need to provide concurrent safe methods. This change
adds documentation about this requirement.

Part of open-telemetry#3925
@MrAlias MrAlias added this to the v1.17.0/v0.40.0 milestone Aug 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:metrics Part of OpenTelemetry Metrics bug Something isn't working pkg:SDK Related to an SDK package
Projects
No open projects
3 participants