Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric Push Controller and basic config #599

Merged
merged 25 commits into from
Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 29 additions & 31 deletions samples/Exporters/Console/TestPrometheus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Exporter.Prometheus;
using OpenTelemetry.Metrics;
using OpenTelemetry.Metrics.Configuration;
using OpenTelemetry.Metrics.Export;
using OpenTelemetry.Trace;
Expand All @@ -30,49 +29,48 @@ internal class TestPrometheus
{
internal static object Run()
{
// Create and Setup Prometheus Exporter
var promOptions = new PrometheusExporterOptions() { Url = "http://localhost:9184/metrics/" };
var promExporter = new PrometheusExporter(promOptions);
var simpleProcessor = new UngroupedBatcher(promExporter, TimeSpan.FromSeconds(5));
var meter = MeterFactory.Create(simpleProcessor).GetMeter("library1");
var testCounter = meter.CreateInt64Counter("testCounter");
var metricsHttpServer = new PrometheusExporterMetricsHttpServer(promExporter);
metricsHttpServer.Start();

// Creater Processor (called Batcher in Metric spec, this is still not decided)
var processor = new UngroupedBatcher();

// MeterFactory is from where one can obtain Meters.
// All meters from this factory will be configured with the common processor.
var meterFactory = MeterFactory.Create(mb =>
{
mb.SetMetricProcessor(processor);
mb.SetMetricExporter(promExporter);
mb.SetMetricPushInterval(TimeSpan.FromSeconds(30));
});

// Obtain a Meter. Libraries would pass their name as argument.
var meter = meterFactory.GetMeter("MyMeter");

// the rest is purely from Metric API.
var testCounter = meter.CreateInt64Counter("MyCounter");
var testMeasure = meter.CreateInt64Measure("MyMeasure");
var labels1 = new List<KeyValuePair<string, string>>();
labels1.Add(new KeyValuePair<string, string>("dim1", "value1"));

var labels2 = new List<KeyValuePair<string, string>>();
labels2.Add(new KeyValuePair<string, string>("dim1", "value2"));

var httpServer = new PrometheusExporterMetricsHttpServer(promExporter);
var defaultContext = default(SpanContext);
try
{
httpServer.Start();

for (int i = 0; i < 1000; i++)
{
testCounter.Add(defaultContext, 100, meter.GetLabelSet(labels1));
testCounter.Add(defaultContext, 10, meter.GetLabelSet(labels1));
testCounter.Add(defaultContext, 200, meter.GetLabelSet(labels2));
testCounter.Add(defaultContext, 10, meter.GetLabelSet(labels2));

if (i % 10 == 0)
{
// Collect is called here explicitly as there is
// no controller implementation yet.
// TODO: There should be no need to cast to MeterSdk.
(meter as MeterSdk).Collect();
}

Task.Delay(1000).Wait();
}
}
finally
// TODO: This sample runs indefinitely. Replace with actual shutdown logic.
while (true)
{
Task.Delay(3000).Wait();
httpServer.Stop();
testCounter.Add(defaultContext, 100, meter.GetLabelSet(labels1));
testMeasure.Record(defaultContext, 100, meter.GetLabelSet(labels1));

Task.Delay(100).Wait();
}

return null;
// Stopping
// metricsHttpServer.Stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void Write(StreamWriter writer)
writer.Write("\n");
}

if (string.IsNullOrEmpty(this.type))
if (!string.IsNullOrEmpty(this.type))
{
// If the token is TYPE, exactly two more tokens are expected. The first is the
// metric name, and the second is either counter, gauge, histogram, summary, or
Expand All @@ -125,8 +125,9 @@ public void Write(StreamWriter writer)
// before the first sample is reported for that metric name. If there is no TYPE
// line for a metric name, the type is set to untyped.

writer.Write("# HELP ");
writer.Write("# TYPE ");
writer.Write(this.name);
writer.Write(" ");
writer.Write(this.type);
writer.Write("\n");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public PrometheusExporter(PrometheusExporterOptions options)
private List<Metric<double>> DoubleMetrics { get; set; }

/// <inheritdoc/>
public override Task<ExportResult> ExportAsync<T>(List<Metric<T>> metrics, CancellationToken cancellationToken)
public override Task<ExportResult> ExportAsync<T>(IEnumerable<Metric<T>> metrics, CancellationToken cancellationToken)
{
// Prometheus uses a pull model, not a push.
// Accumulate the exported metrics internally, return success.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public static string GetMetricsCollection(this PrometheusExporter exporter)
using var stream = new MemoryStream();
using var writer = new StreamWriter(stream);
WriteMetricsCollection(exporter, writer);
writer.Flush();

return Encoding.UTF8.GetString(stream.ToArray(), 0, (int)stream.Length);
}
Expand Down
40 changes: 40 additions & 0 deletions src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System.Diagnostics.Tracing;
using System.Globalization;
using System.Threading;
using OpenTelemetry.Metrics.Export;
using OpenTelemetry.Trace.Export;

namespace OpenTelemetry.Internal
Expand Down Expand Up @@ -84,6 +85,15 @@ public void TracestateValueIsInvalid(ReadOnlySpan<char> value)
}
}

