Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-detect available signals from Elasticsearch #7

Merged
merged 10 commits into from
Nov 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions config/lookups.yml

This file was deleted.

1 change: 1 addition & 0 deletions config/signal_aliases.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
train_speed: "@.MWT.M_T3_1.MRV_TrnSpd_1.TON_4.MRV_Xv_Trn"
4 changes: 3 additions & 1 deletion lib/blocktrain.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

require 'blocktrain/version'
require 'blocktrain/client'
require 'blocktrain/lookups'
require 'blocktrain/query'
require 'blocktrain/aggregation'
require 'blocktrain/aggregations/histogram_aggregation'
require 'blocktrain/aggregations/average_aggregation'
require 'blocktrain/aggregations/min_max_aggregation'
require 'blocktrain/aggregations/terms_aggregation'
require 'blocktrain/lookups'

Dotenv.load
20 changes: 1 addition & 19 deletions lib/blocktrain/aggregation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,7 @@ def results
end

def aggs
{
results: {
date_histogram: {
field: 'timeStamp',
interval: @interval,
time_zone: '+01:00',
min_doc_count: 1,
extended_bounds: {
min: @from,
max: @to
}
},
aggregations: local_aggregations
}
}
raise RuntimeError.new("Aggregation cannot be used directly. Use a derived class instead like AverageAggregation.")
end

def body
Expand All @@ -35,9 +21,5 @@ def body
}
end

def local_aggregations
raise RuntimeError.new("Aggregation cannot be used directly. Use a derived class instead like AverageAggregation.")
end

end
end
2 changes: 1 addition & 1 deletion lib/blocktrain/aggregations/average_aggregation.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Blocktrain
module Aggregations
class AverageAggregation < Aggregation
class AverageAggregation < HistogramAggregation

