Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit Azure IoT Hub downlink retries #603

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jacobsa/crypto v0.0.0-20190317225127-9f44e2d11115 // indirect
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/kamilsk/retry/v4 v4.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
Expand Down
32 changes: 30 additions & 2 deletions internal/backend/gateway/azureiothub/azureiothub.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,22 @@ func (b *Backend) publishCommand(fields log.Fields, gatewayID lorawan.EUI64, com
return nil
}

if retries > 0 {
if retries >= 50 {
return errors.Wrap(err, "gateway/azure_iot_hub: maximum retries exceeded")
} else if retries > 0 {
log.WithError(err).Error("gateway/azure_iot_hub: send cloud to device message error")
}

// try to recover connection
if err := b.c2dRecover(); err != nil {
if strings.Contains(err.Error(), "exceeded the queue limit") {
// IoT Hub has a limit of 50 C2D enqueued messages and returns an error if that is exceeded.
// In this case we recreate the sender instead of doing a full reconnection.
fmt.Println("gateway/azure_iot_hub: exceeded the queue limit, closing sender and retrying")
if err := b.c2dRenewSender(); err != nil {
log.WithError(err).Error("gateway/azure_iot_hub: recreate sender error, retry in 2 seconds")
time.Sleep(2 * time.Second)
}
} else if err := b.c2dRecover(); err != nil {
log.WithError(err).Error("gateway/azure_iot_hub: recover iot hub connection error, retry in 2 seconds")
time.Sleep(2 * time.Second)
}
Expand Down Expand Up @@ -445,6 +455,12 @@ func (b *Backend) c2dNewSessionAndLink() error {
return errors.Wrap(err, "new amqp session error")
}

return b.c2dNewLink()
}

func (b *Backend) c2dNewLink() error {
var err error

b.c2dSender, err = b.c2dSession.NewSender(
amqp.LinkTargetAddress("/messages/devicebound"),
)
Expand All @@ -471,6 +487,18 @@ func (b *Backend) c2dRecover() error {
return b.c2dNewSessionAndLink()
}

func (b *Backend) c2dRenewSender() error {
// aquire a write-lock to make sure that no Send calls are made during the
// sender link recovery
b.Lock()
defer b.Unlock()

log.Info("gateway/azure_iot_hub: re-creating sender")
_ = b.c2dSender.Close(b.ctx)

return b.c2dNewLink()
}

func parseConnectionString(str string) (map[string]string, error) {
out := make(map[string]string)
pairs := strings.Split(str, ";")
Expand Down
55 changes: 55 additions & 0 deletions internal/backend/gateway/azureiothub/azureiothub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//go:build integration
// +build integration

package azureiothub
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless TEST_AZURE_IOT_HUB_EVENTS_CONNECTION_STRING and TEST_AZURE_IOT_HUB_COMMANDS_CONNECTION_STRING env variables exist, this tests would fail. Maybe a build tag should be added to explicitly enable this test?

// +build integration

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added integration build tag.


import (
"testing"

"github.com/brocaar/chirpstack-api/go/v3/gw"
"github.com/brocaar/chirpstack-network-server/v3/internal/backend/gateway"
"github.com/brocaar/chirpstack-network-server/v3/internal/test"
"github.com/brocaar/lorawan"
"github.com/joho/godotenv"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

type BackendTestSuite struct {
suite.Suite

backend gateway.Gateway
gatewayID lorawan.EUI64
}

func (ts *BackendTestSuite) SetupSuite() {
var err error
godotenv.Load("../../../../.env")
assert := require.New(ts.T())
conf := test.GetConfig()

ts.gatewayID = lorawan.EUI64{0x01, 0x02, 0x03, 0x043, 0x05, 0x06, 0x07, 0x08}

ts.backend, err = NewBackend(conf)
assert.NoError(err)
}

func (ts *BackendTestSuite) TestDownlinkCommand() {
assert := require.New(ts.T())

pl := gw.DownlinkFrame{
GatewayId: ts.gatewayID[:],
Items: []*gw.DownlinkFrameItem{
{
PhyPayload: []byte{0x01, 0x02, 0x03, 0x04},
TxInfo: &gw.DownlinkTXInfo{},
},
},
}

assert.NoError(ts.backend.SendTXPacket(pl))
}

func TestBackend(t *testing.T) {
suite.Run(t, new(BackendTestSuite))
}
6 changes: 6 additions & 0 deletions internal/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ func GetConfig() config.Config {
if v := os.Getenv("TEST_RABBITMQ_URL"); v != "" {
c.NetworkServer.Gateway.Backend.AMQP.URL = v
}
if v := os.Getenv("TEST_AZURE_IOT_HUB_EVENTS_CONNECTION_STRING"); v != "" {
c.NetworkServer.Gateway.Backend.AzureIoTHub.EventsConnectionString = v
}
if v := os.Getenv("TEST_AZURE_IOT_HUB_COMMANDS_CONNECTION_STRING"); v != "" {
c.NetworkServer.Gateway.Backend.AzureIoTHub.CommandsConnectionString = v
}

return c
}