-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
receiver.go
152 lines (126 loc) · 4.02 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package adapter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
import (
"context"
"fmt"
"sync"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/extension/experimental/storage"
rcvr "go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/multierr"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
)
type receiver struct {
set component.TelemetrySettings
id component.ID
wg sync.WaitGroup
cancel context.CancelFunc
pipe pipeline.Pipeline
emitter *helper.LogEmitter
consumer consumer.Logs
converter *Converter
obsrecv *receiverhelper.ObsReport
storageID *component.ID
storageClient storage.Client
}
// Ensure this receiver adheres to required interface
var _ rcvr.Logs = (*receiver)(nil)
// Start tells the receiver to start
func (r *receiver) Start(ctx context.Context, host component.Host) error {
rctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.set.Logger.Info("Starting stanza receiver")
if err := r.setStorageClient(ctx, host); err != nil {
return fmt.Errorf("storage client: %w", err)
}
if err := r.pipe.Start(r.storageClient); err != nil {
return fmt.Errorf("start stanza: %w", err)
}
r.converter.Start()
// Below we're starting 2 loops:
// * one which reads all the logs produced by the emitter and then forwards
// them to converter
// ...
r.wg.Add(1)
go r.emitterLoop(rctx)
// ...
// * second one which reads all the logs produced by the converter
// (aggregated by Resource) and then calls consumer to consumer them.
r.wg.Add(1)
go r.consumerLoop(rctx)
// Those 2 loops are started in separate goroutines because batching in
// the emitter loop can cause a flush, caused by either reaching the max
// flush size or by the configurable ticker which would in turn cause
// a set of log entries to be available for reading in converter's out
// channel. In order to prevent backpressure, reading from the converter
// channel and batching are done in those 2 goroutines.
return nil
}
// emitterLoop reads the log entries produced by the emitter and batches them
// in converter.
func (r *receiver) emitterLoop(ctx context.Context) {
defer r.wg.Done()
// Don't create done channel on every iteration.
doneChan := ctx.Done()
for {
select {
case <-doneChan:
r.set.Logger.Debug("Receive loop stopped")
return
case e, ok := <-r.emitter.OutChannel():
if !ok {
continue
}
if err := r.converter.Batch(e); err != nil {
r.set.Logger.Error("Could not add entry to batch", zap.Error(err))
}
}
}
}
// consumerLoop reads converter log entries and calls the consumer to consumer them.
func (r *receiver) consumerLoop(ctx context.Context) {
defer r.wg.Done()
// Don't create done channel on every iteration.
doneChan := ctx.Done()
pLogsChan := r.converter.OutChannel()
for {
select {
case <-doneChan:
r.set.Logger.Debug("Consumer loop stopped")
return
case pLogs, ok := <-pLogsChan:
if !ok {
r.set.Logger.Debug("Converter channel got closed")
continue
}
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
logRecordCount := pLogs.LogRecordCount()
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
if cErr != nil {
r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr))
}
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
}
}
}
// Shutdown is invoked during service shutdown
func (r *receiver) Shutdown(ctx context.Context) error {
if r.cancel == nil {
return nil
}
r.set.Logger.Info("Stopping stanza receiver")
pipelineErr := r.pipe.Stop()
r.converter.Stop()
r.cancel()
r.wg.Wait()
if r.storageClient != nil {
clientErr := r.storageClient.Close(ctx)
return multierr.Combine(pipelineErr, clientErr)
}
return pipelineErr
}