Skip to content

Commit

Permalink
RUBY-3368 Set maxTimeMS if timeoutMS set
Browse files Browse the repository at this point in the history
  • Loading branch information
comandeo-mongo committed Feb 20, 2024
1 parent 955d4f5 commit d46aa16
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 15 deletions.
27 changes: 20 additions & 7 deletions lib/mongo/cluster/sdam_flow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,12 @@ def server_description_changed
log_warn(
"Server #{updated_desc.address.to_s} has an incorrect replica set name '#{updated_desc.replica_set_name}'; expected '#{topology.replica_set_name}'"
)
@updated_desc = ::Mongo::Server::Description.new(updated_desc.address,
{}, average_round_trip_time: updated_desc.average_round_trip_time)
@updated_desc = ::Mongo::Server::Description.new(
updated_desc.address,
{},
average_round_trip_time: updated_desc.average_round_trip_time,
minimum_round_trip_time: updated_desc.minimum_round_trip_time
)
update_server_descriptions
end
end
Expand Down Expand Up @@ -233,8 +237,12 @@ def update_rs_from_primary
end

if stale_primary?
@updated_desc = ::Mongo::Server::Description.new(updated_desc.address,
{}, average_round_trip_time: updated_desc.average_round_trip_time)
@updated_desc = ::Mongo::Server::Description.new(
updated_desc.address,
{},
average_round_trip_time: updated_desc.average_round_trip_time,
minimum_round_trip_time: updated_desc.minimum_round_trip_time
)
update_server_descriptions
check_if_has_primary
return
Expand Down Expand Up @@ -270,9 +278,14 @@ def update_rs_from_primary
servers_list.each do |server|
if server.address != updated_desc.address
if server.primary?
server.update_description(::Mongo::Server::Description.new(
server.address, {},
average_round_trip_time: server.description.average_round_trip_time))
server.update_description(
::Mongo::Server::Description.new(
server.address,
{},
average_round_trip_time: server.description.average_round_trip_time,
minimum_round_trip_time: updated_desc.minimum_round_trip_time
)
)
end
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/mongo/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def write_concern_error_labels
require 'mongo/error/server_api_not_supported'
require 'mongo/error/server_not_usable'
require 'mongo/error/transactions_not_supported'
require 'mongo/error/timeout_error'
require 'mongo/error/unknown_payload_type'
require 'mongo/error/unmet_dependency'
require 'mongo/error/unsupported_option'
Expand Down
25 changes: 25 additions & 0 deletions lib/mongo/error/timeout_error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

# Copyright (C) 2015-present MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
class Error
# Raised when a Client Side Operation Timeout times out.
#
# @since 2.0.0
class TimeoutError < Error
end
end
end
1 change: 1 addition & 0 deletions lib/mongo/operation/shared/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def build_message(connection, context)
if server_api = context.server_api
msg = msg.maybe_add_server_api(server_api)
end
msg = msg.maybe_add_max_time_ms(connection, context)
msg
end

Expand Down
16 changes: 16 additions & 0 deletions lib/mongo/protocol/msg.rb
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,22 @@ def maybe_add_server_api(server_api)
Msg.new(@flags, @options, main_document, *@sequences)
end

def maybe_add_max_time_ms(connection, context)
return self if context.remaining_timeout_sec.nil?

max_time_sec = context.remaining_timeout_sec - connection.server.minimum_round_trip_time
if max_time_sec > 0
max_time_ms = (max_time_sec * 1_000).to_i
main_document = @main_document.merge(
maxTimeMS: max_time_ms
)
Msg.new(@flags, @options, main_document, *@sequences)
else
raise Mongo::Error::TimeoutError
end
end