def local_aggregations
{
Expand Down
25 changes: 25 additions & 0 deletions lib/blocktrain/aggregations/histogram_aggregation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module Blocktrain
module Aggregations
class HistogramAggregation < Aggregation

def aggs
{
results: {
date_histogram: {
field: 'timeStamp',
interval: @interval,
time_zone: '+01:00',
min_doc_count: 1,
extended_bounds: {
min: @from,
max: @to
}
},
aggregations: local_aggregations
}
}
end

end
end
end
2 changes: 1 addition & 1 deletion lib/blocktrain/aggregations/min_max_aggregation.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Blocktrain
module Aggregations
class MinMaxAggregation < Aggregation
class MinMaxAggregation < HistogramAggregation

def local_aggregations
{
Expand Down
33 changes: 33 additions & 0 deletions lib/blocktrain/aggregations/terms_aggregation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module Blocktrain
module Aggregations
class TermsAggregation < Aggregation

def initialize(options = {})
@term = options.fetch(:term, nil)
raise ArgumentError.new("TermAggregation requires a term: argument") unless @term
super
end

def query
{
filtered: {
filter: filtered_filter
}
}
end

def aggs
{
langs: {
terms: { field: @term }
}
}
end

def results
result['aggregations']['langs']['buckets']
end

end
end
end
27 changes: 23 additions & 4 deletions lib/blocktrain/lookups.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,33 @@ module Blocktrain
class Lookups
include Singleton

def initialize
@lookups = OpenStruct.new fetch_yaml 'lookups'
end

def lookups
init! if @lookups.nil?
@lookups
end

def reset!
@lookups = nil
end

# Separate out initialization for testing purposes
def init!
@lookups ||= {}
# Get unique list of keys from ES
r = Aggregations::TermsAggregation.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', term: "memoryAddress").results
addresses = r.map {|x| x["key"]}
# Get a memory location for each key
addresses.each do |address|
r = Query.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', memory_address: address, limit: 1).results
@lookups[r.first["_source"]["signalName"].to_s] = address
end
# Read aliases from file
aliases = OpenStruct.new fetch_yaml 'signal_aliases'
aliases.each_pair do |key, value|
@lookups[key.to_s] = @lookups[value]
end
end

private

def fetch_yaml file
Expand Down
63 changes: 35 additions & 28 deletions lib/blocktrain/query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ module Blocktrain
class Query

def initialize(options = {})
@lookups = Lookups.instance.lookups
@memory_address = options.fetch(:memory_address, nil)
@signal = options[:signal]
@sub_signal = options[:sub_signal]

@from = parse_datetime(options.fetch(:from, '2015-09-01T00:00:00'))
@to = parse_datetime(options.fetch(:to, '2015-09-02T00:00:00'))
Expand All @@ -26,39 +25,47 @@ def parse_datetime(datetime)
end

def address_query
if @lookups[@signal].is_a?(Hash)
if @sub_signal.nil?
@lookups[@signal].map { |k, v| "memoryAddress:#{v}" }.join(' OR ')
else
"memoryAddress:#{@lookups[@signal][@sub_signal]}"
end
else
"memoryAddress:#{@lookups[@signal]}"
end
# Look up memory addresses directly if specified
return "memoryAddress:#{@memory_address}" if @memory_address
# No query if there isn't a signal specified
return nil if @signal.nil?
# Find the right memory address
lookups = Lookups.instance.lookups
"memoryAddress:#{lookups[@signal]}"
end

def query
{
filtered: {
query: {
query_string: {
query: address_query
}
},
filter: {
bool: {
must: [
{
range: {
timeStamp: {
gte: @from,
lte: @to
}
}
query: filtered_query,
filter: filtered_filter
}
}
end

def filtered_query
q = address_query
return {} if q.nil?
{
query_string: {
query: q
}
}
end

def filtered_filter
{
bool: {
must: [
{
range: {
timeStamp: {
gte: @from,
lte: @to
}
]
}
}
}
]
}
}
end
Expand Down
8 changes: 4 additions & 4 deletions spec/blocktrain/aggregation_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module Blocktrain
describe Aggregation do

it 'gets the interval' do
subject = described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', interval: '10m', signal: 'passenger_load')
subject = described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', interval: '10m', signal: 'train_speed')

expect(subject.instance_variable_get("@interval")).to eq('10m')
end
Expand All @@ -15,15 +15,15 @@ module Blocktrain
end
end

RSpec.shared_examples "aggregations" do |described_class|
RSpec.shared_examples "histogram aggregations" do |described_class|

it "returns 0 results", :vcr do
agg = described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', signal: 'passenger_load')
agg = described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', signal: 'train_speed')
expect(agg.send(:result)['hits']['hits'].count).to eq(0)
end

it "returns 0 results even if a limit is specified", :vcr do
agg = described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', signal: 'passenger_load', limit: 1000)
agg = described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', signal: 'train_speed', limit: 1000)
expect(agg.send(:result)['hits']['hits'].count).to eq(0)
end

Expand Down
7 changes: 4 additions & 3 deletions spec/blocktrain/aggregations/average_aggregation_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ module Blocktrain
module Aggregations
describe AverageAggregation do

include_examples "aggregations", described_class
include_examples "histogram aggregations", described_class

describe 'hour long histogram' do

subject(:aggregations) {
described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', interval: '10m', signal: 'passenger_load').results
described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', interval: '10m', signal: 'train_speed').results
}

it 'has an aggregation called results', :vcr do
Expand All @@ -23,7 +23,8 @@ module Aggregations
end

it 'has the expected weight', :vcr do
expect(aggregations['results']['buckets'].first['average_value']['value']).to be_within(0.1).of 37.7
# This result number is sort of assumed. Don't take it as solid truth.
expect(aggregations['results']['buckets'].first['average_value']['value']).to be_within(0.1).of 5392.7
end

end
Expand Down
6 changes: 3 additions & 3 deletions spec/blocktrain/aggregations/min_max_aggregation_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ module Blocktrain
module Aggregations
describe MinMaxAggregation do

include_examples "aggregations", described_class
include_examples "histogram aggregations", described_class

describe 'hour long histogram' do

subject(:aggregations) {
described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', signal: 'passenger_load').results
described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', signal: 'train_speed').results
}

it 'has an aggregation called weight_chart', :vcr do
expect(aggregations).to have_key 'results'
expect(aggregations['results']).to have_key 'buckets'
expect(aggregations['results']['buckets'].count).to eq 6
expect(aggregations['results']['buckets'][0]['value']['buckets'][0].keys).to include 'max_value', 'min_value', 'average_value'
expect(aggregations['results']['buckets'][2]['value']['buckets'][2]['average_value']['value']).to be_within(0.1).of 4.04
expect(aggregations['results']['buckets'][2]['value']['buckets'][0]['average_value']['value']).to be_within(0.1).of 4691.2
end

end
Expand Down
15 changes: 15 additions & 0 deletions spec/blocktrain/aggregations/terms_aggregation_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module Blocktrain
module Aggregations
describe TermsAggregation do

it 'can find unique memory addresses' do
r = described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', term: "memoryAddress").results
keys = r.map {|x| x["key"]}
expect(keys.count).to eq(10)
expect(keys.uniq.count).to eq(10)
expect(keys).to include("2E4414CW")
expect(keys).to include("2E491EEW")
end
end
end
end
8 changes: 4 additions & 4 deletions spec/blocktrain/lookups_spec.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
module Blocktrain
describe Lookups do

it 'knows about car codes' do
expect(described_class.instance.lookups['passenger_load']['A']).to eq '2E64930W'
it 'autodetects all available signals', :vcr do
expect(described_class.instance.lookups['@.MWT.M_T3_1.MRV_TrnSpd_1.TON_4.MRV_Xv_Trn']).to eq '2E491EEW'
end

it 'is an OpenStruct' do
expect(described_class.instance.lookups.passenger_load['B']).to eq '2E64932W'
it 'has useful aliases for obscure signal names', :vcr do
expect(described_class.instance.lookups['train_speed']).to eq '2E491EEW'
end

end
Expand Down
22 changes: 1 addition & 21 deletions spec/blocktrain/query_spec.rb
Original file line number Diff line number Diff line change
@@ -1,32 +1,12 @@
module Blocktrain
describe Query do

it 'queries a group of signals' do
subject = described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', signal: 'passenger_load')

expect(subject.address_query).to eq('memoryAddress:2E64930W OR memoryAddress:2E64932W OR memoryAddress:2E64934W OR memoryAddress:2E64936W')
end

it 'queries a single signal' do
it 'queries a single signal', :vcr do
subject = described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', signal: 'train_speed')

expect(subject.address_query).to eq('memoryAddress:2E491EEW')
end

context 'with a subsignal specified' do

it 'queries a subsignal' do
subject = described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', signal: 'passenger_load', sub_signal: 'A')
expect(subject.address_query).to eq('memoryAddress:2E64930W')
end

it 'queries a single signal' do
subject = described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', signal: 'train_speed', sub_signal: 'bawbag')
expect(subject.address_query).to eq('memoryAddress:2E491EEW')
end

end

it 'provides 100 results by default', :vcr do
subject = described_class.new(from: '2015-09-01 10:00:00Z', to: '2015-09-01 11:00:00Z', signal: 'train_speed')
expect(subject.results.count).to eq(100)
Expand Down
Loading