Skip to content

Commit

Permalink
Merge pull request #172 from stefansundin/add-most_recent-aggregation
Browse files Browse the repository at this point in the history
Add :most_recent aggregation to DirectFileStore
  • Loading branch information
Daniel Magliola authored Jun 22, 2020
2 parents 204d8c9 + 826b32e commit c1d9acd
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 29 deletions.
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,9 @@ When instantiating metrics, there is an optional `store_settings` attribute. Thi
to set up store-specific settings for each metric. For most stores, this is not used, but
for multi-process stores, this is used to specify how to aggregate the values of each
metric across multiple processes. For the most part, this is used for Gauges, to specify
whether you want to report the `SUM`, `MAX` or `MIN` value observed across all processes.
For almost all other cases, you'd leave the default (`SUM`). More on this on the
*Aggregation* section below.
whether you want to report the `SUM`, `MAX`, `MIN`, or `MOST_RECENT` value observed across
all processes. For almost all other cases, you'd leave the default (`SUM`). More on this
on the *Aggregation* section below.

Custom stores may also accept extra parameters besides `:aggregation`. See the
documentation of each store for more details.
Expand Down Expand Up @@ -360,8 +360,11 @@ use case, you may need to control how this works. When using this store,
each Metric allows you to specify an `:aggregation` setting, defining how
to aggregate the multiple possible values we can get for each labelset. By default,
Counters, Histograms and Summaries are `SUM`med, and Gauges report all their values (one
for each process), tagged with a `pid` label. You can also select `SUM`, `MAX` or `MIN`
for your gauges, depending on your use case.
for each process), tagged with a `pid` label. You can also select `SUM`, `MAX`, `MIN`, or
`MOST_RECENT` for your gauges, depending on your use case.

Please note that that the `MOST_RECENT` aggregation only works for gauges, and it does not
allow the use of `increment` / `decrement`, you can only use `set`.

**Memory Usage**: When scraped by Prometheus, this store will read all these files, get all
the values and aggregate them. We have notice this can have a noticeable effect on memory
Expand Down
69 changes: 46 additions & 23 deletions lib/prometheus/client/data_stores/direct_file_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module DataStores

class DirectFileStore
class InvalidStoreSettingsError < StandardError; end
AGGREGATION_MODES = [MAX = :max, MIN = :min, SUM = :sum, ALL = :all]
AGGREGATION_MODES = [MAX = :max, MIN = :min, SUM = :sum, ALL = :all, MOST_RECENT = :most_recent]
DEFAULT_METRIC_SETTINGS = { aggregation: SUM }
DEFAULT_GAUGE_SETTINGS = { aggregation: ALL }

Expand All @@ -45,7 +45,7 @@ def for_metric(metric_name, metric_type:, metric_settings: {})
end

settings = default_settings.merge(metric_settings)
validate_metric_settings(settings)
validate_metric_settings(metric_type, settings)

