Skip to content

Commit

Permalink
Add txn id in disposition frame bits
Browse files Browse the repository at this point in the history
  • Loading branch information
DreamPearl committed Nov 21, 2024
1 parent ed624cd commit 74818fc
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 74 deletions.
15 changes: 7 additions & 8 deletions cpp/examples/tx_send.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
std::cout << " [on_container_start] completed!! txn: " << &transaction << std::endl;
}

void on_transaction_aborted(proton::transaction&) {}
void on_transaction_declare_failed(proton::transaction &) {}
void on_transaction_commit_failed(proton::transaction&) {}
void on_transaction_aborted(proton::transaction) {}
void on_transaction_declare_failed(proton::transaction) {}
void on_transaction_commit_failed(proton::transaction) {}


void on_transaction_declared(proton::transaction &t) override {
std::cout<<"[on_transaction_declared] txn: "<<(&transaction)<< " new_txn: "<<(&t)<<std::endl;
void on_transaction_declared(proton::transaction t) override {
std::cout << "[on_transaction_declared] txn: " << (&transaction)
<< " new_txn: " << (t._impl->id) << std::endl;
connection.close();
// transaction = &t;
// ASSUME: THIS FUNCTION DOESN"T WORK
Expand Down Expand Up @@ -95,7 +95,6 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
if(current_batch == batch_size)
{
transaction.commit();
// WE DON"T CARE ANY MORE FOR NOW
// transaction = NULL;
}
}
Expand All @@ -106,7 +105,7 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
confirmed += 1;
}

