Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Documentation improvements - Comments in key functions #10029

Merged
merged 15 commits into from
May 2, 2024
1 change: 0 additions & 1 deletion confmap/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func NewResolver(set ResolverSettings) (*Resolver, error) {
}

// Resolve returns the configuration as a Conf, or error otherwise.
//
// Should never be called concurrently with itself, Watch or Shutdown.
func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) {
// First check if already an active watching, close that if any.
Expand Down
36 changes: 36 additions & 0 deletions docs/key-files.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this to the otelcol folder as its README.md page. We have other examples of internal documentation being written in this way. This doc talks about files in several components, but I feel like since the entrypoint is in otelcol that's the best place to put it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the other examples of docs like this? I saw confmap had an excellent README - are there others?

I also don't know if I agree with moving it to the otelcol/ directory - I think we should keep this as a separate file that we link to to from CONTRIBUTING.md. Some of the open source projects that I've taken some inspiration from in this section are Redis and sqllite - both of which are known as well documented code bases. Those two projects have a similar section in their main README - I'm not sure if we would want to do that but I do think it should be more prominently located.

I would also love some input as to what other files you think fit into this theme - I'm pretty sure this isnt a great list and I'd love to add a few more sections (or someone could of course contribute those sections)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should continue discussing over at #10068, but I will reply here to this comment just so that the convo is in one place.


What are the other examples of docs like this? I saw confmap had an excellent README - are there others?

Some other README examples:

I also don't know if I agree with moving it to the otelcol/ directory - I think we should keep this as a separate file that we link to to from CONTRIBUTING.md. Some of the open source projects that I've taken some inspiration from in this section are Redis and sqllite - both of which are known as well documented code bases. Those two projects have a similar section in their main README - I'm not sure if we would want to do that but I do think it should be more prominently located.

I am fine with internal-architecture.md, let's go with that! We used to have architecture.md for https://opentelemetry.io/docs/collector/architecture/, this one can be developer-focused and have implementation details.

I would also love some input as to what other files you think fit into this theme - I'm pretty sure this isnt a great list and I'd love to add a few more sections (or someone could of course contribute those sections)

I think your list is a good start when it comes to the general structure of the Collector.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
### What is where
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
There are a few resources available to understand how the collector works:
Firstly, there is a [diagram](startup.md) documenting the startup flow of the collector.
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
Second, the OpenTelemetry Collector's [architecture docs](https://opentelemetry.io/docs/collector/architecture/) are
pretty useful.
Finally, here is a brief list of useful and/or important files that you may find valuable to glance through.
#### [collector.go](../otelcol/collector.go)
This file contains the main Collector struct and its constructor `NewCollector`.

`Collector.Run` starts the collector and sets up its lifecycle management loop.
mx-psi marked this conversation as resolved.
Show resolved Hide resolved

`setupConfigurationComponents` is the "main" function responsible for startup - it orchestrates the loading of the
configuration, the creation of the graph, and the starting of all the components.

#### [graph.go](../service/internal/graph/graph.go)
This file contains the internal graph representation of the pipelines.

`Build` is the constructor for a Graph object. The method calls out to helpers that transform the graph from a config
to a DAG of components. The configuration undergoes additional validation here as well, and is used to instantiate
the components of the pipeline.

`Graph.StartAll` starts every component in the pipelines.

`Graph.ShutdownAll` stops each component in the pipelines
mx-psi marked this conversation as resolved.
Show resolved Hide resolved

#### [component.go](../component/component.go)
component.go outlines the abstraction of components within OTEL collector. It provides details on the component
lifecycle as well as defining the interface that components must fulfil.
mx-psi marked this conversation as resolved.
Show resolved Hide resolved

#### Factories
Each component type contains a Factory interface along with its corresponding NewFactory function.
Implementations of new components use this NewFactory function in their implementation to register key functions with
the collector. For example, the collector uses this interface to give receivers a handle to a nextConsumer -
representing where the receiver can send data to that it has received.

[//]: # (todo rewrite this factories block it is not as clear as id like)
32 changes: 32 additions & 0 deletions docs/startup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
todo: build out config parts (getconfmap). document how settings and config get rendered
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
```mermaid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should omit the private API bits or make them a bit more ambiguous (just describing what they do instead of giving them explicit names). I think that would be easier to maintain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely think we need to still include them in some form. I totally agree with your concerns about maintainability - but these top level private API bits are very important for how the program actually works. Without the names the diagram can't serve as a guide to reading the source code at all - I guess its the difference between documentation designed for a user of the Collector API and someone who wants to start contributing.

flowchart TD
A("`**command.NewCommand**`") -->|1| B("`**UpdateSettingsUsingFlags**`")
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
A --> |2| C("`**NewCollector**
Creates and returns a new instance of Collector`")
A --> |3| D("`**Collector.Run**
Starts the collector and waits for its completion. Also includes the control logic for config reloading and shutdown`")
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
D --> E("`**SetupConfigurationComponents**`")
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
E --> |1| F("`**getConfMap**`")
E ---> |2| G("`**Service.New**
Initializes telemetry and logging, then initializes the pipelines`")
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
E --> |3| Q("`**Service.Start**
1. Start all extensions.
2. Notify extensions about Collector configuration
3. Start all pipelines.
4. Notify extensions that the pipeline is ready.
`")
Q --> R("`**Graph.StartAll**
calls Start on each component in reverse topological order`")
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
G --> H("`**initExtensionsAndPipeline**
Creates extensions and then builds the pipeline graph`")
H --> I("`**Graph.Build**
Converts the settings to an internal graph representation`")
I --> J("`**createNodes**
Builds the node objects from pipeline configuration and adds to graph. Also validates connectors`")
I --> K("`**createEdges**
Iterates through the pipelines and creates edges between components`")
I --> L("`**buildComponents**
topological sort the graph, and create each component in reverse order`")
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
L --> M(Receiver Factory) & N(Processor Factory) & O(Exporter Factory)
```
3 changes: 3 additions & 0 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
}

col.serviceConfig = &cfg.Service

col.service, err = service.New(ctx, service.Settings{
BuildInfo: col.set.BuildInfo,
CollectorConf: conf,
Expand Down Expand Up @@ -263,6 +264,8 @@ func (col *Collector) Run(ctx context.Context) error {
signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM)
}

// control loop: selects between channels for various interrupts - when this loop is broken, the collector exits
// if a configuration reload fails, we return without waiting for graceful shutdown
LOOP:
for {
select {
Expand Down
1 change: 1 addition & 0 deletions otelcol/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func NewCommand(set CollectorSettings) *cobra.Command {
return rootCmd
}

// puts command line flags from flags into the CollectorSettings, to be used during config resolution
func updateSettingsUsingFlags(set *CollectorSettings, flags *flag.FlagSet) error {
if set.ConfigProvider == nil {
resolverSet := &set.ConfigProviderSettings.ResolverSettings
Expand Down
17 changes: 17 additions & 0 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Graph struct {
telemetry servicetelemetry.TelemetrySettings
}

// Build converts Settings into a full pipeline Graph
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
// Build also validates the configuration of the pipelines and does the actual initialization of each Component in the Graph
func Build(ctx context.Context, set Settings) (*Graph, error) {
pipelines := &Graph{
componentGraph: simple.NewDirectedGraph(),
Expand All @@ -74,6 +76,7 @@ func Build(ctx context.Context, set Settings) (*Graph, error) {
}

// Creates a node for each instance of a component and adds it to the graph
// Validates that connectors are configured to export and receive correctly
func (g *Graph) createNodes(set Settings) error {
// Build a list of all connectors for easy reference
connectors := make(map[component.ID]struct{})
Expand All @@ -82,9 +85,12 @@ func (g *Graph) createNodes(set Settings) error {
connectorsAsExporter := make(map[component.ID][]component.ID)
connectorsAsReceiver := make(map[component.ID][]component.ID)

// build each pipelineNodes struct for each pipeline by parsing the pipelineCfg
// also populates the connectors, connectorsAsExporter and connectorsAsReceiver maps
for pipelineID, pipelineCfg := range set.PipelineConfigs {
pipe := g.pipelines[pipelineID]
for _, recvID := range pipelineCfg.Receivers {
// checks if this receiver a connector or a regular receiver
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
if set.ConnectorBuilder.IsConfigured(recvID) {
connectors[recvID] = struct{}{}
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
Expand Down Expand Up @@ -138,6 +144,8 @@ func (g *Graph) createNodes(set Settings) error {

for expType := range expTypes {
for recType := range recTypes {
// checks if the connector supports being a receiver of a certain datatype and an exporter of the other
// datatype - this is powered by the connector's metadata.yaml file
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
if connectorStability(connFactory, expType, recType) != component.StabilityLevelUndefined {
expTypes[expType] = true
recTypes[recType] = true
Expand Down Expand Up @@ -241,19 +249,24 @@ func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component
return connNode
}

// Iterates through the pipelines and creates edges between components
func (g *Graph) createEdges() {
for _, pg := range g.pipelines {
// draw edges from each receiver to the capability node
for _, receiver := range pg.receivers {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, pg.capabilitiesNode))
}

// iterates through processors, chaining them together. starts with the capabilities node
var from, to graph.Node
from = pg.capabilitiesNode
for _, processor := range pg.processors {
to = processor
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))
from = processor
}
// always inserts a fanout node before any exporters. if there is only one
// exporter, the fanout node is still created and acts as a noop
to = pg.fanOutNode
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))

Expand All @@ -263,6 +276,9 @@ func (g *Graph) createEdges() {
}
}

// Uses the already built graph g to instantiate the actual components for each component of each pipeline
// Handles calling the factories for each component - and hooking up each component to the next
// Also calculates whether each pipeline mutates data so the receiver can know whether it needs to clone the data
func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
Expand All @@ -282,6 +298,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
case *receiverNode:
err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID()))
case *processorNode:
// nextConsumers is guaranteed to be length 1. either it is the next processor or it's the fanout node for the exporters
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0])
case *exporterNode:
err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ExporterBuilder)
Expand Down
9 changes: 8 additions & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"go.opentelemetry.io/collector/service/telemetry"
)

// Settings holds configuration for building a new service.
// Settings holds configuration for building a new Service.
type Settings struct {
// BuildInfo provides collector start information.
BuildInfo component.BuildInfo
Expand Down Expand Up @@ -72,6 +72,10 @@ type Service struct {
collectorConf *confmap.Conf
}

// New has a few responsibilities:
// 1. initializes the Service function given a complete set of Settings
// 2. sets up internal facing telemetry
// 3. builds the graph from the settings
ankitpatel96 marked this conversation as resolved.
Show resolved Hide resolved
func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
disableHighCard := obsreportconfig.DisableHighCardinalityMetricsfeatureGate.IsEnabled()
extendedConfig := obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled()
Expand All @@ -92,6 +96,8 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
if err != nil {
return nil, fmt.Errorf("failed to get logger: %w", err)
}

// fetch data for internal telemetry like instance id and sdk version to provide for internal telemetry
res := resource.New(set.BuildInfo, cfg.Telemetry.Resource)
pcommonRes := pdataFromSdk(res)

Expand Down Expand Up @@ -247,6 +253,7 @@ func (srv *Service) Shutdown(ctx context.Context) error {
return errs
}

// creates extensions and then builds the pipeline graph
func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings, cfg Config) error {
var err error
extensionsSettings := extensions.Settings{
Expand Down
Loading