Skip to content

Commit

Permalink
Fixing bug on UDS connection with packet size
Browse files Browse the repository at this point in the history
we were not passing the correct value from the environment in the constructor

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>
  • Loading branch information
pedro-stanaka committed Dec 18, 2024
1 parent fa7b330 commit d1503d1
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 11 deletions.
20 changes: 13 additions & 7 deletions lib/statsd/instrument/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ def statsd_uds_send?

def statsd_max_packet_size
if statsd_uds_send?
return Float(env.fetch("STATSD_MAX_PACKET_SIZE", StatsD::Instrument::UdsConnection::DEFAULT_MAX_PACKET_SIZE))
Integer(env.fetch("STATSD_MAX_PACKET_SIZE", StatsD::Instrument::UdsConnection::DEFAULT_MAX_PACKET_SIZE))
else
Integer(env.fetch("STATSD_MAX_PACKET_SIZE", StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE))
end

Float(env.fetch("STATSD_MAX_PACKET_SIZE", StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE))
end

def statsd_batch_statistics_interval
Expand Down Expand Up @@ -140,19 +140,25 @@ def default_sink_for_environment
case environment
when "production", "staging"
connection = if statsd_uds_send?
StatsD::Instrument::UdsConnection.new(statsd_socket_path)
StatsD::Instrument::UdsConnection.new(
statsd_socket_path,
max_packet_size: statsd_max_packet_size.to_i,
)
else
host, port = statsd_addr.split(":")
StatsD::Instrument::UdpConnection.new(host, port.to_i)
StatsD::Instrument::UdpConnection.new(
host,
port.to_i,
max_packet_size: statsd_max_packet_size.to_i,
)
end

sink = StatsD::Instrument::Sink.new(connection)
if statsd_batching?
# if we are batching, wrap the sink in a batched sink
return StatsD::Instrument::BatchedSink.new(
sink,
buffer_capacity: statsd_buffer_capacity,
max_packet_size: statsd_max_packet_size,
max_packet_size: statsd_max_packet_size.to_i,
statistics_interval: statsd_batch_statistics_interval,
)
end
Expand Down
7 changes: 5 additions & 2 deletions lib/statsd/instrument/sink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ def <<(datagram)
connection.send_datagram(datagram)
rescue SocketError, IOError, SystemCallError => error
StatsD.logger.debug do
"[#{self.class.name}] Resetting connection because of #{error.class}: #{error.message}"
"[#{self.class.name}] [#{connection.class.name}] " \
"Resetting connection because of #{error.class}: #{error.message}"
end
invalidate_connection
if retried
StatsD.logger.warn do
"[#{self.class.name}] Events were dropped because of #{error.class}: #{error.message}"
"[#{self.class.name}] [#{connection.class.name}] " \
"Events were dropped (after retrying) because of #{error.class}: #{error.message}. " \
"Message size: #{datagram.bytesize} bytes."
end
else
retried = true
Expand Down
4 changes: 3 additions & 1 deletion lib/statsd/instrument/udp_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ class UdpConnection

attr_reader :host, :port

def initialize(host, port)
def initialize(host, port, max_packet_size: DEFAULT_MAX_PACKET_SIZE)
@host = host
@port = port
@max_packet_size = max_packet_size
end

def send_datagram(message)
Expand All @@ -30,6 +31,7 @@ def type
def socket
@socket ||= begin
socket = UDPSocket.new
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, @max_packet_size)
socket.connect(@host, @port)
socket
end
Expand Down
10 changes: 10 additions & 0 deletions lib/statsd/instrument/uds_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ def send_datagram(message)

def close
@socket&.close
rescue IOError, SystemCallError => e
StatsD.logger.debug do
"[#{self.class.name}] Error closing socket: #{e.class}: #{e.message}"
end
ensure
@socket = nil
end

