Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout support in queries #47

Merged
merged 6 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ jobs:
- uses: actions/checkout@v3
- uses: erlef/setup-beam@v1
with:
otp-version: "25.1"
gleam-version: "1.4.1"
otp-version: "26.1"
gleam-version: "1.6.1"
rebar3-version: "3"
# ImageOS: ubuntu20
- run: gleam build
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Unreleased

- Add support for timeout in pool configuration as well as queries.

## v1.0.1 - 2024-11-26

- Corrected a mistake in the `array` function type.
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ When writing or reading a JSON, you can simply use
`pog.text(json.to_string(my_json))` and `dynamic.string` to respectively write
and read them!

## Timeout

By default, every pog query has a 5 seconds timeout, and every query taking more
than 5 seconds will automatically be aborted. That behaviour can be changed
through the usage of `default_timeout` or `timeout`. `default_timeout` should be
used on `Config`, and defines the timeout that will be used for every query
using that connection, while `timeout` handles timeout query by query. If you have
one query taking more time than your default timeout to complete, you can override
that behaviour specifically for that one.

## Rows as maps

By default, `pgo` will return every selected value from your query as a tuple.
Expand Down
1 change: 1 addition & 0 deletions gleam.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pgo = ">= 0.12.0 and < 2.0.0"
[dev-dependencies]
gleeunit = "~> 1.0"
exception = ">= 2.0.0 and < 3.0.0"
gleam_erlang = ">= 0.30.0 and < 1.0.0"

