Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azservicebus] Fixing a memory leak when doing cross receiver settlement. #22368

Merged
merged 3 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions sdk/messaging/azservicebus/amqp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ type AMQPAnnotatedMessage struct {
// Properties corresponds to the properties section of an AMQP message.
Properties *AMQPAnnotatedMessageProperties

linkName string

// inner is the AMQP message we originally received, which contains some hidden
// data that's needed to settle with go-amqp. We strip out most of the underlying
// data so it's fairly minimal.
Expand Down Expand Up @@ -273,7 +271,6 @@ func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message, receivingLinkName stri
DeliveryTag: goAMQPMessage.DeliveryTag,
Footer: footer,
Header: header,
linkName: receivingLinkName,
Properties: properties,
inner: goAMQPMessage,
}
Expand Down
11 changes: 8 additions & 3 deletions sdk/messaging/azservicebus/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ func TestNewClientWithAzureIdentity(t *testing.T) {

receiver, err := client.NewReceiverForQueue(queue, nil)
require.NoError(t, err)
actualSettler, _ := receiver.settler.(*messageSettler)
actualSettler.onlyDoBackupSettlement = true // this'll also exercise the management link

messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
require.NoError(t, err)

require.EqualValues(t, []string{"hello - authenticating with a TokenCredential"}, getSortedBodies(messages))
forceManagementSettlement(t, messages)

for _, m := range messages {
err = receiver.CompleteMessage(context.TODO(), m, nil)
Expand Down Expand Up @@ -550,7 +549,7 @@ func TestNewClientUnitTests(t *testing.T) {
MaxRetryDelay: 12 * time.Hour,
}, receiver.retryOptions)

actualSettler := receiver.settler.(*messageSettler)
actualSettler := receiver.settler

require.Equal(t, RetryOptions{
MaxRetries: 101,
Expand Down Expand Up @@ -580,3 +579,9 @@ func assertRPCNotFound(t *testing.T, err error) {
require.ErrorAs(t, err, &rpcError)
require.Equal(t, http.StatusNotFound, rpcError.RPCCode())
}

func forceManagementSettlement(t *testing.T, messages []*ReceivedMessage) {
for _, m := range messages {
m.settleOnMgmtLink = true
}
}
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/internal/mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,10 @@ func SetSessionState(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName str
return nil
}

// SendDisposition allows you settle a message using the management link, rather than via your
// SettleOnMgmtLink allows you settle a message using the management link, rather than via your
// *amqp.Receiver. Use this if the receiver has been closed/lost or if the message isn't associated
// with a link (ex: deferred messages).
func SendDisposition(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]any) error {
func SettleOnMgmtLink(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]any) error {
if lockToken == nil {
err := errors.New("lock token on the message is not set, thus cannot send disposition")
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
DispositionTypeAccept DispositionType = "accept"
DispositionTypeReject DispositionType = "reject"
DispositionTypeRelease DispositionType = "release"
DispositionTypeModify DispositionType = "modify" // used for abandoning a message
)

type DispositionEvent struct {
Expand Down
6 changes: 3 additions & 3 deletions sdk/messaging/azservicebus/internal/stress/Chart.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dependencies:
- name: stress-test-addons
repository: https://stresstestcharts.blob.core.windows.net/helm/
version: 0.3.0
digest: sha256:3e21a7fdf5d6b37e871a6dd9f755888166fbb24802aa517f51d1d9223b47656e
generated: "2023-09-26T11:43:56.706771668-07:00"
version: 0.3.1
digest: sha256:28e374f8db5c46447b2a1491d4361ceb126536c425cbe54be49017120fe7b27d
generated: "2024-02-05T17:21:31.510400504-08:00"
3 changes: 3 additions & 0 deletions sdk/messaging/azservicebus/internal/stress/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ ENV CGO_ENABLED=0
ADD . /src
WORKDIR /src/internal/stress
RUN go build -o stress .
WORKDIR /src/internal/stress/tests/benchmarks
RUN go test -c

# The first container is just for building the artifacts, and contains all the source, etc...
# That container instance only ever lives on your local machine (or the build machine).
Expand All @@ -15,5 +17,6 @@ RUN go build -o stress .
FROM mcr.microsoft.com/cbl-mariner/base/core:2.0
WORKDIR /app
COPY --from=build /src/internal/stress/stress /app/stress
COPY --from=build /src/internal/stress/tests/benchmarks/benchmarks.test /app/benchmarks.test
RUN yum update -y && yum install -y ca-certificates
ENTRYPOINT ["/bin/bash"]
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
displayNames:
# this makes it so these don't show up in the scenario names,
# this makes it so these don't show up in the scenario names,
# since they're just clutter.
1.5Gi": ""
4Gi": ""
Expand All @@ -23,7 +23,7 @@ matrix:
testTarget: finitePeeks
memory: "0.5Gi"
finiteSendAndReceive:
testTarget: finiteSendAndReceive
testTarget: finiteSendAndReceive
memory: "0.5Gi"
finiteSessions:
testTarget: finiteSessions
Expand Down Expand Up @@ -52,10 +52,14 @@ matrix:
memory: "0.5Gi"
rapidOpenClose:
testTarget: rapidOpenClose
memory: "0.5Gi"
memory: "0.5Gi"
receiveCancellation:
testTarget: receiveCancellation
memory: "0.5Gi"
sendAndReceiveDrain:
testTarget: sendAndReceiveDrain
memory: "0.5Gi"
benchmarkBackupSettlementLeak:
benchmark: true
testTarget: "BenchmarkBackupSettlementLeakWhileOldReceiverStillAlive"
memory: "1.0Gi"
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,25 @@ func (sw *senderWrapper) NewMessageBatch(ctx context.Context, options *azservice
return sw.inner.NewMessageBatch(ctx, options)
}

func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender) (*StreamingMessageBatch, error) {
func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender, expectedTotal int) (*StreamingMessageBatch, error) {
batch, err := sender.NewMessageBatch(ctx, nil)

if err != nil {
return nil, err
}

return &StreamingMessageBatch{
sender: sender,
currentBatch: batch,
sender: sender,
currentBatch: batch,
expectedTotal: expectedTotal,
}, nil
}

type StreamingMessageBatch struct {
sender internalBatchSender
currentBatch internalBatch
sender internalBatchSender
currentBatch internalBatch
expectedTotal int
total int
}

// Add appends to the current batch. If it's full it'll send it, allocate a new one.
Expand All @@ -65,11 +68,13 @@ func (sb *StreamingMessageBatch) Add(ctx context.Context, msg *azservicebus.Mess
return err
}

log.Printf("Sending message batch (%d messages)", sb.currentBatch.NumMessages())
log.Printf("Sending message batch with %d messages. %d/%d messages sent so far.", sb.currentBatch.NumMessages(), sb.total, sb.expectedTotal)
if err := sb.sender.SendMessageBatch(ctx, sb.currentBatch); err != nil {
return err
}

sb.total += int(sb.currentBatch.NumMessages())

// throttle a teeny bit.
time.Sleep(time.Second)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func MustGenerateMessages(sc *StressContext, sender *TrackingSender, messageLimi

log.Printf("Sending %d messages", messageLimit)

streamingBatch, err := NewStreamingMessageBatch(ctx, &senderWrapper{inner: sender})
streamingBatch, err := NewStreamingMessageBatch(ctx, &senderWrapper{inner: sender}, messageLimit)
sc.PanicOnError("failed to create streaming batch", err)

extraBytes := make([]byte, numExtraBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ spec:
- >
set -ex;
mkdir -p "$DEBUG_SHARE";
{{ if ne .Stress.benchmark true }}
/app/stress tests "{{ .Stress.testTarget }}" 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
{{ else }}
/app/benchmarks.test -test.timeout 24h -test.bench {{ .Stress.testTarget }} 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
{{ end }}
# Pulls the image on pod start, always. We tend to push to the same image and tag over and over again
# when iterating, so this is a must.
imagePullPolicy: Always
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package benchmarks

import (
"context"
"fmt"
"log"
"math"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared"
)

// BackupSettlementLeak checks that, when we use backup settlement, that we're not
// leaking memory. This came up in a couple of issues for a customer:
// - https://github.com/Azure/azure-sdk-for-go/issues/22318
// - https://github.com/Azure/azure-sdk-for-go/issues/22157
//
// The use case for backup settlement is for when the original link we've received
// on has gone offline, so we need to settle via the management$ link instead. However,
// the underlying go-amqp link is tracking several bits of state for the message which
// will never get cleared. Since that receiver was dead it was going to get garbage
// collected anyways, so this was non-issue.
//
// This customer's use case was slightly different - they were completing on a separate
// receiver even when the original receiving link was still alive. This means the memory
// leak is just accumulating and never gets garbage collected since there's no trigger
// to know when to clear out any tracking state for the message.
func BenchmarkBackupSettlementLeakWhileOldReceiverStillAlive(b *testing.B) {
b.StopTimer()

sc := shared.MustCreateStressContext("BenchmarkBackupSettlementLeak", nil)
defer sc.End()

sent := int64(100000)

client, queueName := mustInitBenchmarkBackupSettlementLeak(sc, b, int(sent))

oldReceiver, err := client.NewReceiverForQueue(queueName, nil)
sc.NoError(err)

newReceiver, err := client.NewReceiverForQueue(queueName, nil)
sc.NoError(err)

b.StartTimer()

var completed int64
expected := maxDeliveryCount * int64(sent)

for completed < expected {
// receive from the old receiver and...
receiveCtx, cancel := context.WithTimeout(context.Background(), time.Minute)

messages, err := oldReceiver.ReceiveMessages(receiveCtx, int(math.Min(float64(expected-completed), 5000)), &azservicebus.ReceiveMessagesOptions{
// not super scientific - mostly just want to get slightly fuller batches
TimeAfterFirstMessage: 30 * time.Second,
})
cancel()
sc.NoError(err)

wg := sync.WaitGroup{}
wg.Add(len(messages))

// ...completing on another receiver
for _, m := range messages {
m := m

go func() {
defer wg.Done()

// abandon it so we see the message a few times (until it's deadlettered after 10 tries)
err := newReceiver.AbandonMessage(context.Background(), m, nil)
sc.NoError(err)
atomic.AddInt64(&completed, 1)
}()
}

wg.Wait()

b.Logf("Settled %d/%d", completed, sent)
}

log.Printf("Forcing garbage collection\n")
runtime.GC()
log.Printf("Done with collection\n")
time.Sleep(1 * time.Minute)
}

func mustInitBenchmarkBackupSettlementLeak(sc *shared.StressContext, b *testing.B, numToSend int) (*azservicebus.Client, string) {
queueName := fmt.Sprintf("backup-settlement-tester-%s", sc.Nano)
shared.MustCreateAutoDeletingQueue(sc, queueName, &admin.QueueProperties{
MaxDeliveryCount: to.Ptr[int32](maxDeliveryCount),
})

client, err := azservicebus.NewClientFromConnectionString(sc.ConnectionString, nil)
sc.PanicOnError("failed to create client", err)

sender, err := shared.NewTrackingSender(sc.TC, client, queueName, nil)
sc.PanicOnError("create a sender", err)

shared.MustGenerateMessages(sc, sender, numToSend, 0)

return client, queueName
}

const maxDeliveryCount = 20
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package benchmarks

import (
"log"
"os"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared"
)

func TestMain(m *testing.M) {
if os.Getenv("ENV_FILE") == "" {
os.Setenv("ENV_FILE", "../../../../.env")
}

err := shared.LoadEnvironment()

if err != nil {
log.Printf("Failed to load env file, benchmarks will not run: %s", err)
return
}

os.Exit(m.Run())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## Generating

`go test -memprofile mem.out -bench .`

## Visualizing

Run:
* `sudo apt install graphviz`
* `go tool pprof -http localhost:8000 -base mem.out.before_fix mem.out.after_fix`
13 changes: 7 additions & 6 deletions sdk/messaging/azservicebus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
"github.com/Azure/go-amqp"
)

Expand Down Expand Up @@ -126,10 +127,9 @@ type ReceivedMessage struct {
// and Header fields.
RawAMQPMessage *AMQPAnnotatedMessage

// deferred indicates we received it using ReceiveDeferredMessages. These messages
// will still go through the normal Receiver.Settle functions but internally will
// always be settled with the management link.
deferred bool
linkName string // used when we call into the management link. It counts towards a link not being considered idle.

settleOnMgmtLink bool // used for cases like when a message is received that was deferred. It can only be settled on the management link.
}

// Message creates a shallow copy of the fields from this message to an instance of
Expand Down Expand Up @@ -310,10 +310,11 @@ func (m *Message) toAMQPMessage() *amqp.Message {
// newReceivedMessage creates a received message from an AMQP message.
// NOTE: this converter assumes that the Body of this message will be the first
// serialized byte array in the Data section of the messsage.
func newReceivedMessage(amqpMsg *amqp.Message, receivingLinkName string) *ReceivedMessage {
func newReceivedMessage(amqpMsg *amqp.Message, receiver amqpwrap.AMQPReceiver) *ReceivedMessage {
msg := &ReceivedMessage{
RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg, receivingLinkName),
RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg, receiver.LinkName()),
State: MessageStateActive,
linkName: receiver.LinkName(),
}

if len(msg.RawAMQPMessage.Body.Data) == 1 {
Expand Down
Loading