Skip to content

Commit

Permalink
Exporting a subset of AMQP message properties (#239)
Browse files Browse the repository at this point in the history
Exporting a small subset of read-only (ie, not sendable) properties from the AMQP message. This mimics the structure of what we have in other SDKs but stops short of allowing them to be sent.
  • Loading branch information
richardpark-msft authored Oct 11, 2021
1 parent c168808 commit bb122ca
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 8 deletions.
53 changes: 49 additions & 4 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,157 +1,202 @@
# Change Log

## `v3.3.16`

- Exporting a subset of AMQP message properties for the Dapr project.

## `v3.3.13`
- We no longer close the link when we receive disposition errors on sending. This allows
us to return errors properly when doing parallel sends on a link that is being

- We no longer close the link when we receive disposition errors on sending. This allows
us to return errors properly when doing parallel sends on a link that is being
throttled. [PR#234](https://github.com/Azure/azure-event-hubs-go/pull/234)

## `v3.3.12`

- Fix bug in sender.Recover() where recovery could get stuck when a link was throttled. [PR#232](#https://github.com/Azure/azure-event-hubs-go/pull/232)

## `v3.3.11`

- Allow for controlling the maximum retry count when sending messages. [#226](https://github.com/Azure/azure-event-hubs-go/issues/226)

## `v3.3.10`

- Fix sender.Recover() to be goroutine safe [#218](https://github.com/Azure/azure-event-hubs-go/issues/218)
- Skip calling sender.Recover() for some errors [#219](https://github.com/Azure/azure-event-hubs-go/issues/219)

## `v3.3.9`

- update the checkpoint after the receiver options are applied
- return the error from reading an old checkpoint when initializing the receiver

## `v3.3.8`

- add option to customise initial checkpoint

## `v3.3.7`

- add option to prefix checkpoint blob paths

## `v3.3.6`

- fix goroutine leak on listener close

## `v3.3.5`

- Remove the check for temporary network errors in sender.go [#80](https://github.com/Azure/azure-event-hubs-go/issues/80)

## `v3.3.4`

- read AZURE_ENVIRONMENT variable from environment to use the specified value when creating NewHub

## `v3.3.3`

- EventBatchIterator drops messages which bigger than 1MB with an error

## `v3.3.2`

- passing a context to internal calls that use go-amqp that now expect a context
- updating dependencies in go.mod

## `v3.3.1`

- fixed panic caused by interface conversion in event.go [#182](https://github.com/Azure/azure-event-hubs-go/issues/182)
- apply Receive options after populating last stored checkpoint

## `v3.3.0`

- add support for sending and receiving custom annotations

## `v3.2.0`

- add IoT Hub system properties

## `v3.1.2`

- fix errors in message handling being ignored [#155](https://github.com/Azure/azure-event-hubs-go/issues/155)

## `v3.1.1`

- Azure storage SAS token regeneration fix [#157](https://github.com/Azure/azure-event-hubs-go/issues/157)

## `v3.1.0`
- add support for websocket connections with eph with `eph.WithWebSocketConnection()`

- add support for websocket connections with eph with `eph.WithWebSocketConnection()`

## `v2.0.4`

- add comment on the `PartitionID` field in `SystemProperties` to clarify that it will always return a nil value [#131](https://github.com/Azure/azure-event-hubs-go/issues/131)

## `v2.0.3`

- fix send on closed channel for GetLeases [#142](https://github.com/Azure/azure-event-hubs-go/issues/142)

## `v2.0.2`

- enable partitionKey for sendBatch to fix [#128](https://github.com/Azure/azure-event-hubs-go/issues/128)
- ensure sender receives ack'd messages from EH [#126](https://github.com/Azure/azure-event-hubs-go/issues/126)
- close `leaseCh` on function return in storage.(*LeaserCheckpointer).GetLeases to fix [#136](https://github.com/Azure/azure-event-hubs-go/issues/136)
- close `leaseCh` on function return in storage.(\*LeaserCheckpointer).GetLeases to fix [#136](https://github.com/Azure/azure-event-hubs-go/issues/136)

## `v2.0.1`

- update to amqp 0.11.2 & common 2.1.0 to fix [#115](https://github.com/Azure/azure-event-hubs-go/issues/115)
- added checkpoint attribute to receiver to fix [#95](https://github.com/Azure/azure-event-hubs-go/issues/95) and [#118](https://github.com/Azure/azure-event-hubs-go/issues/118)

## `v2.0.0`

- **breaking change:** moved github.com/Azure/azure-amqp-common-go/persist to
github.com/Azure/azure-event-hubs-go/persist
- **breaking change:** changed batch message sending to use a safe batch iterator rather than leaving batch sizing to
the consumer.
- move tracing to devigned/tab so to not have to take a direct dependency on opentracing or opencensus

## `v1.3.1`

- cleanup connection after making management request

## `v1.3.0`

- add `SystemProperties` to `Event` which contains immutable broker provided metadata (squence number, offset,
enqueued time)

## `v1.2.0`

- add websocket support

## `v1.1.5`

- add sender recovery handling for `amqp.ErrLinkClose`, `amqp.ErrConnClosed` and `amqp.ErrSessionClosed`

## `v1.1.4`

- update to amqp 0.11.0 and change sender to use unsettled rather than receiver second mode

## `v1.1.3`

- fix leak in partition persistence
- fix discarding event properties on batch sending

## `v1.1.2`

- take dep on updated amqp common which has more permissive RPC status description parsing

## `v1.1.1`

- close sender when hub is closed
- ensure links, session and connections are closed gracefully

## `v1.1.0`

- add receive option to receive from a timestamp
- fix sender recovery on temporary network failures
- add LeasePersistenceInterval to Azure Storage LeaserCheckpointer to allow for customization of persistence interval
duration

## `v1.0.1`

- fix the breaking change from storage; this is not a breaking change for this library
- move from dep to go modules

## `v1.0.0`

- change from OpenTracing to OpenCensus
- add more documentation for EPH
- variadic mgmt options

## `v0.4.0`

- add partition key to received event [#43](https://github.com/Azure/azure-event-hubs-go/pull/43)
- remove `Receive` in eph in favor of `RegisterHandler`, `UnregisterHandler` and `RegisteredHandlerIDs` [#45](https://github.com/Azure/azure-event-hubs-go/pull/45)

## `v0.3.1`

- simplify environmental construction by preferring SAS

## `v0.3.0`

- pin version of amqp

## `v0.2.1`

- update dependency on common to 0.3.2 to fix retry returning nil error

## `v0.2.0`

- add opentracing support
- add context to close functions (breaking change)

## `v0.1.2`

- remove an extraneous dependency on satori/uuid

## `v0.1.1`

- update common dependency to 0.2.4
- provide more feedback when sending using testhub
- retry send upon server-busy
- use a new connection for each sender and receiver

## `v0.1.0`

- initial release
- basic send and receive
- batched send
Expand Down
51 changes: 47 additions & 4 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,49 @@ const (
type (
// Event is an Event Hubs message to be sent or received
Event struct {
Data []byte
PartitionKey *string
Properties map[string]interface{}
ID string
Data []byte
PartitionKey *string
Properties map[string]interface{}

ID string

message *amqp.Message
SystemProperties *SystemProperties

// RawAMQPMessage is a subset of fields from the underlying AMQP message.
// NOTE: These fields are only used when receiving events and are not sent.
RawAMQPMessage struct {
// Properties are standard properties for an AMQP message.
Properties struct {
// The identity of the user responsible for producing the message.
// The client sets this value, and it MAY be authenticated by intermediaries.
UserID []byte

// This is a client-specific id that can be used to mark or identify messages
// between clients.
CorrelationID interface{} // uint64, UUID, []byte, or string

// The content-encoding property is used as a modifier to the content-type.
// When present, its value indicates what additional content encodings have been
// applied to the application-data, and thus what decoding mechanisms need to be
// applied in order to obtain the media-type referenced by the content-type header
// field.
ContentEncoding string

// The RFC-2046 [RFC2046] MIME type for the message's application-data section
// (body). As per RFC-2046 [RFC2046] this can contain a charset parameter defining
// the character encoding used: e.g., 'text/plain; charset="utf-8"'.
//
// For clarity, as per section 7.2.1 of RFC-2616 [RFC2616], where the content type
// is unknown the content-type SHOULD NOT be set. This allows the recipient the
// opportunity to determine the actual type. Where the section is known to be truly
// opaque binary data, the content-type SHOULD be set to application/octet-stream.
ContentType string

// A common field for summary information about the message content and purpose.
Subject string
}
}
}

// SystemProperties are used to store properties that are set by the system.
Expand Down Expand Up @@ -190,6 +227,12 @@ func newEvent(data []byte, msg *amqp.Message) (*Event, error) {
if id, ok := msg.Properties.MessageID.(string); ok {
event.ID = id
}

event.RawAMQPMessage.Properties.UserID = msg.Properties.UserID
event.RawAMQPMessage.Properties.Subject = msg.Properties.Subject
event.RawAMQPMessage.Properties.CorrelationID = msg.Properties.CorrelationID
event.RawAMQPMessage.Properties.ContentEncoding = msg.Properties.ContentEncoding
event.RawAMQPMessage.Properties.ContentType = msg.Properties.ContentType
}

if msg.Annotations != nil {
Expand Down
75 changes: 75 additions & 0 deletions event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package eventhub

import (
"testing"

"github.com/Azure/go-amqp"
"github.com/stretchr/testify/require"
)

// MIT License
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE

func TestMessageConversion(t *testing.T) {
amqpMsg := &amqp.Message{
Properties: &amqp.MessageProperties{
MessageID: "messageID",
UserID: []byte("userID"),
CorrelationID: "correlationID",
Subject: "subject",
ContentEncoding: "utf-75",
ContentType: "application/octet-stream",
},
Annotations: amqp.Annotations{
"annotation1": "annotation1Value",
"dt-subject": "dt-subject-value",
},
DeliveryAnnotations: amqp.Annotations{
"deliveryAnnotation1": "deliveryAnnotation1Value",
},
ApplicationProperties: map[string]interface{}{
"applicationProperty1": "applicationProperty1Value",
},
Data: [][]byte{
[]byte("hello world"),
},
}

event, err := eventFromMsg(amqpMsg)
require.NoError(t, err)

require.EqualValues(t, "hello world", string(event.Data))

// AMQPMessage.Properties -> event.RawAMQPMessage (subset)
require.EqualValues(t, "userID", string(event.RawAMQPMessage.Properties.UserID))
require.EqualValues(t, "correlationID", event.RawAMQPMessage.Properties.CorrelationID)
require.EqualValues(t, "subject", event.RawAMQPMessage.Properties.Subject)
require.EqualValues(t, "utf-75", event.RawAMQPMessage.Properties.ContentEncoding)
require.EqualValues(t, "application/octet-stream", event.RawAMQPMessage.Properties.ContentType)

// AMQPMessage.ApplicationProperties -> Event.Properties
require.EqualValues(t, "applicationProperty1Value", event.Properties["applicationProperty1"])

// AMQPMessage.Annotations -> Event.SystemProperties.Annotations
require.EqualValues(t, "annotation1Value", event.SystemProperties.Annotations["annotation1"])
require.EqualValues(t, "dt-subject-value", event.SystemProperties.Annotations["dt-subject"])
}

0 comments on commit bb122ca

Please sign in to comment.