MetricStore.new(metric_name: metric_name,
store_settings: @store_settings,
Expand All @@ -54,7 +54,7 @@ def for_metric(metric_name, metric_type:, metric_settings: {})

private

def validate_metric_settings(metric_settings)
def validate_metric_settings(metric_type, metric_settings)
unless metric_settings.has_key?(:aggregation) &&
AGGREGATION_MODES.include?(metric_settings[:aggregation])
raise InvalidStoreSettingsError,
Expand All @@ -65,6 +65,11 @@ def validate_metric_settings(metric_settings)
raise InvalidStoreSettingsError,
"Only :aggregation setting can be specified"
end

if metric_settings[:aggregation] == MOST_RECENT && metric_type != :gauge
raise InvalidStoreSettingsError,
"Only :gauge metrics support :most_recent aggregation"
end
end

class MetricStore
Expand Down Expand Up @@ -100,6 +105,12 @@ def set(labels:, val:)
end

def increment(labels:, by: 1)
if @values_aggregation_mode == DirectFileStore::MOST_RECENT
raise InvalidStoreSettingsError,
"The :most_recent aggregation does not support the use of increment"\
"/decrement"
end

key = store_key(labels)
in_process_sync do
value = internal_store.read_value(key)
Expand All @@ -121,15 +132,15 @@ def all_values
stores_for_metric.each do |file_path|
begin
store = FileMappedDict.new(file_path, true)
store.all_values.each do |(labelset_qs, v)|
store.all_values.each do |(labelset_qs, v, ts)|
# Labels come as a query string, and CGI::parse returns arrays for each key
# "foo=bar&x=y" => { "foo" => ["bar"], "x" => ["y"] }
# Turn the keys back into symbols, and remove the arrays
label_set = CGI::parse(labelset_qs).map do |k, vs|
[k.to_sym, vs.first]
end.to_h

stores_data[label_set] << v
stores_data[label_set] << [v, ts]
end
ensure
store.close if store
Expand Down Expand Up @@ -181,30 +192,41 @@ def process_id
end

def aggregate_values(values)
if @values_aggregation_mode == SUM
values.inject { |sum, element| sum + element }
elsif @values_aggregation_mode == MAX
values.max
elsif @values_aggregation_mode == MIN
values.min
elsif @values_aggregation_mode == ALL
values.first
# Each entry in the `values` array is a tuple of `value` and `timestamp`,
# so for all aggregations except `MOST_RECENT`, we need to only take the
# first value in each entry and ignore the second.
if @values_aggregation_mode == MOST_RECENT
latest_tuple = values.max { |a,b| a[1] <=> b[1] }
latest_tuple.first # return the value without the timestamp
else
raise InvalidStoreSettingsError,
"Invalid Aggregation Mode: #{ @values_aggregation_mode }"
values = values.map(&:first) # Discard timestamps

if @values_aggregation_mode == SUM
values.inject { |sum, element| sum + element }
elsif @values_aggregation_mode == MAX
values.max
elsif @values_aggregation_mode == MIN
values.min
elsif @values_aggregation_mode == ALL
values.first
else
raise InvalidStoreSettingsError,
"Invalid Aggregation Mode: #{ @values_aggregation_mode }"
end
end
end
end

private_constant :MetricStore

# A dict of doubles, backed by an file we access directly a a byte array.
# A dict of doubles, backed by an file we access directly as a byte array.
#
# The file starts with a 4 byte int, indicating how much of it is used.
# Then 4 bytes of padding.
# There's then a number of entries, consisting of a 4 byte int which is the
# size of the next field, a utf-8 encoded string key, padding to an 8 byte
# alignment, and then a 8 byte float which is the value.
# alignment, and then a 8 byte float which is the value, and then a 8 byte
# float which is the unix timestamp when the value was set.
class FileMappedDict
INITIAL_FILE_SIZE = 1024*1024

Expand Down Expand Up @@ -235,8 +257,8 @@ def all_values
with_file_lock do
@positions.map do |key, pos|
@f.seek(pos)
value = @f.read(8).unpack('d')[0]
[key, value]
value, timestamp = @f.read(16).unpack('dd')
[key, value, timestamp]
end
end
end
Expand All @@ -256,9 +278,10 @@ def write_value(key, value)
init_value(key)
end

now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
pos = @positions[key]
@f.seek(pos)
@f.write([value].pack('d'))
@f.write([value, now].pack('dd'))
@f.flush
end

Expand Down Expand Up @@ -299,7 +322,7 @@ def resize_file(new_capacity)
def init_value(key)
# Pad to be 8-byte aligned.
padded = key + (' ' * (8 - (key.length + 4) % 8))
value = [padded.length, padded, 0.0].pack("lA#{padded.length}d")
value = [padded.length, padded, 0.0, 0.0].pack("lA#{padded.length}dd")
while @used + value.length > @capacity
@capacity *= 2
resize_file(@capacity)
Expand All @@ -310,7 +333,7 @@ def init_value(key)
@f.seek(0)
@f.write([@used].pack('l'))
@f.flush
@positions[key] = @used - 8
@positions[key] = @used - 16
end

# Read position of all keys. No locking is performed.
Expand All @@ -320,7 +343,7 @@ def populate_positions
padded_len = @f.read(4).unpack('l')[0]
key = @f.read(padded_len).unpack("A#{padded_len}")[0].strip
@positions[key] = @f.pos
@f.seek(8, :CUR)
@f.seek(16, :CUR)
end
end
end
Expand Down
78 changes: 77 additions & 1 deletion spec/prometheus/client/data_stores/direct_file_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

it_behaves_like Prometheus::Client::DataStores

it "only accepts valid :aggregation as Metric Settings" do
it "only accepts valid :aggregation values as Metric Settings" do
expect do
subject.for_metric(:metric_name,
metric_type: :counter,
Expand All @@ -26,14 +26,43 @@
metric_type: :counter,
metric_settings: { aggregation: :invalid })
end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError)
end

it "only accepts valid keys as Metric Settings" do
# the only valid key at the moment is :aggregation
expect do
subject.for_metric(:metric_name,
metric_type: :counter,
metric_settings: { some_setting: true })
end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError)
end

it "only accepts :most_recent aggregation for gauges" do
expect do
subject.for_metric(:metric_name,
metric_type: :gauge,
metric_settings: { aggregation: Prometheus::Client::DataStores::DirectFileStore::MOST_RECENT })
end.not_to raise_error

expect do
subject.for_metric(:metric_name,
metric_type: :counter,
metric_settings: { aggregation: Prometheus::Client::DataStores::DirectFileStore::MOST_RECENT })
end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError)

expect do
subject.for_metric(:metric_name,
metric_type: :histogram,
metric_settings: { aggregation: Prometheus::Client::DataStores::DirectFileStore::MOST_RECENT })
end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError)

expect do
subject.for_metric(:metric_name,
metric_type: :summary,
metric_settings: { aggregation: Prometheus::Client::DataStores::DirectFileStore::MOST_RECENT })
end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError)
end

it "raises when aggregating if we get to that that point with an invalid aggregation mode" do
# This is basically just for coverage of a safety clause that can never be reached
allow(subject).to receive(:validate_metric_settings) # turn off validation
Expand Down Expand Up @@ -267,6 +296,53 @@
end
end

context "with a metric that takes MOST_RECENT instead of SUM" do
it "reports the most recently written value from different processes" do
metric_store1 = subject.for_metric(
:metric_name,
metric_type: :gauge,
metric_settings: { aggregation: :most_recent }
)
metric_store2 = subject.for_metric(
:metric_name,
metric_type: :gauge,
metric_settings: { aggregation: :most_recent }
)

allow(Process).to receive(:pid).and_return(12345)
metric_store1.set(labels: { foo: "bar" }, val: 1)

allow(Process).to receive(:pid).and_return(23456)
metric_store2.set(labels: { foo: "bar" }, val: 3) # Supercedes 'bar' in PID 12345
metric_store2.set(labels: { foo: "baz" }, val: 2)
metric_store2.set(labels: { foo: "zzz" }, val: 1)

allow(Process).to receive(:pid).and_return(12345)
metric_store1.set(labels: { foo: "baz" }, val: 4) # Supercedes 'baz' in PID 23456

expect(metric_store1.all_values).to eq(
{ foo: "bar" } => 3.0,
{ foo: "baz" } => 4.0,
{ foo: "zzz" } => 1.0,
)

# Both processes should return the same value
expect(metric_store1.all_values).to eq(metric_store2.all_values)
end

it "does now allow `increment`, only `set`" do
metric_store1 = subject.for_metric(
:metric_name,
metric_type: :gauge,
metric_settings: { aggregation: :most_recent }
)

expect do
metric_store1.increment(labels: {})
end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError)
end
end

it "resizes the File if metrics get too big" do
truncate_calls_count = 0
allow_any_instance_of(Prometheus::Client::DataStores::DirectFileStore::FileMappedDict).
Expand Down

0 comments on commit c1d9acd

Please sign in to comment.