From 6817971cfd58b4f6e74c2cfc62ebb0f9580f61bf Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov <160598371+comandeo-mongo@users.noreply.github.com> Date: Tue, 20 Feb 2024 16:50:17 +0100 Subject: [PATCH] RUBY-3368 Set maxTimeMS (#2840) --- lib/mongo/cluster/sdam_flow.rb | 27 +++++--- lib/mongo/error.rb | 1 + lib/mongo/error/timeout_error.rb | 23 +++++++ lib/mongo/operation/shared/executable.rb | 1 + lib/mongo/protocol/msg.rb | 31 ++++++++++ lib/mongo/server.rb | 1 + lib/mongo/server/connection_base.rb | 18 ++++++ lib/mongo/server/description.rb | 12 +++- lib/mongo/server/monitor.rb | 7 ++- lib/mongo/server/pending_connection.rb | 8 ++- .../server/round_trip_time_calculator.rb | 2 +- spec/mongo/protocol/msg_spec.rb | 62 +++++++++++++++++++ 12 files changed, 178 insertions(+), 15 deletions(-) create mode 100644 lib/mongo/error/timeout_error.rb diff --git a/lib/mongo/cluster/sdam_flow.rb b/lib/mongo/cluster/sdam_flow.rb index eab2fd88b9..bd908b682e 100644 --- a/lib/mongo/cluster/sdam_flow.rb +++ b/lib/mongo/cluster/sdam_flow.rb @@ -116,8 +116,12 @@ def server_description_changed log_warn( "Server #{updated_desc.address.to_s} has an incorrect replica set name '#{updated_desc.replica_set_name}'; expected '#{topology.replica_set_name}'" ) - @updated_desc = ::Mongo::Server::Description.new(updated_desc.address, - {}, average_round_trip_time: updated_desc.average_round_trip_time) + @updated_desc = ::Mongo::Server::Description.new( + updated_desc.address, + {}, + average_round_trip_time: updated_desc.average_round_trip_time, + minimum_round_trip_time: updated_desc.minimum_round_trip_time + ) update_server_descriptions end end @@ -233,8 +237,12 @@ def update_rs_from_primary end if stale_primary? - @updated_desc = ::Mongo::Server::Description.new(updated_desc.address, - {}, average_round_trip_time: updated_desc.average_round_trip_time) + @updated_desc = ::Mongo::Server::Description.new( + updated_desc.address, + {}, + average_round_trip_time: updated_desc.average_round_trip_time, + minimum_round_trip_time: updated_desc.minimum_round_trip_time + ) update_server_descriptions check_if_has_primary return @@ -270,9 +278,14 @@ def update_rs_from_primary servers_list.each do |server| if server.address != updated_desc.address if server.primary? - server.update_description(::Mongo::Server::Description.new( - server.address, {}, - average_round_trip_time: server.description.average_round_trip_time)) + server.update_description( + ::Mongo::Server::Description.new( + server.address, + {}, + average_round_trip_time: server.description.average_round_trip_time, + minimum_round_trip_time: updated_desc.minimum_round_trip_time + ) + ) end end end diff --git a/lib/mongo/error.rb b/lib/mongo/error.rb index 92d6d5f4b3..6c5bbf44e3 100644 --- a/lib/mongo/error.rb +++ b/lib/mongo/error.rb @@ -218,6 +218,7 @@ def write_concern_error_labels require 'mongo/error/server_api_not_supported' require 'mongo/error/server_not_usable' require 'mongo/error/transactions_not_supported' +require 'mongo/error/timeout_error' require 'mongo/error/unknown_payload_type' require 'mongo/error/unmet_dependency' require 'mongo/error/unsupported_option' diff --git a/lib/mongo/error/timeout_error.rb b/lib/mongo/error/timeout_error.rb new file mode 100644 index 0000000000..a607f002dd --- /dev/null +++ b/lib/mongo/error/timeout_error.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +# Copyright (C) 2015-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class Error + # Raised when a Client Side Operation Timeout times out. + class TimeoutError < Error + end + end +end diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index 9b61476631..713ea111e8 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -112,6 +112,7 @@ def build_message(connection, context) if server_api = context.server_api msg = msg.maybe_add_server_api(server_api) end + msg = msg.maybe_add_max_time_ms(connection, context) msg end diff --git a/lib/mongo/protocol/msg.rb b/lib/mongo/protocol/msg.rb index 73df7a31b6..30cab9fb51 100644 --- a/lib/mongo/protocol/msg.rb +++ b/lib/mongo/protocol/msg.rb @@ -301,6 +301,37 @@ def maybe_add_server_api(server_api) Msg.new(@flags, @options, main_document, *@sequences) end + # Adds maxTimeMS attribute to the message if timeoutMS is set + # and there is enough time left to send the message to the server + # (remaining timeout is bigger than minimum round trip time for + # the server. + # + # @param [ Mongo::Server::Connection ] connection Connection the message + # should be sent with. + # @param [ Mongo::Operation::Context ] context Context of the operation + # the message is build from. + # + # @return [ Mongo::Protocol::Msg] message with added maxTimeMS attribute + # if needed. + # + # @raise [ Mongo::Error::TimeoutError ] if timeout expired or there is + # not enough time to send the message to the server. + def maybe_add_max_time_ms(connection, context) + return self if context.remaining_timeout_sec.nil? + + max_time_sec = context.remaining_timeout_sec - connection.server.minimum_round_trip_time + if max_time_sec > 0 + max_time_ms = (max_time_sec * 1_000).to_i + main_document = @main_document.merge( + maxTimeMS: max_time_ms + ) + Msg.new(@flags, @options, main_document, *@sequences) + else + raise Mongo::Error::TimeoutError + end + end + + # Returns the number of documents returned from the server. # # The Msg instance must be for a server reply and the reply must return diff --git a/lib/mongo/server.rb b/lib/mongo/server.rb index 912aef2fcf..c00285034e 100644 --- a/lib/mongo/server.rb +++ b/lib/mongo/server.rb @@ -197,6 +197,7 @@ def compressor :max_message_size, :tags, :average_round_trip_time, + :minimum_round_trip_time, :mongos?, :other?, :primary?, diff --git a/lib/mongo/server/connection_base.rb b/lib/mongo/server/connection_base.rb index 709f1e920d..f3d37253f3 100644 --- a/lib/mongo/server/connection_base.rb +++ b/lib/mongo/server/connection_base.rb @@ -169,6 +169,7 @@ def deliver(message, context, options = {}) raise Error::LintError, "Trying to deliver a message over a disconnected connection (to #{address})" end buffer = serialize(message, context) + check_timeout!(context) ensure_connected do |socket| operation_id = Monitoring.next_operation_id started_event = command_started(address, operation_id, message.payload, @@ -273,6 +274,23 @@ def serialize(message, context, buffer = BSON::ByteBuffer.new) buffer end + + # If timeoutMS is set for the operation context, checks whether there is + # enough time left to send the corresponding message to the server + # (remaining timeout is bigger than minimum round trip time for + # the server) + # + # @param [ Mongo::Operation::Context ] context Context of the operation. + # + # @raise [ Mongo::Error::TimeoutError ] if timeout expired or there is + # not enough time to send the message to the server. + def check_timeout!(context) + return if context.remaining_timeout_sec.nil? + time_to_execute = context.remaining_timeout_sec - server.minimum_round_trip_time + if time_to_execute <= 0 + raise Mongo::Error:TimeoutError + end + end end end end diff --git a/lib/mongo/server/description.rb b/lib/mongo/server/description.rb index 3e21b6a635..1f393cde76 100644 --- a/lib/mongo/server/description.rb +++ b/lib/mongo/server/description.rb @@ -209,8 +209,8 @@ class Description # @param [ Hash ] config The result of the hello command. # @param [ Float ] average_round_trip_time The moving average time (sec) the hello # command took to complete. - # @param [ Float ] average_round_trip_time The moving average time (sec) - # the ismaster call took to complete. + # @param [ Float ] minimum_round_trip_time The minimum round trip time + # of ten last hello commands. # @param [ true | false ] load_balancer Whether the server is treated as # a load balancer. # @param [ true | false ] force_load_balancer Whether the server is @@ -218,7 +218,8 @@ class Description # # @api private def initialize(address, config = {}, average_round_trip_time: nil, - load_balancer: false, force_load_balancer: false + minimum_round_trip_time: 0, load_balancer: false, + force_load_balancer: false ) @address = address @config = config @@ -226,6 +227,7 @@ def initialize(address, config = {}, average_round_trip_time: nil, @force_load_balancer = !!force_load_balancer @features = Features.new(wire_versions, me || @address.to_s) @average_round_trip_time = average_round_trip_time + @minimum_round_trip_time = minimum_round_trip_time @last_update_time = Time.now.freeze @last_update_monotime = Utils.monotonic_time @@ -302,6 +304,10 @@ def features # @return [ Float ] The moving average time the hello call took to complete. attr_reader :average_round_trip_time + # @return [ Float ] The minimum time from the ten last hello calls took + # to complete. + attr_reader :minimum_round_trip_time + # Returns whether this server is an arbiter, per the SDAM spec. # # @example Is the server an arbiter? diff --git a/lib/mongo/server/monitor.rb b/lib/mongo/server/monitor.rb index 7867b8b5cb..9130fe7128 100644 --- a/lib/mongo/server/monitor.rb +++ b/lib/mongo/server/monitor.rb @@ -237,8 +237,11 @@ def run_sdam_flow(result, awaited: false, scan_error: nil) @sdam_mutex.synchronize do old_description = server.description - new_description = Description.new(server.address, result, - average_round_trip_time: server.round_trip_time_calculator.average_round_trip_time + new_description = Description.new( + server.address, + result, + average_round_trip_time: server.round_trip_time_calculator.average_round_trip_time, + minimum_round_trip_time: server.round_trip_time_calculator.minimum_round_trip_time ) server.cluster.run_sdam_flow(server.description, new_description, awaited: awaited, scan_error: scan_error) diff --git a/lib/mongo/server/pending_connection.rb b/lib/mongo/server/pending_connection.rb index 94784887b5..add725bb07 100644 --- a/lib/mongo/server/pending_connection.rb +++ b/lib/mongo/server/pending_connection.rb @@ -155,7 +155,11 @@ def handshake!(speculative_auth_doc: nil) doc['serviceId'] ||= "fake:#{rand(2**32-1)+1}" end - post_handshake(doc, @server.round_trip_time_calculator.average_round_trip_time) + post_handshake( + doc, + @server.round_trip_time_calculator.average_round_trip_time, + @server.round_trip_time_calculator.minimum_round_trip_time + ) doc end @@ -205,7 +209,7 @@ def ensure_connected # # @return [ Server::Description ] The server description calculated from # the handshake response for this particular connection. - def post_handshake(response, average_rtt) + def post_handshake(response, average_rtt, minimum_rtt) if response["ok"] == 1 # Auth mechanism is entirely dependent on the contents of # hello response *for this connection*. diff --git a/lib/mongo/server/round_trip_time_calculator.rb b/lib/mongo/server/round_trip_time_calculator.rb index 3008d8ad52..99ee7eb60e 100644 --- a/lib/mongo/server/round_trip_time_calculator.rb +++ b/lib/mongo/server/round_trip_time_calculator.rb @@ -80,7 +80,7 @@ def update_average_round_trip_time end def update_minimum_round_trip_time - @rtts.push(@last_round_trip_time) unless @last_round_trip_time.nil? + @rtts.push(last_round_trip_time) unless last_round_trip_time.nil? @minimum_round_trip_time = 0 and return if @rtts.size < MIN_SAMPLES @rtts.shift if @rtts.size > RTT_SAMPLES_FOR_MINIMUM diff --git a/spec/mongo/protocol/msg_spec.rb b/spec/mongo/protocol/msg_spec.rb index 23a4f41d1b..b79ca4f0f1 100644 --- a/spec/mongo/protocol/msg_spec.rb +++ b/spec/mongo/protocol/msg_spec.rb @@ -504,4 +504,66 @@ end end end + + describe '#maybe_add_max_time_ms' do + let(:server) { instance_double(Mongo::Server) } + + let(:connection) do + instance_double(Mongo::Server::Connection).tap do |conn| + allow(conn).to receive(:server).and_return(server) + end + end + + let(:new_msg) do + message.maybe_add_max_time_ms(connection, context) + end + + context 'when no timeout_ms set' do + let(:context) do + Mongo::Operation::Context.new + end + + it 'does now set maxTimeMS' do + expect(new_msg.documents.first.key?(:maxTimeMS)).to eq(false) + end + end + + context 'when there is enough time to send the message' do + let(:context) do + instance_double(Mongo::Operation::Context).tap do |ctx| + # Ten seconds + allow(ctx).to receive(:remaining_timeout_sec).twice.and_return(10) + end + end + + before do + # One second + allow(server).to receive(:minimum_round_trip_time).and_return(1) + end + + it 'sets the maxTimeMS' do + # Nine seconds + expect(new_msg.documents.first[:maxTimeMS]).to eq(9_000) + end + end + + context 'when there is not enough time to send the message' do + let(:context) do + instance_double(Mongo::Operation::Context).tap do |ctx| + allow(ctx).to receive(:remaining_timeout_sec).twice.and_return(0.1) + end + end + + before do + # One second + allow(server).to receive(:minimum_round_trip_time).and_return(1) + end + + it 'sets the maxTimeMS' do + expect do + new_msg + end.to raise_error(Mongo::Error::TimeoutError) + end + end + end end