Skip to content

Commit

Permalink
[Metrics SDK] Metric aggregation temporality controls (open-telemetry…
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Aug 8, 2022
1 parent 124b198 commit 8b61c99
Show file tree
Hide file tree
Showing 25 changed files with 213 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,24 @@ class OStreamMetricExporter final : public opentelemetry::sdk::metrics::MetricEx
* export() function will send metrics data into.
* The default ostream is set to stdout
*/
explicit OStreamMetricExporter(std::ostream &sout = std::cout) noexcept;
explicit OStreamMetricExporter(std::ostream &sout = std::cout,
sdk::metrics::AggregationTemporality aggregation_temporality =
sdk::metrics::AggregationTemporality::kCumulative) noexcept;

/**
* Export
* @param data metrics data
*/
sdk::common::ExportResult Export(const sdk::metrics::ResourceMetrics &data) noexcept override;

/**
* Get the AggregationTemporality for ostream exporter
*
* @return AggregationTemporality
*/
sdk::metrics::AggregationTemporality GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept override;

/**
* Force flush the exporter.
*/
Expand All @@ -55,6 +65,7 @@ class OStreamMetricExporter final : public opentelemetry::sdk::metrics::MetricEx
std::ostream &sout_;
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
sdk::metrics::AggregationTemporality aggregation_temporality_;
bool isShutdown() const noexcept;
void printInstrumentationInfoMetricData(const sdk::metrics::ScopeMetrics &info_metrics,
const sdk::metrics::ResourceMetrics &data);
Expand Down
12 changes: 11 additions & 1 deletion exporters/ostream/src/metric_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,17 @@ inline void printVec(std::ostream &os, Container &vec)
os << ']';
}

OStreamMetricExporter::OStreamMetricExporter(std::ostream &sout) noexcept : sout_(sout) {}
OStreamMetricExporter::OStreamMetricExporter(
std::ostream &sout,
sdk::metrics::AggregationTemporality aggregation_temporality) noexcept
: sout_(sout), aggregation_temporality_(aggregation_temporality)
{}

sdk::metrics::AggregationTemporality OStreamMetricExporter::GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept
{
return aggregation_temporality_;
}

sdk::common::ExportResult OStreamMetricExporter::Export(
const sdk::metrics::ResourceMetrics &data) noexcept
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ class OtlpGrpcMetricExporter : public opentelemetry::sdk::metrics::MetricExporte
*/
explicit OtlpGrpcMetricExporter(const OtlpGrpcMetricExporterOptions &options);

/**
* Get the AggregationTemporality for exporter
*
* @return AggregationTemporality
*/
sdk::metrics::AggregationTemporality GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept override;

opentelemetry::sdk::common::ExportResult Export(
const opentelemetry::sdk::metrics::ResourceMetrics &data) noexcept override;

Expand All @@ -52,6 +60,9 @@ class OtlpGrpcMetricExporter : public opentelemetry::sdk::metrics::MetricExporte
// The configuration options associated with this exporter.
const OtlpGrpcMetricExporterOptions options_;

// Aggregation Temporality selector
const sdk::metrics::AggregationTemporalitySelector aggregation_temporality_selector_;

// For testing
friend class OtlpGrpcExporterTestPeer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ namespace otlp
*/
struct OtlpGrpcMetricExporterOptions : public OtlpGrpcExporterOptions
{
opentelemetry::sdk::metrics::AggregationTemporality aggregation_temporality =
opentelemetry::sdk::metrics::AggregationTemporality::kDelta;

// Preferred Aggregation Temporality
sdk::metrics::AggregationTemporality aggregation_temporality =
sdk::metrics::AggregationTemporality::kCumulative;
};

} // namespace otlp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# include "opentelemetry/exporters/otlp/otlp_http_client.h"

# include "opentelemetry/exporters/otlp/otlp_environment.h"
# include "opentelemetry/exporters/otlp/otlp_metric_utils.h"

# include <chrono>
# include <cstddef>
Expand Down Expand Up @@ -52,6 +53,10 @@ struct OtlpHttpMetricExporterOptions
// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultMetricHeaders();

// Preferred Aggregation Temporality
sdk::metrics::AggregationTemporality aggregation_temporality =
sdk::metrics::AggregationTemporality::kCumulative;

# ifdef ENABLE_ASYNC_EXPORT
// Concurrent requests
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlpgrpc-concurrent-requests
Expand Down Expand Up @@ -79,6 +84,14 @@ class OtlpHttpMetricExporter final : public opentelemetry::sdk::metrics::MetricE
*/
OtlpHttpMetricExporter(const OtlpHttpMetricExporterOptions &options);

/**
* Get the AggregationTemporality for exporter
*
* @return AggregationTemporality
*/
sdk::metrics::AggregationTemporality GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept override;

opentelemetry::sdk::common::ExportResult Export(
const opentelemetry::sdk::metrics::ResourceMetrics &data) noexcept override;

Expand All @@ -95,6 +108,9 @@ class OtlpHttpMetricExporter final : public opentelemetry::sdk::metrics::MetricE
// Configuration options for the exporter
const OtlpHttpMetricExporterOptions options_;

