Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

let a client accept external client_args #39

Merged
merged 11 commits into from
Feb 15, 2024
24 changes: 17 additions & 7 deletions src/amqp-client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class AMQP::Client
def self.start(host = AMQP_HOST, port = AMQP_PORT, vhost = AMQP_VHOST,
user = AMQP_USER, password = AMQP_PASS, tls : TLSContext = AMQP_TLS, websocket = AMQP_WS,
channel_max = 1024_u16, frame_max = 131_072_u32, heartbeat = 0_u16,
verify_mode = OpenSSL::SSL::VerifyMode::PEER, name = nil, & : AMQP::Client::Connection -> _)
conn = self.new(host, port, vhost, user, password, tls, websocket, channel_max, frame_max, heartbeat, verify_mode, name).connect
verify_mode = OpenSSL::SSL::VerifyMode::PEER, name = nil, connection_information = ConnectionInformation.new, & : AMQP::Client::Connection -> _)
conn = self.new(host, port, vhost, user, password, tls, websocket, channel_max, frame_max, heartbeat, verify_mode, name, connection_information).connect
yield conn
ensure
conn.try &.close
Expand All @@ -67,6 +67,7 @@ class AMQP::Client
name = File.basename(PROGRAM_NAME)
buffer_size = 16_384
tcp = TCPConfig.new
connection_information = ConnectionInformation.new
uri.query_params.each do |key, value|
case key
when "name" then name = URI.decode_www_form(value)
Expand All @@ -77,6 +78,10 @@ class AMQP::Client
when "tcp_nodelay" then tcp.nodelay = true
when "recv_buffer_size" then tcp.recv_buffer_size = value.to_i
when "send_buffer_size" then tcp.send_buffer_size = value.to_i
when "product" then connection_information.product = value
when "platform" then connection_information.platform = value
when "product_version" then connection_information.product_version = value
when "platform_version" then connection_information.platform_version = value
when "tcp_keepalive"
ka = value.split(':', 3).map &.to_i
tcp.keepalive_idle, tcp.keepalive_interval, tcp.keepalive_count = ka
Expand All @@ -87,7 +92,7 @@ class AMQP::Client
end
self.new(host, port, vhost, user, password, tls, websocket,
channel_max, frame_max, heartbeat, verify_mode, name,
tcp, buffer_size)
connection_information, tcp, buffer_size)
end

property host, port, vhost, user, websocket, tcp, buffer_size
Expand All @@ -98,9 +103,14 @@ class AMQP::Client
property nodelay, keepalive_idle, keepalive_interval, keepalive_count, send_buffer_size, recv_buffer_size
end

record ConnectionInformation, product : String? = "amqp-client.cr", product_version : String? = nil, platform : String? = "Crystal", platform_version : String? = nil, name : String? = nil do
property product, product_version, platform, platform_version, name
end

def initialize(@host = AMQP_HOST, @port = AMQP_PORT, @vhost = AMQP_VHOST, @user = AMQP_USER, @password = AMQP_PASS,
tls : TLSContext = AMQP_TLS, @websocket = AMQP_WS, @channel_max = 1024_u16, @frame_max = 131_072_u32, @heartbeat = 0_u16,
verify_mode = OpenSSL::SSL::VerifyMode::PEER, @name : String? = File.basename(PROGRAM_NAME),
@connection_information = ConnectionInformation.new("amqp-client.cr", AMQP::Client::VERSION, "Crystal", Crystal::VERSION, File.basename(PROGRAM_NAME)),
@tcp = TCPConfig.new, @buffer_size = 16_384)
if tls.is_a? OpenSSL::SSL::Context::Client
@tls = tls
Expand All @@ -113,17 +123,17 @@ class AMQP::Client
def connect : Connection
if @host.starts_with? '/'
socket = connect_unix
Connection.start(socket, @user, @password, @vhost, @channel_max, @frame_max, @heartbeat, @name)
Connection.start(socket, @user, @password, @vhost, @channel_max, @frame_max, @heartbeat, @connection_information, @name)
kickster97 marked this conversation as resolved.
Show resolved Hide resolved
elsif @websocket
websocket = ::HTTP::WebSocket.new(@host, path: "", port: @port, tls: @tls)
io = WebSocketIO.new(websocket)
Connection.start(io, @user, @password, @vhost, @channel_max, @frame_max, @heartbeat, @name)
Connection.start(io, @user, @password, @vhost, @channel_max, @frame_max, @heartbeat, @connection_information, @name)
elsif ctx = @tls.as? OpenSSL::SSL::Context::Client
socket = connect_tls(connect_tcp, ctx)
Connection.start(socket, @user, @password, @vhost, @channel_max, @frame_max, @heartbeat, @name)
Connection.start(socket, @user, @password, @vhost, @channel_max, @frame_max, @heartbeat, @connection_information, @name)
else
socket = connect_tcp
Connection.start(socket, @user, @password, @vhost, @channel_max, @frame_max, @heartbeat, @name)
Connection.start(socket, @user, @password, @vhost, @channel_max, @frame_max, @heartbeat, @connection_information, @name)
end
rescue ex
case ex
Expand Down
19 changes: 11 additions & 8 deletions src/amqp-client/connection.cr
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,11 @@ class AMQP::Client

# Connection negotiation
def self.start(io : UNIXSocket | TCPSocket | OpenSSL::SSL::Socket::Client | WebSocketIO,
user, password, vhost, channel_max, frame_max, heartbeat, name = File.basename(PROGRAM_NAME))
user, password, vhost, channel_max, frame_max, heartbeat, connection_information,
name = File.basename(PROGRAM_NAME))
io.read_timeout = 60
start(io, user, password, name)
connection_information.name ||= name
start(io, user, password, connection_information)
channel_max, frame_max, heartbeat = tune(io, channel_max, frame_max, heartbeat)
open(io, vhost)
Connection.new(io, channel_max, frame_max, heartbeat)
Expand All @@ -225,16 +227,17 @@ class AMQP::Client
io.read_timeout = nil
end

private def self.start(io, user, password, name)
private def self.start(io, user, password, connection_information)
io.write AMQ::Protocol::PROTOCOL_START_0_9_1.to_slice
io.flush
Frame.from_io(io) { |f| f.as?(Frame::Connection::Start) || raise Error::UnexpectedFrame.new(f) }
props = Arguments.new({
connection_name: name,
product: "amqp-client.cr",
platform: "Crystal",
version: AMQP::Client::VERSION,
capabilities: Arguments.new({
connection_name: connection_information.name,
product: connection_information.product,
platform: connection_information.platform,
product_version: connection_information.product_version,
platform_version: connection_information.platform_version,
capabilities: Arguments.new({
kickster97 marked this conversation as resolved.
Show resolved Hide resolved
"publisher_confirms": true,
"exchange_exchange_bindings": true,
"basic.nack": true,
Expand Down
Loading