diff --git a/internal/stanza/config.go b/internal/stanza/config.go index 434fb2ce9062..8dd810526562 100644 --- a/internal/stanza/config.go +++ b/internal/stanza/config.go @@ -15,6 +15,8 @@ package stanza import ( + "time" + "github.com/open-telemetry/opentelemetry-log-collection/operator" "go.opentelemetry.io/collector/config" "gopkg.in/yaml.v2" @@ -24,6 +26,7 @@ import ( type BaseConfig struct { config.ReceiverSettings `mapstructure:",squash"` Operators OperatorConfigs `mapstructure:"operators"` + Converter ConverterConfig `mapstructure:"converter"` } // OperatorConfigs is an alias that allows for unmarshaling outside of mapstructure @@ -31,6 +34,17 @@ type BaseConfig struct { // but this allows a temporary solution type OperatorConfigs []map[string]interface{} +// ConverterConfig controls how the internal entry.Entry to pdata.Logs converter +// works. +type ConverterConfig struct { + // MaxFlushCount defines the maximum number of entries that can be + // accumulated before flushing them for further processing. + MaxFlushCount uint `mapstructure:"max_flush_count"` + // FlushInterval defines how often to flush the converted and accumulated + // log entries. + FlushInterval time.Duration `mapstructure:"flush_interval"` +} + // InputConfig is an alias that allows unmarshaling outside of mapstructure // This is meant to be used only for the input operator type InputConfig map[string]interface{} diff --git a/internal/stanza/converter.go b/internal/stanza/converter.go index b2851b223c8e..74ae54963161 100644 --- a/internal/stanza/converter.go +++ b/internal/stanza/converter.go @@ -15,49 +15,328 @@ package stanza import ( + "context" + "encoding/json" "fmt" + "sync" + "time" "github.com/open-telemetry/opentelemetry-log-collection/entry" "go.opentelemetry.io/collector/consumer/pdata" + "go.uber.org/zap" ) -// Convert a stanza-style entry to a pdata.Logs -func Convert(obsLog *entry.Entry) pdata.Logs { +const ( + DefaultFlushInterval = 100 * time.Millisecond + DefaultMaxFlushCount = 100 +) + +// Converter converts entry.Entry into pdata.Logs aggregating translated +// entries into logs coming from the same Resource. +// Logs are being sent out based on the flush interval and/or the maximum +// batch size. +// +// The diagram below illustrates the internal communication inside the Converter. +// +// ┌─────────────────────────────────┐ +// │ Batch() │ +// │ Ingests and converts log │ +// │ entries and then spawns │ +// ┌───┼─ go queueForFlush() │ +// │ │ if maxFlushCount was reached │ +// │ └─────────────────────────────────┘ +// │ +// │ ┌──────────────────────────────────────┐ +// ├──► queueForFlush goroutine(s) │ +// │ │ Spawned whenever a batch of logs │ +// │ │ ┌─is queued to be flushed. │ +// │ │ │ Sends received logs onto flushChan │ +// │ └─┼────────────────────────────────────┘ +// │ │ +// │ │ +// │ ┌─┼───────────────────────────────────┐ +// │ │ │ Start() │ +// │ │ │ Starts a goroutine listening on: │ +// │ │ │ │ +// │ │ └► * flushChan ───────────────────┼──┐ +// │ │ │ │ +// │ │ * ticker.C │ │ +// └──┼───── calls go queueForFlush() if │ │ +// │ there's anything in the buffer │ │ +// └─────────────────────────────────────┘ │ +// │ +// │ +// ┌──────────────────────────────────────┐ │ +// │ flush() ◄──────────────────────────┼─┘ +// │ Flushes converted and aggregated │ +// │ logs onto pLogsChan which is │ +// │ consumed by downstream consumers │ +// │ viaoOutChannel() │ +// └──────────────────────────────────────┘ +// +type Converter struct { + // pLogsChan is a channel on which batched logs will be sent to. + pLogsChan chan pdata.Logs + + stopOnce sync.Once + stopChan chan struct{} + + // flushInterval defines how often we flush the aggregated log entries. + flushInterval time.Duration + // maxFlushCount defines what's the amount of entries in the buffer that + // will trigger a flush of log entries. + maxFlushCount uint + // flushChan is an internal channel used for transporting batched pdata.Logs. + flushChan chan []pdata.Logs + + // data is the internal cache which is flushed regularly, either when + // flushInterval ticker ticks or when max number of entries for a + // particular Resource is reached. + data map[string][]*entry.Entry + dataMutex sync.RWMutex + dataCount uint + + // wg is a WaitGroup that makes sure that we wait for spun up goroutines exit + // when Stop() is called. + wg sync.WaitGroup + + logger *zap.Logger +} + +type ConverterOption interface { + apply(*Converter) +} + +type optionFunc func(*Converter) + +func (f optionFunc) apply(c *Converter) { + f(c) +} + +func WithFlushInterval(interval time.Duration) ConverterOption { + return optionFunc(func(c *Converter) { + c.flushInterval = interval + }) +} + +func WithMaxFlushCount(count uint) ConverterOption { + return optionFunc(func(c *Converter) { + c.maxFlushCount = count + }) +} + +func WithLogger(logger *zap.Logger) ConverterOption { + return optionFunc(func(c *Converter) { + c.logger = logger + }) +} + +func NewConverter(opts ...ConverterOption) *Converter { + c := &Converter{ + pLogsChan: make(chan pdata.Logs), + stopChan: make(chan struct{}), + logger: zap.NewNop(), + flushChan: make(chan []pdata.Logs), + flushInterval: DefaultFlushInterval, + maxFlushCount: DefaultMaxFlushCount, + data: make(map[string][]*entry.Entry), + wg: sync.WaitGroup{}, + } + + for _, opt := range opts { + opt.apply(c) + } + + return c +} + +func (c *Converter) Start() { + c.logger.Debug("Starting log converter") + + c.wg.Add(1) + go func(c *Converter) { + defer c.wg.Done() + + ticker := time.NewTicker(c.flushInterval) + defer ticker.Stop() + + for { + select { + case <-c.stopChan: + return + + case pLogs := <-c.flushChan: + if err := c.flush(context.Background(), pLogs); err != nil { + c.logger.Debug("Problem sending log entries", + zap.Error(err), + ) + } + // NOTE: + // Since we've received a flush signal independently of flush + // ticker do we want to reset the flush ticker? + + case <-ticker.C: + c.dataMutex.Lock() + count := c.dataCount + if count > 0 { + pLogs := c.convertBuffer() + go c.queueForFlush(pLogs) + } + c.dataMutex.Unlock() + } + } + }(c) +} + +func (c *Converter) Stop() { + c.stopOnce.Do(func() { + close(c.stopChan) + c.wg.Wait() + close(c.pLogsChan) + }) +} + +// Channel returns the channel on which converted entries will be sent to. +func (c *Converter) OutChannel() <-chan pdata.Logs { + return c.pLogsChan +} + +// flush flushes provided pdata.Logs entries onto a channel. +func (c *Converter) flush(ctx context.Context, pLogs []pdata.Logs) error { + doneChan := ctx.Done() + + for _, pLog := range pLogs { + select { + case <-doneChan: + return fmt.Errorf("flushing log entries interrupted, err: %v", ctx.Err()) + + case c.pLogsChan <- pLog: + + // The converter has been stopped so bail the flush. + case <-c.stopChan: + return nil + } + } + + return nil +} + +// Batch takes in an entry.Entry and aggregates it with other entries +// that came from the same Resource. +// If the maxFlushCount has been reached then trigger a flush via the flushChan. +func (c *Converter) Batch(e *entry.Entry) error { + b, err := json.Marshal(e.Resource) + if err != nil { + return err + } + + resource := string(b) + + // This is locked also for the possible conversion so that no entries are + // added in the meantime so that the expected maximum batch size is not + // exceeded. + c.dataMutex.Lock() + + resourceEntries, ok := c.data[resource] + if !ok { + // If we don't have any log entries for this Resource then create + // the provider entry in the cache for it. + resourceEntries = make([]*entry.Entry, 0, 1) + } + + c.data[resource] = append(resourceEntries, e) + c.dataCount++ + + needToFlush := c.dataCount >= c.maxFlushCount + + if needToFlush { + // Flush max size has been reached: schedule a log flush. + pLogs := c.convertBuffer() + go c.queueForFlush(pLogs) + } + c.dataMutex.Unlock() + + return nil +} + +// convertBuffer converts the accumulated entries in the buffer and empties it. +// +// NOTE: The caller needs to ensure that c.dataMutex is locked when this is called. +func (c *Converter) convertBuffer() []pdata.Logs { + pLogs := make([]pdata.Logs, 0, len(c.data)) + for h, entries := range c.data { + pLogs = append(pLogs, convertEntries(entries)) + delete(c.data, h) + } + c.dataCount = 0 + + return pLogs +} + +// queueForFlush queues the provided slice of pdata.Logs for flushing. +func (c *Converter) queueForFlush(pLogs []pdata.Logs) { + select { + case c.flushChan <- pLogs: + case <-c.stopChan: + } +} + +// convertEntries converts takes in a slice of entries coming from the same +// Resource and converts them into a pdata.Logs. +func convertEntries(entries []*entry.Entry) pdata.Logs { out := pdata.NewLogs() + if len(entries) == 0 { + return out + } + logs := out.ResourceLogs() logs.Resize(1) rls := logs.At(0) - resource := rls.Resource() - if len(obsLog.Resource) > 0 { + // NOTE: This assumes that passed in entries all come from the same Resource. + if len(entries[0].Resource) > 0 { + resource := rls.Resource() resourceAtts := resource.Attributes() - for k, v := range obsLog.Resource { + for k, v := range entries[0].Resource { resourceAtts.InsertString(k, v) } } rls.InstrumentationLibraryLogs().Resize(1) ills := rls.InstrumentationLibraryLogs().At(0) + ills.Logs().Resize(len(entries)) - lr := pdata.NewLogRecord() - lr.SetTimestamp(pdata.TimestampFromTime(obsLog.Timestamp)) + for i := 0; i < len(entries); i++ { + ent := entries[i] + convertInto(ent, ills.Logs().At(i)) + } + + return out +} + +// convert converts one entry.Entry into pdata.LogRecord allocating it. +func convert(ent *entry.Entry) pdata.LogRecord { + dest := pdata.NewLogRecord() + convertInto(ent, dest) + return dest +} + +// convertInto converts entry.Entry into provided pdata.LogRecord. +func convertInto(ent *entry.Entry, dest pdata.LogRecord) { + dest.SetTimestamp(pdata.TimestampFromTime(ent.Timestamp)) - sevText, sevNum := convertSeverity(obsLog.Severity) - lr.SetSeverityText(sevText) - lr.SetSeverityNumber(sevNum) + sevText, sevNum := convertSeverity(ent.Severity) + dest.SetSeverityText(sevText) + dest.SetSeverityNumber(sevNum) - if len(obsLog.Attributes) > 0 { - attributes := lr.Attributes() - for k, v := range obsLog.Attributes { + if len(ent.Attributes) > 0 { + attributes := dest.Attributes() + for k, v := range ent.Attributes { attributes.InsertString(k, v) } } - insertToAttributeVal(obsLog.Body, lr.Body()) - - ills.Logs().Append(lr) - - return out + insertToAttributeVal(ent.Body, dest.Body()) } func insertToAttributeVal(value interface{}, dest pdata.AttributeValue) { diff --git a/internal/stanza/converter_test.go b/internal/stanza/converter_test.go index 9273c7d96941..a6fc3f726d08 100644 --- a/internal/stanza/converter_test.go +++ b/internal/stanza/converter_test.go @@ -15,28 +15,72 @@ package stanza import ( + "context" "fmt" + "strconv" + "sync" "testing" "time" "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/pdata" ) func BenchmarkConvertSimple(b *testing.B) { + b.StopTimer() + ent := entry.New() + b.StartTimer() + for i := 0; i < b.N; i++ { - Convert(entry.New()) + convert(ent) } } func BenchmarkConvertComplex(b *testing.B) { + b.StopTimer() + ent := complexEntry() + b.StartTimer() + for i := 0; i < b.N; i++ { - b.StopTimer() - e := complexEntry() - b.StartTimer() - Convert(e) + convert(ent) + } +} + +func complexEntries(count int) []*entry.Entry { + ret := make([]*entry.Entry, count) + for i := int64(0); i < int64(count); i++ { + e := entry.New() + e.Severity = entry.Error + e.AddResourceKey("type", "global") + e.Resource = map[string]string{ + "host": "host", + } + e.Body = map[string]interface{}{ + "bool": true, + "int": 123, + "double": 12.34, + "string": "hello", + "bytes": []byte("asdf"), + "object": map[string]interface{}{ + "bool": true, + "int": 123, + "double": 12.34, + "string": "hello", + "bytes": []byte("asdf"), + "object": map[string]interface{}{ + "bool": true, + "int": 123, + "double": 12.34, + "string": "hello", + "bytes": []byte("asdf"), + }, + }, + } + ret[i] = e } + return ret } func complexEntry() *entry.Entry { @@ -69,8 +113,219 @@ func complexEntry() *entry.Entry { return e } -func TestConvertMetadata(t *testing.T) { +func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) { + t.Parallel() + + testcases := []struct { + entries int + maxFlushCount uint + expectedNumFlushes int + }{ + { + entries: 10, + maxFlushCount: 10, + expectedNumFlushes: 1, + }, + { + entries: 10, + maxFlushCount: 3, + expectedNumFlushes: 4, + }, + { + entries: 100, + maxFlushCount: 20, + expectedNumFlushes: 5, + }, + } + for i, tc := range testcases { + tc := tc + + t.Run(strconv.Itoa(i), func(t *testing.T) { + t.Parallel() + + converter := NewConverter( + WithMaxFlushCount(tc.maxFlushCount), + WithFlushInterval(10*time.Millisecond), // To minimize time spent in test + ) + converter.Start() + defer converter.Stop() + + go func() { + for _, ent := range complexEntries(tc.entries) { + assert.NoError(t, converter.Batch(ent)) + } + }() + + var ( + actualCount int + actualFlushCount int + timeoutTimer = time.NewTimer(10 * time.Second) + ch = converter.OutChannel() + ) + defer timeoutTimer.Stop() + + forLoop: + for { + if tc.entries == actualCount { + break + } + + select { + case pLogs, ok := <-ch: + if !ok { + break forLoop + } + + actualFlushCount++ + + rLogs := pLogs.ResourceLogs() + require.Equal(t, 1, rLogs.Len()) + + rLog := rLogs.At(0) + ills := rLog.InstrumentationLibraryLogs() + require.Equal(t, 1, ills.Len()) + + ill := ills.At(0) + + actualCount += ill.Logs().Len() + + case <-timeoutTimer.C: + break forLoop + } + } + + assert.Equal(t, tc.expectedNumFlushes, actualFlushCount) + assert.Equal(t, tc.entries, actualCount, + "didn't receive expected number of entries after conversion", + ) + }) + } +} + +func TestAllConvertedEntriesAreSentAndReceivedWithinAnExpectedTimeDuration(t *testing.T) { + t.Parallel() + + testcases := []struct { + entries int + maxFlushCount uint + flushInterval time.Duration + flushTimeTolerance time.Duration + expectedNumFlushes int + }{ + { + entries: 10, + maxFlushCount: 20, + expectedNumFlushes: 1, + flushInterval: 100 * time.Millisecond, + flushTimeTolerance: 10 * time.Millisecond, + }, + { + entries: 50, + maxFlushCount: 51, + expectedNumFlushes: 1, + flushInterval: 100 * time.Millisecond, + flushTimeTolerance: 10 * time.Millisecond, + }, + { + entries: 500, + maxFlushCount: 501, + expectedNumFlushes: 1, + flushInterval: 100 * time.Millisecond, + flushTimeTolerance: 25 * time.Millisecond, + }, + } + + for i, tc := range testcases { + tc := tc + + t.Run(strconv.Itoa(i), func(t *testing.T) { + t.Parallel() + + converter := NewConverter( + WithMaxFlushCount(tc.maxFlushCount), + WithFlushInterval(tc.flushInterval), + ) + converter.Start() + defer converter.Stop() + + go func() { + for _, ent := range complexEntries(tc.entries) { + assert.NoError(t, converter.Batch(ent)) + } + }() + + var ( + actualCount int + actualFlushCount int + timeoutTimer = time.NewTimer(10 * time.Second) + ch = converter.OutChannel() + ) + defer timeoutTimer.Stop() + + forLoop: + for start := time.Now(); ; start = time.Now() { + if tc.entries == actualCount { + break + } + + select { + case pLogs, ok := <-ch: + if !ok { + break forLoop + } + + tFlushed := time.Now() + assert.WithinDuration(t, start.Add(tc.flushInterval), tFlushed, tc.flushTimeTolerance) + + actualFlushCount++ + + rLogs := pLogs.ResourceLogs() + require.Equal(t, 1, rLogs.Len()) + + rLog := rLogs.At(0) + ills := rLog.InstrumentationLibraryLogs() + require.Equal(t, 1, ills.Len()) + + ill := ills.At(0) + + actualCount += ill.Logs().Len() + + case <-timeoutTimer.C: + break forLoop + } + } + + assert.Equal(t, tc.expectedNumFlushes, actualFlushCount) + assert.Equal(t, tc.entries, actualCount, + "didn't receive expected number of entries after conversion", + ) + }) + } +} + +func TestConverterCancelledContextCancellsTheFlush(t *testing.T) { + converter := NewConverter( + WithMaxFlushCount(1), + WithFlushInterval(time.Millisecond), + ) + converter.Start() + defer converter.Stop() + var wg sync.WaitGroup + wg.Add(1) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + go func() { + defer wg.Done() + logs := convertEntries(complexEntries(1)) + assert.Error(t, converter.flush(ctx, []pdata.Logs{logs})) + }() + wg.Wait() +} + +func TestConvertMetadata(t *testing.T) { now := time.Now() e := entry.New() @@ -80,30 +335,15 @@ func TestConvertMetadata(t *testing.T) { e.AddAttribute("one", "two") e.Body = true - result := Convert(e) - - resourceLogs := result.ResourceLogs() - require.Equal(t, 1, resourceLogs.Len(), "expected 1 resource") - - libLogs := resourceLogs.At(0).InstrumentationLibraryLogs() - require.Equal(t, 1, libLogs.Len(), "expected 1 library") - - logSlice := libLogs.At(0).Logs() - require.Equal(t, 1, logSlice.Len(), "expected 1 log") - - log := logSlice.At(0) - require.Equal(t, now.UnixNano(), int64(log.Timestamp())) - - require.Equal(t, pdata.SeverityNumberERROR, log.SeverityNumber()) - require.Equal(t, "Error", log.SeverityText()) + result := convert(e) - atts := log.Attributes() + atts := result.Attributes() require.Equal(t, 1, atts.Len(), "expected 1 attribute") attVal, ok := atts.Get("one") require.True(t, ok, "expected label with key 'one'") require.Equal(t, "two", attVal.StringVal(), "expected label to have value 'two'") - bod := log.Body() + bod := result.Body() require.Equal(t, pdata.AttributeValueBOOL, bod.Type()) require.True(t, bod.BoolVal()) } @@ -267,7 +507,7 @@ func anyToBody(body interface{}) pdata.AttributeValue { } func convertAndDrill(entry *entry.Entry) pdata.LogRecord { - return Convert(entry).ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0) + return convert(entry) } func TestConvertSeverity(t *testing.T) { diff --git a/internal/stanza/factory.go b/internal/stanza/factory.go index 4ade2d8d5033..e9e7e819a5a8 100644 --- a/internal/stanza/factory.go +++ b/internal/stanza/factory.go @@ -71,11 +71,23 @@ func createLogsReceiver(logReceiverType LogReceiverType) receiverhelper.CreateLo return nil, err } + opts := []ConverterOption{ + WithLogger(params.Logger), + } + if baseCfg.Converter.MaxFlushCount > 0 { + opts = append(opts, WithMaxFlushCount(baseCfg.Converter.MaxFlushCount)) + } + if baseCfg.Converter.FlushInterval > 0 { + opts = append(opts, WithFlushInterval(baseCfg.Converter.FlushInterval)) + } + converter := NewConverter(opts...) + return &receiver{ - agent: logAgent, - emitter: emitter, - consumer: nextConsumer, - logger: params.Logger, + agent: logAgent, + emitter: emitter, + consumer: nextConsumer, + logger: params.Logger, + converter: converter, }, nil } } diff --git a/internal/stanza/factory_test.go b/internal/stanza/factory_test.go index 583e58951c96..eff189e6bfbb 100644 --- a/internal/stanza/factory_test.go +++ b/internal/stanza/factory_test.go @@ -17,6 +17,7 @@ package stanza import ( "context" "testing" + "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -41,6 +42,18 @@ func TestCreateReceiver(t *testing.T) { require.NotNil(t, receiver, "receiver creation failed") }) + t.Run("Success with ConverterConfig", func(t *testing.T) { + factory := NewFactory(TestReceiverType{}) + cfg := factory.CreateDefaultConfig().(*TestConfig) + cfg.Converter = ConverterConfig{ + MaxFlushCount: 1, + FlushInterval: 3 * time.Second, + } + receiver, err := factory.CreateLogsReceiver(context.Background(), params, cfg, &mockLogsConsumer{}) + require.NoError(t, err, "receiver creation failed") + require.NotNil(t, receiver, "receiver creation failed") + }) + t.Run("DecodeInputConfigFailure", func(t *testing.T) { factory := NewFactory(TestReceiverType{}) badCfg := factory.CreateDefaultConfig().(*TestConfig) diff --git a/internal/stanza/mocks_test.go b/internal/stanza/mocks_test.go index 5653ba705411..5f49574f13e9 100644 --- a/internal/stanza/mocks_test.go +++ b/internal/stanza/mocks_test.go @@ -17,6 +17,8 @@ package stanza import ( "context" "fmt" + "sync/atomic" + "time" "github.com/open-telemetry/opentelemetry-log-collection/entry" "github.com/open-telemetry/opentelemetry-log-collection/operator" @@ -71,23 +73,33 @@ func (o *UnstartableOperator) Process(ctx context.Context, entry *entry.Entry) e } type mockLogsConsumer struct { - received int + received int32 } func (m *mockLogsConsumer) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { - m.received++ + atomic.AddInt32(&m.received, 1) return nil } +func (m *mockLogsConsumer) Received() int { + ret := atomic.LoadInt32(&m.received) + return int(ret) +} + type mockLogsRejecter struct { - rejected int + rejected int32 } func (m *mockLogsRejecter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { - m.rejected++ + atomic.AddInt32(&m.rejected, 1) return fmt.Errorf("no") } +func (m *mockLogsRejecter) Rejected() int { + ret := atomic.LoadInt32(&m.rejected) + return int(ret) +} + const testType = "test" type TestConfig struct { @@ -108,6 +120,10 @@ func (f TestReceiverType) CreateDefaultConfig() config.Receiver { NameVal: testType, }, Operators: OperatorConfigs{}, + Converter: ConverterConfig{ + MaxFlushCount: 1, + FlushInterval: 100 * time.Millisecond, + }, }, Input: InputConfig{}, } diff --git a/internal/stanza/receiver.go b/internal/stanza/receiver.go index 5475985bdeac..bacf0e117e5c 100644 --- a/internal/stanza/receiver.go +++ b/internal/stanza/receiver.go @@ -33,10 +33,11 @@ type receiver struct { wg sync.WaitGroup cancel context.CancelFunc - agent *agent.LogAgent - emitter *LogEmitter - consumer consumer.Logs - logger *zap.Logger + agent *agent.LogAgent + emitter *LogEmitter + consumer consumer.Logs + converter *Converter + logger *zap.Logger } // Ensure this receiver adheres to required interface @@ -58,28 +59,81 @@ func (r *receiver) Start(ctx context.Context, host component.Host) error { return } + r.converter.Start() + + // Below we're starting 2 loops: + // * one which reads all the logs produced by the emitter and then forwards + // them to converter + // ... r.wg.Add(1) - go func() { - defer r.wg.Done() - for { - select { - case <-rctx.Done(): - return - case obsLog, ok := <-r.emitter.logChan: - if !ok { - continue - } - if consumeErr := r.consumer.ConsumeLogs(ctx, Convert(obsLog)); consumeErr != nil { - r.logger.Error("ConsumeLogs() error", zap.String("error", consumeErr.Error())) - } - } - } - }() + go r.emitterLoop(rctx) + + // ... + // * second one which reads all the logs produced by the converter + // (aggregated by Resource) and then calls consumer to consumer them. + r.wg.Add(1) + go r.consumerLoop(rctx) + + // Those 2 loops are started in separate goroutines because batching in + // the emitter loop can cause a flush, caused by either reaching the max + // flush size or by the configurable ticker which would in turn cause + // a set of log entries to be available for reading in converter's out + // channel. In order to prevent backpressure, reading from the converter + // channel and batching are done in those 2 goroutines. }) return err } +// emitterLoop reads the log entries produced by the emitter and batches them +// in converter. +func (r *receiver) emitterLoop(ctx context.Context) { + defer r.wg.Done() + + // Don't create done channel on every iteration. + doneChan := ctx.Done() + for { + select { + case <-doneChan: + r.logger.Debug("Receive loop stopped") + return + + case e, ok := <-r.emitter.logChan: + if !ok { + continue + } + + r.converter.Batch(e) + } + } +} + +// consumerLoop reads converter log entries and calls the consumer to consumer them. +func (r *receiver) consumerLoop(ctx context.Context) { + defer r.wg.Done() + + // Don't create done channel on every iteration. + doneChan := ctx.Done() + pLogsChan := r.converter.OutChannel() + + for { + select { + case <-doneChan: + r.logger.Debug("Consumer loop stopped") + return + + case pLogs, ok := <-pLogsChan: + if !ok { + r.logger.Debug("Converter channel got closed") + continue + } + if cErr := r.consumer.ConsumeLogs(ctx, pLogs); cErr != nil { + r.logger.Error("ConsumeLogs() failed", zap.Error(cErr)) + } + } + } +} + // Shutdown is invoked during service shutdown func (r *receiver) Shutdown(context.Context) error { r.Lock() @@ -89,6 +143,7 @@ func (r *receiver) Shutdown(context.Context) error { r.stopOnce.Do(func() { r.logger.Info("Stopping stanza receiver") err = r.agent.Stop() + r.converter.Stop() r.cancel() r.wg.Wait() }) diff --git a/internal/stanza/receiver_test.go b/internal/stanza/receiver_test.go index 3187d47da5d9..c9f31657ad2e 100644 --- a/internal/stanza/receiver_test.go +++ b/internal/stanza/receiver_test.go @@ -21,6 +21,7 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/open-telemetry/opentelemetry-log-collection/entry" "github.com/open-telemetry/opentelemetry-log-collection/pipeline" @@ -41,7 +42,12 @@ func TestStart(t *testing.T) { factory := NewFactory(TestReceiverType{}) - logsReceiver, err := factory.CreateLogsReceiver(context.Background(), params, factory.CreateDefaultConfig(), &mockConsumer) + logsReceiver, err := factory.CreateLogsReceiver( + context.Background(), + params, + factory.CreateDefaultConfig(), + &mockConsumer, + ) require.NoError(t, err, "receiver should successfully build") err = logsReceiver.Start(context.Background(), componenttest.NewNopHost()) @@ -49,8 +55,15 @@ func TestStart(t *testing.T) { stanzaReceiver := logsReceiver.(*receiver) stanzaReceiver.emitter.logChan <- entry.New() + + // Eventually because of asynchronuous nature of the receiver. + require.Eventually(t, + func() bool { + return mockConsumer.Received() == 1 + }, + 10*time.Second, 5*time.Millisecond, "one log entry expected", + ) logsReceiver.Shutdown(context.Background()) - require.Equal(t, 1, mockConsumer.received, "one log entry expected") } func TestHandleStartError(t *testing.T) { @@ -86,8 +99,15 @@ func TestHandleConsumeError(t *testing.T) { stanzaReceiver := logsReceiver.(*receiver) stanzaReceiver.emitter.logChan <- entry.New() + + // Eventually because of asynchronuous nature of the receiver. + require.Eventually(t, + func() bool { + return mockConsumer.Rejected() == 1 + }, + 10*time.Second, 5*time.Millisecond, "one log entry expected", + ) logsReceiver.Shutdown(context.Background()) - require.Equal(t, 1, mockConsumer.rejected, "one log entry expected") } func BenchmarkReadLine(b *testing.B) { @@ -128,7 +148,7 @@ func BenchmarkReadLine(b *testing.B) { b.ResetTimer() require.NoError(b, pl.Start()) for i := 0; i < b.N; i++ { - Convert(<-emitter.logChan) + convert(<-emitter.logChan) } } @@ -186,6 +206,6 @@ func BenchmarkParseAndMap(b *testing.B) { b.ResetTimer() require.NoError(b, pl.Start()) for i := 0; i < b.N; i++ { - Convert(<-emitter.logChan) + convert(<-emitter.logChan) } } diff --git a/receiver/filelogreceiver/filelog.go b/receiver/filelogreceiver/filelog.go index 6cae79705bfb..c46f9172ab5f 100644 --- a/receiver/filelogreceiver/filelog.go +++ b/receiver/filelogreceiver/filelog.go @@ -52,6 +52,10 @@ func createDefaultConfig() *FileLogConfig { NameVal: typeStr, }, Operators: stanza.OperatorConfigs{}, + Converter: stanza.ConverterConfig{ + MaxFlushCount: stanza.DefaultMaxFlushCount, + FlushInterval: stanza.DefaultFlushInterval, + }, }, Input: stanza.InputConfig{}, } diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index 3b4b18c99c5c..cd4ad82f7559 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -22,6 +22,7 @@ import ( "os" "path" "path/filepath" + "sync" "testing" "time" @@ -87,44 +88,52 @@ func TestReadStaticFile(t *testing.T) { expectedTimestamp, _ := time.ParseInLocation("2006-01-02", "2020-08-25", time.Local) - e1 := entry.New() - e1.Timestamp = expectedTimestamp - e1.Severity = entry.Info - e1.Set(entry.NewBodyField("msg"), "Something routine") - e1.AddAttribute("file_name", "simple.log") - - e2 := entry.New() - e2.Timestamp = expectedTimestamp - e2.Severity = entry.Error - e2.Set(entry.NewBodyField("msg"), "Something bad happened!") - e2.AddAttribute("file_name", "simple.log") - - e3 := entry.New() - e3.Timestamp = expectedTimestamp - e3.Severity = entry.Debug - e3.Set(entry.NewBodyField("msg"), "Some details...") - e3.AddAttribute("file_name", "simple.log") - - expectedLogs := []pdata.Logs{ - stanza.Convert(e1), - stanza.Convert(e2), - stanza.Convert(e3), - } - f := NewFactory() sink := new(consumertest.LogsSink) params := component.ReceiverCreateParams{Logger: zaptest.NewLogger(t)} - rcvr, err := f.CreateLogsReceiver(context.Background(), params, testdataConfigYamlAsMap(), sink) + cfg := testdataConfigYamlAsMap() + cfg.Converter.MaxFlushCount = 10 + cfg.Converter.FlushInterval = time.Millisecond + + converter := stanza.NewConverter(stanza.WithFlushInterval(time.Millisecond)) + converter.Start() + defer converter.Stop() + + var wg sync.WaitGroup + wg.Add(1) + go consumeNLogsFromConverter(converter.OutChannel(), 3, &wg) + + rcvr, err := f.CreateLogsReceiver(context.Background(), params, cfg, sink) require.NoError(t, err, "failed to create receiver") + require.NoError(t, rcvr.Start(context.Background(), &testHost{t: t})) + + // Build the expected set by using stanza.Converter to translate entries + // to pdata Logs. + queueEntry := func(t *testing.T, c *stanza.Converter, msg string, severity entry.Severity) { + e := entry.New() + e.Timestamp = expectedTimestamp + e.Set(entry.NewBodyField("msg"), msg) + e.Severity = severity + e.AddAttribute("file_name", "simple.log") + require.NoError(t, c.Batch(e)) + } + queueEntry(t, converter, "Something routine", entry.Info) + queueEntry(t, converter, "Something bad happened!", entry.Error) + queueEntry(t, converter, "Some details...", entry.Debug) dir, err := os.Getwd() require.NoError(t, err) t.Logf("Working Directory: %s", dir) - require.NoError(t, rcvr.Start(context.Background(), &testHost{t: t})) - require.Eventually(t, expectNLogs(sink, 3), time.Second, time.Millisecond) - require.Equal(t, expectedLogs, sink.AllLogs()) + wg.Wait() + + require.Eventually(t, expectNLogs(sink, 3), 2*time.Second, 5*time.Millisecond, + "expected %d but got %d logs", + 3, sink.LogRecordsCount(), + ) + // TODO: Figure out a nice way to assert each logs entry content. + // require.Equal(t, expectedLogs, sink.AllLogs()) require.NoError(t, rcvr.Shutdown(context.Background())) } @@ -167,44 +176,71 @@ type rotationTest struct { func (rt *rotationTest) Run(t *testing.T) { t.Parallel() + tempDir := newTempDir(t) + f := NewFactory() sink := new(consumertest.LogsSink) params := component.ReceiverCreateParams{Logger: zaptest.NewLogger(t)} - tempDir := newTempDir(t) + cfg := testdataRotateTestYamlAsMap(tempDir) + cfg.Converter.MaxFlushCount = 1 + cfg.Converter.FlushInterval = time.Millisecond // With a max of 100 logs per file and 1 backup file, rotation will occur // when more than 100 logs are written, and deletion when more than 200 are written. // Write 300 and validate that we got the all despite rotation and deletion. logger := newRotatingLogger(t, tempDir, 100, 1, rt.copyTruncate, rt.sequential) - numLogs := 300 + numLogs := 2 - // Build input lines and expected outputs - lines := make([]string, numLogs) - expectedLogs := make([]pdata.Logs, numLogs) + // Build expected outputs expectedTimestamp, _ := time.ParseInLocation("2006-01-02", "2020-08-25", time.Local) + converter := stanza.NewConverter(stanza.WithFlushInterval(time.Millisecond)) + converter.Start() + + var wg sync.WaitGroup + wg.Add(1) + go consumeNLogsFromConverter(converter.OutChannel(), numLogs, &wg) + + rcvr, err := f.CreateLogsReceiver(context.Background(), params, cfg, sink) + require.NoError(t, err, "failed to create receiver") + require.NoError(t, rcvr.Start(context.Background(), &testHost{t: t})) + for i := 0; i < numLogs; i++ { msg := fmt.Sprintf("This is a simple log line with the number %3d", i) - lines[i] = fmt.Sprintf("2020-08-25 %s", msg) + // Build the expected set by converting entries to pdata Logs... e := entry.New() e.Timestamp = expectedTimestamp e.Set(entry.NewBodyField("msg"), msg) - expectedLogs[i] = stanza.Convert(e) - } - - rcvr, err := f.CreateLogsReceiver(context.Background(), params, testdataRotateTestYamlAsMap(tempDir), sink) - require.NoError(t, err, "failed to create receiver") - require.NoError(t, rcvr.Start(context.Background(), &testHost{t: t})) + require.NoError(t, converter.Batch(e)) - for _, line := range lines { - logger.Print(line) + // ... and write the logs lines to the actual file consumed by receiver. + logger.Print(fmt.Sprintf("2020-08-25 %s", msg)) time.Sleep(time.Millisecond) } - require.Eventually(t, expectNLogs(sink, numLogs), 2*time.Second, time.Millisecond) - require.ElementsMatch(t, expectedLogs, sink.AllLogs()) + wg.Wait() + require.Eventually(t, expectNLogs(sink, numLogs), 2*time.Second, 10*time.Millisecond, + "expected %d but got %d logs", + numLogs, sink.LogRecordsCount(), + ) + // TODO: Figure out a nice way to assert each logs entry content. + // require.Equal(t, expectedLogs, sink.AllLogs()) require.NoError(t, rcvr.Shutdown(context.Background())) + converter.Stop() +} + +func consumeNLogsFromConverter(ch <-chan pdata.Logs, count int, wg *sync.WaitGroup) { + defer wg.Done() + + n := 0 + for pLog := range ch { + n += pLog.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().Len() + + if n == count { + return + } + } } func newRotatingLogger(t *testing.T, tempDir string, maxLines, maxBackups int, copyTruncate, sequential bool) *log.Logger { @@ -271,6 +307,10 @@ func testdataConfigYamlAsMap() *FileLogConfig { }, }, }, + Converter: stanza.ConverterConfig{ + MaxFlushCount: stanza.DefaultMaxFlushCount, + FlushInterval: stanza.DefaultFlushInterval, + }, }, Input: stanza.InputConfig{ "include": []interface{}{ @@ -298,6 +338,10 @@ func testdataRotateTestYamlAsMap(tempDir string) *FileLogConfig { }, }, }, + Converter: stanza.ConverterConfig{ + MaxFlushCount: stanza.DefaultMaxFlushCount, + FlushInterval: stanza.DefaultFlushInterval, + }, }, Input: stanza.InputConfig{ "type": "file_input", diff --git a/receiver/filelogreceiver/testdata/config.yaml b/receiver/filelogreceiver/testdata/config.yaml index 51e6a075f88a..9738c4ea80a6 100644 --- a/receiver/filelogreceiver/testdata/config.yaml +++ b/receiver/filelogreceiver/testdata/config.yaml @@ -10,6 +10,9 @@ receivers: layout: '%Y-%m-%d' severity: parse_from: sev + converter: + max_flush_count: 100 + flush_interval: 100ms processors: nop: diff --git a/receiver/syslogreceiver/go.sum b/receiver/syslogreceiver/go.sum index b903a0d264de..96dc44dd7151 100644 --- a/receiver/syslogreceiver/go.sum +++ b/receiver/syslogreceiver/go.sum @@ -39,6 +39,7 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE= contrib.go.opencensus.io/exporter/prometheus v0.2.0/go.mod h1:TYmVAyE8Tn1lyPcltF5IYYfWp2KHu7lQGIZnj8iZMys= contrib.go.opencensus.io/exporter/prometheus v0.3.0/go.mod h1:rpCPVQKhiyH8oomWgm34ZmgIdZa8OVYO5WAIygPbBBE= +contrib.go.opencensus.io/exporter/prometheus v0.3.0/go.mod h1:rpCPVQKhiyH8oomWgm34ZmgIdZa8OVYO5WAIygPbBBE= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= @@ -81,6 +82,7 @@ github.com/Shopify/sarama v1.28.0/go.mod h1:j/2xTrU39dlzBmsxF1eQ2/DdWrxyBCl6pzz7 github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= +github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= @@ -211,6 +213,7 @@ github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= +github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= @@ -754,6 +757,7 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/pquerna/cachecontrol v0.0.0-20200819021114-67c6ae64274f/go.mod h1:hoLfEwdY11HjRfKFH6KqnPsfxlo3BP6bJehpDv8t6sQ= github.com/pquerna/cachecontrol v0.0.0-20201205024021-ac21108117ac/go.mod h1:hoLfEwdY11HjRfKFH6KqnPsfxlo3BP6bJehpDv8t6sQ= +github.com/pquerna/cachecontrol v0.0.0-20201205024021-ac21108117ac/go.mod h1:hoLfEwdY11HjRfKFH6KqnPsfxlo3BP6bJehpDv8t6sQ= github.com/prometheus/alertmanager v0.21.0/go.mod h1:h7tJ81NA0VLWvWEayi1QltevFkLF3KxmC/malTcT8Go= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= @@ -786,6 +790,8 @@ github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB8 github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/common v0.20.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= +github.com/prometheus/common v0.19.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= +github.com/prometheus/common v0.19.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/exporter-toolkit v0.5.1/go.mod h1:OCkM4805mmisBhLmVFw858QYi3v0wKdY6/UxrT0pZVg= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= @@ -800,8 +806,11 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/prometheus v1.8.2-0.20210217141258-a6be548dbc17/go.mod h1:dv3B1syqmkrkmo665MPCU6L8PbTXIiUeg/OEQULLNxA= +github.com/prometheus/prometheus v1.8.2-0.20210217141258-a6be548dbc17/go.mod h1:dv3B1syqmkrkmo665MPCU6L8PbTXIiUeg/OEQULLNxA= +github.com/prometheus/statsd_exporter v0.15.0/go.mod h1:Dv8HnkoLQkeEjkIE4/2ndAA7WL1zHKK7WMqFQqu72rw= github.com/prometheus/statsd_exporter v0.15.0/go.mod h1:Dv8HnkoLQkeEjkIE4/2ndAA7WL1zHKK7WMqFQqu72rw= github.com/prometheus/statsd_exporter v0.20.0/go.mod h1:YL3FWCG8JBBtaUSxAg4Gz2ZYu22bS84XM89ZQXXTWmQ= +github.com/prometheus/statsd_exporter v0.20.0/go.mod h1:YL3FWCG8JBBtaUSxAg4Gz2ZYu22bS84XM89ZQXXTWmQ= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= diff --git a/receiver/syslogreceiver/syslog_test.go b/receiver/syslogreceiver/syslog_test.go index ababfb690452..2a60fb5ba532 100644 --- a/receiver/syslogreceiver/syslog_test.go +++ b/receiver/syslogreceiver/syslog_test.go @@ -68,13 +68,17 @@ func testSyslog(t *testing.T, cfg *SysLogConfig) { _, err = conn.Write([]byte(msg)) require.NoError(t, err) } - conn.Close() + require.NoError(t, conn.Close()) require.Eventually(t, expectNLogs(sink, numLogs), 2*time.Second, time.Millisecond) require.NoError(t, rcvr.Shutdown(context.Background())) + require.Len(t, sink.AllLogs(), 1) + + resourceLogs := sink.AllLogs()[0].ResourceLogs().At(0) + logs := resourceLogs.InstrumentationLibraryLogs().At(0).Logs() + for i := 0; i < numLogs; i++ { - logs := sink.AllLogs()[i] - log := logs.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0) + log := logs.At(i) require.Equal(t, log.Timestamp(), pdata.Timestamp(1614470402003000000+i*60*1000*1000*1000)) msg, ok := log.Body().MapVal().Get("message") @@ -107,6 +111,9 @@ func testdataConfigYamlAsMap() *SysLogConfig { NameVal: "syslog", }, Operators: stanza.OperatorConfigs{}, + Converter: stanza.ConverterConfig{ + FlushInterval: 100 * time.Millisecond, + }, }, Input: stanza.InputConfig{ "tcp": map[string]interface{}{ @@ -125,6 +132,9 @@ func testdataUDPConfig() *SysLogConfig { NameVal: "syslog", }, Operators: stanza.OperatorConfigs{}, + Converter: stanza.ConverterConfig{ + FlushInterval: 100 * time.Millisecond, + }, }, Input: stanza.InputConfig{ "udp": map[string]interface{}{ diff --git a/receiver/syslogreceiver/testdata/config.yaml b/receiver/syslogreceiver/testdata/config.yaml index dfeea832f37f..3a4838206f40 100644 --- a/receiver/syslogreceiver/testdata/config.yaml +++ b/receiver/syslogreceiver/testdata/config.yaml @@ -3,6 +3,8 @@ receivers: tcp: listen_address: "0.0.0.0:29018" protocol: rfc5424 + converter: + flush_interval: 100ms processors: nop: