Skip to content

Commit

Permalink
Add :most_recent aggregation to DirectFileStore, which reports the va…
Browse files Browse the repository at this point in the history
…lue that was set by a process most recently.

Signed-off-by: Stefan Sundin <stefan@stefansundin.com>
  • Loading branch information
stefansundin committed Dec 23, 2019
1 parent 7129453 commit 207654d
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 19 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,9 @@ When instantiating metrics, there is an optional `store_settings` attribute. Thi
to set up store-specific settings for each metric. For most stores, this is not used, but
for multi-process stores, this is used to specify how to aggregate the values of each
metric across multiple processes. For the most part, this is used for Gauges, to specify
whether you want to report the `SUM`, `MAX` or `MIN` value observed across all processes.
For almost all other cases, you'd leave the default (`SUM`). More on this on the
*Aggregation* section below.
whether you want to report the `SUM`, `MAX`, `MIN`, or `MOST_RECENT` value observed across
all processes. For almost all other cases, you'd leave the default (`SUM`). More on this
on the *Aggregation* section below.

Custom stores may also accept extra parameters besides `:aggregation`. See the
documentation of each store for more details.
Expand Down Expand Up @@ -348,8 +348,8 @@ use case, you may need to control how this works. When using this store,
each Metric allows you to specify an `:aggregation` setting, defining how
to aggregate the multiple possible values we can get for each labelset. By default,
Counters, Histograms and Summaries are `SUM`med, and Gauges report all their values (one
for each process), tagged with a `pid` label. You can also select `SUM`, `MAX` or `MIN`
for your gauges, depending on your use case.
for each process), tagged with a `pid` label. You can also select `SUM`, `MAX`, `MIN`, or
`MOST_RECENT` for your gauges, depending on your use case.

**Memory Usage**: When scraped by Prometheus, this store will read all these files, get all
the values and aggregate them. We have notice this can have a noticeable effect on memory
Expand Down
32 changes: 18 additions & 14 deletions lib/prometheus/client/data_stores/direct_file_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module DataStores

class DirectFileStore
class InvalidStoreSettingsError < StandardError; end
AGGREGATION_MODES = [MAX = :max, MIN = :min, SUM = :sum, ALL = :all]
AGGREGATION_MODES = [MAX = :max, MIN = :min, SUM = :sum, ALL = :all, MOST_RECENT = :most_recent]
DEFAULT_METRIC_SETTINGS = { aggregation: SUM }
DEFAULT_GAUGE_SETTINGS = { aggregation: ALL }

Expand Down Expand Up @@ -121,15 +121,15 @@ def all_values
stores_for_metric.each do |file_path|
begin
store = FileMappedDict.new(file_path, true)
store.all_values.each do |(labelset_qs, v)|
store.all_values.each do |(labelset_qs, v, ts)|
# Labels come as a query string, and CGI::parse returns arrays for each key
# "foo=bar&x=y" => { "foo" => ["bar"], "x" => ["y"] }
# Turn the keys back into symbols, and remove the arrays
label_set = CGI::parse(labelset_qs).map do |k, vs|
[k.to_sym, vs.first]
end.to_h

stores_data[label_set] << v
stores_data[label_set] << [v, ts]
end
ensure
store.close if store
Expand Down Expand Up @@ -182,13 +182,15 @@ def process_id

def aggregate_values(values)
if @values_aggregation_mode == SUM
values.inject { |sum, element| sum + element }
values.map { |element| element[0] }.inject { |sum, value| sum + value }
elsif @values_aggregation_mode == MAX
values.max
values.max { |a,b| a[0] <=> b[0] }[0]
elsif @values_aggregation_mode == MIN
values.min
values.min { |a,b| a[0] <=> b[0] }[0]
elsif @values_aggregation_mode == ALL
values.first
values.first[0]
elsif @values_aggregation_mode == MOST_RECENT
values.max { |a,b| a[1] <=> b[1] }[0]
else
raise InvalidStoreSettingsError,
"Invalid Aggregation Mode: #{ @values_aggregation_mode }"
Expand All @@ -198,13 +200,14 @@ def aggregate_values(values)

private_constant :MetricStore

# A dict of doubles, backed by an file we access directly a a byte array.
# A dict of doubles, backed by an file we access directly as a byte array.
#
# The file starts with a 4 byte int, indicating how much of it is used.
# Then 4 bytes of padding.
# There's then a number of entries, consisting of a 4 byte int which is the
# size of the next field, a utf-8 encoded string key, padding to an 8 byte
# alignment, and then a 8 byte float which is the value.
# alignment, and then a 8 byte float which is the value, and then a 8 byte
# float which is the unix timestamp when the value was set.
class FileMappedDict
INITIAL_FILE_SIZE = 1024*1024

Expand Down Expand Up @@ -236,7 +239,8 @@ def all_values
@positions.map do |key, pos|
@f.seek(pos)
value = @f.read(8).unpack('d')[0]
[key, value]
timestamp = @f.read(8).unpack('d')[0]
[key, value, timestamp]
end
end
end
Expand All @@ -258,7 +262,7 @@ def write_value(key, value)

pos = @positions[key]
@f.seek(pos)
@f.write([value].pack('d'))
@f.write([value, Time.now.to_f].pack('dd'))
@f.flush
end

Expand Down Expand Up @@ -299,7 +303,7 @@ def resize_file(new_capacity)
def init_value(key)
# Pad to be 8-byte aligned.
padded = key + (' ' * (8 - (key.length + 4) % 8))
value = [padded.length, padded, 0.0].pack("lA#{padded.length}d")
value = [padded.length, padded, 0.0, 0.0].pack("lA#{padded.length}dd")
while @used + value.length > @capacity
@capacity *= 2
resize_file(@capacity)
Expand All @@ -310,7 +314,7 @@ def init_value(key)
@f.seek(0)
@f.write([@used].pack('l'))
@f.flush
@positions[key] = @used - 8
@positions[key] = @used - 16
end

# Read position of all keys. No locking is performed.
Expand All @@ -320,7 +324,7 @@ def populate_positions
padded_len = @f.read(4).unpack('l')[0]
key = @f.read(padded_len).unpack("A#{padded_len}")[0].strip
@positions[key] = @f.pos
@f.seek(8, :CUR)
@f.seek(16, :CUR)
end
end
end
Expand Down
39 changes: 39 additions & 0 deletions spec/prometheus/client/data_stores/direct_file_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,45 @@
end
end

context "with a metric that takes MOST_RECENT instead of SUM" do
it "reports the most recently written value from different processes" do
metric_store1 = subject.for_metric(
:metric_name,
metric_type: :gauge,
metric_settings: { aggregation: :most_recent }
)
metric_store2 = subject.for_metric(
:metric_name,
metric_type: :gauge,
metric_settings: { aggregation: :most_recent }
)

allow(Process).to receive(:pid).and_return(12345)
metric_store1.set(labels: { foo: "bar" }, val: 1)
allow(Process).to receive(:pid).and_return(23456)
metric_store2.set(labels: { foo: "bar" }, val: 3)
allow(Process).to receive(:pid).and_return(12345)
metric_store1.set(labels: { foo: "baz" }, val: 7)
allow(Process).to receive(:pid).and_return(23456)
metric_store2.set(labels: { foo: "baz" }, val: 2)
allow(Process).to receive(:pid).and_return(12345)
metric_store1.set(labels: { foo: "baz" }, val: 4)
allow(Process).to receive(:pid).and_return(23456)
metric_store2.set(labels: { foo: "zzz" }, val: 1)
metric_store2.set(labels: { foo: "yyy" }, val: 3)

expect(metric_store1.all_values).to eq(
{ foo: "bar" } => 3.0,
{ foo: "baz" } => 4.0,
{ foo: "zzz" } => 1.0,
{ foo: "yyy" } => 3.0,
)

# Both processes should return the same value
expect(metric_store1.all_values).to eq(metric_store2.all_values)
end
end

it "resizes the File if metrics get too big" do
truncate_calls_count = 0
allow_any_instance_of(Prometheus::Client::DataStores::DirectFileStore::FileMappedDict).
Expand Down

0 comments on commit 207654d

Please sign in to comment.