Skip to content

Commit

Permalink
[azeventhubs] Fixing issues with connection and link robustness for E…
Browse files Browse the repository at this point in the history
…vent Hubs (#19683)

There were some issues with how we do recovery, which could make things
hang or even leave us in an inconsistent state.

1. We didn't have any bound on how long a Close() could take, but since
that requires a network hop it's possible for it to hang infinitely if
the service doesn't respond. This now has a max bound of 60 seconds.

2. If a Close() of a link is cancelled we signal that the connection needs
to be reset, since not doing that will leave us in an inconsistent state
where we believe a link is active, but the service does not.

3. We now ignore errors that come back from trying to close a link (except
for the previously mentioned cancellation). The error that comes back will
be redundant (ie, it'll be the same error that initiated the recovery in
the first place) or it'll be something we can't immediately troubleshoot
and will be dealt with on the next recovery round.

There are also a number of testing quality fixes that I made to validate
this change. I've got gomock now for mocks in newer tests (and will start
to phase out the previous uses of my custom mocks). I've also added some
more tests around recovery and improved logging to log more context per
line so it's easier to correlate activity between links.
  • Loading branch information
richardpark-msft committed Jan 5, 2023
1 parent 1e67666 commit 00a8837
Show file tree
Hide file tree
Showing 33 changed files with 1,729 additions and 147 deletions.
4 changes: 3 additions & 1 deletion sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Release History

## 0.3.1 (2023-01-10)
## 0.4.0 (2023-01-10)

### Bugs Fixed