# Returns the number of documents returned from the server.
#
# The Msg instance must be for a server reply and the reply must return
Expand Down
1 change: 1 addition & 0 deletions lib/mongo/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def compressor
:max_message_size,
:tags,
:average_round_trip_time,
:minimum_round_trip_time,
:mongos?,
:other?,
:primary?,
Expand Down
9 changes: 9 additions & 0 deletions lib/mongo/server/connection_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def deliver(message, context, options = {})
raise Error::LintError, "Trying to deliver a message over a disconnected connection (to #{address})"
end
buffer = serialize(message, context)
check_timeout!(context)
ensure_connected do |socket|
operation_id = Monitoring.next_operation_id
started_event = command_started(address, operation_id, message.payload,
Expand Down Expand Up @@ -273,6 +274,14 @@ def serialize(message, context, buffer = BSON::ByteBuffer.new)

buffer
end

def check_timeout!(context)
return if context.remaining_timeout_sec.nil?
time_to_execute = context.remaining_timeout_sec - server.minimum_round_trip_time
if time_to_execute <= 0
raise Mongo::Error:TimeoutError
end
end
end
end
end
10 changes: 7 additions & 3 deletions lib/mongo/server/description.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,23 +209,23 @@ class Description
# @param [ Hash ] config The result of the hello command.
# @param [ Float ] average_round_trip_time The moving average time (sec) the hello
# command took to complete.
# @param [ Float ] average_round_trip_time The moving average time (sec)
# the ismaster call took to complete.
# @param [ true | false ] load_balancer Whether the server is treated as
# a load balancer.
# @param [ true | false ] force_load_balancer Whether the server is
# forced to be a load balancer.
#
# @api private
def initialize(address, config = {}, average_round_trip_time: nil,
load_balancer: false, force_load_balancer: false
minimum_round_trip_time: 0, load_balancer: false,
force_load_balancer: false
)
@address = address
@config = config
@load_balancer = !!load_balancer
@force_load_balancer = !!force_load_balancer
@features = Features.new(wire_versions, me || @address.to_s)
@average_round_trip_time = average_round_trip_time
@minimum_round_trip_time = minimum_round_trip_time
@last_update_time = Time.now.freeze
@last_update_monotime = Utils.monotonic_time

Expand Down Expand Up @@ -302,6 +302,10 @@ def features
# @return [ Float ] The moving average time the hello call took to complete.
attr_reader :average_round_trip_time

# @return [ Float ] The minimum time from the ten last hello calls took
# to complete.
attr_reader :minimum_round_trip_time

# Returns whether this server is an arbiter, per the SDAM spec.
#
# @example Is the server an arbiter?
Expand Down
7 changes: 5 additions & 2 deletions lib/mongo/server/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,11 @@ def run_sdam_flow(result, awaited: false, scan_error: nil)
@sdam_mutex.synchronize do
old_description = server.description

new_description = Description.new(server.address, result,
average_round_trip_time: server.round_trip_time_calculator.average_round_trip_time
new_description = Description.new(
server.address,
result,
average_round_trip_time: server.round_trip_time_calculator.average_round_trip_time,
minimum_round_trip_time: server.round_trip_time_calculator.minimum_round_trip_time
)

server.cluster.run_sdam_flow(server.description, new_description, awaited: awaited, scan_error: scan_error)
Expand Down
8 changes: 6 additions & 2 deletions lib/mongo/server/pending_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ def handshake!(speculative_auth_doc: nil)
doc['serviceId'] ||= "fake:#{rand(2**32-1)+1}"
end

post_handshake(doc, @server.round_trip_time_calculator.average_round_trip_time)
post_handshake(
doc,
@server.round_trip_time_calculator.average_round_trip_time,
@server.round_trip_time_calculator.minimum_round_trip_time
)

doc
end
Expand Down Expand Up @@ -205,7 +209,7 @@ def ensure_connected
#
# @return [ Server::Description ] The server description calculated from
# the handshake response for this particular connection.
def post_handshake(response, average_rtt)
def post_handshake(response, average_rtt, minimum_rtt)
if response["ok"] == 1
# Auth mechanism is entirely dependent on the contents of
# hello response *for this connection*.
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/server/round_trip_time_calculator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def update_average_round_trip_time
end

def update_minimum_round_trip_time
@rtts.push(@last_round_trip_time) unless @last_round_trip_time.nil?
@rtts.push(last_round_trip_time) unless last_round_trip_time.nil?
@minimum_round_trip_time = 0 and return if @rtts.size < MIN_SAMPLES

@rtts.shift if @rtts.size > RTT_SAMPLES_FOR_MINIMUM
Expand Down
62 changes: 62 additions & 0 deletions spec/mongo/protocol/msg_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -504,4 +504,66 @@
end
end
end

describe '#maybe_add_max_time_ms' do
let(:server) { instance_double(Mongo::Server) }

let(:connection) do
instance_double(Mongo::Server::Connection).tap do |conn|
allow(conn).to receive(:server).and_return(server)
end
end

let(:new_msg) do
message.maybe_add_max_time_ms(connection, context)
end

context 'when no timeout_ms set' do
let(:context) do
Mongo::Operation::Context.new
end

it 'does now set maxTimeMS' do
expect(new_msg.documents.first.key?(:maxTimeMS)).to eq(false)
end
end

context 'when there is enough time to send the message' do
let(:context) do
instance_double(Mongo::Operation::Context).tap do |ctx|
# Ten seconds
allow(ctx).to receive(:remaining_timeout_sec).twice.and_return(10)
end
end

before do
# One second
allow(server).to receive(:minimum_round_trip_time).and_return(1)
end

it 'sets the maxTimeMS' do
# Nine seconds
expect(new_msg.documents.first[:maxTimeMS]).to eq(9_000)
end
end

context 'when there is not enough time to send the message' do
let(:context) do
instance_double(Mongo::Operation::Context).tap do |ctx|
allow(ctx).to receive(:remaining_timeout_sec).twice.and_return(0.1)
end
end

before do
# One second
allow(server).to receive(:minimum_round_trip_time).and_return(1)
end

it 'sets the maxTimeMS' do
expect do
new_msg
end.to raise_error(Mongo::Error::TimeoutError)
end
end
end
end

0 comments on commit d46aa16

Please sign in to comment.