Skip to content

Commit

Permalink
WIP: pipeline mode
Browse files Browse the repository at this point in the history
  • Loading branch information
d-frey committed Dec 14, 2024
1 parent 0a38636 commit b726879
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 0 deletions.
7 changes: 7 additions & 0 deletions include/tao/pq/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <tao/pq/isolation_level.hpp>
#include <tao/pq/notification.hpp>
#include <tao/pq/parameter.hpp>
#include <tao/pq/pipeline_status.hpp>
#include <tao/pq/poll.hpp>
#include <tao/pq/transaction.hpp>
#include <tao/pq/transaction_status.hpp>
Expand Down Expand Up @@ -133,6 +134,12 @@ namespace tao::pq
[[nodiscard]] auto status() const noexcept -> connection_status;
[[nodiscard]] auto transaction_status() const noexcept -> pq::transaction_status;

[[nodiscard]] auto pipeline_status() const noexcept -> pq::pipeline_status;
void enter_pipeline_mode();
void exit_pipeline_mode();

void pipeline_sync();

[[nodiscard]] auto is_open() const noexcept -> bool
{
return status() == connection_status::ok;
Expand Down
23 changes: 23 additions & 0 deletions include/tao/pq/pipeline_status.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) 2024 Daniel Frey and Dr. Colin Hirsch
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt)

#ifndef TAO_PQ_PIPELINE_STATUS_HPP
#define TAO_PQ_PIPELINE_STATUS_HPP

#include <cstdint>

#include <libpq-fe.h>

namespace tao::pq
{
enum class pipeline_status : std::uint8_t
{
on = PQ_PIPELINE_ON,
off = PQ_PIPELINE_OFF,
aborted = PQ_PIPELINE_ABORTED
};

} // namespace tao::pq

#endif
2 changes: 2 additions & 0 deletions include/tao/pq/result.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <libpq-fe.h>

#include <tao/pq/internal/zsv.hpp>
#include <tao/pq/result_status.hpp>
#include <tao/pq/row.hpp>

namespace tao::pq
Expand All @@ -52,6 +53,7 @@ namespace tao::pq
explicit result( PGresult* pgresult );

public:
[[nodiscard]] auto status() const noexcept -> result_status;
[[nodiscard]] auto has_rows_affected() const noexcept -> bool;
[[nodiscard]] auto rows_affected() const -> std::size_t;

Expand Down
34 changes: 34 additions & 0 deletions include/tao/pq/result_status.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2024 Daniel Frey and Dr. Colin Hirsch
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt)

#ifndef TAO_PQ_RESULT_STATUS_HPP
#define TAO_PQ_RESULT_STATUS_HPP

#include <cstdint>

#include <libpq-fe.h>

namespace tao::pq
{
enum class result_status : std::uint8_t
{
empty_query = PGRES_EMPTY_QUERY,
command_ok = PGRES_COMMAND_OK,
tuples_ok = PGRES_TUPLES_OK,
copy_out = PGRES_COPY_OUT,
copy_in = PGRES_COPY_IN,
bad_response = PGRES_BAD_RESPONSE,
nonfatal_error = PGRES_NONFATAL_ERROR,
fatal_error = PGRES_FATAL_ERROR,
single_tuple = PGRES_SINGLE_TUPLE,
#if defined( LIBPQ_HAS_CHUNK_MODE )
tuples_chunk = PGRES_TUPLES_CHUNK,
#endif
pipeline_sync = PGRES_PIPELINE_SYNC,
pipeline_aborted = PGRES_PIPELINE_ABORTED
};

} // namespace tao::pq

#endif
1 change: 1 addition & 0 deletions include/tao/pq/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ namespace tao::pq
#endif

[[nodiscard]] auto get_result( const std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now() ) -> result;
void consume_pipeline_sync( const std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now() );

template< parameter_type... As >
auto execute( const internal::zsv statement, As&&... as )
Expand Down
26 changes: 26 additions & 0 deletions src/lib/pq/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,32 @@ namespace tao::pq
return static_cast< pq::transaction_status >( PQtransactionStatus( m_pgconn.get() ) );
}

auto connection::pipeline_status() const noexcept -> pq::pipeline_status
{
return static_cast< pq::pipeline_status >( PQpipelineStatus( m_pgconn.get() ) );
}

void connection::enter_pipeline_mode()
{
if( PQenterPipelineMode( m_pgconn.get() ) == 0 ) {
throw pq::connection_error( "unable to enter pipeline mode" );
}
}