[NonEvent]
public void MetricControllerException(Exception ex)
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
{
if (this.IsEnabled(EventLevel.Warning, (EventKeywords)(-1)))
{
this.MetricControllerException(ToInvariantString(ex));
}
}

[Event(1, Message = "Span processor queue size reached maximum. Throttling spans.", Level = EventLevel.Warning)]
public void SpanProcessorQueueIsExhausted()
{
Expand Down Expand Up @@ -180,6 +190,36 @@ public void MetricObserverCallbackError(string metricName, string exception)
this.WriteEvent(16, metricName, exception);
}

[Event(17, Message = "Batcher finished collection with '{0}' metrics.", Level = EventLevel.Informational)]
public void BatcherCollectionCompleted(int count)
{
this.WriteEvent(17, count);
}

[Event(18, Message = "Collection completed in '{0}' msecs.", Level = EventLevel.Informational)]
public void CollectionCompleted(long msec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems like an unnecessary conversion of StopWatch to milliseconds. Would ticks work here?

{
this.WriteEvent(18, msec);
}

[Event(19, Message = "Exception occurred in Metric Controller while processing metrics from one Collect cycle. This does not shutdown controller and subsequent collections will be done. Exception: '{0}'", Level = EventLevel.Warning)]
public void MetricControllerException(string exception)
{
this.WriteEvent(19, exception);
}

[Event(20, Message = "Meter Collect Invoked for Meter: '{0}'", Level = EventLevel.Verbose)]
public void MeterCollectInvoked(string meterName)
{
this.WriteEvent(20, meterName);
}

[Event(21, Message = "Metric Export failed with error '{0}'.", Level = EventLevel.Warning)]
public void MetricExporterErrorResult(int exportResult)
{
this.WriteEvent(21, exportResult);
}

/// <summary>
/// Returns a culture-independent string representation of the given <paramref name="exception"/> object,
/// appropriate for diagnostics tracing.
Expand Down
67 changes: 67 additions & 0 deletions src/OpenTelemetry/Metrics/Configuration/MeterBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// <copyright file="MeterBuilder.cs" company="OpenTelemetry Authors">
// Copyright 2018, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using OpenTelemetry.Metrics.Export;

namespace OpenTelemetry.Metrics.Configuration
{
public class MeterBuilder
{
internal MeterBuilder()
{
}

internal MetricProcessor MetricProcessor { get; private set; }

internal MetricExporter MetricExporter { get; private set; }

internal TimeSpan MetricPushInterval { get; private set; }

/// <summary>
/// Configures metric processor. (aka batcher).
/// </summary>
/// <param name="metricProcessor">MetricProcessor instance.</param>
/// <returns>The meter builder instance for chaining.</returns>
public MeterBuilder SetMetricProcessor(MetricProcessor metricProcessor)
{
this.MetricProcessor = metricProcessor;
return this;
}

/// <summary>
/// Configures Metric Exporter.
/// </summary>
/// <param name="metricExporter">MetricExporter instance.</param>
/// <returns>The meter builder instance for chaining.</returns>
public MeterBuilder SetMetricExporter(MetricExporter metricExporter)
{
this.MetricExporter = metricExporter;
return this;
}

/// <summary>
/// Sets the push interval.
/// </summary>
/// <param name="pushInterval">push interval.</param>
/// <returns>The meter builder instance for chaining.</returns>
public MeterBuilder SetMetricPushInterval(TimeSpan pushInterval)
{
this.MetricPushInterval = pushInterval;
return this;
}
}
}
45 changes: 31 additions & 14 deletions src/OpenTelemetry/Metrics/Configuration/MeterFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,52 @@
// limitations under the License.
// </copyright>

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using OpenTelemetry.Metrics.Export;

namespace OpenTelemetry.Metrics.Configuration
{
public class MeterFactory : MeterFactoryBase
{
// TODO: make MeterFactory IDisposable to call Dispose on Exporter/Controller.
private readonly object lck = new object();
private readonly Dictionary<MeterRegistryKey, Meter> meterRegistry = new Dictionary<MeterRegistryKey, Meter>();
private readonly Dictionary<MeterRegistryKey, MeterSdk> meterRegistry = new Dictionary<MeterRegistryKey, MeterSdk>();
private readonly MetricProcessor metricProcessor;
private Meter defaultMeter;
private readonly MetricExporter metricExporter;
private readonly PushMetricController pushMetricController;
private readonly TimeSpan defaultPushInterval = TimeSpan.FromSeconds(60);
private MeterSdk defaultMeter;

private MeterFactory(MetricProcessor metricProcessor)
private MeterFactory(MeterBuilder meterBuilder)
{
if (metricProcessor == null)
{
this.metricProcessor = new NoOpMetricProcessor();
}
else
{
this.metricProcessor = metricProcessor;
}
this.metricProcessor = meterBuilder.MetricProcessor ?? new NoOpMetricProcessor();
this.metricExporter = meterBuilder.MetricExporter ?? new NoOpMetricExporter();

// We only have PushMetricController now with only configurable thing being the push interval
this.pushMetricController = new PushMetricController(
this.meterRegistry,
this.metricProcessor,
this.metricExporter,
meterBuilder.MetricPushInterval == default(TimeSpan) ? this.defaultPushInterval : meterBuilder.MetricPushInterval,
new CancellationTokenSource());

this.defaultMeter = new MeterSdk(string.Empty, this.metricProcessor);
}

public static MeterFactory Create(MetricProcessor metricProcessor)
public static MeterFactory Create(Action<MeterBuilder> configure)
{
return new MeterFactory(metricProcessor);
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

var builder = new MeterBuilder();
configure(builder);

return new MeterFactory(builder);
}

public override Meter GetMeter(string name, string version = null)
Expand Down Expand Up @@ -77,7 +94,7 @@ private static IEnumerable<KeyValuePair<string, string>> CreateLibraryResourceLa
return labels;
}

private readonly struct MeterRegistryKey
internal readonly struct MeterRegistryKey
{
private readonly string name;
private readonly string version;
Expand Down
2 changes: 1 addition & 1 deletion src/OpenTelemetry/Metrics/Export/MetricExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ public enum ExportResult
FailedRetryable = 2,
}

public abstract Task<ExportResult> ExportAsync<T>(List<Metric<T>> metrics, CancellationToken cancellationToken);
public abstract Task<ExportResult> ExportAsync<T>(IEnumerable<Metric<T>> metrics, CancellationToken cancellationToken);
}
}
15 changes: 13 additions & 2 deletions src/OpenTelemetry/Metrics/Export/MetricProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@
// limitations under the License.
// </copyright>

