Skip to content

Commit

Permalink
[CLIENT-2007] Support using Context in query filters
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Dec 2, 2022
1 parent d28540a commit 00da441
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 76 deletions.
76 changes: 44 additions & 32 deletions lib/aerospike/query/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,51 @@
# the License.

module Aerospike

class Filter
attr_reader :packed_ctx

def self.Equal(bin_name, value)
Filter.new(bin_name, value, value)
end
# open up the class to alias the class methods for naming consistency
class << self
def equal(bin_name, value, ctx: nil)
Filter.new(bin_name, value, value, nil, nil, ctx)
end

def self.Contains(bin_name, value, col_type)
Filter.new(bin_name, value, value, nil, col_type)
end
def contains(bin_name, value, col_type, ctx: nil)
Filter.new(bin_name, value, value, nil, col_type, ctx)
end

def self.Range(bin_name, from, to, col_type = nil)
Filter.new(bin_name, from, to, nil, col_type)
end
def range(bin_name, from, to, col_type = nil, ctx: nil)
Filter.new(bin_name, from, to, nil, col_type, ctx)
end

def self.geoWithinGeoJSONRegion(bin_name, region, col_type = nil)
region = region.to_json
Filter.new(bin_name, region, region, ParticleType::GEOJSON, col_type)
end
def geo_within_geo_region(bin_name, region, col_type = nil, ctx: nil)
region = region.to_json
Filter.new(bin_name, region, region, ParticleType::GEOJSON, col_type, ctx)
end

def self.geoWithinRadius(bin_name, lon, lat, radius_meter, col_type = nil)
region = GeoJSON.new({type: "AeroCircle", coordinates: [[lon, lat], radius_meter]})
geoWithinGeoJSONRegion(bin_name, region, col_type)
end
def geo_within_radius(bin_name, lon, lat, radius_meter, col_type = nil, ctx: nil)
region = GeoJSON.new({ type: "AeroCircle", coordinates: [[lon, lat], radius_meter] })
geo_within_geo_region(bin_name, region, col_type, ctx: ctx)
end

def self.geoContainsGeoJSONPoint(bin_name, point, col_type = nil)
point = point.to_json
Filter.new(bin_name, point, point, ParticleType::GEOJSON, col_type)
end
def geo_contains_geo_point(bin_name, point, col_type = nil, ctx: nil)
point = point.to_json
Filter.new(bin_name, point, point, ParticleType::GEOJSON, col_type, ctx)
end

def self.geoContainsPoint(bin_name, lon, lat, col_type = nil)
point = GeoJSON.new({type: "Point", coordinates: [lon, lat]})
geoContainsGeoJSONPoint(bin_name, point, col_type)
def geo_contains_point(bin_name, lon, lat, col_type = nil, ctx: nil)
point = GeoJSON.new({ type: "Point", coordinates: [lon, lat] })
geo_contains_geo_point(bin_name, point, col_type, ctx: ctx)
end

# alias the old names for compatibility
alias :Equal :equal
alias :Contains :contains
alias :Range :range
alias :geoWithinGeoJSONRegion :geo_within_geo_region
alias :geoWithinRadius :geo_within_radius
alias :geoContainsGeoJSONPoint :geo_contains_geo_point
alias :geoContainsPoint :geo_contains_point
end

def estimate_size
Expand All @@ -56,21 +68,21 @@ def estimate_size

def write(buf, offset)
# Write name.
len = buf.write_binary(@name, offset+1)
len = buf.write_binary(@name, offset + 1)
buf.write_byte(len, offset)
offset += len + 1

# Write particle type.
buf.write_byte(@val_type, offset)
offset+=1
offset += 1

# Write filter begin.
len = @begin.write(buf, offset+4)
len = @begin.write(buf, offset + 4)
buf.write_int32(len, offset)
offset += len + 4

# Write filter end.
len = @end.write(buf, offset+4)
len = @end.write(buf, offset + 4)
buf.write_int32(len, offset)
offset += len + 4

Expand Down Expand Up @@ -98,7 +110,7 @@ def to_s

private

def initialize(bin_name, begin_value, end_value, val_type = nil, col_type = nil)
def initialize(bin_name, begin_value, end_value, val_type = nil, col_type = nil, ctx = nil)
@name = bin_name
@begin = Aerospike::Value.of(begin_value)
@end = Aerospike::Value.of(end_value)
Expand All @@ -107,8 +119,8 @@ def initialize(bin_name, begin_value, end_value, val_type = nil, col_type = nil)
# but in certain cases caller can override the type.
@val_type = val_type || @begin.type
@col_type = col_type
end

@packed_ctx = CDT::Context.bytes(ctx)
end
end # class

end
20 changes: 9 additions & 11 deletions lib/aerospike/query/query_partition_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,11 @@ def write_buffer
@data_offset += filter_size
field_count += 1

# TODO: Implement
# packed_ctx = filter.packed_ctx
# if packed_ctx
# @data_offset += FIELD_HEADER_SIZE + packed_ctx.length
# field_count+=1
# end
packed_ctx = filter.packed_ctx
if packed_ctx
@data_offset += FIELD_HEADER_SIZE + packed_ctx.length
field_count += 1
end
end

@statement.set_task_id
Expand Down Expand Up @@ -210,11 +209,10 @@ def write_buffer
@data_offset += @data_buffer.write_byte(1, @data_offset)
@data_offset = filter.write(@data_buffer, @data_offset)