void connection::exit_pipeline_mode()
{
if( PQexitPipelineMode( m_pgconn.get() ) == 0 ) {
throw pq::connection_error( error_message() );
}
}

void connection::pipeline_sync()
{
if( PQpipelineSync( m_pgconn.get() ) == 0 ) {
throw pq::connection_error( "unable to sync pipeline" );
}
}

auto connection::direct() -> std::shared_ptr< pq::transaction >
{
return std::make_shared< internal::autocommit_transaction >( shared_from_this() );
Expand Down
7 changes: 7 additions & 0 deletions src/lib/pq/result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ namespace tao::pq
#if defined( LIBPQ_HAS_CHUNK_MODE )
case PGRES_TUPLES_CHUNK:
#endif
case PGRES_PIPELINE_SYNC:
case PGRES_PIPELINE_ABORTED:
return;

case PGRES_EMPTY_QUERY:
Expand All @@ -56,6 +58,11 @@ namespace tao::pq
}
}

auto result::status() const noexcept -> result_status
{
return static_cast< pq::result_status >( PQresultStatus( m_pgresult.get() ) );
}

auto result::has_rows_affected() const noexcept -> bool
{
const char* str = PQcmdTuples( m_pgresult.get() );
Expand Down
24 changes: 24 additions & 0 deletions src/lib/pq/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <chrono>
#include <cstdio>
#include <exception>
#include <format>
#include <memory>
#include <stdexcept>
#include <utility>
Expand Down Expand Up @@ -189,10 +190,33 @@ namespace tao::pq
result = std::move( next );
}
}
else {
throw std::runtime_error( "unable to obtain result" );
}

return pq::result( result.release() );
}

void transaction::consume_pipeline_sync( const std::chrono::steady_clock::time_point start )
{
check_current_transaction();
const auto end = m_connection->timeout_end( start );

auto result = m_connection->get_result( end );
if( result ) {
const auto status = PQresultStatus( result.get() );
if( status == PGRES_PIPELINE_SYNC ) {
return;
}
else {
throw std::runtime_error( std::format( "unexpected result status: {}", static_cast< int >( status ) ) );
}
}
while( auto next = m_connection->get_result( end ) ) {
result = std::move( next );
}
}

auto transaction::subtransaction() -> std::shared_ptr< transaction >
{
check_current_transaction();
Expand Down
76 changes: 76 additions & 0 deletions src/test/pq/pipeline_mode.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2024 Daniel Frey and Dr. Colin Hirsch
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt)

#include "../getenv.hpp"
#include "../macros.hpp"

#include <exception>
#include <iostream>

#include <tao/pq.hpp>
#include <tao/pq/internal/unreachable.hpp>

namespace
{
void run()
{
// overwrite the default with an environment variable if needed
const auto connection_string = tao::pq::internal::getenv( "TAOPQ_TEST_DATABASE", "dbname=template1" ); // NOLINT(clang-analyzer-deadcode.DeadStores)

// open a connection
const auto connection = tao::pq::connection::create( connection_string );
TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::off );

TEST_THROWS( connection->pipeline_sync() );

connection->exit_pipeline_mode();
TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::off );

connection->enter_pipeline_mode();
TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::on );

connection->enter_pipeline_mode();
TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::on );

{
auto tr = connection->direct();
tr->send( "SELECT 42" );
tr->send( "SELECT 1234" );
connection->pipeline_sync();

TEST_ASSERT( tr->get_result().as< int >() == 42 );

tr->send( "SELECT 1701" );
connection->pipeline_sync();

TEST_ASSERT( tr->get_result().as< int >() == 1234 );
TEST_ASSERT( tr->get_result().as< int >() == 1701 );
tr->consume_pipeline_sync();

TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::on );
connection->exit_pipeline_mode();
TEST_ASSERT( connection->pipeline_status() == tao::pq::pipeline_status::off );

tr->commit();
}
}

} // namespace

auto main() -> int // NOLINT(bugprone-exception-escape)
{
try {
run();
}
// LCOV_EXCL_START
catch( const std::exception& e ) {
std::cerr << "exception: " << e.what() << '\n';
throw;
}
catch( ... ) {
std::cerr << "unknown exception\n";
throw;
}
// LCOV_EXCL_STOP
}

0 comments on commit b726879

Please sign in to comment.