From 5fd5bc5ba9e4d67b10b0f2edbf88094024ad176e Mon Sep 17 00:00:00 2001 From: Sanan Guliyev Date: Fri, 13 Sep 2024 07:59:41 +0200 Subject: [PATCH] Enable event flushing during consumption --- internal/impl/pure/processor_workflow_test.go | 2 +- public/service/stream_builder_test.go | 4 ++-- public/service/tracing.go | 12 ++++++------ public/service/tracing_test.go | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/internal/impl/pure/processor_workflow_test.go b/internal/impl/pure/processor_workflow_test.go index 76c1df7fb..162058b4d 100644 --- a/internal/impl/pure/processor_workflow_test.go +++ b/internal/impl/pure/processor_workflow_test.go @@ -1048,5 +1048,5 @@ workflow: {Type: "CONSUME", Content: "hello world", Meta: map[string]interface{}{}}, {Type: "PRODUCE", Content: "{\"id\":\"HELLO WORLD\"}", Meta: map[string]interface{}{}}, }, - }, tracer.ProcessorEvents()) + }, tracer.ProcessorEvents(false)) } diff --git a/public/service/stream_builder_test.go b/public/service/stream_builder_test.go index f14c8c6f2..ef64e6416 100644 --- a/public/service/stream_builder_test.go +++ b/public/service/stream_builder_test.go @@ -1259,7 +1259,7 @@ output: require.NoError(t, strm.Run(tCtx)) eventKeys := map[string]map[string]struct{}{} - for k, v := range tracSum.InputEvents() { + for k, v := range tracSum.InputEvents(false) { eMap := map[string]struct{}{} for _, e := range v { eMap[e.Content] = struct{}{} @@ -1283,7 +1283,7 @@ output: }, eventKeys) eventKeys = map[string]map[string]struct{}{} - for k, v := range tracSum.OutputEvents() { + for k, v := range tracSum.OutputEvents(false) { eMap := map[string]struct{}{} for _, e := range v { eMap[e.Content] = struct{}{} diff --git a/public/service/tracing.go b/public/service/tracing.go index dbff09624..62a1ee3e5 100644 --- a/public/service/tracing.go +++ b/public/service/tracing.go @@ -80,9 +80,9 @@ func (s *TracingSummary) TotalOutput() uint64 { // execution of a stream pipeline. // // Experimental: This method may change outside of major version releases. -func (s *TracingSummary) InputEvents() map[string][]TracingEvent { +func (s *TracingSummary) InputEvents(flush bool) map[string][]TracingEvent { m := map[string][]TracingEvent{} - for k, v := range s.summary.InputEvents(false) { + for k, v := range s.summary.InputEvents(flush) { events := make([]TracingEvent, len(v)) for i, e := range v { events[i] = TracingEvent{ @@ -100,9 +100,9 @@ func (s *TracingSummary) InputEvents() map[string][]TracingEvent { // execution of a stream pipeline. // // Experimental: This method may change outside of major version releases. -func (s *TracingSummary) ProcessorEvents() map[string][]TracingEvent { +func (s *TracingSummary) ProcessorEvents(flush bool) map[string][]TracingEvent { m := map[string][]TracingEvent{} - for k, v := range s.summary.ProcessorEvents(false) { + for k, v := range s.summary.ProcessorEvents(flush) { events := make([]TracingEvent, len(v)) for i, e := range v { events[i] = TracingEvent{ @@ -120,9 +120,9 @@ func (s *TracingSummary) ProcessorEvents() map[string][]TracingEvent { // execution of a stream pipeline. // // Experimental: This method may change outside of major version releases. -func (s *TracingSummary) OutputEvents() map[string][]TracingEvent { +func (s *TracingSummary) OutputEvents(flush bool) map[string][]TracingEvent { m := map[string][]TracingEvent{} - for k, v := range s.summary.OutputEvents(false) { + for k, v := range s.summary.OutputEvents(flush) { events := make([]TracingEvent, len(v)) for i, e := range v { events[i] = TracingEvent{ diff --git a/public/service/tracing_test.go b/public/service/tracing_test.go index 652725d67..8abcca88d 100644 --- a/public/service/tracing_test.go +++ b/public/service/tracing_test.go @@ -112,7 +112,7 @@ logger: {Type: service.TracingEventProduce, Content: `{"id":4}`, Meta: tMap{}}, {Type: service.TracingEventProduce, Content: `{"id":5}`, Meta: tMap{}}, }, - }, trace.InputEvents()) + }, trace.InputEvents(false)) assert.Equal(t, map[string][]service.TracingEvent{ "root.pipeline.processors.0": { @@ -129,7 +129,7 @@ logger: {Type: service.TracingEventConsume, Content: `{"id":5}`, Meta: tMap{}}, {Type: service.TracingEventProduce, Content: `{"count":5}`, Meta: tMap{"foo": int64(5)}}, }, - }, trace.ProcessorEvents()) + }, trace.ProcessorEvents(false)) assert.Equal(t, map[string][]service.TracingEvent{ "root.output": { @@ -139,7 +139,7 @@ logger: {Type: service.TracingEventConsume, Content: `{"id":4}`, Meta: tMap{}}, {Type: service.TracingEventConsume, Content: `{"count":5}`, Meta: tMap{"foo": int64(5)}}, }, - }, trace.OutputEvents()) + }, trace.OutputEvents(false)) } func BenchmarkStreamTracing(b *testing.B) {