Skip to content

Commit

Permalink
[feat][txn]Implement transactionCoordinatorClient
Browse files Browse the repository at this point in the history
Master Issue:#932
### Motivation
Implement transaction coordinator client.
### Modifications
1. Implement transaction coordinator
2. implement transaction API
3. Add metric and test
  • Loading branch information
liangyepianzhou committed Feb 13, 2023
1 parent d9b1083 commit 91803b0
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 2 deletions.
2 changes: 2 additions & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ type ClientOptions struct {

// Add custom labels to all the metrics reported by this client instance
CustomMetricsLabels map[string]string

IsEnableTransaction bool
}

// Client represents a pulsar client
Expand Down
9 changes: 9 additions & 0 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type client struct {
handlers internal.ClientHandlers
lookupService internal.LookupService
metrics *internal.Metrics
tcClient *transactionCoordinatorClient

log log.Logger
}
Expand Down Expand Up @@ -147,6 +148,14 @@ func newClient(options ClientOptions) (Client, error) {

c.handlers = internal.NewClientHandlers()

if options.IsEnableTransaction {
c.tcClient = newTransactionCoordinatorClientImpl(c)
err = c.tcClient.start()
if err != nil {
return nil, err
}
}

return c, nil
}

Expand Down
7 changes: 7 additions & 0 deletions pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ const (
ProducerClosed
// SchemaFailure means the payload could not be encoded using the Schema
SchemaFailure

// ReachMaxPendingOps means the pending operations in transaction_impl coordinator reach the maximum.
ReachMaxPendingOps
// InvalidStatus means the component status is not as expected.
InvalidStatus
// TransactionError means this is a transaction related error
TransactionError
)