using System;
using System.Collections.Generic;
using OpenTelemetry.Metrics.Aggregators;

namespace OpenTelemetry.Metrics.Export
{
public abstract class MetricProcessor
{
/// <summary>
/// Process the metric.
/// Process the metric. This method is called once every collection interval.
/// </summary>
/// <param name="meterName">the name of the meter, used as a namespace for the metric instruments.</param>
/// <param name="metricName">the name of the instrument.</param>
Expand All @@ -30,12 +32,21 @@ public abstract class MetricProcessor
public abstract void Process(string meterName, string metricName, LabelSet labelSet, Aggregator<long> aggregator);

/// <summary>
/// Process the metric.
/// Process the metric. This method is called once every collection interval.
/// </summary>
/// <param name="meterName">the name of the meter, used as a namespace for the metric instruments.</param>
/// <param name="metricName">the name of the instrument.</param>
/// <param name="labelSet">the labelSet associated with the instrument.</param>
/// <param name="aggregator">the aggregator used.</param>
public abstract void Process(string meterName, string metricName, LabelSet labelSet, Aggregator<double> aggregator);

/// <summary>
/// Finish the current collection cycle and return the metrics it holds.
/// This is called at the end of one collection cycle by the Controller.
/// MetricProcessor can use this to clear its Metrics (in case of stateless).
/// </summary>
/// <param name="longMetrics">The list of long metrics from this cycle, which are to be exported.</param>
/// <param name="doubleMetrics">The list of double metrics from this cycle, which are to be exported.</param>
public abstract void FinishCollectionCycle(out IEnumerable<Metric<long>> longMetrics, out IEnumerable<Metric<double>> doubleMetrics);
}
}
Loading