Skip to content

Commit

Permalink
Raise DB::ConnectionLost error when an IO error occurs to enable the
Browse files Browse the repository at this point in the history
crystal-db database pool to automatically remove bad connections from
the connection pool
  • Loading branch information
lachlan authored and wonderix committed Dec 2, 2023
1 parent 70e9195 commit bedde71
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 51 deletions.
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies:
db:
github: crystal-lang/crystal-db
#version: ~> 0.12.0
commit: a527cfdc4edf7e27d3d7e90ac4d6bc80c8800ba4
commit: 06df272740fb9141050681ae916c465cc8e70584

authors:
- Ulrich Kramer <ulrich.kramer@web.de>
Expand Down
2 changes: 1 addition & 1 deletion src/tds.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ require "db"
require "./tds/**"

module TDS
VERSION = "0.1.0"
VERSION = {{ `shards version "#{__DIR__}"`.chomp.stringify.downcase }}
end
44 changes: 22 additions & 22 deletions src/tds/connection.cr
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class TDS::Connection < DB::Connection

begin
socket = TCPSocket.new(tds_options.host, tds_options.port, connect_timeout: tds_options.connect_timeout)
rescue exc : Socket::ConnectError
raise DB::ConnectionRefused.new
rescue ex : Socket::ConnectError
raise DB::ConnectionRefused.new(cause: ex)
end
@socket = socket
@socket.read_timeout = tds_options.read_timeout
Expand Down Expand Up @@ -84,38 +84,38 @@ class TDS::Connection < DB::Connection
]).write(io)
end
result : Int32? = nil
begin
recv(PacketIO::Type::REPLY) do |io|
Token.each(io) do |token|
case token
when Token::MetaData
when Token::Order
when Token::ReturnStatus
when Token::DoneInProc
when Token::Param
result = token.value.as(Int32)
else
raise ProtocolError.new("Unexpected token #{token.inspect}")
end
recv(PacketIO::Type::REPLY) do |io|
Token.each(io) do |token|
case token
when Token::MetaData
when Token::Order
when Token::ReturnStatus
when Token::DoneInProc
when Token::Param
result = token.value.as(Int32)
else
raise ProtocolError.new("Unexpected token #{token.inspect}")
end
end
rescue exc : ::Exception
raise DB::Error.new("#{exc.to_s} while preparing \"#{statement}\"")
end
result.not_nil!
rescue ex : IO::Error
raise DB::ConnectionLost.new(self, ex)
rescue ex
raise DB::Error.new("#{ex.to_s} while preparing \"#{statement}\"", ex)
end

protected def perform_exec(statement)
send(PacketIO::Type::QUERY) do |io|
UTF16_IO.write(io, statement, ENCODING)
end
recv(PacketIO::Type::REPLY) do |io|
begin
Token.each(io) { |t| }
rescue exc : ::Exception
raise DB::Error.new("#{exc.to_s} in \"#{statement}\"")
end
Token.each(io) { |t| }
end
rescue ex : IO::Error
raise DB::ConnectionLost.new(self, ex)
rescue ex
raise DB::Error.new("#{ex.to_s} in \"#{statement}\"", ex)
end

def build_prepared_statement(query) : DB::Statement
Expand Down
28 changes: 15 additions & 13 deletions src/tds/prepared_statement.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class TDS::PreparedStatement < DB::Statement
begin
index += 1
PreparedStatement.encode(args[index])
rescue ::IndexError
raise DB::Error.new("Too few arguments specified for statement: #{command}")
rescue ex : ::IndexError
raise DB::Error.new("Too few arguments specified for statement: #{command}", ex)
end
end
raise DB::Error.new("Too many arguments specified for statement: #{command}") if index != args.size - 1
Expand All @@ -37,8 +37,8 @@ class TDS::PreparedStatement < DB::Statement
@type_infos << type_info
params << "#{param} #{type_info.type}"
param
rescue ::IndexError
raise DB::Error.new("Too few arguments specified for statement: #{command}")
rescue ex : ::IndexError
raise DB::Error.new("Too few arguments specified for statement: #{command}", ex)
end
end
raise DB::Error.new("Too many arguments specified for statement: #{command}") if index != args.size - 1
Expand All @@ -61,6 +61,8 @@ class TDS::PreparedStatement < DB::Statement
result = ResultSet.new(self, Token.each(io))
end
result.not_nil!
rescue ex : IO::Error
raise DB::ConnectionLost.new(connection, ex)
end