// Aggregation Temporality Selector
const sdk::metrics::AggregationTemporalitySelector aggregation_temporality_selector_;

// Object that stores the HTTP sessions that have been created
std::unique_ptr<OtlpHttpClient> http_client_;
// For testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ class OtlpMetricUtils
static void PopulateRequest(
const opentelemetry::sdk::metrics::ResourceMetrics &data,
proto::collector::metrics::v1::ExportMetricsServiceRequest *request) noexcept;

static sdk::metrics::AggregationTemporalitySelector ChooseTemporalitySelector(
sdk::metrics::AggregationTemporality preferred_aggregation_temporality) noexcept;
static sdk::metrics::AggregationTemporality DeltaTemporalitySelector(
sdk::metrics::InstrumentType instrument_type) noexcept;
static sdk::metrics::AggregationTemporality CumulativeTemporalitySelector(
sdk::metrics::InstrumentType instrument_type) noexcept;
};

} // namespace otlp
Expand Down
16 changes: 14 additions & 2 deletions exporters/otlp/src/otlp_grpc_metric_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,28 @@ OtlpGrpcMetricExporter::OtlpGrpcMetricExporter()
{}

OtlpGrpcMetricExporter::OtlpGrpcMetricExporter(const OtlpGrpcMetricExporterOptions &options)
: options_(options), metrics_service_stub_(MakeMetricsServiceStub(options))
: options_(options),
aggregation_temporality_selector_{
OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)},
metrics_service_stub_(MakeMetricsServiceStub(options))
{}

OtlpGrpcMetricExporter::OtlpGrpcMetricExporter(
std::unique_ptr<proto::collector::metrics::v1::MetricsService::StubInterface> stub)
: options_(OtlpGrpcMetricExporterOptions()), metrics_service_stub_(std::move(stub))
: options_(OtlpGrpcMetricExporterOptions()),
aggregation_temporality_selector_{
OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)},
metrics_service_stub_(std::move(stub))
{}

// ----------------------------- Exporter methods ------------------------------

sdk::metrics::AggregationTemporality OtlpGrpcMetricExporter::GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept
{
return aggregation_temporality_selector_(instrument_type);
}

