Skip to content

Commit

Permalink
[chore] Add test to validate expected component instances (#12071)
Browse files Browse the repository at this point in the history
Subset of #12057

This PR adds a test to validate the expected number of instances of each
component. This framework becomes more useful once singleton components
are explicitly supported.
  • Loading branch information
djaglowski authored Jan 20, 2025
1 parent f07ebc3 commit 81f1fad
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 5 deletions.
4 changes: 2 additions & 2 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
MutatesData: g.pipelines[n.pipelineID].fanOutNode.getConsumer().Capabilities().MutatesData,
}
for _, proc := range g.pipelines[n.pipelineID].processors {
capability.MutatesData = capability.MutatesData || proc.getConsumer().Capabilities().MutatesData
capability.MutatesData = capability.MutatesData || proc.(*processorNode).getConsumer().Capabilities().MutatesData
}
next := g.nextConsumers(n.ID())[0]
switch n.pipelineID.Signal() {
Expand Down Expand Up @@ -379,7 +379,7 @@ type pipelineNodes struct {
*capabilitiesNode

// The order of processors is very important. Therefore use a slice for processors.
processors []*processorNode
processors []graph.Node

// Emits to exporters.
*fanOutNode
Expand Down
168 changes: 166 additions & 2 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
}

for _, n := range pipeline.processors {
require.True(t, n.Component.(*testcomponents.ExampleProcessor).Started())
require.True(t, n.(*processorNode).Component.(*testcomponents.ExampleProcessor).Started())
}

for _, n := range pipeline.receivers {
Expand Down Expand Up @@ -929,7 +929,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
}

for _, n := range pipeline.processors {
require.True(t, n.Component.(*testcomponents.ExampleProcessor).Stopped())
require.True(t, n.(*processorNode).Component.(*testcomponents.ExampleProcessor).Stopped())
}

for _, n := range pipeline.exporters {
Expand Down Expand Up @@ -1010,6 +1010,170 @@ func TestConnectorPipelinesGraph(t *testing.T) {
}
}

func TestInstances(t *testing.T) {
tests := []struct {
name string
pipelineConfigs pipelines.Config
expectInstances map[component.ID]int
}{
{
name: "one_pipeline_each_signal",
pipelineConfigs: pipelines.Config{
pipeline.NewID(pipeline.SignalTraces): {
Receivers: []component.ID{component.MustNewID("examplereceiver")},
Processors: []component.ID{component.MustNewID("exampleprocessor")},
Exporters: []component.ID{component.MustNewID("exampleexporter")},
},
pipeline.NewID(pipeline.SignalMetrics): {
Receivers: []component.ID{component.MustNewID("examplereceiver")},
Processors: []component.ID{component.MustNewID("exampleprocessor")},
Exporters: []component.ID{component.MustNewID("exampleexporter")},
},
pipeline.NewID(pipeline.SignalLogs): {
Receivers: []component.ID{component.MustNewID("examplereceiver")},
Processors: []component.ID{component.MustNewID("exampleprocessor")},
Exporters: []component.ID{component.MustNewID("exampleexporter")},
},
pipeline.NewID(xpipeline.SignalProfiles): {
Receivers: []component.ID{component.MustNewID("examplereceiver")},
Processors: []component.ID{component.MustNewID("exampleprocessor")},
Exporters: []component.ID{component.MustNewID("exampleexporter")},
},
},
expectInstances: map[component.ID]int{
component.MustNewID("examplereceiver"): 4, // one per signal
component.MustNewID("exampleprocessor"): 4, // one per pipeline
component.MustNewID("exampleexporter"): 4, // one per signal
},
},
{
name: "shared_by_signals",
pipelineConfigs: pipelines.Config{
pipeline.NewIDWithName(pipeline.SignalTraces, "1"): {
Receivers: []component.ID{component.MustNewID("examplereceiver")},
Processors: []component.ID{component.MustNewID("exampleprocessor")},
Exporters: []component.ID{component.MustNewID("exampleexporter")},
},
pipeline.NewIDWithName(pipeline.SignalTraces, "2"): {
Receivers: []component.ID{component.MustNewID("examplereceiver")},
Processors: []component.ID{component.MustNewID("exampleprocessor")},
Exporters: []component.ID{component.MustNewID("exampleexporter")},
},
pipeline.NewIDWithName(pipeline.SignalMetrics, "1"): {
Receivers: []component.ID{component.MustNewID("examplereceiver")},
Processors: []component.ID{component.MustNewID("exampleprocessor")},
Exporters: []component.ID{component.MustNewID("exampleexporter")},
},
pipeline.NewIDWithName(pipeline.SignalMetrics, "2"): {
Receivers: []component.ID{component.MustNewID("examplereceiver")},
Processors: []component.ID{component.MustNewID("exampleprocessor")},
Exporters: []component.ID{component.MustNewID("exampleexporter")},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "1"): {
Receivers: []component.ID{component.MustNewID("examplereceiver")},
Processors: []component.ID{component.MustNewID("exampleprocessor")},
Exporters: []component.ID{component.MustNewID("exampleexporter")},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "2"): {
Receivers: []component.ID{component.MustNewID("examplereceiver")},
Processors: []component.ID{component.MustNewID("exampleprocessor")},
Exporters: []component.ID{component.MustNewID("exampleexporter")},
},
pipeline.NewIDWithName(xpipeline.SignalProfiles, "1"): {
Receivers: []component.ID{component.MustNewID("examplereceiver")},
Processors: []component.ID{component.MustNewID("exampleprocessor")},
Exporters: []component.ID{component.MustNewID("exampleexporter")},
},
pipeline.NewIDWithName(xpipeline.SignalProfiles, "2"): {
Receivers: []component.ID{component.MustNewID("examplereceiver")},
Processors: []component.ID{component.MustNewID("exampleprocessor")},
Exporters: []component.ID{component.MustNewID("exampleexporter")},
},
},
expectInstances: map[component.ID]int{
component.MustNewID("examplereceiver"): 4, // one per signal
component.MustNewID("exampleprocessor"): 8, // one per pipeline
component.MustNewID("exampleexporter"): 4, // one per signal
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
set := Settings{
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
ReceiverBuilder: builders.NewReceiver(
map[component.ID]component.Config{
component.MustNewID("examplereceiver"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(),
},
map[component.Type]receiver.Factory{
testcomponents.ExampleReceiverFactory.Type(): testcomponents.ExampleReceiverFactory,
},
),
ProcessorBuilder: builders.NewProcessor(
map[component.ID]component.Config{
component.MustNewID("exampleprocessor"): testcomponents.ExampleProcessorFactory.CreateDefaultConfig(),
},
map[component.Type]processor.Factory{
testcomponents.ExampleProcessorFactory.Type(): testcomponents.ExampleProcessorFactory,
},
),
ExporterBuilder: builders.NewExporter(
map[component.ID]component.Config{
component.MustNewID("exampleexporter"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(),
},
map[component.Type]exporter.Factory{
testcomponents.ExampleExporterFactory.Type(): testcomponents.ExampleExporterFactory,
},
),
ConnectorBuilder: builders.NewConnector(map[component.ID]component.Config{}, map[component.Type]connector.Factory{}),
PipelineConfigs: tt.pipelineConfigs,
}

pg, err := Build(context.Background(), set)
require.NoError(t, err)

require.Equal(t, len(set.PipelineConfigs), len(pg.pipelines))

// For each component id, build a map of the instances of that component.
// Use graph.Node.ID() as the key to determine uniqueness of instances.
componentInstances := map[component.ID]map[int64]struct{}{}
for _, pipeline := range pg.pipelines {
for _, n := range pipeline.receivers {
r := n.(*receiverNode)
if _, ok := componentInstances[r.componentID]; !ok {
componentInstances[r.componentID] = map[int64]struct{}{}
}
componentInstances[r.componentID][n.ID()] = struct{}{}
}
for _, n := range pipeline.processors {
p := n.(*processorNode)
if _, ok := componentInstances[p.componentID]; !ok {
componentInstances[p.componentID] = map[int64]struct{}{}
}
componentInstances[p.componentID][n.ID()] = struct{}{}
}
for _, n := range pipeline.exporters {
e := n.(*exporterNode)
if _, ok := componentInstances[e.componentID]; !ok {
componentInstances[e.componentID] = map[int64]struct{}{}
}
componentInstances[e.componentID][n.ID()] = struct{}{}
}
}

var totalExpected int
for id, instances := range componentInstances {
totalExpected += tt.expectInstances[id]
require.Len(t, instances, tt.expectInstances[id], id.String())
}
totalExpected += len(tt.pipelineConfigs) * 2 // one fanout & one capabilities node per pipeline
require.Equal(t, totalExpected, pg.componentGraph.Nodes().Len())
})
}
}

func TestConnectorRouter(t *testing.T) {
rcvrID := component.MustNewID("examplereceiver")
routeTracesID := component.MustNewIDWithName("examplerouter", "traces")
Expand Down
2 changes: 1 addition & 1 deletion service/internal/graph/zpages.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (g *Graph) HandleZPages(w http.ResponseWriter, r *http.Request) {
}
procIDs := make([]string, 0, len(p.processors))
for _, c := range p.processors {
procIDs = append(procIDs, c.componentID.String())
procIDs = append(procIDs, c.(*processorNode).componentID.String())
}
exprIDs := make([]string, 0, len(p.exporters))
for _, c := range p.exporters {
Expand Down

0 comments on commit 81f1fad

Please sign in to comment.