protected def perform_exec(args : Enumerable) : DB::ExecResult
Expand All @@ -69,24 +71,24 @@ class TDS::PreparedStatement < DB::Statement
parameters = args.zip(@type_infos).map do |x|
begin
Parameter.new(x[0], type_info: x[1])
rescue exc : IndexError
raise DB::Error.new("#{x} : #{exc}")
rescue ex : IndexError
raise DB::Error.new("#{x} : #{ex}", ex)
end
end
rescue exc : IndexError
raise DB::Error.new("#{args} #{@type_infos} #{command}: #{exc}")
rescue ex : IndexError
raise DB::Error.new("#{args} #{@type_infos} #{command}: #{ex}", ex)
end
connection.send(PacketIO::Type::RPC) do |io|
RpcRequest.new(id: RpcRequest::Type::EXECUTE, parameters: [@proc_id.not_nil!] + parameters).write(io)
end
connection.recv(PacketIO::Type::REPLY) do |io|
begin
Token.each(io) { |t| }
rescue exc : ::Exception
raise DB::Error.new("#{exc.to_s} in \"#{command}\"")
end
Token.each(io) { |t| }
end
DB::ExecResult.new 0, 0
rescue ex : IO::Error
raise DB::ConnectionLost.new(connection, ex)
rescue ex
raise DB::Error.new("#{ex.to_s} in \"#{command}\"", ex)
end

protected def do_close
Expand Down
7 changes: 5 additions & 2 deletions src/tds/result_set.cr
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ class TDS::ResultSet < DB::ResultSet
while !@done
token = begin
@iterator.next
rescue exc : ::Exception
rescue ex : IO::Error
@done = true
raise DB::Error.new("#{exc.to_s} in \"#{statement.command}\"")
raise DB::ConnectionLost.new(statement.connection, ex)
rescue ex
@done = true
raise DB::Error.new("#{ex.to_s} in \"#{statement.command}\"", ex)
end
case token
when Token::Row
Expand Down
14 changes: 7 additions & 7 deletions src/tds/type_info.cr
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ module TDS
when Nil
NVarchar.new(1)
else
raise NotImplemented.new("Invalid type #{value.inspect}")
raise NotImplemented.new("Unsupported value : #{value.class} = #{value.inspect} (expected type: String | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Time | BigDecimal | Nil)")
end
end
end
Expand Down Expand Up @@ -252,7 +252,7 @@ module TDS
ENCODING.encode(value.to_i64, io)
end
else
raise ProtocolError.new("Unsupported value #{value}")
raise ProtocolError.new("Unsupported value : #{value.class} = #{value.inspect} (expected type: Number | Nil)")
end
end

Expand Down Expand Up @@ -389,7 +389,7 @@ module TDS
ENCODING.encode(0x1_u8, io)
ENCODING.encode(value.to_u8.bit(0), io)
else
raise ProtocolError.new("Unsupported value #{value}")
raise ProtocolError.new("Unsupported value : #{value.class} = #{value.inspect} (expected type: Number | Nil)")
end
end

Expand Down Expand Up @@ -467,7 +467,7 @@ module TDS
ENCODING.encode(value.to_f64, io)
end
else
raise ProtocolError.new("Unsupported value #{value}")
raise ProtocolError.new("Unsupported value : #{value.class} = #{value.inspect} (expected type: Number | Nil)")
end
end

Expand Down Expand Up @@ -559,7 +559,7 @@ module TDS
ENCODING.encode(days, io)
ENCODING.encode(fraction, io)
else
raise ProtocolError.new("Unsupported value #{value}")
raise ProtocolError.new("Unsupported value : #{value.class} = #{value.inspect} (expected type: Time | Nil)")
end
end

Expand Down Expand Up @@ -622,7 +622,7 @@ module TDS
ENCODING.encode(sign, io)
io.write(data.to_slice)
else
raise ProtocolError.new("Unsupported value #{value}")
raise ProtocolError.new("Unsupported value : #{value.class} = #{value.inspect} (expected type: BigDecimal | Nil)")
end
end

Expand Down Expand Up @@ -678,7 +678,7 @@ module TDS
ENCODING.encode(UInt16.new(value.size * 2), io)
UTF16_IO.write(io, value, ENCODING)
else
raise ProtocolError.new("Unsupported value #{value}")
raise ProtocolError.new("Unsupported value : #{value.class} = #{value.inspect} (expected type: String | Nil)")
end
end

Expand Down
12 changes: 7 additions & 5 deletions src/tds/unprepared_statement.cr
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class TDS::UnpreparedStatement < DB::Statement
result = ResultSet.new(self, Token.each(io))
end
result.not_nil!
rescue ex : IO::Error
raise DB::ConnectionLost.new(connection, ex)
end

protected def perform_exec(args : Enumerable) : DB::ExecResult
Expand All @@ -38,13 +40,13 @@ class TDS::UnpreparedStatement < DB::Statement
UTF16_IO.write(io, statement, ENCODING)
end
connection.recv(PacketIO::Type::REPLY) do |io|
begin
Token.each(io) { |t| }
rescue exc : ::Exception
raise StatementError.new(exc, statement)
end
Token.each(io) { |t| }
end
DB::ExecResult.new 0, 0
rescue ex : IO::Error
raise DB::ConnectionLost.new(connection, ex)
rescue ex
raise StatementError.new(ex, statement.to_s)
end

protected def do_close
Expand Down

0 comments on commit bedde71

Please sign in to comment.