Expand All @@ -45,6 +50,11 @@ def socket
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, @max_packet_size.to_i)
socket.connect(Socket.pack_sockaddr_un(@socket_path))
socket
rescue IOError => e
StatsD.logger.debug do
"[#{self.class.name}] Failed to create socket: #{e.class}: #{e.message}"
end
nil
end
end
end
Expand Down
51 changes: 51 additions & 0 deletions test/environment_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,55 @@ def test_client_from_env_uses_regular_udp_sink_when_buffer_capacity_is_0
)
assert_kind_of(StatsD::Instrument::Sink, env.client.sink)
end

def test_client_from_env_uses_uds_sink_with_correct_packet_size_in_production
env = StatsD::Instrument::Environment.new(
"STATSD_ENV" => "production",
"STATSD_SOCKET_PATH" => "/tmp/statsd.sock",
"STATSD_MAX_PACKET_SIZE" => "65507",
"STATSD_USE_NEW_CLIENT" => "1"
)

client = env.client
sink = client.sink
connection = sink.connection

assert_kind_of(StatsD::Instrument::UdsConnection, connection)
assert_equal(65507, connection.instance_variable_get(:@max_packet_size))
end

def test_client_from_env_uses_default_packet_size_for_uds_when_not_specified
env = StatsD::Instrument::Environment.new(
"STATSD_ENV" => "production",
"STATSD_SOCKET_PATH" => "/tmp/statsd.sock",
"STATSD_USE_NEW_CLIENT" => "1"
)

client = env.client
sink = client.sink
connection = sink.connection

assert_kind_of(StatsD::Instrument::UdsConnection, connection)
assert_equal(StatsD::Instrument::UdsConnection::DEFAULT_MAX_PACKET_SIZE,
connection.instance_variable_get(:@max_packet_size))
end

def test_client_from_env_uses_batched_uds_sink_with_correct_packet_size
env = StatsD::Instrument::Environment.new(
"STATSD_ENV" => "production",
"STATSD_SOCKET_PATH" => "/tmp/statsd.sock",
"STATSD_MAX_PACKET_SIZE" => "65507",
"STATSD_BUFFER_CAPACITY" => "1000",
"STATSD_USE_NEW_CLIENT" => "1",
)

client = env.client
sink = client.sink
assert_kind_of(StatsD::Instrument::BatchedSink, sink)

underlying_sink = sink.instance_variable_get(:@sink)
connection = underlying_sink.connection
assert_kind_of(StatsD::Instrument::UdsConnection, connection)
assert_equal(65507, connection.instance_variable_get(:@max_packet_size))
end
end
10 changes: 9 additions & 1 deletion test/udp_sink_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,16 @@ def test_socket_error_should_invalidate_socket
UDPSocket.stubs(:new).returns(socket = mock("socket"))

seq = sequence("connect_fail_connect_succeed")
socket.expects(:setsockopt)
.with(Socket::SOL_SOCKET, Socket::SO_SNDBUF, StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE)
.in_sequence(seq)
socket.expects(:connect).with("localhost", 8125).in_sequence(seq)
socket.expects(:send).raises(Errno::EDESTADDRREQ).in_sequence(seq)
socket.expects(:close).in_sequence(seq)

socket.expects(:setsockopt)
.with(Socket::SOL_SOCKET, Socket::SO_SNDBUF, StatsD::Instrument::UdpConnection::DEFAULT_MAX_PACKET_SIZE)
.in_sequence(seq)
socket.expects(:connect).with("localhost", 8125).in_sequence(seq)
socket.expects(:send).twice.returns(1).in_sequence(seq)
socket.expects(:close).in_sequence(seq)
Expand All @@ -161,7 +168,8 @@ def test_socket_error_should_invalidate_socket
udp_sink << "bar:1|c"

assert_equal(
"[#{@sink_class}] Resetting connection because of " \
"[#{@sink_class}] [#{@sink_class.for_addr("localhost:8125").connection.class}] " \
"Resetting connection because of " \
"Errno::EDESTADDRREQ: Destination address required\n",
logs.string,
)
Expand Down

0 comments on commit d1503d1

Please sign in to comment.