void on_transaction_committed(proton::transaction &t) override {
void on_transaction_committed(proton::transaction t) override {
committed += current_batch;
std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
if(committed == total) {
Expand Down
1 change: 1 addition & 0 deletions cpp/include/proton/container.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ class PN_CPP_CLASS_EXTERN container {
friend class receiver_options;
friend class sender_options;
friend class work_queue;
friend class transaction;
/// @endcond
};

Expand Down
64 changes: 53 additions & 11 deletions cpp/include/proton/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "./internal/export.hpp"
#include "./sender.hpp"
#include "./tracker.hpp"
#include "./container.hpp"

/// @file
/// @copybrief proton::transaction
Expand All @@ -36,16 +37,57 @@ namespace proton {

class transaction_handler;

// TODO: This should not be accessible to users.
class transaction_impl {
public:
proton::sender *txn_ctrl = nullptr;
proton::transaction_handler *handler = nullptr;
proton::binary id;
proton::tracker _declare;
proton::tracker _discharge;
bool failed = false;
std::vector<proton::tracker> pending;

void commit();
void abort();
void declare();
proton::tracker send(proton::sender s, proton::message msg);

void discharge(bool failed);
proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
void handle_outcome(proton::tracker t);
transaction_impl(proton::sender &_txn_ctrl,
proton::transaction_handler &_handler,
bool _settle_before_discharge);

// delete copy and assignment operator to ensure no copy of this object is
// every made transaction_impl(const transaction_impl&) = delete;
// transaction_impl& operator=(const transaction_impl&) = delete;
};

class
PN_CPP_CLASS_EXTERN transaction {
// private:
// PN_CPP_EXTERN transaction(proton::sender& _txn_ctrl,
// proton::transaction_handler& _handler, bool _settle_before_discharge);

static transaction mk_transaction_impl(sender &s, transaction_handler &h,
bool f);
PN_CPP_EXTERN transaction(transaction_impl* impl);
public:
transaction_impl* _impl;
// TODO:
PN_CPP_EXTERN virtual ~transaction();
PN_CPP_EXTERN virtual void commit();
PN_CPP_EXTERN virtual void abort();
PN_CPP_EXTERN virtual void declare();
PN_CPP_EXTERN virtual void handle_outcome(proton::tracker);
PN_CPP_EXTERN virtual proton::tracker send(proton::sender s, proton::message msg);
// PN_CPP_EXTERN transaction(transaction &o);
PN_CPP_EXTERN transaction();
PN_CPP_EXTERN ~transaction();
PN_CPP_EXTERN void commit();
PN_CPP_EXTERN void abort();
PN_CPP_EXTERN void declare();
PN_CPP_EXTERN void handle_outcome(proton::tracker);
PN_CPP_EXTERN proton::tracker send(proton::sender s, proton::message msg);

friend class transaction_impl;
friend class container::impl;
};

class
Expand All @@ -54,19 +96,19 @@ PN_CPP_CLASS_EXTERN transaction_handler {
PN_CPP_EXTERN virtual ~transaction_handler();

/// Called when a local transaction is declared.
PN_CPP_EXTERN virtual void on_transaction_declared(transaction&);
PN_CPP_EXTERN virtual void on_transaction_declared(transaction);

/// Called when a local transaction is discharged successfully.
PN_CPP_EXTERN virtual void on_transaction_committed(transaction&);
PN_CPP_EXTERN virtual void on_transaction_committed(transaction);

/// Called when a local transaction is discharged unsuccessfully (aborted).
PN_CPP_EXTERN virtual void on_transaction_aborted(transaction&);
PN_CPP_EXTERN virtual void on_transaction_aborted(transaction);

/// Called when a local transaction declare fails.
PN_CPP_EXTERN virtual void on_transaction_declare_failed(transaction&);
PN_CPP_EXTERN virtual void on_transaction_declare_failed(transaction);

/// Called when the commit of a local transaction fails.
PN_CPP_EXTERN virtual void on_transaction_commit_failed(transaction&);
PN_CPP_EXTERN virtual void on_transaction_commit_failed(transaction);
};

} // namespace proton
Expand Down
20 changes: 20 additions & 0 deletions cpp/include/proton/transfer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,26 @@
/// @copybrief proton::transfer

struct pn_delivery_t;
struct pn_disposition_t;

namespace proton {

class disposition : public internal::object<pn_disposition_t> {
/// @cond INTERNAL
disposition(pn_disposition_t *d) : internal::object<pn_disposition_t>(d) {}
/// @endcond

public:
/// Create an empty disposition.
disposition() : internal::object<pn_disposition_t>(0) {}

proton::value data() const;

/// @cond INTERNAL
friend class internal::factory<disposition>;
/// @endcond
};

/// The base class for delivery and tracker.
class transfer : public internal::object<pn_delivery_t> {
/// @cond INTERNAL
Expand Down Expand Up @@ -88,6 +105,9 @@ class transfer : public internal::object<pn_delivery_t> {
/// Get user data from this transfer.
PN_CPP_EXTERN void* user_data() const;

PN_CPP_EXTERN disposition remote();
PN_CPP_EXTERN disposition local();

/// @cond INTERNAL
friend class internal::factory<transfer>;
/// @endcond
Expand Down
25 changes: 19 additions & 6 deletions cpp/src/messaging_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

#include <proton/connection.h>
#include <proton/delivery.h>
#include <proton/disposition.h>
#include <proton/handlers.h>
#include <proton/link.h>
#include <proton/message.h>
Expand Down Expand Up @@ -115,20 +116,32 @@ void message_decode(message& msg, proton::delivery delivery) {
msg.decode(buf);
pn_link_advance(unwrap(link));
}

void on_delivery(messaging_handler& handler, pn_event_t* event) {
pn_link_t *lnk = pn_event_link(event);
pn_delivery_t *dlv = pn_event_delivery(event);
link_context& lctx = link_context::get(lnk);
Tracing& ot = Tracing::getTracing();
if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) {
std::cout<< " on_delivery: COOORINDATOR.. " << &handler << std::endl;
// delivery d(make_wrapper<delivery>(dlv));
pn_disposition_t *disposition = pn_delivery_remote(dlv);
proton::value val(pn_disposition_data(disposition));
std::cout << " on_delivery: COOORINDATOR.. tracker: " << val
<< std::endl;
tracker t(make_wrapper<tracker>(dlv));
std::cout<< " on_delivery: COOORINDATOR.. tracker" << &t << std::endl;
handler.on_tracker_settle(t);
}
std::cout << " on_delivery: COOORINDATOR.. TRACKER MADE: "
<< std::endl;
// t.user_data = val; // not

// proton::disposition _disposition = make_wrapper(disposition); // #
// t.remote();

else if (pn_link_is_receiver(lnk)) {
// proton::value val2 = _disposition.data();

// std::cout<< " on_delivery: COOORINDATOR with TXN IN :"
// << val2 << std::endl;

handler.on_tracker_settle(t);
} else if (pn_link_is_receiver(lnk)) {
delivery d(make_wrapper<delivery>(dlv));
if (pn_delivery_aborted(dlv)) {
pn_delivery_settle(dlv);
Expand Down
27 changes: 24 additions & 3 deletions cpp/src/proactor_container_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@
#include "proton/transport.h"
#include "proton/transaction.hpp"

#include "proton/delivery.h"

#include "contexts.hpp"
#include "messaging_adapter.hpp"
#include "reconnect_options_impl.hpp"
#include "proton_bits.hpp"

#include <proton/types.hpp>

#include <assert.h>
#include <string.h>

Expand Down Expand Up @@ -863,16 +867,32 @@ void container::impl::stop(const proton::error_condition& err) {
}

// TODO: declare this in separate internal header file
extern transaction mk_transaction_impl(sender&, transaction_handler&, bool);
// extern transaction mk_transaction_impl(sender&, transaction_handler&, bool);

transaction container::impl::declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge) {
class InternalTransactionHandler : public proton::messaging_handler {
// TODO: auto_settle
void on_tracker_settle(proton::tracker &t) override {
std::cout<<" [InternalTransactionHandler][on_tracker_settle] called with tracker.txn"
<< std::endl;
t.transaction().handle_outcome(t);

// t.user_data = val; // not

// proton::disposition _disposition = make_wrapper(disposition);
// // # t.remote();

// proton::value val2 = _disposition.data();

// proton::disposition _disposition = t.remote();

// proton::value val = _disposition.data();

// std::cout<< " declare_transaction: on_tracker_settle with
// TXN IN :" << val << std::endl;

// if(t.transaction()) {
t.transaction().handle_outcome(t);
// t.transaction().handle_outcome(t);
// }
}

Expand Down Expand Up @@ -903,7 +923,8 @@ transaction container::impl::declare_transaction(proton::connection conn, proton

std::cout<<" [declare_transaction] calling mk_transaction_impl" << std::endl;

auto txn = mk_transaction_impl(s, handler, settle_before_discharge);
auto txn =
transaction::mk_transaction_impl(s, handler, settle_before_discharge);
std::cout<<" [declare_transaction] txn address:" << &txn << std::endl;

return txn;
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/proton_bits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
#include <proton/link.h>
#include <proton/session.h>
#include <proton/tracker.hpp>

#include <string>
#include <iosfwd>
Expand All @@ -41,6 +42,7 @@ struct pn_connection_t;
struct pn_session_t;
struct pn_link_t;
struct pn_delivery_t;
struct pn_disposition_t;
struct pn_condition_t;
struct pn_acceptor_t;
struct pn_terminus_t;
Expand All @@ -60,6 +62,7 @@ class sender;
class receiver;
class transfer;
class tracker;
class disposition;
class delivery;
class error_condition;
class acceptor;
Expand Down Expand Up @@ -98,6 +101,9 @@ template <> struct wrapped<sender> { typedef pn_link_t type; };
template <> struct wrapped<receiver> { typedef pn_link_t type; };
template <> struct wrapped<transfer> { typedef pn_delivery_t type; };
template <> struct wrapped<tracker> { typedef pn_delivery_t type; };
template <> struct wrapped<disposition> {
typedef pn_disposition_t type;
};
template <> struct wrapped<delivery> { typedef pn_delivery_t type; };
template <> struct wrapped<error_condition> { typedef pn_condition_t type; };
template <> struct wrapped<terminus> { typedef pn_terminus_t type; };
Expand All @@ -111,6 +117,9 @@ template <> struct wrapper<pn_connection_t> { typedef connection type; };
template <> struct wrapper<pn_session_t> { typedef session type; };
template <> struct wrapper<pn_link_t> { typedef link type; };
template <> struct wrapper<pn_delivery_t> { typedef transfer type; };
template <> struct wrapper<pn_disposition_t> {
typedef disposition type;
};
template <> struct wrapper<pn_condition_t> { typedef error_condition type; };
template <> struct wrapper<pn_terminus_t> { typedef terminus type; };

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

namespace proton {

tracker::tracker(pn_delivery_t *d): transfer(make_wrapper(d)) {}
tracker::tracker(pn_delivery_t *d) : transfer(make_wrapper(d)) {}
sender tracker::sender() const { return make_wrapper<class sender>(pn_delivery_link(pn_object())); }
binary tracker::tag() const { return bin(pn_delivery_tag(pn_object())); }
}
Loading

0 comments on commit 74818fc

Please sign in to comment.