- User-Agent was incorrectly formatted in our AMQP-based clients. (PR#19712)
- Connection recovery has been improved, removing some unnecessasry retries as well as adding a bound around
some operations (Close) that could potentially block recovery for a long time. (PR#19683)

## 0.3.0 (2022-11-10)

Expand Down
7 changes: 4 additions & 3 deletions sdk/messaging/azeventhubs/checkpoints/doc.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

//go:build go1.16
// +build go1.16

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

// Package checkpoints provides a CheckpointStore using Azure Blob Storage.
//
// CheckpointStore's are generally not used on their own and will be created so they
Expand All @@ -14,4 +14,5 @@
//
// [Processor]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#Processor
// [example_processor_test.go]: https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_processor_test.go

package checkpoints
6 changes: 3 additions & 3 deletions sdk/messaging/azeventhubs/consumer_client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestConsumerClient_Recovery(t *testing.T) {
testParams := test.GetConnectionParamsForTest(t)

// Uncomment to see the entire recovery playbook run.
// test.EnableStdoutLogging()
test.EnableStdoutLogging()

dac, err := azidentity.NewDefaultAzureCredential(nil)
require.NoError(t, err)
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestConsumerClient_Recovery(t *testing.T) {

defer test.RequireClose(t, consumerClient)

log.Printf("3. closing connection, which will force recovery for each partition client so they can read the next event")
log.Printf("3. closing internal connection (non-permanently), which will force recovery for each partition client so they can read the next event")

// now we'll close the internal connection, simulating a connection break
require.NoError(t, consumerClient.namespace.Close(context.Background(), false))
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestConsumerClient_RecoveryLink(t *testing.T) {
testParams := test.GetConnectionParamsForTest(t)

// Uncomment to see the entire recovery playbook run.
// test.EnableStdoutLogging()
test.EnableStdoutLogging()

dac, err := azidentity.NewDefaultAzureCredential(nil)
require.NoError(t, err)
Expand Down
73 changes: 73 additions & 0 deletions sdk/messaging/azeventhubs/consumer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package azeventhubs_test

import (
"context"
"strings"
"sync"
"testing"
"time"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/test"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -427,6 +429,77 @@ func TestConsumerClient_ReceiveEvents(t *testing.T) {
}
}

func TestConsumerClient_Detaches(t *testing.T) {
testParams := test.GetConnectionParamsForTest(t)

test.EnableStdoutLogging()

dac, err := azidentity.NewDefaultAzureCredential(nil)
require.NoError(t, err)

// create our event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, nil)
require.NoError(t, err)

defer producerClient.Close(context.Background())

enableOrDisableEventHub(t, testParams, dac, true)
t.Logf("Sending events, connection should be fine")
err = sendEvent(t, producerClient)
require.NoError(t, err)

enableOrDisableEventHub(t, testParams, dac, false)
t.Logf("Sending events, expected to fail since entity is disabled")
err = sendEvent(t, producerClient)
require.Error(t, err, "fails, entity has become disabled")

enableOrDisableEventHub(t, testParams, dac, true)
t.Logf("Sending events, should reconnect")
err = sendEvent(t, producerClient)
require.NoError(t, err, "reattach happens")
}

func sendEvent(t *testing.T, producerClient *azeventhubs.ProducerClient) error {
batch, err := producerClient.NewEventDataBatch(context.Background(), nil)
require.NoError(t, err)

err = batch.AddEventData(&azeventhubs.EventData{
Body: []byte("hello world"),
}, nil)
require.NoError(t, err)

return producerClient.SendEventDataBatch(context.Background(), batch, nil)
}

// enableOrDisableEventHub sets an eventhub to active if active is true, or disables it if active is false.
//
// This is useful when testing attach/detach type scenarios where you want the service to force links
// to detach.
func enableOrDisableEventHub(t *testing.T, testParams test.ConnectionParamsForTest, dac *azidentity.DefaultAzureCredential, active bool) {
client, err := armeventhub.NewEventHubsClient(testParams.SubscriptionID, dac, nil)
require.NoError(t, err)

ns := strings.Split(testParams.EventHubNamespace, ".")[0]

resp, err := client.Get(context.Background(), testParams.ResourceGroup, ns, testParams.EventHubName, nil)
require.NoError(t, err)

if active {
resp.Properties.Status = to.Ptr(armeventhub.EntityStatusActive)
} else {
resp.Properties.Status = to.Ptr(armeventhub.EntityStatusDisabled)
}

t.Logf("Setting entity status to %s", *resp.Properties.Status)
_, err = client.CreateOrUpdate(context.Background(), testParams.ResourceGroup, ns, testParams.EventHubName, armeventhub.Eventhub{
Properties: resp.Properties,
}, nil)
require.NoError(t, err)

// give a little time for the change to take effect
time.Sleep(5 * time.Second)
}

func newPartitionClientForTest(t *testing.T, partitionID string, subscribeOptions azeventhubs.PartitionClientOptions) (*azeventhubs.PartitionClient, func()) {
testParams := test.GetConnectionParamsForTest(t)

Expand Down
1 change: 1 addition & 0 deletions sdk/messaging/azeventhubs/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
// There are two clients for consuming events:
// - [azeventhubs.Processor], which handles checkpointing and load balancing using durable storage.
// - [azeventhubs.ConsumerClient], which is fully manual, but provides full control.

package azeventhubs
2 changes: 2 additions & 0 deletions sdk/messaging/azeventhubs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0
github.com/golang/mock v1.6.0
github.com/joho/godotenv v1.4.0
github.com/stretchr/testify v1.7.1
)
Expand Down
26 changes: 26 additions & 0 deletions sdk/messaging/azeventhubs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4Sath
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0 h1:BWeAAEzkCnL0ABVJqs+4mYudNch7oFGPtTlSmIWL8ms=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0/go.mod h1:Y3gnVwfaz8h6L1YHar+NfWORtBoVUSB5h4GlGkdeF7Q=
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE=
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -18,6 +20,8 @@ github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRx
github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c=
github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang-jwt/jwt/v4 v4.2.0 h1:besgBTC8w8HjP6NzQdxwKH9Z5oQMZ24ThTrHp3cZ8eU=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -45,19 +49,41 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tedsuo/ifrit v0.0.0-20180802180643-bea94bb476cc/go.mod h1:eyZnKCc955uh98WQvzOm0dgAeLnf2O0Rz0LPoC5ze+0=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88 h1:Tgea0cVUD0ivh5ADBX4WwuI12DUd2to3nCYe2eayMIw=
golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 h1:HVyaeDAYux4pnY+D/SiwmLOR36ewZ4iGQIIrtnuCjFA=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
8 changes: 0 additions & 8 deletions sdk/messaging/azeventhubs/internal/amqpInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,13 @@ import (
"context"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
)

type AMQPReceiver = amqpwrap.AMQPReceiver
type AMQPReceiverCloser = amqpwrap.AMQPReceiverCloser
type AMQPSender = amqpwrap.AMQPSender
type AMQPSenderCloser = amqpwrap.AMQPSenderCloser

// RPCLink is implemented by *rpc.Link
type RPCLink interface {
Close(ctx context.Context) error
RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, error)
LinkName() string
}

// Closeable is implemented by pretty much any AMQP link/client
// including our own higher level Receiver/Sender.
type Closeable interface {
Expand Down
16 changes: 16 additions & 0 deletions sdk/messaging/azeventhubs/internal/amqpwrap/amqpwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ type AMQPClient interface {
NewSession(ctx context.Context, opts *amqp.SessionOptions) (AMQPSession, error)
}

// RPCLink is implemented by *rpc.Link
type RPCLink interface {
Close(ctx context.Context) error
RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, error)
LinkName() string
}

// RPCResponse is the simplified response structure from an RPC like call
type RPCResponse struct {
// Code is the response code - these originate from Service Bus. Some
// common values are called out below, with the RPCResponseCode* constants.
Code int
Description string
Message *amqp.Message
}

// AMQPClientWrapper is a simple interface, implemented by *AMQPClientWrapper
// It exists only so we can return AMQPSession, which itself only exists so we can
// return interfaces for AMQPSender and AMQPReceiver from AMQPSession.
Expand Down
31 changes: 25 additions & 6 deletions sdk/messaging/azeventhubs/internal/cbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package internal

import (
"context"
"errors"

azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
Expand All @@ -23,7 +24,10 @@ const (
)

// NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience
func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, provider auth.TokenProvider) error {
//
// contextWithTimeoutFn is intended to be context.WithTimeout in production code, but can be stubbed out when writing
// unit tests to keep timeouts reasonable.
func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, provider auth.TokenProvider, contextWithTimeoutFn contextWithTimeoutFn) error {
link, err := NewRPCLink(ctx, RPCLinkArgs{
Client: conn,
Address: cbsAddress,
Expand All @@ -34,15 +38,27 @@ func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClie
return err
}

defer func() {
closeLink := func(ctx context.Context, origErr error) error {
ctx, cancel := contextWithTimeoutFn(ctx, defaultCloseTimeout)
defer cancel()

if err := link.Close(ctx); err != nil {
if IsCancelError(err) {
azlog.Writef(exported.EventAuth, "Failed closing claim link because it was cancelled. Connection will need to be reset")
return errConnResetNeeded
}

azlog.Writef(exported.EventAuth, "Failed closing claim link: %s", err.Error())
return err
}
}()

return origErr
}

token, err := provider.GetToken(audience)
if err != nil {
return err
azlog.Writef(exported.EventAuth, "Failed to get token from provider")
return closeLink(ctx, err)
}

azlog.Writef(exported.EventAuth, "negotiating claim for audience %s with token type %s and expiry of %s", audience, token.TokenType, token.Expiry)
Expand All @@ -58,8 +74,11 @@ func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClie
}

if _, err := link.RPC(ctx, msg); err != nil {
return err
azlog.Writef(exported.EventAuth, "Failed to send/receive RPC message")
return closeLink(ctx, err)
}

return nil
return closeLink(ctx, nil)
}

var errConnResetNeeded = errors.New("connection must be reset, link/connection state may be inconsistent")
Loading

0 comments on commit 00a8837

Please sign in to comment.