diff --git a/source_test.go b/source_test.go index b07184a..f45fbd7 100644 --- a/source_test.go +++ b/source_test.go @@ -71,11 +71,18 @@ func TestSource_Integration_RestartPartial(t *testing.T) { testSourceIntegrationRead(ctx, is, cfgMap, lastPosition, wantRecs, false) } +const testAppId = "id-1234" + func generateRabbitmqMsgs(from, to int) []amqp091.Publishing { var msgs []amqp091.Publishing for i := from; i <= to; i++ { msg := amqp091.Publishing{ + MessageId: fmt.Sprintf("test-msg-id-%d", i), + ContentType: "text/plain", + // setting testAppId asserts that the metadata is being set + AppId: testAppId, + Body: []byte(fmt.Sprintf("test-payload-%d", i)), } @@ -137,6 +144,9 @@ func testSourceIntegrationRead( wantPayload := string(wantRecord.Body) is.Equal(wantPayload, recPayload) + is.Equal(wantRecord.MessageId, string(rec.Key.Bytes())) + is.Equal(testAppId, rec.Metadata["rabbitmq.appId"]) + positions = append(positions, rec.Position) }