[erlang]
# Starting an SSL connection relies on ssl application to be started.
Expand Down
2 changes: 2 additions & 0 deletions manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
packages = [
{ name = "backoff", version = "1.1.6", build_tools = ["rebar3"], requirements = [], otp_app = "backoff", source = "hex", outer_checksum = "CF0CFFF8995FB20562F822E5CC47D8CCF664C5ECDC26A684CBE85C225F9D7C39" },
{ name = "exception", version = "2.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "exception", source = "hex", outer_checksum = "F5580D584F16A20B7FCDCABF9E9BE9A2C1F6AC4F9176FA6DD0B63E3B20D450AA" },
{ name = "gleam_erlang", version = "0.30.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "760618870AE4A497B10C73548E6E44F43B76292A54F0207B3771CBB599C675B4" },
{ name = "gleam_stdlib", version = "0.34.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "1FB8454D2991E9B4C0C804544D8A9AD0F6184725E20D63C3155F0AEB4230B016" },
{ name = "gleeunit", version = "1.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "D364C87AFEB26BDB4FB8A5ABDE67D635DC9FA52D6AB68416044C35B096C6882D" },
{ name = "opentelemetry_api", version = "1.3.0", build_tools = ["rebar3", "mix"], requirements = ["opentelemetry_semantic_conventions"], otp_app = "opentelemetry_api", source = "hex", outer_checksum = "B9E5FF775FD064FA098DBA3C398490B77649A352B40B0B730A6B7DC0BDD68858" },
Expand All @@ -14,6 +15,7 @@ packages = [

[requirements]
exception = { version = ">= 2.0.0 and < 3.0.0" }
gleam_erlang = { version = ">= 0.30.0 and < 1.0.0" }
gleam_stdlib = { version = ">= 0.20.0 and < 2.0.0" }
gleeunit = { version = "~> 1.0" }
pgo = { version = ">= 0.12.0 and < 2.0.0" }
45 changes: 40 additions & 5 deletions src/pog.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pub type Config {
/// (default: False) By default, pgo will return a n-tuple, in the order of the query.
/// By setting `rows_as_map` to `True`, the result will be `Dict`.
rows_as_map: Bool,
/// (default: 5000): Default time in milliseconds to wait before the query
/// is considered timeout. Timeout can be edited per query.
default_timeout: Int,
)
}

Expand Down Expand Up @@ -161,6 +164,13 @@ pub fn rows_as_map(config: Config, rows_as_map: Bool) -> Config {
Config(..config, rows_as_map:)
}

/// By default, pog have a default value of 5000ms as timeout.
/// By setting `default_timeout`, every queries will now use that timeout.
/// `default_timeout` should be set as milliseconds.
pub fn default_timeout(config: Config, default_timeout: Int) -> Config {
lpil marked this conversation as resolved.
Show resolved Hide resolved
Config(..config, default_timeout:)
}

/// The internet protocol version to use.
pub type IpVersion {
/// Internet Protocol version 4 (IPv4)
Expand Down Expand Up @@ -188,6 +198,7 @@ pub fn default_config() -> Config {
trace: False,
ip_version: Ipv4,
rows_as_map: False,
default_timeout: 5000,
)
}

Expand Down Expand Up @@ -329,6 +340,7 @@ fn run_query(
a: Connection,
b: String,
c: List(Value),
timeout: Option(Int),
) -> Result(#(Int, List(Dynamic)), QueryError)

pub type QueryError {
Expand All @@ -346,20 +358,32 @@ pub type QueryError {
/// The rows returned by the database could not be decoded using the supplied
/// dynamic decoder.
UnexpectedResultType(DecodeErrors)
/// The query timed out.
QueryTimeout
/// No connection was available to execute the query. This may be due to
/// invalid connection details such as an invalid username or password.
ConnectionUnavailable
}

pub opaque type Query(row_type) {
Query(sql: String, parameters: List(Value), row_decoder: Decoder(row_type))
Query(
sql: String,
parameters: List(Value),
row_decoder: Decoder(row_type),
timeout: option.Option(Int),
)
}

/// Create a new query to use with the `execute`, `returning`, and `parameter`
/// functions.
///
pub fn query(sql: String) -> Query(Nil) {
Query(sql:, parameters: [], row_decoder: fn(_) { Ok(Nil) })
Query(
sql:,
parameters: [],
row_decoder: fn(_) { Ok(Nil) },
timeout: option.None,
)
}

/// Set the decoder to use for the type of row returned by executing this
Expand All @@ -370,15 +394,21 @@ pub fn query(sql: String) -> Query(Nil) {
/// against the database.
///
pub fn returning(query: Query(t1), decoder: Decoder(t2)) -> Query(t2) {
let Query(sql:, parameters:, row_decoder: _) = query
Query(sql:, parameters:, row_decoder: decoder)
let Query(sql:, parameters:, row_decoder: _, timeout:) = query
Query(sql:, parameters:, row_decoder: decoder, timeout:)
}

/// Push a new query parameter value for the query.
pub fn parameter(query: Query(t1), parameter: Value) -> Query(t1) {
Query(..query, parameters: [parameter, ..query.parameters])
}

/// Use a custom timeout for the query. This timeout will take precedence over
/// the default connection timeout.
pub fn timeout(query: Query(t1), timeout: Int) -> Query(t1) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document this please 🙏

Query(..query, timeout: Some(timeout))
}

/// Run a query against a PostgreSQL database.
///
/// The provided dynamic decoder is used to decode the rows returned by
Expand All @@ -390,7 +420,12 @@ pub fn execute(
on pool: Connection,
) -> Result(Returned(t), QueryError) {
let parameters = list.reverse(query.parameters)
use #(count, rows) <- result.then(run_query(pool, query.sql, parameters))
use #(count, rows) <- result.then(run_query(
pool,
query.sql,
parameters,
query.timeout,
))
use rows <- result.then(
list.try_map(over: rows, with: query.row_decoder)
|> result.map_error(UnexpectedResultType),
Expand Down
23 changes: 16 additions & 7 deletions src/pog_ffi.erl
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
-module(pog_ffi).

-export([query/3, connect/1, disconnect/1, coerce/1, null/0, transaction/2]).
-export([query/4, connect/1, disconnect/1, coerce/1, null/0, transaction/2]).

-record(pog_pool, {name, pid}).
-record(pog_pool, {name, pid, default_timeout}).

-include_lib("pog/include/pog_Config.hrl").
-include_lib("pg_types/include/pg_types.hrl").
Expand Down Expand Up @@ -54,7 +54,8 @@ connect(Config) ->
idle_interval = IdleInterval,
trace = Trace,
ip_version = IpVersion,
rows_as_map = RowsAsMap
rows_as_map = RowsAsMap,
default_timeout = DefaultTimeout
} = Config,
SslOptions = default_ssl_options(Host, Ssl),
Options1 = #{
Expand All @@ -81,7 +82,7 @@ connect(Config) ->
none -> Options1
end,
{ok, Pid} = pgo_pool:start_link(PoolName, Options2),
#pog_pool{name = PoolName, pid = Pid}.
#pog_pool{name = PoolName, pid = Pid, default_timeout = DefaultTimeout}.

disconnect(#pog_pool{pid = Pid}) ->
erlang:exit(Pid, normal),
Expand All @@ -102,8 +103,14 @@ transaction(#pog_pool{name = Name} = Conn, Callback) ->
end.


query(#pog_pool{name = Name}, Sql, Arguments) ->
case pgo:query(Sql, Arguments, #{pool => Name}) of
query(#pog_pool{name = Name, default_timeout = DefaultTimeout}, Sql, Arguments, Timeout) ->
Timeout1 = case Timeout of
none -> DefaultTimeout;
{some, QueryTimeout} -> QueryTimeout
end,
Options = #{pool => Name, pool_options => [{timeout, Timeout1}]},
Res = pgo:query(Sql, Arguments, Options),
case Res of
#{rows := Rows, num_rows := NumRows} ->
{ok, {NumRows, Rows}};

Expand Down Expand Up @@ -133,4 +140,6 @@ convert_error(#{
value := Value
}) ->
Got = list_to_binary(io_lib:format("~p", [Value])),
{unexpected_argument_type, Expected, Got}.
{unexpected_argument_type, Expected, Got};
convert_error(closed) ->
query_timeout.
47 changes: 47 additions & 0 deletions test/pog_test.gleam
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import exception
import gleam/dynamic.{type Decoder}
import gleam/erlang/atom
import gleam/option.{None, Some}
import gleeunit
import gleeunit/should
Expand All @@ -9,6 +10,11 @@ pub fn main() {
gleeunit.main()
}

pub fn run_with_timeout(time: Int, next: fn() -> a) {
let assert Ok(timeout) = atom.from_string("timeout")
#(timeout, time, next)
}

pub fn url_config_everything_test() {
let expected =
pog.default_config()
Expand Down Expand Up @@ -441,6 +447,47 @@ pub fn expected_return_type_test() {
pog.disconnect(db)
}

pub fn expected_five_millis_timeout_test() {
use <- run_with_timeout(20)
let db = start_default()

pog.query("select sub.ret from (select pg_sleep(0.05), 'OK' as ret) as sub")
|> pog.timeout(5)
|> pog.returning(dynamic.element(0, dynamic.string))
|> pog.execute(db)
|> should.equal(Error(pog.QueryTimeout))

pog.disconnect(db)
}

pub fn expected_ten_millis_no_timeout_test() {
use <- run_with_timeout(20)
let db = start_default()

pog.query("select sub.ret from (select pg_sleep(0.01), 'OK' as ret) as sub")
|> pog.timeout(30)
|> pog.returning(dynamic.element(0, dynamic.string))
|> pog.execute(db)
|> should.equal(Ok(pog.Returned(1, ["Ok"])))

pog.disconnect(db)
}

pub fn expected_ten_millis_no_default_timeout_test() {
use <- run_with_timeout(20)
let db =
default_config()
|> pog.default_timeout(30)
|> pog.connect

pog.query("select sub.ret from (select pg_sleep(0.01), 'OK' as ret) as sub")
|> pog.returning(dynamic.element(0, dynamic.string))
|> pog.execute(db)
|> should.equal(Ok(pog.Returned(1, ["Ok"])))

pog.disconnect(db)
}

pub fn expected_maps_test() {
let db = pog.Config(..default_config(), rows_as_map: True) |> pog.connect

Expand Down
Loading