diff --git a/cpp/examples/tx_send.cpp b/cpp/examples/tx_send.cpp index 55d247ecc..319ab3731 100644 --- a/cpp/examples/tx_send.cpp +++ b/cpp/examples/tx_send.cpp @@ -60,13 +60,13 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler { std::cout << " [on_container_start] completed!! txn: " << &transaction << std::endl; } - void on_transaction_aborted(proton::transaction&) {} - void on_transaction_declare_failed(proton::transaction &) {} - void on_transaction_commit_failed(proton::transaction&) {} + void on_transaction_aborted(proton::transaction) {} + void on_transaction_declare_failed(proton::transaction) {} + void on_transaction_commit_failed(proton::transaction) {} - - void on_transaction_declared(proton::transaction &t) override { - std::cout<<"[on_transaction_declared] txn: "<<(&transaction)<< " new_txn: "<<(&t)<id) << std::endl; connection.close(); // transaction = &t; // ASSUME: THIS FUNCTION DOESN"T WORK @@ -95,7 +95,6 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler { if(current_batch == batch_size) { transaction.commit(); - // WE DON"T CARE ANY MORE FOR NOW // transaction = NULL; } } @@ -106,7 +105,7 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler { confirmed += 1; } - void on_transaction_committed(proton::transaction &t) override { + void on_transaction_committed(proton::transaction t) override { committed += current_batch; std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl; if(committed == total) { diff --git a/cpp/include/proton/container.hpp b/cpp/include/proton/container.hpp index 0b0e16a41..a6153fa69 100644 --- a/cpp/include/proton/container.hpp +++ b/cpp/include/proton/container.hpp @@ -327,6 +327,7 @@ class PN_CPP_CLASS_EXTERN container { friend class receiver_options; friend class sender_options; friend class work_queue; + friend class transaction; /// @endcond }; diff --git a/cpp/include/proton/transaction.hpp b/cpp/include/proton/transaction.hpp index d2bc24be8..a74f85d94 100644 --- a/cpp/include/proton/transaction.hpp +++ b/cpp/include/proton/transaction.hpp @@ -28,6 +28,7 @@ #include "./internal/export.hpp" #include "./sender.hpp" #include "./tracker.hpp" +#include "./container.hpp" /// @file /// @copybrief proton::transaction @@ -36,16 +37,57 @@ namespace proton { class transaction_handler; +// TODO: This should not be accessible to users. +class transaction_impl { + public: + proton::sender *txn_ctrl = nullptr; + proton::transaction_handler *handler = nullptr; + proton::binary id; + proton::tracker _declare; + proton::tracker _discharge; + bool failed = false; + std::vector pending; + + void commit(); + void abort(); + void declare(); + proton::tracker send(proton::sender s, proton::message msg); + + void discharge(bool failed); + proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value); + void handle_outcome(proton::tracker t); + transaction_impl(proton::sender &_txn_ctrl, + proton::transaction_handler &_handler, + bool _settle_before_discharge); + + // delete copy and assignment operator to ensure no copy of this object is + // every made transaction_impl(const transaction_impl&) = delete; + // transaction_impl& operator=(const transaction_impl&) = delete; +}; + class PN_CPP_CLASS_EXTERN transaction { + // private: + // PN_CPP_EXTERN transaction(proton::sender& _txn_ctrl, + // proton::transaction_handler& _handler, bool _settle_before_discharge); + + static transaction mk_transaction_impl(sender &s, transaction_handler &h, + bool f); + PN_CPP_EXTERN transaction(transaction_impl* impl); public: + transaction_impl* _impl; // TODO: - PN_CPP_EXTERN virtual ~transaction(); - PN_CPP_EXTERN virtual void commit(); - PN_CPP_EXTERN virtual void abort(); - PN_CPP_EXTERN virtual void declare(); - PN_CPP_EXTERN virtual void handle_outcome(proton::tracker); - PN_CPP_EXTERN virtual proton::tracker send(proton::sender s, proton::message msg); + // PN_CPP_EXTERN transaction(transaction &o); + PN_CPP_EXTERN transaction(); + PN_CPP_EXTERN ~transaction(); + PN_CPP_EXTERN void commit(); + PN_CPP_EXTERN void abort(); + PN_CPP_EXTERN void declare(); + PN_CPP_EXTERN void handle_outcome(proton::tracker); + PN_CPP_EXTERN proton::tracker send(proton::sender s, proton::message msg); + + friend class transaction_impl; + friend class container::impl; }; class @@ -54,19 +96,19 @@ PN_CPP_CLASS_EXTERN transaction_handler { PN_CPP_EXTERN virtual ~transaction_handler(); /// Called when a local transaction is declared. - PN_CPP_EXTERN virtual void on_transaction_declared(transaction&); + PN_CPP_EXTERN virtual void on_transaction_declared(transaction); /// Called when a local transaction is discharged successfully. - PN_CPP_EXTERN virtual void on_transaction_committed(transaction&); + PN_CPP_EXTERN virtual void on_transaction_committed(transaction); /// Called when a local transaction is discharged unsuccessfully (aborted). - PN_CPP_EXTERN virtual void on_transaction_aborted(transaction&); + PN_CPP_EXTERN virtual void on_transaction_aborted(transaction); /// Called when a local transaction declare fails. - PN_CPP_EXTERN virtual void on_transaction_declare_failed(transaction&); + PN_CPP_EXTERN virtual void on_transaction_declare_failed(transaction); /// Called when the commit of a local transaction fails. - PN_CPP_EXTERN virtual void on_transaction_commit_failed(transaction&); + PN_CPP_EXTERN virtual void on_transaction_commit_failed(transaction); }; } // namespace proton diff --git a/cpp/include/proton/transfer.hpp b/cpp/include/proton/transfer.hpp index 0e6de8cc9..1e7995c06 100644 --- a/cpp/include/proton/transfer.hpp +++ b/cpp/include/proton/transfer.hpp @@ -33,9 +33,26 @@ /// @copybrief proton::transfer struct pn_delivery_t; +struct pn_disposition_t; namespace proton { +class disposition : public internal::object { + /// @cond INTERNAL + disposition(pn_disposition_t *d) : internal::object(d) {} + /// @endcond + + public: + /// Create an empty disposition. + disposition() : internal::object(0) {} + + proton::value data() const; + + /// @cond INTERNAL + friend class internal::factory; + /// @endcond +}; + /// The base class for delivery and tracker. class transfer : public internal::object { /// @cond INTERNAL @@ -88,6 +105,9 @@ class transfer : public internal::object { /// Get user data from this transfer. PN_CPP_EXTERN void* user_data() const; + PN_CPP_EXTERN disposition remote(); + PN_CPP_EXTERN disposition local(); + /// @cond INTERNAL friend class internal::factory; /// @endcond diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp index 8b51510c6..9a9a048bf 100644 --- a/cpp/src/messaging_adapter.cpp +++ b/cpp/src/messaging_adapter.cpp @@ -41,6 +41,7 @@ #include #include +#include #include #include #include @@ -115,20 +116,32 @@ void message_decode(message& msg, proton::delivery delivery) { msg.decode(buf); pn_link_advance(unwrap(link)); } - void on_delivery(messaging_handler& handler, pn_event_t* event) { pn_link_t *lnk = pn_event_link(event); pn_delivery_t *dlv = pn_event_delivery(event); link_context& lctx = link_context::get(lnk); Tracing& ot = Tracing::getTracing(); if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) { - std::cout<< " on_delivery: COOORINDATOR.. " << &handler << std::endl; + // delivery d(make_wrapper(dlv)); + pn_disposition_t *disposition = pn_delivery_remote(dlv); + proton::value val(pn_disposition_data(disposition)); + std::cout << " on_delivery: COOORINDATOR.. tracker: " << val + << std::endl; tracker t(make_wrapper(dlv)); - std::cout<< " on_delivery: COOORINDATOR.. tracker" << &t << std::endl; - handler.on_tracker_settle(t); - } + std::cout << " on_delivery: COOORINDATOR.. TRACKER MADE: " + << std::endl; + // t.user_data = val; // not + + // proton::disposition _disposition = make_wrapper(disposition); // # + // t.remote(); - else if (pn_link_is_receiver(lnk)) { + // proton::value val2 = _disposition.data(); + + // std::cout<< " on_delivery: COOORINDATOR with TXN IN :" + // << val2 << std::endl; + + handler.on_tracker_settle(t); + } else if (pn_link_is_receiver(lnk)) { delivery d(make_wrapper(dlv)); if (pn_delivery_aborted(dlv)) { pn_delivery_settle(dlv); diff --git a/cpp/src/proactor_container_impl.cpp b/cpp/src/proactor_container_impl.cpp index bae2557ca..00ab38bef 100644 --- a/cpp/src/proactor_container_impl.cpp +++ b/cpp/src/proactor_container_impl.cpp @@ -36,11 +36,15 @@ #include "proton/transport.h" #include "proton/transaction.hpp" +#include "proton/delivery.h" + #include "contexts.hpp" #include "messaging_adapter.hpp" #include "reconnect_options_impl.hpp" #include "proton_bits.hpp" +#include + #include #include @@ -863,7 +867,7 @@ void container::impl::stop(const proton::error_condition& err) { } // TODO: declare this in separate internal header file -extern transaction mk_transaction_impl(sender&, transaction_handler&, bool); +// extern transaction mk_transaction_impl(sender&, transaction_handler&, bool); transaction container::impl::declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge) { class InternalTransactionHandler : public proton::messaging_handler { @@ -871,8 +875,24 @@ transaction container::impl::declare_transaction(proton::connection conn, proton void on_tracker_settle(proton::tracker &t) override { std::cout<<" [InternalTransactionHandler][on_tracker_settle] called with tracker.txn" << std::endl; + t.transaction().handle_outcome(t); + + // t.user_data = val; // not + + // proton::disposition _disposition = make_wrapper(disposition); + // // # t.remote(); + + // proton::value val2 = _disposition.data(); + + // proton::disposition _disposition = t.remote(); + + // proton::value val = _disposition.data(); + + // std::cout<< " declare_transaction: on_tracker_settle with + // TXN IN :" << val << std::endl; + // if(t.transaction()) { - t.transaction().handle_outcome(t); + // t.transaction().handle_outcome(t); // } } @@ -903,7 +923,8 @@ transaction container::impl::declare_transaction(proton::connection conn, proton std::cout<<" [declare_transaction] calling mk_transaction_impl" << std::endl; - auto txn = mk_transaction_impl(s, handler, settle_before_discharge); + auto txn = + transaction::mk_transaction_impl(s, handler, settle_before_discharge); std::cout<<" [declare_transaction] txn address:" << &txn << std::endl; return txn; diff --git a/cpp/src/proton_bits.hpp b/cpp/src/proton_bits.hpp index 48b9f5fd8..b4350e0f6 100644 --- a/cpp/src/proton_bits.hpp +++ b/cpp/src/proton_bits.hpp @@ -20,6 +20,7 @@ */ #include #include +#include #include #include @@ -41,6 +42,7 @@ struct pn_connection_t; struct pn_session_t; struct pn_link_t; struct pn_delivery_t; +struct pn_disposition_t; struct pn_condition_t; struct pn_acceptor_t; struct pn_terminus_t; @@ -60,6 +62,7 @@ class sender; class receiver; class transfer; class tracker; +class disposition; class delivery; class error_condition; class acceptor; @@ -98,6 +101,9 @@ template <> struct wrapped { typedef pn_link_t type; }; template <> struct wrapped { typedef pn_link_t type; }; template <> struct wrapped { typedef pn_delivery_t type; }; template <> struct wrapped { typedef pn_delivery_t type; }; +template <> struct wrapped { + typedef pn_disposition_t type; +}; template <> struct wrapped { typedef pn_delivery_t type; }; template <> struct wrapped { typedef pn_condition_t type; }; template <> struct wrapped { typedef pn_terminus_t type; }; @@ -111,6 +117,9 @@ template <> struct wrapper { typedef connection type; }; template <> struct wrapper { typedef session type; }; template <> struct wrapper { typedef link type; }; template <> struct wrapper { typedef transfer type; }; +template <> struct wrapper { + typedef disposition type; +}; template <> struct wrapper { typedef error_condition type; }; template <> struct wrapper { typedef terminus type; }; diff --git a/cpp/src/tracker.cpp b/cpp/src/tracker.cpp index f671764d1..5715f12b8 100644 --- a/cpp/src/tracker.cpp +++ b/cpp/src/tracker.cpp @@ -32,7 +32,7 @@ namespace proton { -tracker::tracker(pn_delivery_t *d): transfer(make_wrapper(d)) {} +tracker::tracker(pn_delivery_t *d) : transfer(make_wrapper(d)) {} sender tracker::sender() const { return make_wrapper(pn_delivery_link(pn_object())); } binary tracker::tag() const { return bin(pn_delivery_tag(pn_object())); } } diff --git a/cpp/src/transaction.cpp b/cpp/src/transaction.cpp index 0d9a15c5e..b7865cf7b 100644 --- a/cpp/src/transaction.cpp +++ b/cpp/src/transaction.cpp @@ -19,11 +19,13 @@ * */ -#include "proton/message.hpp" #include "proton/transaction.hpp" +#include "proton/delivery.h" +#include "proton/message.hpp" #include "proton/target_options.hpp" #include "proton/tracker.hpp" +#include "proton_bits.hpp" #include #include @@ -31,43 +33,33 @@ namespace proton { transaction_handler::~transaction_handler() = default; -void transaction_handler::on_transaction_declared(transaction &) {} -void transaction_handler::on_transaction_committed(transaction &) {} -void transaction_handler::on_transaction_aborted(transaction &) {} -void transaction_handler::on_transaction_declare_failed(transaction &) {} -void transaction_handler::on_transaction_commit_failed(transaction &) {} - +void transaction_handler::on_transaction_declared(transaction) {} +void transaction_handler::on_transaction_committed(transaction) {} +void transaction_handler::on_transaction_aborted(transaction) {} +void transaction_handler::on_transaction_declare_failed(transaction) {} +void transaction_handler::on_transaction_commit_failed(transaction) {} + +transaction::transaction() : _impl(NULL) {} // empty transaction, not yet ready +// transaction::transaction(proton::sender& _txn_ctrl, +// proton::transaction_handler& _handler, bool _settle_before_discharge) : +// _impl(std::make_shared(_txn_ctrl, _handler, +// _settle_before_discharge)) {} +transaction::transaction(transaction_impl *impl) + : _impl(impl) {} +// transaction::transaction( transaction_impl* impl): _impl(impl){} transaction::~transaction() = default; -void transaction::commit() {}; -void transaction::abort() {}; -void transaction::declare() {}; -proton::tracker transaction::send(proton::sender s, proton::message msg) { return {}; }; +void transaction::commit() { _impl->commit(); }; +void transaction::abort() { _impl->abort(); }; +void transaction::declare() { _impl->declare(); }; +proton::tracker transaction::send(proton::sender s, proton::message msg) { + return _impl->send(s, msg); +}; void transaction::handle_outcome(proton::tracker t) { - std::cout<<" transaction_impl::handle_outcome = NO OP base class " << std::endl; - + std::cout << " transaction::handle_outcome = NO OP base class " + << std::endl; + _impl->handle_outcome(t); }; -class transaction_impl : public transaction { - public: - proton::sender* txn_ctrl = nullptr; - proton::transaction_handler* handler = nullptr; - // TODO int - int id = 0; - proton::tracker _declare; - proton::tracker _discharge; - bool failed = false; - std::vector pending; - - transaction_impl(proton::sender& _txn_ctrl, proton::transaction_handler& _handler, bool _settle_before_discharge); - void commit() override; - void abort() override; - void declare() override; - proton::tracker send(proton::sender s, proton::message msg) override; - - void discharge(bool failed); - proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value); - void handle_outcome(proton::tracker t); -}; transaction_impl::transaction_impl(proton::sender& _txn_ctrl, proton::transaction_handler& _handler, bool _settle_before_discharge): txn_ctrl(&_txn_ctrl), @@ -97,6 +89,10 @@ void transaction_impl::declare() { proton::value _value = vd; std::cout<<" [transaction_impl::declare()] value to send_ctrl: " << _value<< std::endl; _declare = send_ctrl(descriptor, _value ); + std::cout << " transaction_impl::declare()... txn_impl i am is " << this + << std::endl; + std::cout << " [transaction_impl::declare()] _declare is : " << _declare + << std::endl; } void transaction_impl::discharge(bool failed) { @@ -118,8 +114,12 @@ proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, proton::v proton::message msg = msg_value; std::cout << " [transaction_impl::send_ctrl] sending " << msg << std::endl; proton::tracker delivery = txn_ctrl->send(msg); - delivery.transaction(*this); - std::cout << " [transaction_impl::send_ctrl] sending done. I guess queued! " << std::endl; + std::cout << " # declare, delivery as tracker: " << delivery + << std::endl; + delivery.transaction(transaction(this)); + std::cout + << " [transaction_impl::send_ctrl] sending done. I guess queued! " + << delivery << std::endl; return delivery; } @@ -129,18 +129,36 @@ proton::tracker transaction_impl::send(proton::sender s, proton::message msg) { } void transaction_impl::handle_outcome(proton::tracker t) { + + // std::vector _data = + // proton::get>(val); + auto txn = t.transaction(); + std::cout << " handle_outcome::txn_impl i am is " << this << std::endl; + std::cout << " handle_outcome::_declare is " << _declare << std::endl; + std::cout << " handle_outcome::tracker is " << t << std::endl; + // TODO: handle outcome if(_declare == t) { std::cout<<" transaction_impl::handle_outcome => got _declare" << std::endl; - + pn_disposition_t *disposition = pn_delivery_remote(unwrap(t)); + proton::value val(pn_disposition_data(disposition)); + auto vd = get>(val); + txn._impl->id = vd[0]; + std::cout << " transaction_impl: handle_outcome.. got txnid:: " + << vd[0] << std::endl; + handler->on_transaction_declared(txn); + } else if (_discharge == t) { + std::cout << " transaction_impl::handle_outcome => got _discharge" + << std::endl; + handler->on_transaction_committed(txn); + } else { + std::cout << " transaction_impl::handle_outcome => got NONE!" + << std::endl; } - std::cout<<" transaction_impl::handle_outcome => calling txn declared. handler: " << handler << std::endl; - handler->on_transaction_declared(*this); - } -transaction mk_transaction_impl(sender& s, transaction_handler& h, bool f) { - return transaction_impl{s, h, f}; +transaction transaction::mk_transaction_impl(sender &s, transaction_handler &h, + bool f) { + return transaction(new transaction_impl(s, h, f)); } - } diff --git a/cpp/src/transfer.cpp b/cpp/src/transfer.cpp index fa7cd126c..bdd17035d 100644 --- a/cpp/src/transfer.cpp +++ b/cpp/src/transfer.cpp @@ -32,6 +32,7 @@ #include "proton_bits.hpp" +#include #include namespace proton { @@ -52,8 +53,7 @@ std::string to_string(enum transfer::state s) { return pn_disposition_type_name( std::ostream& operator<<(std::ostream& o, const enum transfer::state s) { return o << to_string(s); } void transfer::transaction(proton::transaction t) { - transfer_context& cc = transfer_context::get(pn_object()); - // FIX THIS + transfer_context &cc = transfer_context::get(pn_object()); cc.transaction_ = std::make_unique(t); } @@ -72,4 +72,23 @@ void* transfer::user_data() const { return cc.user_data_; } +disposition transfer::remote() { + auto me = pn_object(); + std::cout << " transfer::remote ME => " << me << std::endl; + + auto dd = pn_delivery_remote(me); + std::cout << " transfer::remote dd => " << dd << std::endl; + std::cout << " transfer::remote.data dd => " + << proton::value(pn_disposition_data(dd)) << std::endl; + auto d2 = make_wrapper(dd); + std::cout << " transfer::remote d2 ready => " << std::endl; + return d2; +} +disposition transfer::local() { + return make_wrapper(pn_delivery_local(pn_object())); +} + +proton::value disposition::data() const { + return proton::value(pn_disposition_data(pn_object())); +} }