opentelemetry::sdk::common::ExportResult OtlpGrpcMetricExporter::Export(
const opentelemetry::sdk::metrics::ResourceMetrics &data) noexcept
{
Expand Down
14 changes: 13 additions & 1 deletion exporters/otlp/src/otlp_http_metric_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ OtlpHttpMetricExporter::OtlpHttpMetricExporter()

OtlpHttpMetricExporter::OtlpHttpMetricExporter(const OtlpHttpMetricExporterOptions &options)
: options_(options),
aggregation_temporality_selector_{
OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)},
http_client_(new OtlpHttpClient(OtlpHttpClientOptions(options.url,
options.content_type,
options.json_bytes_mapping,
Expand All @@ -44,7 +46,10 @@ OtlpHttpMetricExporter::OtlpHttpMetricExporter(const OtlpHttpMetricExporterOptio
{}

OtlpHttpMetricExporter::OtlpHttpMetricExporter(std::unique_ptr<OtlpHttpClient> http_client)
: options_(OtlpHttpMetricExporterOptions()), http_client_(std::move(http_client))
: options_(OtlpHttpMetricExporterOptions()),
aggregation_temporality_selector_{
OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)},
http_client_(std::move(http_client))
{
OtlpHttpMetricExporterOptions &options = const_cast<OtlpHttpMetricExporterOptions &>(options_);
options.url = http_client_->GetOptions().url;
Expand All @@ -61,6 +66,13 @@ OtlpHttpMetricExporter::OtlpHttpMetricExporter(std::unique_ptr<OtlpHttpClient> h
}
// ----------------------------- Exporter methods ------------------------------

sdk::metrics::AggregationTemporality OtlpHttpMetricExporter::GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept
{

return aggregation_temporality_selector_(instrument_type);
}

opentelemetry::sdk::common::ExportResult OtlpHttpMetricExporter::Export(
const opentelemetry::sdk::metrics::ResourceMetrics &data) noexcept
{
Expand Down
34 changes: 34 additions & 0 deletions exporters/otlp/src/otlp_metric_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,40 @@ void OtlpMetricUtils::PopulateRequest(
auto resource_metrics = request->add_resource_metrics();
PopulateResourceMetrics(data, resource_metrics);
}

sdk::metrics::AggregationTemporalitySelector OtlpMetricUtils::ChooseTemporalitySelector(
sdk::metrics::AggregationTemporality preferred_aggregation_temporality) noexcept
{
if (preferred_aggregation_temporality == sdk::metrics::AggregationTemporality::kDelta)
{
return DeltaTemporalitySelector;
}
return CumulativeTemporalitySelector;
}

sdk::metrics::AggregationTemporality OtlpMetricUtils::DeltaTemporalitySelector(
sdk::metrics::InstrumentType instrument_type) noexcept
{
switch (instrument_type)
{
case sdk::metrics::InstrumentType::kCounter:
case sdk::metrics::InstrumentType::kObservableCounter:
case sdk::metrics::InstrumentType::kHistogram:
case sdk::metrics::InstrumentType::kObservableGauge:
return sdk::metrics::AggregationTemporality::kDelta;
case sdk::metrics::InstrumentType::kUpDownCounter:
case sdk::metrics::InstrumentType::kObservableUpDownCounter:
return sdk::metrics::AggregationTemporality::kCumulative;
}
return sdk::metrics::AggregationTemporality::kUnspecified;
}

sdk::metrics::AggregationTemporality OtlpMetricUtils::CumulativeTemporalitySelector(
sdk::metrics::InstrumentType instrument_type) noexcept
{
return sdk::metrics::AggregationTemporality::kCumulative;
}

} // namespace otlp
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ class PrometheusExporter : public sdk::metrics::MetricExporter
*/
PrometheusExporter(const PrometheusExporterOptions &options);

/**
* Get the AggregationTemporality for Prometheus exporter
*
* @return AggregationTemporality
*/
sdk::metrics::AggregationTemporality GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept override;

/**
* Exports a batch of Metric Records.
* @param records: a collection of records to export
Expand Down
7 changes: 7 additions & 0 deletions exporters/prometheus/src/exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ PrometheusExporter::PrometheusExporter() : is_shutdown_(false)
collector_ = std::unique_ptr<PrometheusCollector>(new PrometheusCollector(3));
}

sdk::metrics::AggregationTemporality PrometheusExporter::GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept
{
// Prometheus exporter only support Cumulative
return sdk::metrics::AggregationTemporality::kCumulative;
}

/**
* Exports a batch of Metric Records.
* @param records: a collection of records to export
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ class PeriodicExportingMetricReader : public MetricReader
{

public:
PeriodicExportingMetricReader(
std::unique_ptr<MetricExporter> exporter,
const PeriodicExportingMetricReaderOptions &option,
AggregationTemporality aggregation_temporality = AggregationTemporality::kCumulative);
PeriodicExportingMetricReader(std::unique_ptr<MetricExporter> exporter,
const PeriodicExportingMetricReaderOptions &option);

AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) const noexcept override;

private:
bool OnForceFlush(std::chrono::microseconds timeout) noexcept override;
Expand Down
4 changes: 2 additions & 2 deletions sdk/include/opentelemetry/sdk/metrics/instruments.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ struct InstrumentDescriptor
InstrumentValueType value_type_;
};

using MetricAttributes = opentelemetry::sdk::common::OrderedAttributeMap;
using AggregationTemporalitySelector = std::function<AggregationTemporality(InstrumentType)>;
static InstrumentClass GetInstrumentClass(InstrumentType type)
{
if (type == InstrumentType::kCounter || type == InstrumentType::kHistogram ||
Expand All @@ -71,8 +73,6 @@ static InstrumentClass GetInstrumentClass(InstrumentType type)
}
}

using MetricAttributes = opentelemetry::sdk::common::OrderedAttributeMap;

/*class InstrumentSelector {
public:
InstrumentSelector(opentelemetry::nostd::string_view name,
Expand Down
8 changes: 8 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/metric_exporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ class MetricExporter
*/
virtual opentelemetry::sdk::common::ExportResult Export(const ResourceMetrics &data) noexcept = 0;

/**
* Get the AggregationTemporality for given Instrument Type for this exporter.
*
* @return AggregationTemporality
*/
virtual AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) const noexcept = 0;

/**
* Force flush the exporter.
*/
Expand Down
13 changes: 9 additions & 4 deletions sdk/include/opentelemetry/sdk/metrics/metric_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ namespace metrics
class MetricReader
{
public:
MetricReader(
AggregationTemporality aggregation_temporality = AggregationTemporality::kCumulative);
MetricReader();

void SetMetricProducer(MetricProducer *metric_producer);

Expand All @@ -36,7 +35,14 @@ class MetricReader
*/
bool Collect(nostd::function_ref<bool(ResourceMetrics &metric_data)> callback) noexcept;

AggregationTemporality GetAggregationTemporality() const noexcept;
/**
* Get the AggregationTemporality for given Instrument Type for this reader.
*
* @return AggregationTemporality
*/

virtual AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) const noexcept = 0;

/**
* Shutdown the meter reader.
Expand All @@ -62,7 +68,6 @@ class MetricReader

private:
MetricProducer *metric_producer_;
AggregationTemporality aggregation_temporality_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool shutdown_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class MeterContext;
class CollectorHandle
{
public:
virtual AggregationTemporality GetAggregationTemporality() noexcept = 0;
virtual AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) noexcept = 0;
};

/**
Expand All @@ -33,7 +34,8 @@ class MetricCollector : public MetricProducer, public CollectorHandle
MetricCollector(std::shared_ptr<MeterContext> &&context,
std::unique_ptr<MetricReader> metric_reader);

AggregationTemporality GetAggregationTemporality() noexcept override;
AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) noexcept override;

/**
* The callback to be called for each metric exporter. This will only be those
Expand Down
Loading

0 comments on commit 8b61c99

Please sign in to comment.