// Error implement error interface, composed of two parts: msg and result.
Expand Down
12 changes: 11 additions & 1 deletion pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func NewMessageReaderFromArray(headersAndPayload []byte) *MessageReader {
// Batch format
// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [METADATA_SIZE][METADATA][PAYLOAD]
// [METADATA_SIZE][METADATA][PAYLOAD]
//
type MessageReader struct {
buffer Buffer
// true if we are parsing a batched message - set after parsing the message metadata
Expand Down Expand Up @@ -213,6 +212,17 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
cmd.GetLastMessageId = msg.(*pb.CommandGetLastMessageId)
case pb.BaseCommand_AUTH_RESPONSE:
cmd.AuthResponse = msg.(*pb.CommandAuthResponse)
case pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST:
cmd.TcClientConnectRequest = msg.(*pb.CommandTcClientConnectRequest)
case pb.BaseCommand_NEW_TXN:
cmd.NewTxn = msg.(*pb.CommandNewTxn)
case pb.BaseCommand_ADD_PARTITION_TO_TXN:
cmd.AddPartitionToTxn = msg.(*pb.CommandAddPartitionToTxn)
case pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN:
cmd.AddSubscriptionToTxn = msg.(*pb.CommandAddSubscriptionToTxn)
case pb.BaseCommand_END_TXN:
cmd.EndTxn = msg.(*pb.CommandEndTxn)

default:
panic(fmt.Sprintf("Missing command type: %v", cmdType))
}
Expand Down
11 changes: 10 additions & 1 deletion pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,16 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
c.handlePing()
case pb.BaseCommand_PONG:
c.handlePong()

case pb.BaseCommand_TC_CLIENT_CONNECT_RESPONSE:
c.handleResponse(cmd.TcClientConnectResponse.GetRequestId(), cmd)
case pb.BaseCommand_NEW_TXN_RESPONSE:
c.handleResponse(cmd.NewTxnResponse.GetRequestId(), cmd)
case pb.BaseCommand_ADD_PARTITION_TO_TXN_RESPONSE:
c.handleResponse(cmd.AddPartitionToTxnResponse.GetRequestId(), cmd)
case pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN_RESPONSE:
c.handleResponse(cmd.AddSubscriptionToTxnResponse.GetRequestId(), cmd)
case pb.BaseCommand_END_TXN_RESPONSE:
c.handleResponse(cmd.EndTxnResponse.GetRequestId(), cmd)
case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:

default:
Expand Down
7 changes: 7 additions & 0 deletions pulsar/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ func topicStats(topic string) (map[string]interface{}, error) {
return metadata, err
}

func transactionStats(id TxnID) (map[string]interface{}, error) {
var metadata map[string]interface{}
path := fmt.Sprintf("admin/v3/transactions/transactionMetadata/%d/%d", id.mostSigBits, id.leastSigBits)
err := httpGet(path, &metadata)
return metadata, err
}

func topicPath(topic string) string {
tn, _ := internal.ParseTopicName(topic)
idx := strings.LastIndex(tn.Name, "/")
Expand Down
6 changes: 6 additions & 0 deletions pulsar/transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package pulsar

type TxnID struct {
mostSigBits uint64
leastSigBits uint64
}
204 changes: 204 additions & 0 deletions pulsar/transactionCoordinatorClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package pulsar

import (
"context"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/gogo/protobuf/proto"
"strconv"
"sync/atomic"
"time"
)

type transactionCoordinatorClient struct {
client *client
cons []internal.Connection
epoch uint64
semaphore internal.Semaphore
blockIfReachMaxPendingOps bool
//The number of transactionImpl coordinators
tcNum uint64
log log.Logger
}

// TransactionCoordinatorAssign is the transaction_impl coordinator topic which is used to look up the broker
// where the TC located.
const TransactionCoordinatorAssign = "persistent://pulsar/system/transaction_coordinator_assign"

/*
*
Init a transactionImpl coordinator client and acquire connections with all transactionImpl coordinators.
*/
func newTransactionCoordinatorClientImpl(client *client) *transactionCoordinatorClient {
tc := &transactionCoordinatorClient{
client: client,
blockIfReachMaxPendingOps: true,
semaphore: internal.NewSemaphore(1000),
}
tc.log = client.log.SubLogger(log.Fields{})
return tc
}

func (tc *transactionCoordinatorClient) start() error {
r, err := tc.client.lookupService.GetPartitionedTopicMetadata(TransactionCoordinatorAssign)
if err != nil {
return err
}
tc.tcNum = uint64(r.Partitions)
tc.cons = make([]internal.Connection, tc.tcNum)

//Get connections with all transaction_impl coordinators which is synchronized
for i := uint64(0); i < tc.tcNum; i++ {
err := tc.grabConn(i)
if err != nil {
return err
}
}
return nil
}

func (tc *transactionCoordinatorClient) grabConn(partition uint64) error {
lr, err := tc.client.lookupService.Lookup(getTCAssignTopicName(partition))
if err != nil {
tc.log.WithError(err).Warn("Failed to lookup the transaction_impl " +
"coordinator assign topic [" + strconv.FormatUint(partition, 10) + "]")
return err
}

requestID := tc.client.rpcClient.NewRequestID()
cmdTCConnect := pb.CommandTcClientConnectRequest{
RequestId: proto.Uint64(requestID),
TcId: proto.Uint64(partition),
}

res, err := tc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST, &cmdTCConnect)

if err != nil {
tc.log.WithError(err).Error("Failed to connect transaction_impl coordinator " +
strconv.FormatUint(partition, 10))
return err
}
tc.cons[partition] = res.Cnx
return nil
}

func (tc *transactionCoordinatorClient) close() {
for _, con := range tc.cons {
con.Close()
}
}

/*
*
New a transactionImpl which can be used to guarantee exactly-once semantics.
*/
func (tc *transactionCoordinatorClient) newTransaction(timeout time.Duration) (TxnID, error) {
_, err := tc.canSendRequest()
if err != nil {
return TxnID{}, err
}
requestID := tc.client.rpcClient.NewRequestID()
nextTcId := tc.nextTCNumber()
cmdNewTxn := &pb.CommandNewTxn{
RequestId: proto.Uint64(requestID),
TcId: proto.Uint64(nextTcId),
TxnTtlSeconds: proto.Uint64(uint64(timeout.Milliseconds())),
}

cnx, err := tc.client.rpcClient.RequestOnCnx(tc.cons[nextTcId], requestID, pb.BaseCommand_NEW_TXN, cmdNewTxn)
if err != nil {
return TxnID{}, err
}

return TxnID{nextTcId, *cnx.Response.NewTxnResponse.TxnidLeastBits}, nil
}

/*
*
Register the partitions which published messages with the transactionImpl.
And this can be used when ending the transactionImpl.
*/
func (tc *transactionCoordinatorClient) addPublishPartitionToTxn(id TxnID, partitions []string) error {
_, err := tc.canSendRequest()
if err != nil {
return err
}
requestID := tc.client.rpcClient.NewRequestID()
cmdAddPartitions := &pb.CommandAddPartitionToTxn{
RequestId: proto.Uint64(requestID),
TxnidMostBits: proto.Uint64(id.mostSigBits),
TxnidLeastBits: proto.Uint64(id.leastSigBits),
Partitions: partitions,
}
_, err = tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID,
pb.BaseCommand_ADD_PARTITION_TO_TXN, cmdAddPartitions)
return err
}

