diff --git a/.gitignore b/.gitignore index 6dd107e3..9d4c8351 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ /deps/deps.jl /deps/usr/ /deps/build.log +/Manifest.toml diff --git a/.travis.yml b/.travis.yml index 1d0ea828..f43267fe 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,6 +6,7 @@ julia: - 1.0 env: matrix: + - POSTGRESQL_VERSION=12 - POSTGRESQL_VERSION=11 - POSTGRESQL_VERSION=10 - POSTGRESQL_VERSION=9.6 @@ -22,6 +23,7 @@ matrix: fast_finish: true include: - julia: 1.1 + - julia: 1.2 - julia: nightly - os: osx env: diff --git a/Project.toml b/Project.toml index e9f4909d..48fbab9a 100644 --- a/Project.toml +++ b/Project.toml @@ -1,13 +1,14 @@ name = "LibPQ" uuid = "194296ae-ab2e-5f79-8cd4-7183a0a5a0d1" license = "MIT" -version = "0.9.1" +version = "0.10.0" [deps] BinaryProvider = "b99e7846-7c00-51b0-8f62-c81ae34c0232" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" Decimals = "abce61dc-4473-55a0-ba07-351d65e31d42" DocStringExtensions = "ffbed154-4ef7-542d-bbb7-c09d3a79fcae" +FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee" IterTools = "c8e1da08-722c-5040-9ed9-7db0dc04731e" LayerDicts = "6f188dcb-512c-564b-bc01-e0f76e72f166" Libdl = "8f399da3-3557-5675-b5ff-fb832c97cbdb" diff --git a/README.md b/README.md index e312efb3..9dfdfd60 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ LibPQ.jl is a Julia wrapper for the PostgreSQL `libpq` C library. * UTF-8 client encoding * Queries * Create and execute queries with or without parameters + * Execute queries asynchronously * Stream results using [Tables](https://github.com/JuliaData/Tables.jl) * Configurably convert a variety of PostgreSQL types to corresponding Julia types (see the **Type Conversions** section of the docs) * Prepared Statements diff --git a/appveyor.yml b/appveyor.yml index 3ff31fbb..2ffa4f7b 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -7,6 +7,7 @@ environment: matrix: - julia_version: 1.0 - julia_version: 1.1 + - julia_version: 1.2 platform: - x86 # 32-bit diff --git a/docs/src/index.md b/docs/src/index.md index 6f86127e..901c2e3d 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -20,6 +20,12 @@ data = columntable(result) result = execute(conn, "SELECT typname FROM pg_type WHERE oid = \$1", ["16"]) data = columntable(result) +# the same but asynchronously +async_result = async_execute(conn, "SELECT typname FROM pg_type WHERE oid = \$1", ["16"]) +# do other things +result = fetch(async_result) +data = columntable(result) + close(conn) ``` diff --git a/docs/src/pages/api.md b/docs/src/pages/api.md index fc98deb3..026a409f 100644 --- a/docs/src/pages/api.md +++ b/docs/src/pages/api.md @@ -54,6 +54,14 @@ LibPQ.CopyIn execute(::LibPQ.Connection, ::LibPQ.CopyIn) ``` +### Asynchronous + +```@docs +async_execute +LibPQ.AsyncResult +cancel +``` + ## Internals ### Connections diff --git a/src/LibPQ.jl b/src/LibPQ.jl index 703d154f..3dfcd30c 100644 --- a/src/LibPQ.jl +++ b/src/LibPQ.jl @@ -1,15 +1,17 @@ module LibPQ -export status, reset!, execute, prepare, +export status, reset!, execute, prepare, async_execute, cancel, num_columns, num_rows, num_params, num_affected_rows +using Base: Semaphore, acquire, release using Base.Iterators: zip, product using Base.Threads using Dates using DocStringExtensions using Decimals +using FileWatching using Tables using IterTools: imap using LayerDicts @@ -82,4 +84,6 @@ include("parsing.jl") include("copy.jl") include("tables.jl") +include("asyncresults.jl") + end diff --git a/src/asyncresults.jl b/src/asyncresults.jl new file mode 100644 index 00000000..a0233136 --- /dev/null +++ b/src/asyncresults.jl @@ -0,0 +1,253 @@ +"An asynchronous PostgreSQL query" +mutable struct AsyncResult + "The LibPQ.jl Connection used for the query" + jl_conn::Connection + + "Keyword arguments to pass to Result on creation" + result_kwargs::Ref + + "Whether or not the query should be cancelled, if running" + should_cancel::Bool + + "Task which errors or returns a LibPQ.jl Result which is created once available" + result_task::Task + + function AsyncResult(jl_conn::Connection, result_kwargs::Ref) + return new(jl_conn, result_kwargs, false) + end +end + +function AsyncResult(jl_conn::Connection; kwargs...) + return AsyncResult(jl_conn, Ref(kwargs)) +end + +function Base.show(io::IO, async_result::AsyncResult) + status = if isready(async_result) + if iserror(async_result) + "errored" + else + "finished" + end + else + "in progress" + end + print(io, typeof(async_result), " (", status, ")") +end + +""" + handle_result(async_result::AsyncResult; throw_error=true) -> Result + +Executes the query in `async_result` and waits for results. + +This implements the loop described in the PostgreSQL documentation for +[Asynchronous Command Processing](https://www.postgresql.org/docs/10/libpq-async.html). + +The `throw_error` option only determines whether to throw errors when handling the new +[`Result`](@ref)s; the `Task` may error for other reasons related to processing the +asynchronous loop. + +The result returned will be the [`Result`](@ref) of the last query run (the only query if +using parameters). +Any errors produced by the queries will be thrown together in a `CompositeException`. +""" +function handle_result(async_result::AsyncResult; throw_error=true) + errors = [] + result = nothing + for result_ptr in _consume(async_result.jl_conn) + try + result = handle_result( + Result( + result_ptr, + async_result.jl_conn; + async_result.result_kwargs[]... + ); + throw_error=throw_error, + ) + catch err + push!(errors, err) + end + end + + if throw_error && !isempty(errors) + throw(CompositeException(errors)) + elseif result === nothing + error(LOGGER, "Async query did not return result") + else + return result + end +end + +function _consume(jl_conn::Connection) + async_result = jl_conn.async_result + result_ptrs = Ptr{libpq_c.PGresult}[] + watcher = FDWatcher(socket(jl_conn), true, false) # can wait for reads + try + while true + if async_result.should_cancel + debug(LOGGER, "Received cancel signal for connection $(jl_conn.conn)") + _cancel(jl_conn) + end + debug(LOGGER, "Waiting to read from connection $(jl_conn.conn)") + wait(watcher) + debug(LOGGER, "Consuming input from connection $(jl_conn.conn)") + success = libpq_c.PQconsumeInput(jl_conn.conn) == 1 + !success && error(LOGGER, error_message(jl_conn)) + + while libpq_c.PQisBusy(jl_conn.conn) == 0 + debug(LOGGER, "Checking the result from connection $(jl_conn.conn)") + result_ptr = libpq_c.PQgetResult(jl_conn.conn) + if result_ptr == C_NULL + debug(LOGGER, "Finished reading from connection $(jl_conn.conn)") + return result_ptrs + else + result_num = length(result_ptrs) + 1 + debug(LOGGER, + "Saving result $result_num from connection $(jl_conn.conn)" + ) + push!(result_ptrs, result_ptr) + end + end + end + finally + close(watcher) + end +end + +""" + cancel(async_result::AsyncResult) + +If this [`AsyncResult`](@ref) represents a currently-executing query, attempt to cancel it. +""" +function cancel(async_result::AsyncResult) + # just sets the `should_cancel` flag + # the actual cancellation will be triggered in the main loop of _consume + # which will call `_cancel` on the `Connection` + async_result.should_cancel = true + return +end + +function _cancel(jl_conn::Connection) + cancel_ptr = libpq_c.PQgetCancel(jl_conn.conn) + try + # https://www.postgresql.org/docs/10/libpq-cancel.html#LIBPQ-PQCANCEL + errbuf_size = 256 + errbuf = zeros(UInt8, errbuf_size) + success = libpq_c.PQcancel(cancel_ptr, pointer(errbuf), errbuf_size) == 1 + if !success + warn(LOGGER, "Failed cancelling query: $(String(errbuf))") + else + debug(LOGGER, "Cancelled query for connection $(jl_conn.conn)") + end + finally + libpq_c.PQfreeCancel(cancel_ptr) + end +end + +iserror(async_result::AsyncResult) = Base.istaskfailed(async_result.result_task) +Base.isready(async_result::AsyncResult) = istaskdone(async_result.result_task) +Base.wait(async_result::AsyncResult) = wait(async_result.result_task) +Base.fetch(async_result::AsyncResult) = fetch(async_result.result_task) +Base.close(async_result::AsyncResult) = cancel(async_result) + +""" + async_execute( + jl_conn::Connection, + query::AbstractString, + [parameters::Union{AbstractVector, Tuple},] + kwargs... + ) -> AsyncResult + +Run a query on the PostgreSQL database and return an [`AsyncResult`](@ref). + +The `AsyncResult` contains a `Task` which processes a query asynchronously. +Calling `fetch` on the `AsyncResult` will return a [`Result`](@ref). + +All keyword arguments are the same as [`execute`](@ref) and are passed to the created +`Result`. + +Only one `AsyncResult` can be active on a [`Connection`](@ref) at once. +If multiple `AsyncResult`s use the same `Connection`, they will execute serially. + +`async_execute` does not yet support [`Statement`](@ref)s. + +`async_execute` optionally takes a `parameters` vector which passes query parameters as +strings to PostgreSQL. +Queries without parameters can contain multiple SQL statements, and the result of the final +statement is returned. +Any errors which occur during executed statements will be bundled together in a +`CompositeException` and thrown. + +As is normal for `Task`s, any exceptions will be thrown when calling `wait` or `fetch`. +""" +function async_execute end + +function async_execute(jl_conn::Connection, query::AbstractString; kwargs...) + async_result = _async_execute(jl_conn; kwargs...) do jl_conn + _async_submit(jl_conn.conn, query) + end + + return async_result +end + +function async_execute( + jl_conn::Connection, + query::AbstractString, + parameters::Union{AbstractVector, Tuple}; + kwargs... +) + string_params = string_parameters(parameters) + pointer_params = parameter_pointers(string_params) + + async_result = _async_execute(jl_conn; kwargs...) do jl_conn + _async_submit(jl_conn.conn, query, pointer_params) + end + + return async_result +end + +function _async_execute( + submission_fn::Function, jl_conn::Connection; throw_error::Bool=true, kwargs... +) + async_result = AsyncResult(jl_conn; kwargs...) + + async_result.result_task = @async lock(jl_conn) do + jl_conn.async_result = async_result + + try + # error if submission fails + # does not respect `throw_error` as there's no result to return on this error + submission_fn(jl_conn) || error(LOGGER, error_message(async_result.jl_conn)) + + return handle_result(async_result; throw_error=throw_error)::Result + finally + jl_conn.async_result = nothing + end + end + + return async_result +end + +function _async_submit(conn_ptr::Ptr{libpq_c.PGconn}, query::AbstractString) + return libpq_c.PQsendQuery(conn_ptr, query) == 1 +end + +function _async_submit( + conn_ptr::Ptr{libpq_c.PGconn}, + query::AbstractString, + parameters::Vector{Ptr{UInt8}}, +) + num_params = length(parameters) + + send_status = libpq_c.PQsendQueryParams( + conn_ptr, + query, + num_params, + C_NULL, # set paramTypes to C_NULL to have the server infer a type + parameters, + C_NULL, # paramLengths is ignored for text format parameters + zeros(Cint, num_params), # all parameters in text format + zero(Cint), # return result in text format + ) + + return send_status == 1 +end diff --git a/src/connections.jl b/src/connections.jl index 8eca17f8..2f366c0e 100644 --- a/src/connections.jl +++ b/src/connections.jl @@ -52,8 +52,11 @@ mutable struct Connection "True if the connection is closed and the PGconn object has been cleaned up" closed::Atomic{Bool} - "Lock for thread-safety" - lock::Mutex + "Semaphore for thread-safety (not thread-safe until Julia 1.2)" + semaphore::Semaphore + + "Current AsyncResult, if active" + async_result # ::Union{AsyncResult, Nothing}, would be a circular reference function Connection( conn::Ptr, @@ -68,7 +71,8 @@ mutable struct Connection PQTypeMap(type_map), PQConversions(conversions), Atomic{Bool}(closed), - Mutex(), + Semaphore(1), + nothing, ) end end @@ -98,6 +102,7 @@ function handle_new_connection(jl_conn::Connection; throw_error::Bool=true) warn(LOGGER, err) end else + debug(LOGGER, "Connection established: $(jl_conn.conn)") # if connection is successful, set client_encoding reset_encoding!(jl_conn) end @@ -169,6 +174,7 @@ function Connection( end # Make the connection + debug(LOGGER, "Connecting to $str") jl_conn = Connection(libpq_c.PQconnectdbParams(keywords, values, false); kwargs...) # If password needed and not entered, prompt the user @@ -193,14 +199,19 @@ end # AbstractLock primitives: # https://github.com/JuliaLang/julia/blob/master/base/condition.jl#L18 -Base.lock(conn::Connection) = lock(conn.lock) -Base.unlock(conn::Connection) = unlock(conn.lock) -Base.trylock(conn::Connection) = trylock(conn.lock) -Base.islocked(conn::Connection) = islocked(conn.lock) +Base.lock(conn::Connection) = acquire(conn.semaphore) +Base.unlock(conn::Connection) = release(conn.semaphore) +Base.islocked(conn::Connection) = conn.semaphore.curr_cnt >= conn.semaphore.sem_size -# AbstractLock conventions: -Base.lock(f, conn::Connection) = lock(f, conn.lock) -Base.trylock(f, conn::Connection) = trylock(f, conn.lock) +# AbstractLock convention: +function Base.lock(f, conn::Connection) + lock(conn) + try + return f() + finally + unlock(conn) + end +end """ Connection(f, args...; kwargs...) -> Connection @@ -410,10 +421,15 @@ but only if `jl_conn.closed` is `false`, to avoid a double-free. """ function Base.close(jl_conn::Connection) if !atomic_cas!(jl_conn.closed, false, true) + debug(LOGGER, "Closing connection $(jl_conn.conn)") + async_result = jl_conn.async_result + async_result === nothing || cancel(async_result) lock(jl_conn) do libpq_c.PQfinish(jl_conn.conn) jl_conn.conn = C_NULL end + else + debug(LOGGER, "Tried to close a closed connection; doing nothing") end return nothing end @@ -440,8 +456,12 @@ See [`handle_new_connection`](@ref) for information on the `throw_error` argumen """ function reset!(jl_conn::Connection; throw_error::Bool=true) if !atomic_cas!(jl_conn.closed, false, true) + debug(LOGGER, "Closing connection $(jl_conn.conn)") + async_result = jl_conn.async_result + async_result === nothing || cancel(async_result) lock(jl_conn) do jl_conn.closed[] = false + debug(LOGGER, "Resetting connection $(jl_conn.conn)") libpq_c.PQreset(jl_conn.conn) end @@ -519,7 +539,7 @@ end Construct a `ConnectionOption` from a `libpg_c.PQconninfoOption`. """ function ConnectionOption(pq_opt::libpq_c.PQconninfoOption) - ConnectionOption( + return ConnectionOption( unsafe_string(pq_opt.keyword), unsafe_string_or_null(pq_opt.envvar), unsafe_string_or_null(pq_opt.compiled), @@ -616,3 +636,12 @@ function Base.show(io::IO, jl_conn::Connection) end end end + +function socket(jl_conn::Connection) + socket_int = libpq_c.PQsocket(jl_conn.conn) + @static if Sys.iswindows() + return Base.WindowsRawSocket(Ptr{Cvoid}(Int(socket_int))) + else + return RawFD(socket_int) + end +end diff --git a/src/headers/libpq-fe.jl b/src/headers/libpq-fe.jl index 0e423a46..f31133f1 100644 --- a/src/headers/libpq-fe.jl +++ b/src/headers/libpq-fe.jl @@ -342,7 +342,7 @@ function PQfreeCancel(cancel) ccall((:PQfreeCancel, LIBPQ_HANDLE), Cvoid, (Ptr{PGcancel},), cancel) end -function PQcancel(cancel, errbuf, errbufsize::Cint) +function PQcancel(cancel, errbuf, errbufsize) ccall((:PQcancel, LIBPQ_HANDLE), Cint, (Ptr{PGcancel}, Cstring, Cint), cancel, errbuf, errbufsize) end diff --git a/src/tables.jl b/src/tables.jl index fdc2434f..b292a229 100644 --- a/src/tables.jl +++ b/src/tables.jl @@ -152,7 +152,8 @@ function load!(table::T, connection::Connection, query::AbstractString) where {T names = propertynames(row) sch = Tables.Schema(names, nothing) parameters = Vector{Parameter}(undef, length(names)) - while true + while state !== nothing + row, st = state Tables.eachcolumn(sch, row) do val, col, nm parameters[col] = if ismissing(val) missing @@ -164,8 +165,6 @@ function load!(table::T, connection::Connection, query::AbstractString) where {T end close(execute(stmt, parameters; throw_error=true)) state = iterate(rows, st) - state === nothing && break - row, st = state end return stmt end diff --git a/test/runtests.jl b/test/runtests.jl index 0a4236ae..2255e5e2 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -341,8 +341,15 @@ end result = execute(conn, copyin; throw_error=false) @test isopen(result) @test status(result) == LibPQ.libpq_c.PGRES_FATAL_ERROR - @test occursin("ERROR", LibPQ.error_message(result)) - @test occursin("invalid input syntax for integer", LibPQ.error_message(result)) + + err_msg = LibPQ.error_message(result) + @test occursin("ERROR", err_msg) + if LibPQ.server_version(conn) >= v"12" + @test occursin("invalid input syntax for type bigint", err_msg) + else + @test occursin("invalid input syntax for integer", err_msg) + end + close(result) result = execute( @@ -377,7 +384,7 @@ end @test was_open @test !isopen(saved_conn) - @test_throws ErrorException LibPQ.Connection("dbname=123fake"; throw_error=true) do jl_conn + @test_throws ErrorException LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=true) do jl_conn @test false end end @@ -536,7 +543,7 @@ end @testset "Bad Connection" begin @testset "throw_error=false" begin - conn = LibPQ.Connection("dbname=123fake"; throw_error=false) + conn = LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=false) @test conn isa LibPQ.Connection @test status(conn) == LibPQ.libpq_c.CONNECTION_BAD @test isopen(conn) @@ -552,9 +559,9 @@ end end @testset "throw_error=true" begin - @test_throws ErrorException LibPQ.Connection("dbname=123fake"; throw_error=true) + @test_throws ErrorException LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=true) - conn = LibPQ.Connection("dbname=123fake"; throw_error=false) + conn = LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=false) @test conn isa LibPQ.Connection @test status(conn) == LibPQ.libpq_c.CONNECTION_BAD @test isopen(conn) @@ -1133,6 +1140,155 @@ end close(conn) end end + + @testset "AsyncResults" begin + trywait(ar::LibPQ.AsyncResult) = (try wait(ar) catch end; nothing) + + @testset "Basic" begin + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + + ar = async_execute(conn, "SELECT pg_sleep(2);"; throw_error=false) + yield() + @test !isready(ar) + @test !LibPQ.iserror(ar) + @test conn.async_result === ar + + wait(ar) + @test isready(ar) + @test !LibPQ.iserror(ar) + @test conn.async_result === nothing + + result = fetch(ar) + @test status(result) == LibPQ.libpq_c.PGRES_TUPLES_OK + @test LibPQ.column_name(result, 1) == "pg_sleep" + + close(result) + close(conn) + end + + @testset "Parameters" begin + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + + ar = async_execute( + conn, + "SELECT typname FROM pg_type WHERE oid = \$1", + [16]; + throw_error=false, + ) + + wait(ar) + @test isready(ar) + @test !LibPQ.iserror(ar) + @test conn.async_result === nothing + + result = fetch(ar) + @test result isa LibPQ.Result + @test status(result) == LibPQ.libpq_c.PGRES_TUPLES_OK + @test isopen(result) + @test LibPQ.num_columns(result) == 1 + @test LibPQ.num_rows(result) == 1 + @test LibPQ.column_name(result, 1) == "typname" + + data = columntable(result) + + @test data[:typname][1] == "bool" + + close(result) + close(conn) + end + + # Ensures queries wait for previous query completion before starting + @testset "Wait in line to complete" begin + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + + first_ar = async_execute(conn, "SELECT pg_sleep(4);") + yield() + second_ar = async_execute(conn, "SELECT pg_sleep(2);") + @test !isready(first_ar) + @test !isready(second_ar) + + # wait(first_ar) # this is needed if I use @par for some reason + second_result = fetch(second_ar) + @test isready(first_ar) + @test isready(second_ar) + @test !LibPQ.iserror(first_ar) + @test !LibPQ.iserror(second_ar) + @test status(second_result) == LibPQ.libpq_c.PGRES_TUPLES_OK + @test conn.async_result === nothing + + first_result = fetch(first_ar) + @test isready(first_ar) + @test !LibPQ.iserror(first_ar) + @test status(second_result) == LibPQ.libpq_c.PGRES_TUPLES_OK + @test conn.async_result === nothing + + close(second_result) + close(first_result) + close(conn) + end + + @testset "Cancel" begin + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + + # final query needs to be one that actually does something + # on Windows, first query also needs to do something + ar = async_execute( + conn, + "SELECT * FROM pg_opclass; SELECT pg_sleep(3); SELECT * FROM pg_type;", + ) + yield() + @test !isready(ar) + @test !LibPQ.iserror(ar) + @test conn.async_result === ar + + cancel(ar) + trywait(ar) + @test isready(ar) + @test LibPQ.iserror(ar) + @test conn.async_result === nothing + + local err_msg = "" + try + wait(ar) + catch e + err_msg = sprint(showerror, e) + end + + @test occursin("canceling statement due to user request", err_msg) + + close(conn) + end + + @testset "Canceled by closing connection" begin + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + + # final query needs to be one that actually does something + # on Windows, first query also needs to do something + ar = async_execute( + conn, + "SELECT * FROM pg_opclass; SELECT pg_sleep(3); SELECT * FROM pg_type;", + ) + yield() + @test !isready(ar) + @test !LibPQ.iserror(ar) + @test conn.async_result === ar + + close(conn) + trywait(ar) + @test isready(ar) + @test LibPQ.iserror(ar) + @test conn.async_result === nothing + + local err_msg = "" + try + wait(ar) + catch e + err_msg = sprint(showerror, e) + end + + @test occursin("canceling statement due to user request", err_msg) + end + end end end