# TODO: Implement
# if packed_ctx
# write_field_header(packed_ctx.length, FieldType::INDEX_CONTEXT)
# @data_buffer.write_binary(packed_ctx, @data_offset)
# end
if packed_ctx
write_field_header(packed_ctx.length, FieldType::INDEX_CONTEXT)
@data_offset += @data_buffer.write_binary(packed_ctx, @data_offset)
end
end

if @statement.function_name
Expand Down
79 changes: 46 additions & 33 deletions spec/aerospike/index_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,31 @@
# License for the specific language governing permissions and limitations under
# the License.

require 'aerospike/host'
require 'aerospike/key'
require 'aerospike/bin'
require 'aerospike/language'
require "aerospike/host"
require "aerospike/key"
require "aerospike/bin"
require "aerospike/language"

describe Aerospike::Client do

let(:client) { Support.client }

let(:str_bin_name) { 'str_bin' }
let(:int_bin_name) { 'int_bin' }
let(:list_bin_name) { 'list_bin' }
let(:map_bin_name) { 'map_bin' }
let(:str_bin_name) { "str_bin" }
let(:int_bin_name) { "int_bin" }
let(:list_bin_name) { "list_bin" }
let(:list_bin_ctx_name) { "list_bin_ctx" }
let(:map_bin_name) { "map_bin" }

before do
key = Support.gen_random_key
client.truncate(key.namespace, key.set_name)
(1..1000).to_a.each do |i|
key = Support.gen_random_key
record = {
int_bin_name => rand(100_000),
str_bin_name => 'string value',
str_bin_name => "string value",
list_bin_name => Array.new(10) { rand(1000) },
map_bin_name => { i: rand(100_000), s: 'map string value' },
list_bin_ctx_name => Array.new(5) { |rank| i + rank },
map_bin_name => { i: rand(100_000), s: "map string value" },
}
client.put(key, record)
end
Expand All @@ -49,35 +52,31 @@
key = Support.gen_random_key
client.drop_index(key.namespace,
key.set_name,
"index_str_#{key.set_name}",
)
"index_str_#{key.set_name}")
client.drop_index(key.namespace,
key.set_name,
"index_int_#{key.set_name}")
client.drop_index(key.namespace,
key.set_name,
"index_int_#{key.set_name}",
)
"index_list_#{key.set_name}")
client.drop_index(key.namespace,
key.set_name,
"index_list_#{key.set_name}",
)
"index_mapkeys_#{key.set_name}")
client.drop_index(key.namespace,
key.set_name,
"index_mapkeys_#{key.set_name}",
)
"index_mapvalues_#{key.set_name}")
client.drop_index(key.namespace,
key.set_name,
"index_mapvalues_#{key.set_name}",
)
"index_list_values_context_#{key.set_name}")
end

describe "Index operations" do

it "should create an integer index and wait until it is created on all nodes" do
key = Support.gen_random_key
index_task = client.create_index(key.namespace,
key.set_name,
"index_int_#{key.set_name}",
int_bin_name, :numeric
)
int_bin_name, :numeric)

expect(index_task.wait_till_completed).to be true
expect(index_task.completed?).to be true
Expand All @@ -88,8 +87,7 @@
index_task = client.create_index(key.namespace,
key.set_name,
"index_str_#{key.set_name}",
str_bin_name, :string
)
str_bin_name, :string)

expect(index_task.wait_till_completed).to be true
expect(index_task.completed?).to be true
Expand All @@ -100,8 +98,7 @@
index_task = client.create_index(key.namespace,
key.set_name,
"index_list_#{key.set_name}",
list_bin_name, :numeric, :list
)
list_bin_name, :numeric, :list)

expect(index_task.wait_till_completed).to be true
expect(index_task.completed?).to be true
Expand All @@ -112,8 +109,7 @@
index_task = client.create_index(key.namespace,
key.set_name,
"index_mapkeys_#{key.set_name}",
map_bin_name, :string, :mapkeys
)
map_bin_name, :string, :mapkeys)

expect(index_task.wait_till_completed).to be true
expect(index_task.completed?).to be true
Expand All @@ -124,8 +120,7 @@
index_task = client.create_index(key.namespace,
key.set_name,
"index_mapvalues_#{key.set_name}",
map_bin_name, :string, :mapvalues
)
map_bin_name, :string, :mapvalues)

expect(index_task.wait_till_completed).to be true
expect(index_task.completed?).to be true
Expand All @@ -142,6 +137,24 @@
expect(index_task.completed?).to be true
end

end # describe
it "should create an index on list bin via context" do
key = Support.gen_random_key
context = [Aerospike::CDT::Context.list_rank(-1)]
index_task = client.create_index(key.namespace, key.set_name, "index_list_values_context_#{key.set_name}", list_bin_ctx_name, :numeric, ctx: context)
expect(index_task.wait_till_completed).to be true
expect(index_task.completed?).to be true

start = 14
finish = 18
stmt = Aerospike::Statement.new(key.namespace, key.set_name)
stmt.filters = [Aerospike::Filter.Range(list_bin_ctx_name, start, finish, ctx: context)]
rs = client.query(stmt)
count = 0
rs.each do |r|
expect(r.bins[list_bin_ctx_name][-1]).to be_between(start, finish)
count += 1
end
expect(count).to eq(5)
end
end # describe
end # describe

0 comments on commit 00da441

Please sign in to comment.