Skip to content

Commit

Permalink
Support timeout in Collector Logs exporter (#894)
Browse files Browse the repository at this point in the history
* Support timeout in Collector Logs exporter

* Add default timeout

* Add duration to test
  • Loading branch information
damemi authored Sep 18, 2024
1 parent 78c2bb5 commit d6795b4
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
2 changes: 2 additions & 0 deletions exporter/collector/integrationtest/inmemoryotelexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,14 @@ func NewLogTestExporter(
logger, err := zap.NewDevelopment()
require.NoError(t, err)

var duration time.Duration
exporter, err := collector.NewGoogleCloudLogsExporter(
ctx,
cfg,
logger,
meterProvider,
"latest",
duration,
)
require.NoError(t, err)

Expand Down
3 changes: 3 additions & 0 deletions exporter/collector/integrationtest/logs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ func createLogsExporter(
cfg := test.CreateLogConfig()
// For sending to a real project, set the project ID from an env var.
cfg.ProjectID = os.Getenv("PROJECT_ID")

var duration time.Duration
exporter, err := collector.NewGoogleCloudLogsExporter(
ctx,
cfg,
logger,
noop.NewMeterProvider(),
"latest",
duration,
)
require.NoError(t, err)
err = exporter.Start(ctx, componenttest.NewNopHost())
Expand Down
14 changes: 12 additions & 2 deletions exporter/collector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ type LogsExporter struct {
loggingClient *loggingv2.Client
cfg Config
mapper logMapper
timeout time.Duration
}

type logMapper struct {
Expand All @@ -163,6 +164,7 @@ func NewGoogleCloudLogsExporter(
log *zap.Logger,
meterProvider metric.MeterProvider,
version string,
timeout time.Duration,
) (*LogsExporter, error) {
setVersionInUserAgent(&cfg, version)
obs := selfObservability{
Expand All @@ -171,8 +173,9 @@ func NewGoogleCloudLogsExporter(
}

return &LogsExporter{
cfg: cfg,
obs: obs,
cfg: cfg,
obs: obs,
timeout: timeout,
mapper: logMapper{
obs: obs,
cfg: cfg,
Expand Down Expand Up @@ -364,6 +367,13 @@ func mergeLogLabels(instrumentationSource, instrumentationVersion string, resour
}

func (l *LogsExporter) writeLogEntries(ctx context.Context, batch []*logpb.LogEntry) (*logpb.WriteLogEntriesResponse, error) {
timeout := l.timeout
if timeout <= 0 {
timeout = 12 * time.Second
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

request := &logpb.WriteLogEntriesRequest{
PartialSuccess: true,
Entries: batch,
Expand Down

0 comments on commit d6795b4

Please sign in to comment.