From 0ba94e81bbf13c3f42a5f513f90ec80699042789 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Sat, 20 Apr 2024 11:04:21 -0700 Subject: [PATCH 01/15] startup diagram init commit --- docs/startup.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 docs/startup.md diff --git a/docs/startup.md b/docs/startup.md new file mode 100644 index 00000000000..71e4760dbca --- /dev/null +++ b/docs/startup.md @@ -0,0 +1,27 @@ +```mermaid +flowchart TD + A("`command.NewCommand`") -->|1| B("`UpdateSettingsUsingFlags`") + A --> |2| C("`NewCollector`") + A --> |3| D("`Collector.Run`") + D --> E("`SetupConfigurationComponents`") + E --> |1| F(getConfMap) + E ---> |2| G("`Service.New + Initializes telemetry and logging, then initializes the pipelines`") + E --> |3| Q("`Service.Start + 1. Start all extensions. + 2. Notify extensions about Collector configuration + 3. Start all pipelines. + 4. Notify extensions that the pipeline is ready. + `") + Q --> R("`Graph.StartAll + calls Start on each component in reverse topological order`") + G --> H("`initExtensionsAndPipeline + Creates extensions and then builds the pipeline graph`") + H --> I("`Graph.Build + Converts the settings to an internal graph representation`") + I --> J(createNodes) + I --> K(createEdges) + I --> L("`buildComponents + topological sort the graph, and create each component in reverse order`") + L --> M(Receiver Factory) & N(Processor Factory) & O(Exporter Factory) +``` \ No newline at end of file From 72229e707f24c7a4754f06dcb3ef3953a178f3cc Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Mon, 22 Apr 2024 12:08:00 -0700 Subject: [PATCH 02/15] augment graph and add some extra comments to stuff --- docs/startup.md | 13 +++++++++---- otelcol/collector.go | 4 +++- service/internal/graph/graph.go | 15 +++++++++++++++ service/service.go | 4 ++++ 4 files changed, 31 insertions(+), 5 deletions(-) diff --git a/docs/startup.md b/docs/startup.md index 71e4760dbca..a16104e2748 100644 --- a/docs/startup.md +++ b/docs/startup.md @@ -1,8 +1,11 @@ +todo: bold the service names. build out config parts (getconfmap). document how settings and config get rendered ```mermaid flowchart TD A("`command.NewCommand`") -->|1| B("`UpdateSettingsUsingFlags`") - A --> |2| C("`NewCollector`") - A --> |3| D("`Collector.Run`") + A --> |2| C("`NewCollector + Creates and returns a new instance of Collector`") + A --> |3| D("`Collector.Run + Starts the collector and waits for its completion. Also includes the control logic for config reloading and shutdown`") D --> E("`SetupConfigurationComponents`") E --> |1| F(getConfMap) E ---> |2| G("`Service.New @@ -19,8 +22,10 @@ flowchart TD Creates extensions and then builds the pipeline graph`") H --> I("`Graph.Build Converts the settings to an internal graph representation`") - I --> J(createNodes) - I --> K(createEdges) + I --> J("`createNodes + Builds the node objects from pipeline configuration and adds to graph. Also validates connectors`") + I --> K("`createEdges + Iterates through the pipelines and creates edges between components`") I --> L("`buildComponents topological sort the graph, and create each component in reverse order`") L --> M(Receiver Factory) & N(Processor Factory) & O(Exporter Factory) diff --git a/otelcol/collector.go b/otelcol/collector.go index f90956f8104..4d4c4e92c08 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -163,7 +163,6 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { col.setCollectorState(StateStarting) var conf *confmap.Conf - if cp, ok := col.configProvider.(ConfmapProvider); ok { var err error conf, err = cp.GetConfmap(ctx) @@ -187,6 +186,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { } col.serviceConfig = &cfg.Service + col.service, err = service.New(ctx, service.Settings{ BuildInfo: col.set.BuildInfo, CollectorConf: conf, @@ -263,6 +263,8 @@ func (col *Collector) Run(ctx context.Context) error { signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM) } + // control loop: selects between channels for various interrupts - when this loop is broken, the collector exits + // if a configuration reload fails, we return without waiting for graceful shutdown LOOP: for { select { diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 04643a37526..4050ec0db5e 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -74,6 +74,7 @@ func Build(ctx context.Context, set Settings) (*Graph, error) { } // Creates a node for each instance of a component and adds it to the graph +// Validates that connectors are configured to export and receive correctly func (g *Graph) createNodes(set Settings) error { // Build a list of all connectors for easy reference connectors := make(map[component.ID]struct{}) @@ -82,9 +83,12 @@ func (g *Graph) createNodes(set Settings) error { connectorsAsExporter := make(map[component.ID][]component.ID) connectorsAsReceiver := make(map[component.ID][]component.ID) + // build each pipelineNodes struct for each pipeline by parsing the pipelineCfg + // also populates the connectors, connectorsAsExporter and connectorsAsReceiver maps for pipelineID, pipelineCfg := range set.PipelineConfigs { pipe := g.pipelines[pipelineID] for _, recvID := range pipelineCfg.Receivers { + // checks if this receiver a connector or a regular receiver if set.ConnectorBuilder.IsConfigured(recvID) { connectors[recvID] = struct{}{} connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID) @@ -138,6 +142,8 @@ func (g *Graph) createNodes(set Settings) error { for expType := range expTypes { for recType := range recTypes { + // checks if the connector supports being a receiver of a certain datatype and an exporter of the other + // datatype - this is powered by the connector's metadata.yaml file if connectorStability(connFactory, expType, recType) != component.StabilityLevelUndefined { expTypes[expType] = true recTypes[recType] = true @@ -241,12 +247,15 @@ func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component return connNode } +// Iterates through the pipelines and creates edges between components func (g *Graph) createEdges() { for _, pg := range g.pipelines { + // draw edges from each receiver to the capability node for _, receiver := range pg.receivers { g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, pg.capabilitiesNode)) } + // iterates through processors, chaining them together. starts with the capabilities node var from, to graph.Node from = pg.capabilitiesNode for _, processor := range pg.processors { @@ -254,6 +263,8 @@ func (g *Graph) createEdges() { g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to)) from = processor } + // always inserts a fanout node before any exporters. if there is only one + // exporter, the fanout node is still created and acts as a noop to = pg.fanOutNode g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to)) @@ -263,6 +274,9 @@ func (g *Graph) createEdges() { } } +// Uses the already built graph g to instantiate the actual components for each component of each pipeline +// Handles calling the factories for each component - and hooking up each component to the next +// Also calculates whether each pipeline mutates data so the receiver can know whether it needs to clone the data func (g *Graph) buildComponents(ctx context.Context, set Settings) error { nodes, err := topo.Sort(g.componentGraph) if err != nil { @@ -282,6 +296,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { case *receiverNode: err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID())) case *processorNode: + // nextConsumers is guaranteed to be length 1. either it is the next processor or it's the fanout node for the exporters err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0]) case *exporterNode: err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ExporterBuilder) diff --git a/service/service.go b/service/service.go index 283e9c94d09..0aa466b83b0 100644 --- a/service/service.go +++ b/service/service.go @@ -72,6 +72,10 @@ type Service struct { collectorConf *confmap.Conf } +// Service.New has a few responsibilities: +// 1. initializes the Service function given a complete set of Settings +// 2. sets up internal facing telemetry +// 3. builds the graph from the settings func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { disableHighCard := obsreportconfig.DisableHighCardinalityMetricsfeatureGate.IsEnabled() extendedConfig := obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() From ef29e285131c66e65ce1532bc4cb457da28fc572 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Tue, 23 Apr 2024 16:21:17 -0700 Subject: [PATCH 03/15] bold --- docs/startup.md | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/docs/startup.md b/docs/startup.md index a16104e2748..3e2c1689d37 100644 --- a/docs/startup.md +++ b/docs/startup.md @@ -1,32 +1,32 @@ -todo: bold the service names. build out config parts (getconfmap). document how settings and config get rendered +todo: build out config parts (getconfmap). document how settings and config get rendered ```mermaid flowchart TD - A("`command.NewCommand`") -->|1| B("`UpdateSettingsUsingFlags`") - A --> |2| C("`NewCollector + A("`**command.NewCommand**`") -->|1| B("`**UpdateSettingsUsingFlags**`") + A --> |2| C("`**NewCollector** Creates and returns a new instance of Collector`") - A --> |3| D("`Collector.Run + A --> |3| D("`**Collector.Run** Starts the collector and waits for its completion. Also includes the control logic for config reloading and shutdown`") - D --> E("`SetupConfigurationComponents`") - E --> |1| F(getConfMap) - E ---> |2| G("`Service.New + D --> E("`**SetupConfigurationComponents**`") + E --> |1| F("`**getConfMap**`") + E ---> |2| G("`**Service.New** Initializes telemetry and logging, then initializes the pipelines`") - E --> |3| Q("`Service.Start + E --> |3| Q("`**Service.Start** 1. Start all extensions. 2. Notify extensions about Collector configuration 3. Start all pipelines. 4. Notify extensions that the pipeline is ready. `") - Q --> R("`Graph.StartAll + Q --> R("`**Graph.StartAll** calls Start on each component in reverse topological order`") - G --> H("`initExtensionsAndPipeline + G --> H("`**initExtensionsAndPipeline** Creates extensions and then builds the pipeline graph`") - H --> I("`Graph.Build + H --> I("`**Graph.Build** Converts the settings to an internal graph representation`") - I --> J("`createNodes + I --> J("`**createNodes** Builds the node objects from pipeline configuration and adds to graph. Also validates connectors`") - I --> K("`createEdges + I --> K("`**createEdges** Iterates through the pipelines and creates edges between components`") - I --> L("`buildComponents + I --> L("`**buildComponents** topological sort the graph, and create each component in reverse order`") L --> M(Receiver Factory) & N(Processor Factory) & O(Exporter Factory) ``` \ No newline at end of file From cb3da5f0a14482977eb8b3d68b46aec6ab1e363e Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Tue, 23 Apr 2024 18:36:12 -0700 Subject: [PATCH 04/15] more comments on functions --- confmap/resolver.go | 1 - otelcol/collector.go | 1 + otelcol/command.go | 1 + service/internal/graph/graph.go | 2 ++ service/service.go | 7 +++++-- 5 files changed, 9 insertions(+), 3 deletions(-) diff --git a/confmap/resolver.go b/confmap/resolver.go index dc1cd7b7700..05f7f964d03 100644 --- a/confmap/resolver.go +++ b/confmap/resolver.go @@ -151,7 +151,6 @@ func NewResolver(set ResolverSettings) (*Resolver, error) { } // Resolve returns the configuration as a Conf, or error otherwise. -// // Should never be called concurrently with itself, Watch or Shutdown. func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) { // First check if already an active watching, close that if any. diff --git a/otelcol/collector.go b/otelcol/collector.go index 4d4c4e92c08..9202a42fe20 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -163,6 +163,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { col.setCollectorState(StateStarting) var conf *confmap.Conf + if cp, ok := col.configProvider.(ConfmapProvider); ok { var err error conf, err = cp.GetConfmap(ctx) diff --git a/otelcol/command.go b/otelcol/command.go index 8e5c6284b3b..f13440cb4b1 100644 --- a/otelcol/command.go +++ b/otelcol/command.go @@ -41,6 +41,7 @@ func NewCommand(set CollectorSettings) *cobra.Command { return rootCmd } +// puts command line flags from flags into the CollectorSettings, to be used during config resolution func updateSettingsUsingFlags(set *CollectorSettings, flags *flag.FlagSet) error { if set.ConfigProvider == nil { resolverSet := &set.ConfigProviderSettings.ResolverSettings diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 4050ec0db5e..8956387940a 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -53,6 +53,8 @@ type Graph struct { telemetry servicetelemetry.TelemetrySettings } +// Build converts Settings into a full pipeline Graph +// Build also validates the configuration of the pipelines and does the actual initialization of each Component in the Graph func Build(ctx context.Context, set Settings) (*Graph, error) { pipelines := &Graph{ componentGraph: simple.NewDirectedGraph(), diff --git a/service/service.go b/service/service.go index 0aa466b83b0..6c836660012 100644 --- a/service/service.go +++ b/service/service.go @@ -34,7 +34,7 @@ import ( "go.opentelemetry.io/collector/service/telemetry" ) -// Settings holds configuration for building a new service. +// Settings holds configuration for building a new Service. type Settings struct { // BuildInfo provides collector start information. BuildInfo component.BuildInfo @@ -72,7 +72,7 @@ type Service struct { collectorConf *confmap.Conf } -// Service.New has a few responsibilities: +// New has a few responsibilities: // 1. initializes the Service function given a complete set of Settings // 2. sets up internal facing telemetry // 3. builds the graph from the settings @@ -96,6 +96,8 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { if err != nil { return nil, fmt.Errorf("failed to get logger: %w", err) } + + // fetch data for internal telemetry like instance id and sdk version to provide for internal telemetry res := resource.New(set.BuildInfo, cfg.Telemetry.Resource) pcommonRes := pdataFromSdk(res) @@ -251,6 +253,7 @@ func (srv *Service) Shutdown(ctx context.Context) error { return errs } +// creates extensions and then builds the pipeline graph func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings, cfg Config) error { var err error extensionsSettings := extensions.Settings{ From 775659047ddfc64738ef8414cf2f378dd5fcb257 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Wed, 24 Apr 2024 00:45:31 -0700 Subject: [PATCH 05/15] contributing.md --- CONTRIBUTING.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 54e77c88664..8c875b7cc95 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -110,6 +110,39 @@ on something that maintainers may decide this repo is not the right place for. Follow the instructions below to create your PR. +### What is where +There are a few resources available to understand how the collector works: +Firstly, there is a [diagram](docs/startup.md) documenting the startup flow of the collector. +Second, the OpenTelemetry Collector's [architecture docs](https://opentelemetry.io/docs/collector/architecture/) are +pretty useful. +Finally, here is a brief list of useful and/or important files that you may find valuable to glance through. +#### [collector.go](otelcol/collector.go) +This file contains the main Collector struct and its constructor `NewCollector`. + +`Collector.Run` starts the collector and sets up its lifecycle management loop. + +`setupConfigurationComponents` is the "main" function responsible for startup - it orchestrates the loading of the configuration, the creation of the graph, and the starting of all the components. + +#### [graph.go](service/internal/graph.go) +This file contains the internal graph representation of the pipelines. + +`Build` is the constructor for a Graph object. The method calls out to helpers that transform the graph from a config +to a DAG of components. The configuration undergoes additional validation here as well, and is used to instantiate +the components of the pipeline. + +`Graph.StartAll` starts every component in the pipelines. + +`Graph.ShutdownAll` stops each component in the pipelines + +#### [component.go](component/component.go) +component.go outlines the abstraction of components within OTEL collector. It provides details on the component lifecycle as well as defining the interface that components must fulfil. + +#### Factories +Each component type contains a Factory interface along with its corresponding NewFactory function. +Implementations of new components use this NewFactory function in their implementation to register key functions with the collector. For example, the collector uses this interface to give receivers a handle to a nextConsumer - representing where the receiver can send data to that it has received. + +[//]: # (todo rewrite this factories block it is not as clear as id like) + ### Fork In the interest of keeping this repository clean and manageable, you should From e0b244312c7bf7c04e4e3befcf235945d95b0d47 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Wed, 24 Apr 2024 16:32:01 -0700 Subject: [PATCH 06/15] move files --- CONTRIBUTING.md | 33 --------------------------------- docs/key-files.md | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 33 deletions(-) create mode 100644 docs/key-files.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8c875b7cc95..54e77c88664 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -110,39 +110,6 @@ on something that maintainers may decide this repo is not the right place for. Follow the instructions below to create your PR. -### What is where -There are a few resources available to understand how the collector works: -Firstly, there is a [diagram](docs/startup.md) documenting the startup flow of the collector. -Second, the OpenTelemetry Collector's [architecture docs](https://opentelemetry.io/docs/collector/architecture/) are -pretty useful. -Finally, here is a brief list of useful and/or important files that you may find valuable to glance through. -#### [collector.go](otelcol/collector.go) -This file contains the main Collector struct and its constructor `NewCollector`. - -`Collector.Run` starts the collector and sets up its lifecycle management loop. - -`setupConfigurationComponents` is the "main" function responsible for startup - it orchestrates the loading of the configuration, the creation of the graph, and the starting of all the components. - -#### [graph.go](service/internal/graph.go) -This file contains the internal graph representation of the pipelines. - -`Build` is the constructor for a Graph object. The method calls out to helpers that transform the graph from a config -to a DAG of components. The configuration undergoes additional validation here as well, and is used to instantiate -the components of the pipeline. - -`Graph.StartAll` starts every component in the pipelines. - -`Graph.ShutdownAll` stops each component in the pipelines - -#### [component.go](component/component.go) -component.go outlines the abstraction of components within OTEL collector. It provides details on the component lifecycle as well as defining the interface that components must fulfil. - -#### Factories -Each component type contains a Factory interface along with its corresponding NewFactory function. -Implementations of new components use this NewFactory function in their implementation to register key functions with the collector. For example, the collector uses this interface to give receivers a handle to a nextConsumer - representing where the receiver can send data to that it has received. - -[//]: # (todo rewrite this factories block it is not as clear as id like) - ### Fork In the interest of keeping this repository clean and manageable, you should diff --git a/docs/key-files.md b/docs/key-files.md new file mode 100644 index 00000000000..f7d529ef7b5 --- /dev/null +++ b/docs/key-files.md @@ -0,0 +1,32 @@ +### What is where +There are a few resources available to understand how the collector works: +Firstly, there is a [diagram](docs/startup.md) documenting the startup flow of the collector. +Second, the OpenTelemetry Collector's [architecture docs](https://opentelemetry.io/docs/collector/architecture/) are +pretty useful. +Finally, here is a brief list of useful and/or important files that you may find valuable to glance through. +#### [collector.go](otelcol/collector.go) +This file contains the main Collector struct and its constructor `NewCollector`. + +`Collector.Run` starts the collector and sets up its lifecycle management loop. + +`setupConfigurationComponents` is the "main" function responsible for startup - it orchestrates the loading of the configuration, the creation of the graph, and the starting of all the components. + +#### [graph.go](service/internal/graph.go) +This file contains the internal graph representation of the pipelines. + +`Build` is the constructor for a Graph object. The method calls out to helpers that transform the graph from a config +to a DAG of components. The configuration undergoes additional validation here as well, and is used to instantiate +the components of the pipeline. + +`Graph.StartAll` starts every component in the pipelines. + +`Graph.ShutdownAll` stops each component in the pipelines + +#### [component.go](component/component.go) +component.go outlines the abstraction of components within OTEL collector. It provides details on the component lifecycle as well as defining the interface that components must fulfil. + +#### Factories +Each component type contains a Factory interface along with its corresponding NewFactory function. +Implementations of new components use this NewFactory function in their implementation to register key functions with the collector. For example, the collector uses this interface to give receivers a handle to a nextConsumer - representing where the receiver can send data to that it has received. + +[//]: # (todo rewrite this factories block it is not as clear as id like) From 9d1deee15a2648fee91d2829d86eb0d3488ddab5 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Wed, 24 Apr 2024 16:35:42 -0700 Subject: [PATCH 07/15] fix links --- docs/key-files.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/key-files.md b/docs/key-files.md index f7d529ef7b5..4b2435e77d1 100644 --- a/docs/key-files.md +++ b/docs/key-files.md @@ -1,17 +1,17 @@ ### What is where There are a few resources available to understand how the collector works: -Firstly, there is a [diagram](docs/startup.md) documenting the startup flow of the collector. +Firstly, there is a [diagram](startup.md) documenting the startup flow of the collector. Second, the OpenTelemetry Collector's [architecture docs](https://opentelemetry.io/docs/collector/architecture/) are pretty useful. Finally, here is a brief list of useful and/or important files that you may find valuable to glance through. -#### [collector.go](otelcol/collector.go) +#### [collector.go](../otelcol/collector.go) This file contains the main Collector struct and its constructor `NewCollector`. `Collector.Run` starts the collector and sets up its lifecycle management loop. `setupConfigurationComponents` is the "main" function responsible for startup - it orchestrates the loading of the configuration, the creation of the graph, and the starting of all the components. -#### [graph.go](service/internal/graph.go) +#### [graph.go](../service/internal/graph/graph.go) This file contains the internal graph representation of the pipelines. `Build` is the constructor for a Graph object. The method calls out to helpers that transform the graph from a config @@ -22,7 +22,7 @@ the components of the pipeline. `Graph.ShutdownAll` stops each component in the pipelines -#### [component.go](component/component.go) +#### [component.go](../component/component.go) component.go outlines the abstraction of components within OTEL collector. It provides details on the component lifecycle as well as defining the interface that components must fulfil. #### Factories From 7dea12c5f09832d4c54fabe3dc8c27846a2764b8 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Wed, 24 Apr 2024 16:38:43 -0700 Subject: [PATCH 08/15] newlines --- docs/key-files.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/docs/key-files.md b/docs/key-files.md index 4b2435e77d1..328525c1440 100644 --- a/docs/key-files.md +++ b/docs/key-files.md @@ -9,7 +9,8 @@ This file contains the main Collector struct and its constructor `NewCollector`. `Collector.Run` starts the collector and sets up its lifecycle management loop. -`setupConfigurationComponents` is the "main" function responsible for startup - it orchestrates the loading of the configuration, the creation of the graph, and the starting of all the components. +`setupConfigurationComponents` is the "main" function responsible for startup - it orchestrates the loading of the +configuration, the creation of the graph, and the starting of all the components. #### [graph.go](../service/internal/graph/graph.go) This file contains the internal graph representation of the pipelines. @@ -23,10 +24,13 @@ the components of the pipeline. `Graph.ShutdownAll` stops each component in the pipelines #### [component.go](../component/component.go) -component.go outlines the abstraction of components within OTEL collector. It provides details on the component lifecycle as well as defining the interface that components must fulfil. +component.go outlines the abstraction of components within OTEL collector. It provides details on the component +lifecycle as well as defining the interface that components must fulfil. #### Factories Each component type contains a Factory interface along with its corresponding NewFactory function. -Implementations of new components use this NewFactory function in their implementation to register key functions with the collector. For example, the collector uses this interface to give receivers a handle to a nextConsumer - representing where the receiver can send data to that it has received. +Implementations of new components use this NewFactory function in their implementation to register key functions with +the collector. For example, the collector uses this interface to give receivers a handle to a nextConsumer - +representing where the receiver can send data to that it has received. [//]: # (todo rewrite this factories block it is not as clear as id like) From 96290b0230a91226c9e01bd1baa06e55b91cfae3 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Thu, 25 Apr 2024 13:45:39 -0700 Subject: [PATCH 09/15] changes to key-files --- docs/key-files.md | 45 ++++++++++++++++++++++++++++++++++++++------- docs/startup.md | 32 -------------------------------- 2 files changed, 38 insertions(+), 39 deletions(-) delete mode 100644 docs/startup.md diff --git a/docs/key-files.md b/docs/key-files.md index 328525c1440..902a25b493a 100644 --- a/docs/key-files.md +++ b/docs/key-files.md @@ -1,9 +1,42 @@ -### What is where +## Collector internal architecture There are a few resources available to understand how the collector works: -Firstly, there is a [diagram](startup.md) documenting the startup flow of the collector. -Second, the OpenTelemetry Collector's [architecture docs](https://opentelemetry.io/docs/collector/architecture/) are -pretty useful. -Finally, here is a brief list of useful and/or important files that you may find valuable to glance through. +### [Startup Diagram](#startup-diagram) +### [Architecture Docs](https://opentelemetry.io/docs/collector/architecture/) +### [Important Files](#important-files) +### Startup Diagram +```mermaid +flowchart TD + A("`**command.NewCommand**`") -->|1| B("`**updateSettingsUsingFlags**`") + A --> |2| C("`**NewCollector** + Creates and returns a new instance of Collector`") + A --> |3| D("`**Collector.Run** + Starts the collector and waits for its completion. Also includes the control logic for config reloading and shutdown`") + D --> E("`**setupConfigurationComponents**`") + E --> |1| F("`**getConfMap**`") + E ---> |2| G("`**Service.New** + Initializes telemetry, then initializes the pipelines`") + E --> |3| Q("`**Service.Start** + 1. Start all extensions. + 2. Notify extensions about Collector configuration + 3. Start all pipelines. + 4. Notify extensions that the pipeline is ready. + `") + Q --> R("`**Graph.StartAll** + Calls Start on each component in reverse topological order`") + G --> H("`**initExtensionsAndPipeline** + Creates extensions and then builds the pipeline graph`") + H --> I("`**Graph.Build** + Converts the settings to an internal graph representation`") + I --> J("`**createNodes** + Builds the node objects from pipeline configuration and adds to graph. Also validates connectors`") + I --> K("`**createEdges** + Iterates through the pipelines and creates edges between components`") + I --> L("`**buildComponents** + Topological sort the graph, and create each component in reverse order`") + L --> M(Receiver Factory) & N(Processor Factory) & O(Exporter Factory) +``` +### Important Files +Here is a brief list of useful and/or important files that you may find valuable to glance through. #### [collector.go](../otelcol/collector.go) This file contains the main Collector struct and its constructor `NewCollector`. @@ -32,5 +65,3 @@ Each component type contains a Factory interface along with its corresponding Ne Implementations of new components use this NewFactory function in their implementation to register key functions with the collector. For example, the collector uses this interface to give receivers a handle to a nextConsumer - representing where the receiver can send data to that it has received. - -[//]: # (todo rewrite this factories block it is not as clear as id like) diff --git a/docs/startup.md b/docs/startup.md deleted file mode 100644 index 3e2c1689d37..00000000000 --- a/docs/startup.md +++ /dev/null @@ -1,32 +0,0 @@ -todo: build out config parts (getconfmap). document how settings and config get rendered -```mermaid -flowchart TD - A("`**command.NewCommand**`") -->|1| B("`**UpdateSettingsUsingFlags**`") - A --> |2| C("`**NewCollector** - Creates and returns a new instance of Collector`") - A --> |3| D("`**Collector.Run** - Starts the collector and waits for its completion. Also includes the control logic for config reloading and shutdown`") - D --> E("`**SetupConfigurationComponents**`") - E --> |1| F("`**getConfMap**`") - E ---> |2| G("`**Service.New** - Initializes telemetry and logging, then initializes the pipelines`") - E --> |3| Q("`**Service.Start** - 1. Start all extensions. - 2. Notify extensions about Collector configuration - 3. Start all pipelines. - 4. Notify extensions that the pipeline is ready. - `") - Q --> R("`**Graph.StartAll** - calls Start on each component in reverse topological order`") - G --> H("`**initExtensionsAndPipeline** - Creates extensions and then builds the pipeline graph`") - H --> I("`**Graph.Build** - Converts the settings to an internal graph representation`") - I --> J("`**createNodes** - Builds the node objects from pipeline configuration and adds to graph. Also validates connectors`") - I --> K("`**createEdges** - Iterates through the pipelines and creates edges between components`") - I --> L("`**buildComponents** - topological sort the graph, and create each component in reverse order`") - L --> M(Receiver Factory) & N(Processor Factory) & O(Exporter Factory) -``` \ No newline at end of file From f0aa46302e4a6c8bd22c0fb22fe737db767fc5d1 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Thu, 25 Apr 2024 13:46:15 -0700 Subject: [PATCH 10/15] rename file --- docs/{key-files.md => internal-architecture.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename docs/{key-files.md => internal-architecture.md} (100%) diff --git a/docs/key-files.md b/docs/internal-architecture.md similarity index 100% rename from docs/key-files.md rename to docs/internal-architecture.md From 393139a4bd0e4ae2d5c05bb499954e83309c8572 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Thu, 25 Apr 2024 13:53:46 -0700 Subject: [PATCH 11/15] fix specific comments --- service/internal/graph/graph.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 8956387940a..243b8b9d651 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -53,7 +53,7 @@ type Graph struct { telemetry servicetelemetry.TelemetrySettings } -// Build converts Settings into a full pipeline Graph +// Build builds a full pipeline graph. // Build also validates the configuration of the pipelines and does the actual initialization of each Component in the Graph func Build(ctx context.Context, set Settings) (*Graph, error) { pipelines := &Graph{ @@ -90,7 +90,7 @@ func (g *Graph) createNodes(set Settings) error { for pipelineID, pipelineCfg := range set.PipelineConfigs { pipe := g.pipelines[pipelineID] for _, recvID := range pipelineCfg.Receivers { - // checks if this receiver a connector or a regular receiver + // checks if this receiver is a connector or a regular receiver if set.ConnectorBuilder.IsConfigured(recvID) { connectors[recvID] = struct{}{} connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID) @@ -144,8 +144,7 @@ func (g *Graph) createNodes(set Settings) error { for expType := range expTypes { for recType := range recTypes { - // checks if the connector supports being a receiver of a certain datatype and an exporter of the other - // datatype - this is powered by the connector's metadata.yaml file + // Typechecks the connector's receiving and exporting datatypes. if connectorStability(connFactory, expType, recType) != component.StabilityLevelUndefined { expTypes[expType] = true recTypes[recType] = true @@ -298,7 +297,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { case *receiverNode: err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID())) case *processorNode: - // nextConsumers is guaranteed to be length 1. either it is the next processor or it's the fanout node for the exporters + // nextConsumers is guaranteed to be length 1. Either it is the next processor or it is the fanout node for the exporters err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0]) case *exporterNode: err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ExporterBuilder) From a815f53a42ec76ea08adbeb617294d32be58765a Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Thu, 25 Apr 2024 14:21:07 -0700 Subject: [PATCH 12/15] capitalizations and periods --- otelcol/collector.go | 4 ++-- otelcol/command.go | 2 +- service/internal/graph/graph.go | 32 ++++++++++++++++---------------- service/service.go | 10 +++++----- 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/otelcol/collector.go b/otelcol/collector.go index 9202a42fe20..43222855464 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -264,8 +264,8 @@ func (col *Collector) Run(ctx context.Context) error { signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM) } - // control loop: selects between channels for various interrupts - when this loop is broken, the collector exits - // if a configuration reload fails, we return without waiting for graceful shutdown + // Control loop: selects between channels for various interrupts - when this loop is broken, the collector exits. + // If a configuration reload fails, we return without waiting for graceful shutdown. LOOP: for { select { diff --git a/otelcol/command.go b/otelcol/command.go index f13440cb4b1..9db850bcc13 100644 --- a/otelcol/command.go +++ b/otelcol/command.go @@ -41,7 +41,7 @@ func NewCommand(set CollectorSettings) *cobra.Command { return rootCmd } -// puts command line flags from flags into the CollectorSettings, to be used during config resolution +// Puts command line flags from flags into the CollectorSettings, to be used during config resolution. func updateSettingsUsingFlags(set *CollectorSettings, flags *flag.FlagSet) error { if set.ConfigProvider == nil { resolverSet := &set.ConfigProviderSettings.ResolverSettings diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 243b8b9d651..a856219a123 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -54,7 +54,7 @@ type Graph struct { } // Build builds a full pipeline graph. -// Build also validates the configuration of the pipelines and does the actual initialization of each Component in the Graph +// Build also validates the configuration of the pipelines and does the actual initialization of each Component in the Graph. func Build(ctx context.Context, set Settings) (*Graph, error) { pipelines := &Graph{ componentGraph: simple.NewDirectedGraph(), @@ -75,18 +75,18 @@ func Build(ctx context.Context, set Settings) (*Graph, error) { return pipelines, pipelines.buildComponents(ctx, set) } -// Creates a node for each instance of a component and adds it to the graph -// Validates that connectors are configured to export and receive correctly +// Creates a node for each instance of a component and adds it to the graph. +// Validates that connectors are configured to export and receive correctly. func (g *Graph) createNodes(set Settings) error { - // Build a list of all connectors for easy reference + // Build a list of all connectors for easy reference. connectors := make(map[component.ID]struct{}) - // Keep track of connectors and where they are used. (map[connectorID][]pipelineID) + // Keep track of connectors and where they are used. (map[connectorID][]pipelineID). connectorsAsExporter := make(map[component.ID][]component.ID) connectorsAsReceiver := make(map[component.ID][]component.ID) - // build each pipelineNodes struct for each pipeline by parsing the pipelineCfg - // also populates the connectors, connectorsAsExporter and connectorsAsReceiver maps + // Build each pipelineNodes struct for each pipeline by parsing the pipelineCfg. + // Also populates the connectors, connectorsAsExporter and connectorsAsReceiver maps. for pipelineID, pipelineCfg := range set.PipelineConfigs { pipe := g.pipelines[pipelineID] for _, recvID := range pipelineCfg.Receivers { @@ -248,15 +248,15 @@ func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component return connNode } -// Iterates through the pipelines and creates edges between components +// Iterates through the pipelines and creates edges between components. func (g *Graph) createEdges() { for _, pg := range g.pipelines { - // draw edges from each receiver to the capability node + // Draw edges from each receiver to the capability node. for _, receiver := range pg.receivers { g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, pg.capabilitiesNode)) } - // iterates through processors, chaining them together. starts with the capabilities node + // Iterates through processors, chaining them together. starts with the capabilities node. var from, to graph.Node from = pg.capabilitiesNode for _, processor := range pg.processors { @@ -264,8 +264,8 @@ func (g *Graph) createEdges() { g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to)) from = processor } - // always inserts a fanout node before any exporters. if there is only one - // exporter, the fanout node is still created and acts as a noop + // Always inserts a fanout node before any exporters. If there is only one + // exporter, the fanout node is still created and acts as a noop. to = pg.fanOutNode g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to)) @@ -275,9 +275,9 @@ func (g *Graph) createEdges() { } } -// Uses the already built graph g to instantiate the actual components for each component of each pipeline -// Handles calling the factories for each component - and hooking up each component to the next -// Also calculates whether each pipeline mutates data so the receiver can know whether it needs to clone the data +// Uses the already built graph g to instantiate the actual components for each component of each pipeline. +// Handles calling the factories for each component - and hooking up each component to the next. +// Also calculates whether each pipeline mutates data so the receiver can know whether it needs to clone the data. func (g *Graph) buildComponents(ctx context.Context, set Settings) error { nodes, err := topo.Sort(g.componentGraph) if err != nil { @@ -297,7 +297,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { case *receiverNode: err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID())) case *processorNode: - // nextConsumers is guaranteed to be length 1. Either it is the next processor or it is the fanout node for the exporters + // nextConsumers is guaranteed to be length 1. Either it is the next processor or it is the fanout node for the exporters. err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0]) case *exporterNode: err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ExporterBuilder) diff --git a/service/service.go b/service/service.go index 6c836660012..28b90b4d11d 100644 --- a/service/service.go +++ b/service/service.go @@ -73,9 +73,9 @@ type Service struct { } // New has a few responsibilities: -// 1. initializes the Service function given a complete set of Settings -// 2. sets up internal facing telemetry -// 3. builds the graph from the settings +// 1. Initializes the Service function given a complete set of Settings. +// 2. Sets up internal facing telemetry. +// 3. Builds the graph from the settings. func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { disableHighCard := obsreportconfig.DisableHighCardinalityMetricsfeatureGate.IsEnabled() extendedConfig := obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() @@ -97,7 +97,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { return nil, fmt.Errorf("failed to get logger: %w", err) } - // fetch data for internal telemetry like instance id and sdk version to provide for internal telemetry + // Fetch data for internal telemetry like instance id and sdk version to provide for internal telemetry. res := resource.New(set.BuildInfo, cfg.Telemetry.Resource) pcommonRes := pdataFromSdk(res) @@ -253,7 +253,7 @@ func (srv *Service) Shutdown(ctx context.Context) error { return errs } -// creates extensions and then builds the pipeline graph +// Creates extensions and then builds the pipeline graph. func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings, cfg Config) error { var err error extensionsSettings := extensions.Settings{ From b2f7060094de961d467df61ee513c532bad5ab74 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Thu, 25 Apr 2024 14:33:48 -0700 Subject: [PATCH 13/15] reduce public facing documentation on impl details --- docs/internal-architecture.md | 4 ++-- otelcol/collector.go | 1 + service/service.go | 5 +---- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/docs/internal-architecture.md b/docs/internal-architecture.md index 902a25b493a..b6018a9bc99 100644 --- a/docs/internal-architecture.md +++ b/docs/internal-architecture.md @@ -10,7 +10,7 @@ flowchart TD A --> |2| C("`**NewCollector** Creates and returns a new instance of Collector`") A --> |3| D("`**Collector.Run** - Starts the collector and waits for its completion. Also includes the control logic for config reloading and shutdown`") + Starts the collector and blocks until it shuts down`") D --> E("`**setupConfigurationComponents**`") E --> |1| F("`**getConfMap**`") E ---> |2| G("`**Service.New** @@ -40,7 +40,7 @@ Here is a brief list of useful and/or important files that you may find valuable #### [collector.go](../otelcol/collector.go) This file contains the main Collector struct and its constructor `NewCollector`. -`Collector.Run` starts the collector and sets up its lifecycle management loop. +`Collector.Run` starts the collector blocks until it shuts down. `setupConfigurationComponents` is the "main" function responsible for startup - it orchestrates the loading of the configuration, the creation of the graph, and the starting of all the components. diff --git a/otelcol/collector.go b/otelcol/collector.go index 43222855464..1d907124457 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -249,6 +249,7 @@ func (col *Collector) DryRun(ctx context.Context) error { // Run starts the collector according to the given configuration, and waits for it to complete. // Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down. +// Sets up the control logic for config reloading and shutdown. func (col *Collector) Run(ctx context.Context) error { if err := col.setupConfigurationComponents(ctx); err != nil { col.setCollectorState(StateClosed) diff --git a/service/service.go b/service/service.go index 28b90b4d11d..902454c78f1 100644 --- a/service/service.go +++ b/service/service.go @@ -72,10 +72,7 @@ type Service struct { collectorConf *confmap.Conf } -// New has a few responsibilities: -// 1. Initializes the Service function given a complete set of Settings. -// 2. Sets up internal facing telemetry. -// 3. Builds the graph from the settings. +// New creates a new Service, its telemetry, and Components. func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { disableHighCard := obsreportconfig.DisableHighCardinalityMetricsfeatureGate.IsEnabled() extendedConfig := obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() From 31f2bdf79cedb20800b2c9aa601c4ef583e612e8 Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Wed, 1 May 2024 17:54:27 -0400 Subject: [PATCH 14/15] remove internal architecture doc --- docs/internal-architecture.md | 67 ----------------------------------- 1 file changed, 67 deletions(-) delete mode 100644 docs/internal-architecture.md diff --git a/docs/internal-architecture.md b/docs/internal-architecture.md deleted file mode 100644 index b6018a9bc99..00000000000 --- a/docs/internal-architecture.md +++ /dev/null @@ -1,67 +0,0 @@ -## Collector internal architecture -There are a few resources available to understand how the collector works: -### [Startup Diagram](#startup-diagram) -### [Architecture Docs](https://opentelemetry.io/docs/collector/architecture/) -### [Important Files](#important-files) -### Startup Diagram -```mermaid -flowchart TD - A("`**command.NewCommand**`") -->|1| B("`**updateSettingsUsingFlags**`") - A --> |2| C("`**NewCollector** - Creates and returns a new instance of Collector`") - A --> |3| D("`**Collector.Run** - Starts the collector and blocks until it shuts down`") - D --> E("`**setupConfigurationComponents**`") - E --> |1| F("`**getConfMap**`") - E ---> |2| G("`**Service.New** - Initializes telemetry, then initializes the pipelines`") - E --> |3| Q("`**Service.Start** - 1. Start all extensions. - 2. Notify extensions about Collector configuration - 3. Start all pipelines. - 4. Notify extensions that the pipeline is ready. - `") - Q --> R("`**Graph.StartAll** - Calls Start on each component in reverse topological order`") - G --> H("`**initExtensionsAndPipeline** - Creates extensions and then builds the pipeline graph`") - H --> I("`**Graph.Build** - Converts the settings to an internal graph representation`") - I --> J("`**createNodes** - Builds the node objects from pipeline configuration and adds to graph. Also validates connectors`") - I --> K("`**createEdges** - Iterates through the pipelines and creates edges between components`") - I --> L("`**buildComponents** - Topological sort the graph, and create each component in reverse order`") - L --> M(Receiver Factory) & N(Processor Factory) & O(Exporter Factory) -``` -### Important Files -Here is a brief list of useful and/or important files that you may find valuable to glance through. -#### [collector.go](../otelcol/collector.go) -This file contains the main Collector struct and its constructor `NewCollector`. - -`Collector.Run` starts the collector blocks until it shuts down. - -`setupConfigurationComponents` is the "main" function responsible for startup - it orchestrates the loading of the -configuration, the creation of the graph, and the starting of all the components. - -#### [graph.go](../service/internal/graph/graph.go) -This file contains the internal graph representation of the pipelines. - -`Build` is the constructor for a Graph object. The method calls out to helpers that transform the graph from a config -to a DAG of components. The configuration undergoes additional validation here as well, and is used to instantiate -the components of the pipeline. - -`Graph.StartAll` starts every component in the pipelines. - -`Graph.ShutdownAll` stops each component in the pipelines - -#### [component.go](../component/component.go) -component.go outlines the abstraction of components within OTEL collector. It provides details on the component -lifecycle as well as defining the interface that components must fulfil. - -#### Factories -Each component type contains a Factory interface along with its corresponding NewFactory function. -Implementations of new components use this NewFactory function in their implementation to register key functions with -the collector. For example, the collector uses this interface to give receivers a handle to a nextConsumer - -representing where the receiver can send data to that it has received. From 996ef1bbc2bc69ec761de8a684f44f3adf0b885f Mon Sep 17 00:00:00 2001 From: Ankit Patel Date: Wed, 1 May 2024 17:57:40 -0400 Subject: [PATCH 15/15] tweaks to comments --- confmap/confmap.go | 23 ++++++++++++++--------- service/internal/graph/graph.go | 2 +- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/confmap/confmap.go b/confmap/confmap.go index f3e0471e1d6..655ccc07315 100644 --- a/confmap/confmap.go +++ b/confmap/confmap.go @@ -245,30 +245,35 @@ func expandNilStructPointersHookFunc() mapstructure.DecodeHookFuncValue { // This is needed in combination with ComponentID, which may produce equal IDs for different strings, // and an error needs to be returned in that case, otherwise the last equivalent ID overwrites the previous one. func mapKeyStringToMapKeyTextUnmarshalerHookFunc() mapstructure.DecodeHookFuncType { - return func(f reflect.Type, t reflect.Type, data any) (any, error) { - if f.Kind() != reflect.Map || f.Key().Kind() != reflect.String { + return func(from reflect.Type, to reflect.Type, data any) (any, error) { + if from.Kind() != reflect.Map || from.Key().Kind() != reflect.String { return data, nil } - if t.Kind() != reflect.Map { + if to.Kind() != reflect.Map { return data, nil } - if _, ok := reflect.New(t.Key()).Interface().(encoding.TextUnmarshaler); !ok { + // Checks that the key type of to implements the TextUnmarshaler interface. + if _, ok := reflect.New(to.Key()).Interface().(encoding.TextUnmarshaler); !ok { return data, nil } - m := reflect.MakeMap(reflect.MapOf(t.Key(), reflect.TypeOf(true))) + // Create a map with key value of to's key to bool. + fieldNameSet := reflect.MakeMap(reflect.MapOf(to.Key(), reflect.TypeOf(true))) for k := range data.(map[string]any) { - tKey := reflect.New(t.Key()) + // Create a new value of the to's key type. + tKey := reflect.New(to.Key()) + + // Use tKey to unmarshal the key of the map. if err := tKey.Interface().(encoding.TextUnmarshaler).UnmarshalText([]byte(k)); err != nil { return nil, err } - - if m.MapIndex(reflect.Indirect(tKey)).IsValid() { + // Checks if the key has already been decoded in a previous iteration. + if fieldNameSet.MapIndex(reflect.Indirect(tKey)).IsValid() { return nil, fmt.Errorf("duplicate name %q after unmarshaling %v", k, tKey) } - m.SetMapIndex(reflect.Indirect(tKey), reflect.ValueOf(true)) + fieldNameSet.SetMapIndex(reflect.Indirect(tKey), reflect.ValueOf(true)) } return data, nil } diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index a856219a123..525c269a6de 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -90,7 +90,7 @@ func (g *Graph) createNodes(set Settings) error { for pipelineID, pipelineCfg := range set.PipelineConfigs { pipe := g.pipelines[pipelineID] for _, recvID := range pipelineCfg.Receivers { - // checks if this receiver is a connector or a regular receiver + // Checks if this receiver is a connector or a regular receiver. if set.ConnectorBuilder.IsConfigured(recvID) { connectors[recvID] = struct{}{} connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)