/*
Register the subscription which acked messages with the transactionImpl.
And this can be used when ending the transactionImpl.
*/
func (tc *transactionCoordinatorClient) addSubscriptionToTxn(id TxnID, topic string, subscription string) error {
_, err := tc.canSendRequest()
if err != nil {
return err
}
requestID := tc.client.rpcClient.NewRequestID()
sub := &pb.Subscription{
Topic: &topic,
Subscription: &subscription,
}
cmdAddSubscription := &pb.CommandAddSubscriptionToTxn{
RequestId: proto.Uint64(requestID),
TxnidMostBits: proto.Uint64(id.mostSigBits),
TxnidLeastBits: proto.Uint64(id.leastSigBits),
Subscription: []*pb.Subscription{sub},
}
_, err = tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID,
pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN, cmdAddSubscription)
return err
}

/*
*
Commit or abort the transactionImpl.
*/
func (tc *transactionCoordinatorClient) endTxn(id TxnID, action pb.TxnAction) error {
_, err := tc.canSendRequest()
if err != nil {
return err
}
requestID := tc.client.rpcClient.NewRequestID()
cmdEndTxn := &pb.CommandEndTxn{
RequestId: proto.Uint64(requestID),
TxnAction: &action,
TxnidMostBits: proto.Uint64(id.mostSigBits),
TxnidLeastBits: proto.Uint64(id.leastSigBits),
}
_, err = tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID, pb.BaseCommand_END_TXN, cmdEndTxn)
return err
}

func getTCAssignTopicName(partition uint64) string {
return TransactionCoordinatorAssign + "-partition-" + strconv.FormatUint(partition, 10)
}

func (tc *transactionCoordinatorClient) canSendRequest() (bool, error) {
if tc.blockIfReachMaxPendingOps {
if !tc.semaphore.Acquire(context.Background()) {
return false, newError(UnknownError, "Failed to acquire semaphore")
}
} else {
if !tc.semaphore.TryAcquire() {
return false, newError(ReachMaxPendingOps, "transaction_impl coordinator reach max pending ops")
}
}
return true, nil
}

func (tc *transactionCoordinatorClient) nextTCNumber() uint64 {
return atomic.AddUint64(&tc.epoch, 1) % tc.tcNum
}
73 changes: 73 additions & 0 deletions pulsar/transaction_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package pulsar

import (
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestTCClient(t *testing.T) {
//1. Prepare: create PulsarClient and init transaction coordinator client.
topic := "my-topic"
sub := "my-sub"
tc, client := createTcClient(t)
defer client.Close()
defer tc.close()
//2. Prepare: create Topic and Subscription.
_, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
})
assert.Nil(t, err)
//3. Test newTransaction, addSubscriptionToTxn, addPublishPartitionToTxn
//Create a transaction1 and add subscription and publish topic to the transaction.
id1, err := tc.newTransaction(3 * time.Minute)
err = tc.addSubscriptionToTxn(id1, topic, sub)
err = tc.addPublishPartitionToTxn(id1, []string{topic})
assert.Nil(t, err)
//4. Verify the transaction1 stats
stats, err := transactionStats(id1)
assert.Nil(t, err)
assert.Equal(t, "OPEN", stats["status"])
producedPartitions := stats["producedPartitions"].(map[string]interface{})
ackedPartitions := stats["ackedPartitions"].(map[string]interface{})
_, ok := producedPartitions[topic]
assert.True(t, ok)
temp, ok := ackedPartitions[topic]
assert.True(t, ok)
subscriptions := temp.(map[string]interface{})
_, ok = subscriptions[sub]
assert.True(t, ok)
//5. Test End transaction
//Create transaction2 and Commit the transaction.
id2, err := tc.newTransaction(3 * time.Minute)
assert.Nil(t, err)
//6. Verify the transaction2 stats
stats2, err := transactionStats(id2)
assert.Equal(t, "OPEN", stats2["status"])
err = tc.endTxn(id2, pb.TxnAction_COMMIT)
stats2, err = transactionStats(id2)
//The transaction will be removed from txnMeta. Therefore, it is expected that stats2 is zero
if err == nil {
assert.Equal(t, "COMMITTED", stats2["status"])
} else {
assert.Equal(t, err.Error(), "http error status code: 404")
}
}

/*
Create a transaction coordinator client to send request
*/
func createTcClient(t *testing.T) (*transactionCoordinatorClient, *client) {
c, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
IsEnableTransaction: true,
})
assert.Nil(t, err)
tcClient := newTransactionCoordinatorClientImpl(c.(*client))
err = tcClient.start()
assert.Nil(t, err)

return tcClient, c.(*client)
}

0 comments on commit 91803b0

Please sign in to comment.