diff --git a/README.md b/README.md index 4b8fd6a5..e5a47bec 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ SDK, the Go version. - [Usage of ORM](#usage-of-orm) - [PubSub](#pubsub) - [Kafka specific configuration](#kafka-specific-configuration) + - [SQS specific configuration](#sqs-specific-configuration) - [Cache](#cache) - [Redis](#redis-specific-configuration) - [Experimentation](#experimentation) @@ -849,6 +850,23 @@ common: &common subscriber: topic: "test-topic" group_id: "" + sqs: + subscriber: + # Set by APP_PUBSUB_SQS_SUBSCRIBER_ENABLED env variable + enabled: true + # Set by APP_PUBSUB_SQS_SUBSCRIBER_QUEUE_URL env variable + queue_url: "https://sqs.us-east-2.amazonaws.com/123456789012/test-queue" + # Set by APP_PUBSUB_SQS_SUBSCRIBER_MAX_MESSAGES env variable + max_messages: 10 + # Set by APP_PUBSUB_SQS_SUBSCRIBER_WORKERS env variable + workers: 1 + # Set by APP_PUBSUB_SQS_SUBSCRIBER_WAIT_TIME env variable + wait_time: "10s" + publisher: + # Set by APP_PUBSUB_SQS_PUBLISHER_ENABLED env variable + enabled: true + # Set by APP_PUBSUB_SQS_PUBLISHER_QUEUE_URL env variable + queue_url: "https://sqs.us-east-2.amazonaws.com/123456789012/test-queue" ``` #### Kafka specific configuration @@ -920,6 +938,25 @@ To authenticate the requests to Kafka, Go SDK provides a configuration set for T | Assumable role | This role will be used to establish connection to AWS MSK ignoring the static credentials | `role` | `APP_PUBSUB_KAFKA_SASL_AWS_MSK_IAM_ROLE` | string | AWS ARN string | | Session name | Will be passed to AWS STS when assuming the role | `session_name` | `APP_PUBSUB_KAFKA_SASL_AWS_MSK_IAM_SESSION_NAME` | string | application | +#### SQS specific configuration + +Under the hood, the SQS integration relies on the provided SQS client. To learn more about the AWS services initialization, check the [AWS service configuration](#aws-service-configuration). + +**Subscriber**: + +| Setting | Description | YAML variable | Environment variable (ENV) | Type | Possible Values | +|-------------------|-------------------------------------------------------------------------------------------------|-----------------------|---------------------------------------------------|---------|-----------------| +| Queue URL | URL of the SQS queue | `queue_url` | `APP_PUBSUB_SQS_SUBSCRIBER_QUEUE_URL` | string | queue URL | +| Max messages | Maximum number of messages to retrieve from the queue per polling iteration | `max_messages` | `APP_PUBSUB_SQS_SUBSCRIBER_MAX_MESSAGES` | number | 1,2... | +| Number of workers | Number of workers processing incoming messages | `workers` | `APP_PUBSUB_SQS_SUBSCRIBER_WORKERS` | number | 1,2... | +| Wait time | The maximum amount of time in time.Duration to wait for messages to be available for retrieval. | `wait_time` | `APP_PUBSUB_SQS_SUBSCRIBER_WAIT_TIME` | string | 10s | + +**Publisher**: + +| Setting | Description | YAML variable | Environment variable (ENV) | Type | Possible Values | +|-----------|-----------------------|-----------------------|---------------------------------------------------|---------|-----------------| +| Queue URL | URL of the SQS queue | `queue_url` | `APP_PUBSUB_SQS_PUBLISHER_QUEUE_URL` | string | queue URL | + ### Cache `go-sdk` provides a convenient way to create an application Cache configuration. diff --git a/go.mod b/go.mod index 65bb4981..3ba9914a 100644 --- a/go.mod +++ b/go.mod @@ -7,14 +7,15 @@ require ( github.com/DATA-DOG/go-txdb v0.1.9 github.com/DataDog/datadog-go v4.8.2+incompatible github.com/aws/aws-sdk-go v1.44.327 - github.com/aws/aws-sdk-go-v2 v1.26.1 - github.com/aws/aws-sdk-go-v2/config v1.27.11 - github.com/aws/aws-sdk-go-v2/credentials v1.17.11 - github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1 - github.com/aws/aws-sdk-go-v2/service/sagemakerruntime v1.27.4 - github.com/aws/aws-sdk-go-v2/service/sfn v1.19.4 - github.com/aws/aws-sdk-go-v2/service/sqs v1.24.4 - github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 + github.com/aws/aws-sdk-go-v2 v1.32.3 + github.com/aws/aws-sdk-go-v2/config v1.28.1 + github.com/aws/aws-sdk-go-v2/credentials v1.17.42 + github.com/aws/aws-sdk-go-v2/service/s3 v1.66.2 + github.com/aws/aws-sdk-go-v2/service/sagemakerruntime v1.32.3 + github.com/aws/aws-sdk-go-v2/service/sfn v1.33.3 + github.com/aws/aws-sdk-go-v2/service/sqs v1.36.3 + github.com/aws/aws-sdk-go-v2/service/sts v1.32.3 + github.com/aws/smithy-go v1.22.0 github.com/getsentry/sentry-go v0.12.0 github.com/go-kit/kit v0.9.0 github.com/go-sql-driver/mysql v1.7.1 @@ -48,24 +49,24 @@ require ( github.com/DataDog/gostackparse v0.7.0 // indirect github.com/DataDog/sketches-go v1.4.2 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect - github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 // indirect - github.com/aws/aws-sdk-go-v2/service/dynamodb v1.21.4 // indirect - github.com/aws/aws-sdk-go-v2/service/eventbridge v1.20.4 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.34 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 // indirect - github.com/aws/aws-sdk-go-v2/service/kinesis v1.18.4 // indirect - github.com/aws/aws-sdk-go-v2/service/sns v1.21.4 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect - github.com/aws/smithy-go v1.20.2 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.18 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.22 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.22 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.22 // indirect + github.com/aws/aws-sdk-go-v2/service/dynamodb v1.36.3 // indirect + github.com/aws/aws-sdk-go-v2/service/ec2 v1.187.0 // indirect + github.com/aws/aws-sdk-go-v2/service/eventbridge v1.35.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.3 // indirect + github.com/aws/aws-sdk-go-v2/service/kinesis v1.32.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sns v1.33.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.3 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.3 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect diff --git a/go.sum b/go.sum index f5242779..3406fcec 100644 --- a/go.sum +++ b/go.sum @@ -731,67 +731,60 @@ github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZo github.com/aws/aws-sdk-go v1.43.16/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/aws/aws-sdk-go v1.44.327 h1:ZS8oO4+7MOBLhkdwIhgtVeDzCeWOlTfKJS7EgggbIEY= github.com/aws/aws-sdk-go v1.44.327/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= -github.com/aws/aws-sdk-go-v2 v1.20.3/go.mod h1:/RfNgGmRxI+iFOB1OeJUyxiU+9s88k3pfHvDagGEp0M= -github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= -github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.13/go.mod h1:gpAbvyDGQFozTEmlTFO8XcQKHzubdq0LzRyJpG6MiXM= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 h1:x6xsQXGSmW6frevwDA+vi/wqhp1ct18mVXYN08/93to= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2/go.mod h1:lPprDr1e6cJdyYeGXnRaJoP4Md+cDBvi2eOj00BlGmg= -github.com/aws/aws-sdk-go-v2/config v1.27.11 h1:f47rANd2LQEYHda2ddSCKYId18/8BhSRM4BULGmfgNA= -github.com/aws/aws-sdk-go-v2/config v1.27.11/go.mod h1:SMsV78RIOYdve1vf36z8LmnszlRWkwMQtomCAI0/mIE= -github.com/aws/aws-sdk-go-v2/credentials v1.17.11 h1:YuIB1dJNf1Re822rriUOTxopaHHvIq0l/pX3fwO+Tzs= -github.com/aws/aws-sdk-go-v2/credentials v1.17.11/go.mod h1:AQtFPsDH9bI2O+71anW6EKL+NcD7LG3dpKGMV4SShgo= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 h1:FVJ0r5XTHSmIHJV6KuDmdYhEpvlHpiSd38RQWhut5J4= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1/go.mod h1:zusuAeqezXzAB24LGuzuekqMAEgWkVYukBec3kr3jUg= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.40/go.mod h1:5kKmFhLeOVy6pwPDpDNA6/hK/d6URC98pqDDqHgdBx4= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.34/go.mod h1:RZP0scceAyhMIQ9JvFp7HvkpcgqjL4l/4C+7RAeGbuM= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.1.3/go.mod h1:jYLMm3Dh0wbeV3lxth5ryks/O2M/omVXWyYm3YcEVqQ= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 h1:81KE7vaZzrl7yHBYHVEzYB8sypz11NMOZ40YlWvPxsU= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5/go.mod h1:LIt2rg7Mcgn09Ygbdh/RdIm0rQ+3BNkbP1gyVMFtRK0= -github.com/aws/aws-sdk-go-v2/service/dynamodb v1.21.4 h1:x3V1JRHq7q9RUbDpaeNpLH7QoipGpCo3fdnMMuSeABU= -github.com/aws/aws-sdk-go-v2/service/dynamodb v1.21.4/go.mod h1:aryF4jxgjhbqpdhj8QybUZI3xYrX8MQIKm4WbOv8Whg= -github.com/aws/aws-sdk-go-v2/service/ec2 v1.93.2 h1:c6a19AjfhEXKlEX63cnlWtSQ4nzENihHZOG0I3wH6BE= -github.com/aws/aws-sdk-go-v2/service/ec2 v1.93.2/go.mod h1:VX22JN3HQXDtQ3uS4h4TtM+K11vydq58tpHTlsm8TL8= -github.com/aws/aws-sdk-go-v2/service/eventbridge v1.20.4 h1:G18wotYZxZ0A5tkqKv6FHCjsF86UQrqNHy5LS+T7JWM= -github.com/aws/aws-sdk-go-v2/service/eventbridge v1.20.4/go.mod h1:XlbY5AGZhlipCdhRorT18/HEThKAxo51hMmhixreJoM= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.14/go.mod h1:dDilntgHy9WnHXsh7dDtUPgHKEfTJIBUTHM8OWm0f/0= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 h1:ZMeFZ5yk+Ek+jNr1+uwCd2tG89t6oTS5yVWpa6yy2es= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7/go.mod h1:mxV05U+4JiHqIpGqqYXOHLPKUC6bDXC44bsUhNjOEwY= -github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.34 h1:JlxVMFDHivlhNOIxd2O/9z4O0wC2zIC4lRB71lejVHU= -github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.34/go.mod h1:CDPcT6pljRaqz1yLsOgPUvOPOczFvXuJxOKzDzAbF0c= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 h1:ogRAwT1/gxJBcSWDMZlgyFUM962F51A5CRhDLbxLdmo= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7/go.mod h1:YCsIZhXfRPLFFCl5xxY+1T9RKzOKjCut+28JSX2DnAk= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 h1:f9RyWNtS8oH7cZlbn+/JNPpjUk5+5fLd5lM9M0i49Ys= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5/go.mod h1:h5CoMZV2VF297/VLhRhO1WF+XYWOzXo+4HsObA4HjBQ= -github.com/aws/aws-sdk-go-v2/service/kinesis v1.18.4 h1:UohaQds+Puk9BEbvncXkZduIGYImxohbFpVmSoymXck= -github.com/aws/aws-sdk-go-v2/service/kinesis v1.18.4/go.mod h1:HnjgmL8TNmYtGcrA3N6EeCnDvlX6CteCdUbZ1wV8QWQ= -github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1 h1:6cnno47Me9bRykw9AEv9zkXE+5or7jz8TsskTTccbgc= -github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1/go.mod h1:qmdkIIAC+GCLASF7R2whgNrJADz0QZPX+Seiw/i4S3o= -github.com/aws/aws-sdk-go-v2/service/sagemakerruntime v1.27.4 h1:hNp4PzD2N9qTqJAlrP0GAwDTKc2FTNLh6DVFzurLMrE= -github.com/aws/aws-sdk-go-v2/service/sagemakerruntime v1.27.4/go.mod h1:oPtVhWs6TuHOxUPQpNDtaQoVGjO5DbEfUWfzOxqZDOE= -github.com/aws/aws-sdk-go-v2/service/sfn v1.19.4 h1:yIyFY2kbCOoHvuivf9minqnP2RLYJgmvQRYxakIb2oI= -github.com/aws/aws-sdk-go-v2/service/sfn v1.19.4/go.mod h1:uWCH4ATwNrkRO40j8Dmy7u/Y1/BVWgCM+YjBNYZeOro= -github.com/aws/aws-sdk-go-v2/service/sns v1.21.4 h1:Asj098jPfIZYzAbk4xVFwVBGij5hgMcli0d+5Pe4aZA= -github.com/aws/aws-sdk-go-v2/service/sns v1.21.4/go.mod h1:bbB779DXXOnPXvB7F3dP7AjuV1Eyr7fNyrA058ExuzY= -github.com/aws/aws-sdk-go-v2/service/sqs v1.24.4 h1:bp8KUUx15mnLMe8SSJqO/kYEn0C2kKfWq/M9SRK9i1E= -github.com/aws/aws-sdk-go-v2/service/sqs v1.24.4/go.mod h1:c1AF/ac4k4xz32FprEk6AqqGFH/Fkub9VUPSrASlllA= -github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 h1:vN8hEbpRnL7+Hopy9dzmRle1xmDc7o8tmY0klsr175w= -github.com/aws/aws-sdk-go-v2/service/sso v1.20.5/go.mod h1:qGzynb/msuZIE8I75DVRCUXw3o3ZyBmUvMwQ2t/BrGM= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 h1:Jux+gDDyi1Lruk+KHF91tK2KCuY61kzoCpvtvJJBtOE= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4/go.mod h1:mUYPBhaF2lGiukDEjJX2BLRRKTmoUSitGDUgM4tRxak= -github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n2HZPkcKgPAi1phU= -github.com/aws/aws-sdk-go-v2/service/sts v1.28.6/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw= -github.com/aws/smithy-go v1.14.2/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= -github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= -github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/aws/aws-sdk-go-v2 v1.32.3 h1:T0dRlFBKcdaUPGNtkBSwHZxrtis8CQU17UpNBZYd0wk= +github.com/aws/aws-sdk-go-v2 v1.32.3/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 h1:pT3hpW0cOHRJx8Y0DfJUEQuqPild8jRGmSFmBgvydr0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6/go.mod h1:j/I2++U0xX+cr44QjHay4Cvxj6FUbnxrgmqN3H1jTZA= +github.com/aws/aws-sdk-go-v2/config v1.28.1 h1:oxIvOUXy8x0U3fR//0eq+RdCKimWI900+SV+10xsCBw= +github.com/aws/aws-sdk-go-v2/config v1.28.1/go.mod h1:bRQcttQJiARbd5JZxw6wG0yIK3eLeSCPdg6uqmmlIiI= +github.com/aws/aws-sdk-go-v2/credentials v1.17.42 h1:sBP0RPjBU4neGpIYyx8mkU2QqLPl5u9cmdTWVzIpHkM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.42/go.mod h1:FwZBfU530dJ26rv9saAbxa9Ej3eF/AK0OAY86k13n4M= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.18 h1:68jFVtt3NulEzojFesM/WVarlFpCaXLKaBxDpzkQ9OQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.18/go.mod h1:Fjnn5jQVIo6VyedMc0/EhPpfNlPl7dHV916O6B+49aE= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.22 h1:Jw50LwEkVjuVzE1NzkhNKkBf9cRN7MtE1F/b2cOKTUM= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.22/go.mod h1:Y/SmAyPcOTmpeVaWSzSKiILfXTVJwrGmYZhcRbhWuEY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.22 h1:981MHwBaRZM7+9QSR6XamDzF/o7ouUGxFzr+nVSIhrs= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.22/go.mod h1:1RA1+aBEfn+CAB/Mh0MB6LsdCYCnjZm7tKXtnk499ZQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.22 h1:yV+hCAHZZYJQcwAaszoBNwLbPItHvApxT0kVIw6jRgs= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.22/go.mod h1:kbR1TL8llqB1eGnVbybcA4/wgScxdylOdyAd51yxPdw= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.36.3 h1:pS5ka5Z026eG29K3cce+yxG39i5COQARcgheeK9NKQE= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.36.3/go.mod h1:MBT8rSGSZjJiV6X7rlrVGoIt+mCoaw0VbpdVtsrsJfk= +github.com/aws/aws-sdk-go-v2/service/ec2 v1.187.0 h1:cA4hWo269CN5RY7Arqt8BfzXF0KIN8DSNo/KcqHKkWk= +github.com/aws/aws-sdk-go-v2/service/ec2 v1.187.0/go.mod h1:ossaD9Z1ugYb6sq9QIqQLEOorCGcqUoxlhud9M9yE70= +github.com/aws/aws-sdk-go-v2/service/eventbridge v1.35.3 h1:e/jGXEQi+lyTIhc3s+jbJrq2IWgLXsNbdYxDauWTyPU= +github.com/aws/aws-sdk-go-v2/service/eventbridge v1.35.3/go.mod h1:607CryyDS58whuaVno9CCg3L/nnWOqorxiyAS2f9leY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 h1:TToQNkvGguu209puTojY/ozlqy2d/SFNcoLIqTFi42g= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0/go.mod h1:0jp+ltwkf+SwG2fm/PKo8t4y8pJSgOCO4D8Lz3k0aHQ= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.3 h1:kT6BcZsmMtNkP/iYMcRG+mIEA/IbeiUimXtGmqF39y0= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.3/go.mod h1:Z8uGua2k4PPaGOYn66pK02rhMrot3Xk3tpBuUFPomZU= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.3 h1:wudRPcZMKytcywXERkR6PLqD8gPx754ZyIOo0iVg488= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.3/go.mod h1:yRo5Kj5+m/ScVIZpQOquQvDtSrDM1JLRCnvglBcdNmw= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.3 h1:qcxX0JYlgWH3hpPUnd6U0ikcl6LLA9sLkXE2w1fpMvY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.3/go.mod h1:cLSNEmI45soc+Ef8K/L+8sEA3A3pYFEYf5B5UI+6bH4= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.3 h1:ZC7Y/XgKUxwqcdhO5LE8P6oGP1eh6xlQReWNKfhvJno= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.3/go.mod h1:WqfO7M9l9yUAw0HcHaikwRd/H6gzYdz7vjejCA5e2oY= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.32.3 h1:k0LL8/0Pgg3IA+5SgxuKXZRkIo1sP7Mp9dTyuukAouU= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.32.3/go.mod h1:S4FSetfb/MJWdDEdcWVNVP2IOW7U99Hrm9x8NeIJOvA= +github.com/aws/aws-sdk-go-v2/service/s3 v1.66.2 h1:p9TNFL8bFUMd+38YIpTAXpoxyz0MxC7FlbFEH4P4E1U= +github.com/aws/aws-sdk-go-v2/service/s3 v1.66.2/go.mod h1:fNjyo0Coen9QTwQLWeV6WO2Nytwiu+cCcWaTdKCAqqE= +github.com/aws/aws-sdk-go-v2/service/sagemakerruntime v1.32.3 h1:XiEBIj2A2YnzzZFM1U5CIZqQfTs8qmvFdbRcJxuxKgs= +github.com/aws/aws-sdk-go-v2/service/sagemakerruntime v1.32.3/go.mod h1:o+4DOEzyoP+w3bUzKCN32WO5JTdYcgiPDvOZh8oEN/g= +github.com/aws/aws-sdk-go-v2/service/sfn v1.33.3 h1:Q6N+VBfqxVzRB0i2xArfkpz4kjKDLwEkFn9G8IGKLiM= +github.com/aws/aws-sdk-go-v2/service/sfn v1.33.3/go.mod h1:aWluPXGD8XlnhB5pE72NTond4ZsCpcO8xjDf8mdEXM4= +github.com/aws/aws-sdk-go-v2/service/sns v1.33.3 h1:coZW/SqpINT0VWG8vRWWY9TWUof8TDdxublw2Xur0Zc= +github.com/aws/aws-sdk-go-v2/service/sns v1.33.3/go.mod h1:J/G2xuhwNBlDvEi0WR/bnBbac4KSgpkERna/IXEF52w= +github.com/aws/aws-sdk-go-v2/service/sqs v1.36.3 h1:H1bCg79Q4PDtxQH8Fn5kASQlbVv2WGP5o5IEFEBNOAs= +github.com/aws/aws-sdk-go-v2/service/sqs v1.36.3/go.mod h1:W6Uy6OWgxF9RZuHoikthB6f+A0oYXqnfWmFl5m7E2G4= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.3 h1:UTpsIf0loCIWEbrqdLb+0RxnTXfWh2vhw4nQmFi4nPc= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.3/go.mod h1:FZ9j3PFHHAR+w0BSEjK955w5YD2UwB/l/H0yAK3MJvI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.3 h1:2YCmIXv3tmiItw0LlYf6v7gEHebLY45kBEnPezbUKyU= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.3/go.mod h1:u19stRyNPxGhj6dRm+Cdgu6N75qnbW7+QN0q0dsAk58= +github.com/aws/aws-sdk-go-v2/service/sts v1.32.3 h1:wVnQ6tigGsRqSWDEEyH6lSAJ9OyFUsSnbaUWChuSGzs= +github.com/aws/aws-sdk-go-v2/service/sts v1.32.3/go.mod h1:VZa9yTFyj4o10YGsmDO4gbQJUvvhY72fhumT8W4LqsE= +github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM= +github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= diff --git a/pkg/instrumentation/aws_test.go b/pkg/instrumentation/aws_test.go index 6e5f9b54..5fa2754c 100644 --- a/pkg/instrumentation/aws_test.go +++ b/pkg/instrumentation/aws_test.go @@ -2,6 +2,7 @@ package instrumentation import ( "context" + "net/url" "testing" "github.com/aws/aws-sdk-go/aws" @@ -12,6 +13,7 @@ import ( awsv2 "github.com/aws/aws-sdk-go-v2/aws" awscfg "github.com/aws/aws-sdk-go-v2/config" awss3v2 "github.com/aws/aws-sdk-go-v2/service/s3" + smithyendpoints "github.com/aws/smithy-go/endpoints" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -21,6 +23,23 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) +type ( + testCustomResolver struct{} +) + +func (t testCustomResolver) ResolveEndpoint( + ctx context.Context, params awss3v2.EndpointParameters) (smithyendpoints.Endpoint, error) { + uri, err := url.Parse("http://localhost:4566") + if err != nil { + return smithyendpoints.Endpoint{}, err + } + + return smithyendpoints.Endpoint{ + URI: *uri, + }, nil + +} + func TestInstrumentAWSSession(t *testing.T) { cfg := aws.NewConfig(). WithRegion("us-west-2"). @@ -68,19 +87,9 @@ func TestInstrumentAWSSession(t *testing.T) { } func TestInstrumentAWSClient(t *testing.T) { - customResolver := awsv2.EndpointResolverWithOptionsFunc( - func(service, region string, opts ...interface{}) (awsv2.Endpoint, error) { - return awsv2.Endpoint{ - PartitionID: "aws", - URL: "http://localhost:4566", - SigningRegion: "us-east-2", - }, nil - }) - cfg, err := awscfg.LoadDefaultConfig( context.Background(), awscfg.WithRegion("us-west-2"), - awscfg.WithEndpointResolverWithOptions(customResolver), awscfg.WithCredentialsProvider(awsv2.AnonymousCredentials{}), ) require.NoError(t, err) @@ -97,7 +106,7 @@ func TestInstrumentAWSClient(t *testing.T) { mt := mocktracer.Start() defer mt.Stop() - client := awss3v2.NewFromConfig(cfg) + client := awss3v2.NewFromConfig(cfg, awss3v2.WithEndpointResolverV2(&testCustomResolver{})) root, ctx := tracer.StartSpanFromContext(context.Background(), "test") _, err := client.GetObject(ctx, &awss3v2.GetObjectInput{ @@ -122,7 +131,7 @@ func TestInstrumentAWSClient(t *testing.T) { assert.Equal(t, "GET", s.Tag(ext.HTTPMethod)) assert.Equal( t, - "http://test-bucket-name.localhost:4566///test//file//name?x-id=GetObject", + "http://localhost:4566///test//file//name?x-id=GetObject", s.Tag(ext.HTTPURL), ) }) diff --git a/pkg/pubsub/config.go b/pkg/pubsub/config.go index da4f13a4..f7f9b0d4 100644 --- a/pkg/pubsub/config.go +++ b/pkg/pubsub/config.go @@ -1,6 +1,7 @@ package pubsub import ( + "errors" "fmt" "strings" "time" @@ -14,6 +15,8 @@ type ( Config struct { // Kafka contains the configuration for Kafka client Kafka Kafka `mapstructure:"kafka"` + // SQS contains the configuration for AWS SQS client + SQS SQS `mapstructure:"sqs"` } // Publisher contains the publisher specific configuration @@ -135,6 +138,27 @@ type ( // MetricsEnabled controls if metrics publishing is enabled or not MetricsEnabled bool `mapstructure:"metrics_enabled"` } + + SQSSubscriber struct { + Enabled bool `mapstructure:"enabled"` + QueueURL string `mapstructure:"queue_url"` + MaxMessages int `mapstructure:"max_messages"` + Workers int `mapstructure:"workers"` + + WaitTime time.Duration `mapstructure:"wait_time"` + } + + SQSPublisher struct { + Enabled bool `mapstructure:"enabled"` + QueueURL string `mapstructure:"queue_url"` + } + + SQS struct { + // SQS Publisher specific configuration + Publisher SQSPublisher + // SQS Subscriber specific configuration + Subscriber SQSSubscriber + } ) const ( @@ -152,6 +176,8 @@ var ( saslMechanismPlainString: Plain, saslMechanismAWsMskIamString: AWSMskIam, } + + ErrEmptySQSQueueURL = errors.New("sqs queue url is empty") ) // NewConfig returns a new Config instance. @@ -192,6 +218,12 @@ func (c *Config) validate() error { strings.Join(allowedMechanisms, ","), ) } + if c.SQS.Publisher.Enabled && c.SQS.Publisher.QueueURL == "" { + return ErrEmptySQSQueueURL + } + if c.SQS.Subscriber.Enabled && c.SQS.Subscriber.QueueURL == "" { + return ErrEmptySQSQueueURL + } return nil } diff --git a/pkg/pubsub/config_test.go b/pkg/pubsub/config_test.go index 56c38931..d8d047e1 100644 --- a/pkg/pubsub/config_test.go +++ b/pkg/pubsub/config_test.go @@ -36,7 +36,7 @@ func TestNewConfigWithAppRoot(t *testing.T) { testCases := []struct { name string env string - kafka Kafka + cfg Config wantErr bool envOverrides [][]string @@ -44,47 +44,65 @@ func TestNewConfigWithAppRoot(t *testing.T) { { name: "NewWithConfigFileWorks", env: "test", - kafka: Kafka{ - BrokerUrls: []string{"localhost:9092"}, - ClientId: "test-app", - Cert: "pem string", - CertKey: "pem key", - SecurityProtocol: "ssl", - Publisher: Publisher{ - MaxAttempts: 3, - WriteTimeout: 10 * time.Second, - Topic: "test-topic", + cfg: Config{ + Kafka: Kafka{ + BrokerUrls: []string{"localhost:9092"}, + ClientId: "test-app", + Cert: "pem string", + CertKey: "pem key", + SecurityProtocol: "ssl", + Publisher: Publisher{ + MaxAttempts: 3, + WriteTimeout: 10 * time.Second, + Topic: "test-topic", + }, + Subscriber: Subscriber{ + Topic: "test-topic", + AutoCommit: AutoCommit{ + Enabled: true, + }, + }, + SSLVerificationEnabled: true, }, - Subscriber: Subscriber{ - Topic: "test-topic", - AutoCommit: AutoCommit{ - Enabled: true, + SQS: SQS{ + Publisher: SQSPublisher{}, + Subscriber: SQSSubscriber{ + MaxMessages: 1, + Workers: 1, }, }, - SSLVerificationEnabled: true, }, }, { name: "NewWithConfigFileWorks (broker URLs override)", env: "test", - kafka: Kafka{ - BrokerUrls: []string{"localhost:9092", "localhost:9093"}, - ClientId: "test-app", - Cert: "pem string", - CertKey: "pem key", - SecurityProtocol: "ssl", - Publisher: Publisher{ - MaxAttempts: 3, - WriteTimeout: 10 * time.Second, - Topic: "test-topic", + cfg: Config{ + Kafka: Kafka{ + BrokerUrls: []string{"localhost:9092", "localhost:9093"}, + ClientId: "test-app", + Cert: "pem string", + CertKey: "pem key", + SecurityProtocol: "ssl", + Publisher: Publisher{ + MaxAttempts: 3, + WriteTimeout: 10 * time.Second, + Topic: "test-topic", + }, + Subscriber: Subscriber{ + Topic: "test-topic", + AutoCommit: AutoCommit{ + Enabled: true, + }, + }, + SSLVerificationEnabled: true, }, - Subscriber: Subscriber{ - Topic: "test-topic", - AutoCommit: AutoCommit{ - Enabled: true, + SQS: SQS{ + Publisher: SQSPublisher{}, + Subscriber: SQSSubscriber{ + MaxMessages: 1, + Workers: 1, }, }, - SSLVerificationEnabled: true, }, envOverrides: [][]string{{"APP_PUBSUB_KAFKA_BROKER_URLS", "localhost:9092 localhost:9093"}}, @@ -92,24 +110,33 @@ func TestNewConfigWithAppRoot(t *testing.T) { { name: "NewWithConfigFileWorks (auto_commimt override)", env: "test", - kafka: Kafka{ - BrokerUrls: []string{"localhost:9092"}, - ClientId: "test-app", - Cert: "pem string", - CertKey: "pem key", - SecurityProtocol: "ssl", - Publisher: Publisher{ - MaxAttempts: 3, - WriteTimeout: 10 * time.Second, - Topic: "test-topic", + cfg: Config{ + Kafka: Kafka{ + BrokerUrls: []string{"localhost:9092"}, + ClientId: "test-app", + Cert: "pem string", + CertKey: "pem key", + SecurityProtocol: "ssl", + Publisher: Publisher{ + MaxAttempts: 3, + WriteTimeout: 10 * time.Second, + Topic: "test-topic", + }, + Subscriber: Subscriber{ + Topic: "test-topic", + AutoCommit: AutoCommit{ + Enabled: false, + }, + }, + SSLVerificationEnabled: true, }, - Subscriber: Subscriber{ - Topic: "test-topic", - AutoCommit: AutoCommit{ - Enabled: false, + SQS: SQS{ + Publisher: SQSPublisher{}, + Subscriber: SQSSubscriber{ + MaxMessages: 1, + Workers: 1, }, }, - SSLVerificationEnabled: true, }, envOverrides: [][]string{{"APP_PUBSUB_KAFKA_SUBSCRIBER_AUTO_COMMIT_ENABLED", "false"}}, @@ -117,28 +144,37 @@ func TestNewConfigWithAppRoot(t *testing.T) { { name: "NewWithConfigFileWorks (TLS config override)", env: "test", - kafka: Kafka{ - BrokerUrls: []string{"localhost:9092"}, - ClientId: "test-app", - Cert: "pem string", - CertKey: "pem key", - SecurityProtocol: "ssl", - Publisher: Publisher{ - MaxAttempts: 3, - WriteTimeout: 10 * time.Second, - Topic: "test-topic", - }, - Subscriber: Subscriber{ - Topic: "test-topic", - AutoCommit: AutoCommit{ + cfg: Config{ + Kafka: Kafka{ + BrokerUrls: []string{"localhost:9092"}, + ClientId: "test-app", + Cert: "pem string", + CertKey: "pem key", + SecurityProtocol: "ssl", + Publisher: Publisher{ + MaxAttempts: 3, + WriteTimeout: 10 * time.Second, + Topic: "test-topic", + }, + Subscriber: Subscriber{ + Topic: "test-topic", + AutoCommit: AutoCommit{ + Enabled: true, + }, + }, + SSLVerificationEnabled: true, + TLS: TLS{ Enabled: true, + Cert: "pem string", + CertKey: "pem key", }, }, - SSLVerificationEnabled: true, - TLS: TLS{ - Enabled: true, - Cert: "pem string", - CertKey: "pem key", + SQS: SQS{ + Publisher: SQSPublisher{}, + Subscriber: SQSSubscriber{ + MaxMessages: 1, + Workers: 1, + }, }, }, envOverrides: [][]string{ @@ -150,28 +186,37 @@ func TestNewConfigWithAppRoot(t *testing.T) { { name: "NewWithConfigFileWorks (SASL config override, error)", env: "test", - kafka: Kafka{ - BrokerUrls: []string{"localhost:9092"}, - ClientId: "test-app", - Cert: "pem string", - CertKey: "pem key", - SecurityProtocol: "ssl", - Publisher: Publisher{ - MaxAttempts: 3, - WriteTimeout: 10 * time.Second, - Topic: "test-topic", - }, - Subscriber: Subscriber{ - Topic: "test-topic", - AutoCommit: AutoCommit{ - Enabled: true, + cfg: Config{ + Kafka: Kafka{ + BrokerUrls: []string{"localhost:9092"}, + ClientId: "test-app", + Cert: "pem string", + CertKey: "pem key", + SecurityProtocol: "ssl", + Publisher: Publisher{ + MaxAttempts: 3, + WriteTimeout: 10 * time.Second, + Topic: "test-topic", + }, + Subscriber: Subscriber{ + Topic: "test-topic", + AutoCommit: AutoCommit{ + Enabled: true, + }, }, + SASL: SASL{ + Enabled: true, + Mechanism: "test", + }, + SSLVerificationEnabled: true, }, - SASL: SASL{ - Enabled: true, - Mechanism: "test", + SQS: SQS{ + Publisher: SQSPublisher{}, + Subscriber: SQSSubscriber{ + MaxMessages: 1, + Workers: 1, + }, }, - SSLVerificationEnabled: true, }, envOverrides: [][]string{ {"APP_PUBSUB_KAFKA_SASL_ENABLED", "true"}, @@ -182,32 +227,41 @@ func TestNewConfigWithAppRoot(t *testing.T) { { name: "NewWithConfigFileWorks (SASL config override, error)", env: "test", - kafka: Kafka{ - BrokerUrls: []string{"localhost:9092"}, - ClientId: "test-app", - Cert: "pem string", - CertKey: "pem key", - SecurityProtocol: "ssl", - Publisher: Publisher{ - MaxAttempts: 3, - WriteTimeout: 10 * time.Second, - Topic: "test-topic", - }, - Subscriber: Subscriber{ - Topic: "test-topic", - AutoCommit: AutoCommit{ - Enabled: true, + cfg: Config{ + Kafka: Kafka{ + BrokerUrls: []string{"localhost:9092"}, + ClientId: "test-app", + Cert: "pem string", + CertKey: "pem key", + SecurityProtocol: "ssl", + Publisher: Publisher{ + MaxAttempts: 3, + WriteTimeout: 10 * time.Second, + Topic: "test-topic", }, + Subscriber: Subscriber{ + Topic: "test-topic", + AutoCommit: AutoCommit{ + Enabled: true, + }, + }, + SASL: SASL{ + Enabled: true, + Mechanism: "aws_msk_iam", + AWSMskIam: SASLAwsMskIam{ + AccessKey: "access key", + SecretKey: "secret key", + }, + }, + SSLVerificationEnabled: true, }, - SASL: SASL{ - Enabled: true, - Mechanism: "aws_msk_iam", - AWSMskIam: SASLAwsMskIam{ - AccessKey: "access key", - SecretKey: "secret key", + SQS: SQS{ + Publisher: SQSPublisher{}, + Subscriber: SQSSubscriber{ + MaxMessages: 1, + Workers: 1, }, }, - SSLVerificationEnabled: true, }, envOverrides: [][]string{ {"APP_PUBSUB_KAFKA_SASL_ENABLED", "true"}, @@ -216,6 +270,122 @@ func TestNewConfigWithAppRoot(t *testing.T) { {"APP_PUBSUB_KAFKA_SASL_AWS_MSK_IAM_SECRET_KEY", "secret key"}, }, }, + { + name: "NewWithConfigFileWorks (SQS overrides)", + env: "test", + cfg: Config{ + Kafka: Kafka{ + BrokerUrls: []string{"localhost:9092"}, + ClientId: "test-app", + Cert: "pem string", + CertKey: "pem key", + SecurityProtocol: "ssl", + Publisher: Publisher{ + MaxAttempts: 3, + WriteTimeout: 10 * time.Second, + Topic: "test-topic", + }, + Subscriber: Subscriber{ + Topic: "test-topic", + AutoCommit: AutoCommit{ + Enabled: true, + }, + }, + SSLVerificationEnabled: true, + }, + SQS: SQS{ + Publisher: SQSPublisher{ + QueueURL: "https://test2.com", + }, + Subscriber: SQSSubscriber{ + QueueURL: "https://test3.com", + MaxMessages: 10, + Workers: 5, + }, + }, + }, + envOverrides: [][]string{ + {"APP_PUBSUB_SQS_PUBLISHER_QUEUE_URL", "https://test2.com"}, + {"APP_PUBSUB_SQS_SUBSCRIBER_QUEUE_URL", "https://test3.com"}, + {"APP_PUBSUB_SQS_SUBSCRIBER_MAX_MESSAGES", "10"}, + {"APP_PUBSUB_SQS_SUBSCRIBER_WORKERS", "5"}, + }, + }, + { + name: "NewWithConfigFileWorks (SQS override, empty publisher QueueURL)", + env: "test", + wantErr: true, + cfg: Config{ + Kafka: Kafka{ + BrokerUrls: []string{"localhost:9092"}, + ClientId: "test-app", + Cert: "pem string", + CertKey: "pem key", + SecurityProtocol: "ssl", + Publisher: Publisher{ + MaxAttempts: 3, + WriteTimeout: 10 * time.Second, + Topic: "test-topic", + }, + Subscriber: Subscriber{ + Topic: "test-topic", + AutoCommit: AutoCommit{ + Enabled: true, + }, + }, + SSLVerificationEnabled: true, + }, + SQS: SQS{ + Publisher: SQSPublisher{ + Enabled: true, + }, + Subscriber: SQSSubscriber{ + MaxMessages: 1, + Workers: 1, + }, + }, + }, + envOverrides: [][]string{ + {"APP_PUBSUB_SQS_PUBLISHER_ENABLED", "true"}, + }, + }, + { + name: "NewWithConfigFileWorks (SQS override, empty subscriber QueueURL)", + env: "test", + wantErr: true, + cfg: Config{ + Kafka: Kafka{ + BrokerUrls: []string{"localhost:9092"}, + ClientId: "test-app", + Cert: "pem string", + CertKey: "pem key", + SecurityProtocol: "ssl", + Publisher: Publisher{ + MaxAttempts: 3, + WriteTimeout: 10 * time.Second, + Topic: "test-topic", + }, + Subscriber: Subscriber{ + Topic: "test-topic", + AutoCommit: AutoCommit{ + Enabled: true, + }, + }, + SSLVerificationEnabled: true, + }, + SQS: SQS{ + Publisher: SQSPublisher{}, + Subscriber: SQSSubscriber{ + MaxMessages: 1, + Workers: 1, + Enabled: true, + }, + }, + }, + envOverrides: [][]string{ + {"APP_PUBSUB_SQS_SUBSCRIBER_ENABLED", "true"}, + }, + }, } currentAppRoot := os.Getenv("APP_ROOT") @@ -245,7 +415,7 @@ func TestNewConfigWithAppRoot(t *testing.T) { require.Nil(t, err) } - assert.Equal(t, tc.kafka, c.Kafka) + assert.Equal(t, tc.cfg, *c) // teardown if len(envVariables) > 0 { diff --git a/pkg/pubsub/kafka/partitionconsumer.go b/pkg/pubsub/kafka/partitionconsumer.go index 824f66cb..4fcb6ad8 100644 --- a/pkg/pubsub/kafka/partitionconsumer.go +++ b/pkg/pubsub/kafka/partitionconsumer.go @@ -7,10 +7,11 @@ import ( sdkkafka "github.com/scribd/go-sdk/pkg/instrumentation/kafka" sdklogger "github.com/scribd/go-sdk/pkg/logger" + "github.com/scribd/go-sdk/pkg/pubsub/pool" ) type pconsumer struct { - pool *pool + pool *pool.Pool quit chan struct{} done chan struct{} diff --git a/pkg/pubsub/kafka/subscriber.go b/pkg/pubsub/kafka/subscriber.go index d256f8e6..93f3cd55 100644 --- a/pkg/pubsub/kafka/subscriber.go +++ b/pkg/pubsub/kafka/subscriber.go @@ -12,6 +12,7 @@ import ( sdkkafka "github.com/scribd/go-sdk/pkg/instrumentation/kafka" sdklogger "github.com/scribd/go-sdk/pkg/logger" + "github.com/scribd/go-sdk/pkg/pubsub/pool" ) type ( @@ -152,7 +153,7 @@ func (s *Subscriber) assigned(_ context.Context, cl *kgo.Client, assigned map[st pc := pconsumer{ quit: make(chan struct{}), recs: make(chan *sdkkafka.FetchPartition), - pool: newPool(s.numWorkers), + pool: pool.New(s.numWorkers), done: make(chan struct{}), } s.consumers[topic][partition] = pc diff --git a/pkg/pubsub/kafka/pool.go b/pkg/pubsub/pool/pool.go similarity index 64% rename from pkg/pubsub/kafka/pool.go rename to pkg/pubsub/pool/pool.go index e80ce1be..ce961408 100644 --- a/pkg/pubsub/kafka/pool.go +++ b/pkg/pubsub/pool/pool.go @@ -1,12 +1,12 @@ -package kafka +package pool -type pool struct { +type Pool struct { sem chan struct{} work chan func() } -func newPool(size int) *pool { - p := &pool{ +func New(size int) *Pool { + p := &Pool{ sem: make(chan struct{}, size), work: make(chan func()), } @@ -14,7 +14,7 @@ func newPool(size int) *pool { return p } -func (p *pool) Schedule(task func()) { +func (p *Pool) Schedule(task func()) { select { case p.work <- task: return @@ -23,7 +23,7 @@ func (p *pool) Schedule(task func()) { } } -func (p *pool) worker(task func()) { +func (p *Pool) worker(task func()) { defer func() { <-p.sem }() for { diff --git a/pkg/pubsub/sqs/publisher.go b/pkg/pubsub/sqs/publisher.go new file mode 100644 index 00000000..af00e5f2 --- /dev/null +++ b/pkg/pubsub/sqs/publisher.go @@ -0,0 +1,29 @@ +package sqs + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/service/sqs" +) + +type ( + Publisher struct { + client *sqs.Client + queueURL string + } +) + +func NewPublisher(sqsClient *sqs.Client, queueURL string) *Publisher { + return &Publisher{ + client: sqsClient, + queueURL: queueURL, + } +} + +func (p *Publisher) Publish(ctx context.Context, msg *sqs.SendMessageInput) (*sqs.SendMessageOutput, error) { + if msg.QueueUrl == nil { + msg.QueueUrl = &p.queueURL + } + + return p.client.SendMessage(ctx, msg) +} diff --git a/pkg/pubsub/sqs/subscriber.go b/pkg/pubsub/sqs/subscriber.go new file mode 100644 index 00000000..81fdec5e --- /dev/null +++ b/pkg/pubsub/sqs/subscriber.go @@ -0,0 +1,119 @@ +package sqs + +import ( + "context" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + + "github.com/scribd/go-sdk/pkg/pubsub" + "github.com/scribd/go-sdk/pkg/pubsub/pool" +) + +type ( + SQSClient interface { + ReceiveMessage( + ctx context.Context, + params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) + } + + Subscriber struct { + client SQSClient + queueURL string + handler MsgHandler + maxMessages int + pool *pool.Pool + waitTime time.Duration + wg sync.WaitGroup // tracks active handlers + stopCh chan struct{} + } + + SubscriberConfig struct { + SQSClient *sqs.Client + MsgHandler MsgHandler + SQSConfig pubsub.SQS + } + + MsgHandler func(msg types.Message) +) + +const ( + defaultNumWorkers = 1 + maxWaitTime = 20 * time.Second +) + +func NewSubscriber(c SubscriberConfig) *Subscriber { + workers := c.SQSConfig.Subscriber.Workers + if workers == 0 { + workers = defaultNumWorkers + } + + waitTime := c.SQSConfig.Subscriber.WaitTime + if waitTime > maxWaitTime { + waitTime = maxWaitTime + } + + return &Subscriber{ + client: c.SQSClient, + handler: c.MsgHandler, + maxMessages: c.SQSConfig.Subscriber.MaxMessages, + queueURL: c.SQSConfig.Subscriber.QueueURL, + pool: pool.New(workers), + waitTime: waitTime, + stopCh: make(chan struct{}), + } +} + +func (s *Subscriber) Subscribe(ctx context.Context) chan error { + ch := make(chan error) + + req := &sqs.ReceiveMessageInput{ + QueueUrl: aws.String(s.queueURL), + MaxNumberOfMessages: int32(s.maxMessages), + MessageAttributeNames: []string{"All"}, + MessageSystemAttributeNames: []types.MessageSystemAttributeName{ + types.MessageSystemAttributeNameAll, + }, + } + if s.waitTime > 0 { + req.WaitTimeSeconds = int32(s.waitTime.Seconds()) + } + + go func() { + defer close(ch) + + for { + select { + case <-s.stopCh: + return + default: + response, err := s.client.ReceiveMessage(ctx, req) + if err != nil { + ch <- err + + return + } + + for _, message := range response.Messages { + s.wg.Add(1) + s.pool.Schedule(func() { + s.handler(message) + s.wg.Done() + }) + } + } + } + }() + + return ch +} + +func (s *Subscriber) Unsubscribe() error { + close(s.stopCh) + s.wg.Wait() + + return nil +} diff --git a/pkg/pubsub/sqs/subscriber_test.go b/pkg/pubsub/sqs/subscriber_test.go new file mode 100644 index 00000000..990977cf --- /dev/null +++ b/pkg/pubsub/sqs/subscriber_test.go @@ -0,0 +1,92 @@ +package sqs + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + + "github.com/scribd/go-sdk/pkg/pubsub/pool" +) + +type ( + mockSQSClient struct { + msgs []types.Message + } +) + +func (m *mockSQSClient) ReceiveMessage( + ctx context.Context, + params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) { + return &sqs.ReceiveMessageOutput{ + Messages: m.msgs, + }, nil +} + +func Test_Subscriber_Subscribe(t *testing.T) { + t.Run("all subscribers finished", func(t *testing.T) { + var nHandlers int64 // atomic + var executedTimes int64 + + c := make(chan int, 6) + + sub := &Subscriber{ + pool: pool.New(2), + stopCh: make(chan struct{}), + client: &mockSQSClient{ + msgs: []types.Message{ + { + Body: aws.String("1"), + }, + { + Body: aws.String("2"), + }, + { + Body: aws.String("3"), + }, + { + Body: aws.String("4"), + }, + { + Body: aws.String("5"), + }, + { + Body: aws.String("6"), + }, + }, + }, + handler: func(msg types.Message) { + c <- 0 + + atomic.AddInt64(&nHandlers, 1) + defer atomic.AddInt64(&nHandlers, -1) + atomic.AddInt64(&executedTimes, 1) + + time.Sleep(time.Millisecond * 10) + }, + } + + _ = sub.Subscribe(context.Background()) + // Make sure all goroutines have started. + for i := 0; i < cap(c); i++ { + <-c + } + + err := sub.Unsubscribe() + if err != nil { + t.Errorf("expected nil, got %v", err) + } + + if got := atomic.LoadInt64(&nHandlers); got != 0 { + t.Errorf("expected 0, got %d", got) + } + + if got := atomic.LoadInt64(&executedTimes); got != 6 { + t.Errorf("expected 6, got %d", got) + } + }) +} diff --git a/pkg/pubsub/testdata/config/pubsub.yml b/pkg/pubsub/testdata/config/pubsub.yml index 3017a4cd..58bd5820 100644 --- a/pkg/pubsub/testdata/config/pubsub.yml +++ b/pkg/pubsub/testdata/config/pubsub.yml @@ -41,5 +41,15 @@ test: &test session_token: "" user_agent: "" + sqs: + publisher: + enabled: false + queue_url: "" + subscriber: + enabled: false + queue_url: "" + workers: 1 + max_messages: 1 + development: <<: *test diff --git a/pkg/transport/sqs/encode_decode.go b/pkg/transport/sqs/encode_decode.go new file mode 100644 index 00000000..58eed360 --- /dev/null +++ b/pkg/transport/sqs/encode_decode.go @@ -0,0 +1,24 @@ +package sqs + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" +) + +// DecodeRequestFunc extracts a user-domain request object from +// an SQS message object. It is designed to be used in Subscribers. +type DecodeRequestFunc func(context.Context, types.Message) (request interface{}, err error) + +// EncodeRequestFunc encodes the passed payload object into +// an SQS message object. It is designed to be used in Publishers. +type EncodeRequestFunc func(context.Context, *sqs.SendMessageInput, interface{}) error + +// EncodeResponseFunc encodes the passed response object to +// an SQS message object. It is designed to be used in Subscribers. +type EncodeResponseFunc func(context.Context, *sqs.SendMessageInput, interface{}) error + +// DecodeResponseFunc extracts a user-domain response object from +// an SQS message object. It is designed to be used in Publishers. +type DecodeResponseFunc func(context.Context, types.Message) (response interface{}, err error) diff --git a/pkg/transport/sqs/publisher.go b/pkg/transport/sqs/publisher.go new file mode 100644 index 00000000..cd2b7dc4 --- /dev/null +++ b/pkg/transport/sqs/publisher.go @@ -0,0 +1,136 @@ +package sqs + +import ( + "context" + "encoding/json" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/go-kit/kit/endpoint" +) + +type contextKey int + +const ( + // ContextKeyResponseQueueURL is the context key that allows fetching + // and setting the response queue URL from and into context. + ContextKeyResponseQueueURL contextKey = iota +) + +type ( + SQSPublisher interface { + Publish(ctx context.Context, message *sqs.SendMessageInput) (*sqs.SendMessageOutput, error) + } + + // Publisher wraps an Publisher client, and provides a method that + // implements endpoint.Endpoint. + Publisher struct { + Handler SQSPublisher + queueURL string + enc EncodeRequestFunc + dec DecodeResponseFunc + before []PublisherRequestFunc + after []PublisherResponseFunc + } +) + +// NewPublisher constructs a usable Publisher for a single remote method. +func NewPublisher( + handler SQSPublisher, + queueURL string, + enc EncodeRequestFunc, + dec DecodeResponseFunc, + options ...PublisherOption, +) *Publisher { + p := &Publisher{ + Handler: handler, + queueURL: queueURL, + enc: enc, + dec: dec, + } + for _, option := range options { + option(p) + } + return p +} + +// PublisherOption sets an optional parameter for clients. +type PublisherOption func(*Publisher) + +// PublisherBefore sets the RequestFuncs that are applied to the outgoing SQS +// request before it's invoked. +func PublisherBefore(before ...PublisherRequestFunc) PublisherOption { + return func(p *Publisher) { p.before = append(p.before, before...) } +} + +// PublisherAfter sets the ClientResponseFuncs applied to the incoming SQS +// request prior to it being decoded. This is useful for obtaining the response +// and adding any information onto the context prior to decoding. +func PublisherAfter(after ...PublisherResponseFunc) PublisherOption { + return func(p *Publisher) { p.after = append(p.after, after...) } +} + +// SetPublisherResponseQueueURL can be used as a before function to add +// provided url as responseQueueURL in context. +func SetPublisherResponseQueueURL(url string) PublisherRequestFunc { + return func(ctx context.Context, _ *sqs.SendMessageInput) context.Context { + return context.WithValue(ctx, ContextKeyResponseQueueURL, url) + } +} + +// Endpoint returns a usable endpoint that invokes the remote endpoint. +func (p Publisher) Endpoint() endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + msgInput := sqs.SendMessageInput{ + QueueUrl: &p.queueURL, + } + if err := p.enc(ctx, &msgInput, request); err != nil { + return nil, err + } + + for _, f := range p.before { + ctx = f(ctx, &msgInput) + } + + output, err := p.Handler.Publish(ctx, &msgInput) + if err != nil { + return nil, err + } + + var responseMsg types.Message + for _, f := range p.after { + ctx, responseMsg, err = f(ctx, p.Handler, output) + if err != nil { + return nil, err + } + } + + response, err := p.dec(ctx, responseMsg) + if err != nil { + return nil, err + } + + return response, nil + } +} + +// EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a +// JSON object and loads it as the MessageBody of the sqs.SendMessageInput. +// This can be enough for most JSON over SQS communications. +func EncodeJSONRequest(_ context.Context, msg *sqs.SendMessageInput, request interface{}) error { + b, err := json.Marshal(request) + if err != nil { + return err + } + + msg.MessageBody = aws.String(string(b)) + + return nil +} + +// NoResponseDecode is a DecodeResponseFunc that can be used when no response is needed. +// It returns nil value and nil error. +func NoResponseDecode(_ context.Context, _ types.Message) (interface{}, error) { + return nil, nil +} diff --git a/pkg/transport/sqs/publisher_test.go b/pkg/transport/sqs/publisher_test.go new file mode 100644 index 00000000..a70c67a8 --- /dev/null +++ b/pkg/transport/sqs/publisher_test.go @@ -0,0 +1,328 @@ +package sqs + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" +) + +const ( + queueURL = "someURL" +) + +type testReq struct { + Squadron int `json:"s"` +} + +type testRes struct { + Squadron int `json:"s"` + Name string `json:"n"` +} + +var names = map[int]string{ + 424: "tiger", + 426: "thunderbird", + 429: "bison", + 436: "tusker", + 437: "husky", +} + +// mockClient is a mock of SQS Handler. +type mockClient struct { + SQSPublisher + SQSClient + err error + sendOutputChan chan types.Message + receiveOutputChan chan *sqs.ReceiveMessageOutput + sendMsgID string + deleteError error +} + +func (mock *mockClient) Publish(ctx context.Context, input *sqs.SendMessageInput) (*sqs.SendMessageOutput, error) { + if input != nil && input.MessageBody != nil && *input.MessageBody != "" { + go func() { + mock.receiveOutputChan <- &sqs.ReceiveMessageOutput{ + Messages: []types.Message{ + { + MessageAttributes: input.MessageAttributes, + Body: input.MessageBody, + MessageId: aws.String(mock.sendMsgID), + }, + }, + } + }() + return &sqs.SendMessageOutput{MessageId: aws.String(mock.sendMsgID)}, nil + } + // Add logic to allow context errors. + for { + select { + case d := <-mock.sendOutputChan: + return &sqs.SendMessageOutput{MessageId: d.MessageId}, mock.err + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +// TestBadEncode tests if encode errors are handled properly. +func TestBadEncode(t *testing.T) { + mock := &mockClient{ + sendOutputChan: make(chan types.Message), + } + pub := NewPublisher( + mock, + queueURL, + func(context.Context, *sqs.SendMessageInput, interface{}) error { return errors.New("err!") }, + func(context.Context, types.Message) (response interface{}, err error) { return struct{}{}, nil }, + ) + errChan := make(chan error, 1) + var err error + go func() { + _, pubErr := pub.Endpoint()(context.Background(), struct{}{}) + errChan <- pubErr + + }() + select { + case err = <-errChan: + break + + case <-time.After(100 * time.Millisecond): + t.Fatal("Timed out waiting for result") + } + if err == nil { + t.Error("expected error") + } + if want, have := "err!", err.Error(); want != have { + t.Errorf("want %s, have %s", want, have) + } +} + +// TestBadDecode tests if decode errors are handled properly. +func TestBadDecode(t *testing.T) { + mock := &mockClient{ + sendOutputChan: make(chan types.Message), + } + go func() { + mock.sendOutputChan <- types.Message{ + MessageId: aws.String("someMsgID"), + } + }() + + pub := NewPublisher( + mock, + queueURL, + func(context.Context, *sqs.SendMessageInput, interface{}) error { return nil }, + func(context.Context, types.Message) (response interface{}, err error) { + return struct{}{}, errors.New("err!") + }, + PublisherAfter(func( + ctx context.Context, _ SQSPublisher, msg *sqs.SendMessageOutput) (context.Context, types.Message, error) { + // Set the actual response for the request. + return ctx, types.Message{Body: aws.String("someMsgContent")}, nil + }), + ) + + var err error + errChan := make(chan error, 1) + go func() { + _, pubErr := pub.Endpoint()(context.Background(), struct{}{}) + errChan <- pubErr + }() + + select { + case err = <-errChan: + break + + case <-time.After(100 * time.Millisecond): + t.Fatal("Timed out waiting for result") + } + + if err == nil { + t.Error("expected error") + } + if want, have := "err!", err.Error(); want != have { + t.Errorf("want %s, have %s", want, have) + } +} + +// TestSuccessfulPublisher ensures that the producer mechanisms work. +func TestSuccessfulPublisher(t *testing.T) { + mockReq := testReq{437} + mockRes := testRes{ + Squadron: mockReq.Squadron, + Name: names[mockReq.Squadron], + } + b, err := json.Marshal(mockRes) + if err != nil { + t.Fatal(err) + } + mock := &mockClient{ + sendOutputChan: make(chan types.Message), + sendMsgID: "someMsgID", + } + go func() { + mock.sendOutputChan <- types.Message{ + MessageId: aws.String("someMsgID"), + } + }() + + pub := NewPublisher( + mock, + queueURL, + EncodeJSONRequest, + func(_ context.Context, msg types.Message) (interface{}, error) { + response := testRes{} + if unmarshallErr := json.Unmarshal([]byte(*msg.Body), &response); unmarshallErr != nil { + return nil, unmarshallErr + } + return response, nil + }, + PublisherAfter(func( + ctx context.Context, _ SQSPublisher, msg *sqs.SendMessageOutput) (context.Context, types.Message, error) { + // Sets the actual response for the request. + if *msg.MessageId == "someMsgID" { + return ctx, types.Message{Body: aws.String(string(b))}, nil + } + return nil, types.Message{}, fmt.Errorf("Did not receive expected SendMessageOutput") + }), + ) + var res testRes + var ok bool + resChan := make(chan interface{}, 1) + errChan := make(chan error, 1) + go func() { + r, pubErr := pub.Endpoint()(context.Background(), mockReq) + if pubErr != nil { + errChan <- pubErr + } else { + resChan <- r + } + }() + + select { + case response := <-resChan: + res, ok = response.(testRes) + if !ok { + t.Error("failed to assert endpoint response type") + } + break + + case err = <-errChan: + break + + case <-time.After(100 * time.Millisecond): + t.Fatal("timed out waiting for result") + } + + if err != nil { + t.Fatal(err) + } + if want, have := mockRes.Name, res.Name; want != have { + t.Errorf("want %s, have %s", want, have) + } +} + +// TestSuccessfulPublisherNoResponse ensures that the producer response mechanism works. +func TestSuccessfulPublisherNoResponse(t *testing.T) { + mock := &mockClient{ + sendOutputChan: make(chan types.Message), + receiveOutputChan: make(chan *sqs.ReceiveMessageOutput), + sendMsgID: "someMsgID", + } + + pub := NewPublisher( + mock, + queueURL, + EncodeJSONRequest, + NoResponseDecode, + ) + var err error + errChan := make(chan error, 1) + finishChan := make(chan bool, 1) + go func() { + _, pubErr := pub.Endpoint()(context.Background(), struct{}{}) + if pubErr != nil { + errChan <- pubErr + } else { + finishChan <- true + } + }() + + select { + case <-finishChan: + break + case err = <-errChan: + t.Errorf("unexpected error %s", err) + case <-time.After(100 * time.Millisecond): + t.Fatal("timed out waiting for result") + } +} + +// TestPublisherWithBefore adds a PublisherBefore function that adds responseQueueURL to context, +// and another on that adds it as a message attribute to outgoing message. +// This test ensures that setting multiple before functions work as expected +// and that SetPublisherResponseQueueURL works as expected. +func TestPublisherWithBefore(t *testing.T) { + mock := &mockClient{ + sendOutputChan: make(chan types.Message), + receiveOutputChan: make(chan *sqs.ReceiveMessageOutput), + sendMsgID: "someMsgID", + } + + responseQueueURL := "someOtherURL" + pub := NewPublisher( + mock, + queueURL, + EncodeJSONRequest, + NoResponseDecode, + PublisherBefore(SetPublisherResponseQueueURL(responseQueueURL)), + PublisherBefore(func(c context.Context, s *sqs.SendMessageInput) context.Context { + responseQueueURL := c.Value(ContextKeyResponseQueueURL).(string) + if s.MessageAttributes == nil { + s.MessageAttributes = make(map[string]types.MessageAttributeValue) + } + s.MessageAttributes["responseQueueURL"] = types.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: &responseQueueURL, + } + return c + }), + ) + var err error + errChan := make(chan error, 1) + go func() { + _, pubErr := pub.Endpoint()(context.Background(), struct{}{}) + if pubErr != nil { + errChan <- pubErr + } + }() + + want := types.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: &responseQueueURL, + } + + select { + case receiveOutput := <-mock.receiveOutputChan: + if len(receiveOutput.Messages) != 1 { + t.Errorf("published %d messages instead of 1", len(receiveOutput.Messages)) + } + if have, exists := receiveOutput.Messages[0].MessageAttributes["responseQueueURL"]; !exists { + t.Errorf("expected MessageAttributes responseQueueURL not found") + } else if *have.StringValue != responseQueueURL || *have.DataType != "String" { + t.Errorf("want %v, have %v", want, have) + } + break + case err = <-errChan: + t.Errorf("unexpected error %s", err) + case <-time.After(100 * time.Millisecond): + t.Fatal("timed out waiting for result") + } +} diff --git a/pkg/transport/sqs/request_response_funcs.go b/pkg/transport/sqs/request_response_funcs.go new file mode 100644 index 00000000..2808a41d --- /dev/null +++ b/pkg/transport/sqs/request_response_funcs.go @@ -0,0 +1,110 @@ +package sqs + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + + sdkloggercontext "github.com/scribd/go-sdk/pkg/context/logger" + sdkmetricscontext "github.com/scribd/go-sdk/pkg/context/metrics" + sdkrequestidcontext "github.com/scribd/go-sdk/pkg/context/requestid" + sdkinstrumentation "github.com/scribd/go-sdk/pkg/instrumentation" + sdklogger "github.com/scribd/go-sdk/pkg/logger" + sdkmetrics "github.com/scribd/go-sdk/pkg/metrics" +) + +// SubscriberRequestFunc may take information from a subscriber request result and +// put it into a request context. In Subscribers, RequestFuncs are executed prior +// to invoking the endpoint. +// use cases eg. in Subscriber : extract message information into context. +type SubscriberRequestFunc func( + ctx context.Context, cancel context.CancelFunc, message types.Message) context.Context + +// PublisherRequestFunc may take information from a producer request and put it into a +// request context, or add some informations to SendMessageInput. In Publishers, +// RequestFuncs are executed prior to publishing the message but after encoding. +// use cases eg. in Publisher : enforce some message attributes to SendMessageInput. +type PublisherRequestFunc func(ctx context.Context, input *sqs.SendMessageInput) context.Context + +// SubscriberResponseFunc may take information from a request context and use it to +// manipulate a Publisher. SubscriberResponseFunc are only executed in +// subscriber, after invoking the endpoint. +// use cases eg. : Pipe information from request message, delete msg from queue, etc. +type SubscriberResponseFunc func( + ctx context.Context, cancel context.CancelFunc, message types.Message, resp interface{}) context.Context + +// PublisherResponseFunc may take information from an sqs.SendMessageOutput and +// fetch response using the Client. SQS is not req-reply out-of-the-box. Responses need to be fetched. +// PublisherResponseFunc are only executed in producers, after a request has been made, +// but prior to its response being decoded. So this is the perfect place to fetch actual response. +type PublisherResponseFunc func( + context.Context, SQSPublisher, *sqs.SendMessageOutput) (context.Context, types.Message, error) + +// SetPublisherLogger returns PublisherRequestFunc that sets SDK Logger to the request context. +// It will also try to setup context values to the logger fields. +func SetPublisherLogger(l sdklogger.Logger) PublisherRequestFunc { + return func(ctx context.Context, input *sqs.SendMessageInput) context.Context { + logContext := sdkinstrumentation.TraceLogs(ctx) + + requestID, err := sdkrequestidcontext.Extract(ctx) + if err != nil { + l.WithFields(sdklogger.Fields{ + "error": err.Error(), + }).Tracef("Could not retrieve request id from the context") + } + + logger := l.WithFields(sdklogger.Fields{ + "pubsub": sdklogger.Fields{ + "request_id": requestID, + }, + "dd": sdklogger.Fields{ + "trace_id": logContext.TraceID, + "span_id": logContext.SpanID, + }, + }) + + return sdkloggercontext.ToContext(ctx, logger) + } +} + +// SetSubscriberLogger returns SubscriberRequestFunc that sets SDK Logger to the request context. +// It will also try to setup context values to the logger fields. +func SetSubscriberLogger(l sdklogger.Logger) SubscriberRequestFunc { + return func(ctx context.Context, cancel context.CancelFunc, message types.Message) context.Context { + logContext := sdkinstrumentation.TraceLogs(ctx) + + requestID, err := sdkrequestidcontext.Extract(ctx) + if err != nil { + l.WithFields(sdklogger.Fields{ + "error": err.Error(), + }).Tracef("Could not retrieve request id from the context") + } + + logger := l.WithFields(sdklogger.Fields{ + "pubsub": sdklogger.Fields{ + "request_id": requestID, + }, + "dd": sdklogger.Fields{ + "trace_id": logContext.TraceID, + "span_id": logContext.SpanID, + }, + }) + + return sdkloggercontext.ToContext(ctx, logger) + } +} + +// SetPublisherMetrics returns PublisherRequestFunc that sets the Metrics client to the request context. +func SetPublisherMetrics(m sdkmetrics.Metrics) PublisherRequestFunc { + return func(ctx context.Context, input *sqs.SendMessageInput) context.Context { + return sdkmetricscontext.ToContext(ctx, m) + } +} + +// SetSubscriberMetrics returns SubscriberRequestFunc that sets the Metrics client to the request context. +func SetSubscriberMetrics(m sdkmetrics.Metrics) SubscriberRequestFunc { + return func(ctx context.Context, cancel context.CancelFunc, message types.Message) context.Context { + return sdkmetricscontext.ToContext(ctx, m) + } +} diff --git a/pkg/transport/sqs/subscriber.go b/pkg/transport/sqs/subscriber.go new file mode 100644 index 00000000..f6d3c026 --- /dev/null +++ b/pkg/transport/sqs/subscriber.go @@ -0,0 +1,241 @@ +package sqs + +import ( + "context" + "encoding/json" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/transport" +) + +type ( + SQSClient interface { + ChangeMessageVisibility( + ctx context.Context, + input *sqs.ChangeMessageVisibilityInput, + optFns ...func(opts *sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error) + DeleteMessage( + ctx context.Context, + input *sqs.DeleteMessageInput, + optFns ...func(opts *sqs.Options)) (*sqs.DeleteMessageOutput, error) + } + + // Subscriber wraps an endpoint and provides a handler for SQS messages. + Subscriber struct { + sqsClient SQSClient + e endpoint.Endpoint + dec DecodeRequestFunc + enc EncodeResponseFunc + queueURL string + before []SubscriberRequestFunc + after []SubscriberResponseFunc + errorEncoder ErrorEncoder + finalizer []SubscriberFinalizerFunc + errorHandler transport.ErrorHandler + } +) + +// NewSubscriber constructs a new Subscriber, which provides a ServeMessage method +// and message handlers that wrap the provided endpoint. +func NewSubscriber( + sqsClient SQSClient, + e endpoint.Endpoint, + dec DecodeRequestFunc, + enc EncodeResponseFunc, + queueURL string, + options ...SubscriberOption, +) *Subscriber { + s := &Subscriber{ + sqsClient: sqsClient, + e: e, + dec: dec, + enc: enc, + queueURL: queueURL, + errorEncoder: DefaultErrorEncoder, + errorHandler: transport.NewLogErrorHandler(log.NewNopLogger()), + } + for _, option := range options { + option(s) + } + return s +} + +// SubscriberOption sets an optional parameter for subscribers. +type SubscriberOption func(*Subscriber) + +// SubscriberBefore functions are executed on the producer request object before the +// request is decoded. +func SubscriberBefore(before ...SubscriberRequestFunc) SubscriberOption { + return func(s *Subscriber) { s.before = append(s.before, before...) } +} + +// SubscriberAfter functions are executed on the subscriber reply after the +// endpoint is invoked. +func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption { + return func(s *Subscriber) { s.after = append(s.after, after...) } +} + +// SubscriberErrorEncoder is used to encode errors to the subscriber reply +// whenever they're encountered in the processing of a request. Clients can +// use this to provide custom error formatting. By default, +// errors will be published with the DefaultErrorEncoder. +func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption { + return func(s *Subscriber) { s.errorEncoder = ee } +} + +// SubscriberErrorHandler is used to handle non-terminal errors. By default, non-terminal errors +// are ignored. This is intended as a diagnostic measure. Finer-grained control +// of error handling, including logging in more detail, should be performed in a +// custom SubscriberErrorEncoder which has access to the context. +func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption { + return func(s *Subscriber) { s.errorHandler = errorHandler } +} + +// SubscriberFinalizer is executed once all the received SQS messages are done being processed. +// By default, no finalizer is registered. +func SubscriberFinalizer(f ...SubscriberFinalizerFunc) SubscriberOption { + return func(s *Subscriber) { s.finalizer = f } +} + +// SubscriberSetContextTimeout returns a SubscriberOption that sets the context timeout. +func SubscriberSetContextTimeout(timeout time.Duration) SubscriberOption { + return func(s *Subscriber) { + before := func(ctx context.Context, cancel context.CancelFunc, msg types.Message) context.Context { + newCtx, newCancel := context.WithTimeout(ctx, timeout) + defer newCancel() + + return newCtx + } + s.before = append(s.before, before) + } + +} + +// SubscriberDeleteMessageBefore returns a SubscriberOption that appends a function +// that delete the message from queue to the list of subscriber's before functions. +func SubscriberDeleteMessageBefore() SubscriberOption { + return func(s *Subscriber) { + deleteBefore := func(ctx context.Context, cancel context.CancelFunc, msg types.Message) context.Context { + if err := deleteMessage(ctx, s.sqsClient, s.queueURL, msg); err != nil { + s.errorHandler.Handle(ctx, err) + s.errorEncoder(ctx, err, msg, s.sqsClient) + cancel() + } + return ctx + } + s.before = append(s.before, deleteBefore) + } +} + +// SubscriberDeleteMessageAfter returns a SubscriberOption that appends a function +// that delete a message from queue to the list of subscriber's after functions. +func SubscriberDeleteMessageAfter() SubscriberOption { + return func(s *Subscriber) { + deleteAfter := func( + ctx context.Context, cancel context.CancelFunc, msg types.Message, _ interface{}) context.Context { + if err := deleteMessage(ctx, s.sqsClient, s.queueURL, msg); err != nil { + s.errorHandler.Handle(ctx, err) + s.errorEncoder(ctx, err, msg, s.sqsClient) + cancel() + } + return ctx + } + s.after = append(s.after, deleteAfter) + } +} + +// ServeMessage serves an SQS message. +func (s Subscriber) ServeMessage(ctx context.Context) func(msg types.Message) error { + return func(msg types.Message) error { + newCtx, cancel := context.WithCancel(ctx) + defer cancel() + + if len(s.finalizer) > 0 { + defer func() { + for _, f := range s.finalizer { + f(newCtx, msg) + } + }() + } + + for _, f := range s.before { + newCtx = f(newCtx, cancel, msg) + } + + req, err := s.dec(newCtx, msg) + if err != nil { + s.errorHandler.Handle(newCtx, err) + s.errorEncoder(newCtx, err, msg, s.sqsClient) + return err + } + + response, err := s.e(newCtx, req) + if err != nil { + s.errorHandler.Handle(newCtx, err) + s.errorEncoder(newCtx, err, msg, s.sqsClient) + return err + } + + for _, f := range s.after { + newCtx = f(newCtx, cancel, msg, response) + } + + return nil + } +} + +// ErrorEncoder is responsible for encoding an error to the subscriber's reply. +// Users are encouraged to use custom ErrorEncoders to encode errors to +// their replies, and will likely want to pass and check for their own error +// types. +type ErrorEncoder func(ctx context.Context, err error, req types.Message, sqsClient SQSClient) + +// SubscriberFinalizerFunc can be used to perform work at the end of a request +// from a producer, after the response has been written to the producer. The +// principal intended use is for request logging. +// Can also be used to delete messages once fully proccessed. +type SubscriberFinalizerFunc func(ctx context.Context, msg types.Message) + +// DefaultErrorEncoder simply ignores the message. +func DefaultErrorEncoder(context.Context, error, types.Message, SQSClient) { +} + +// SubscriberNackMessageErrorEncoder can be used to perform an immediate nack on the message. +func SubscriberNackMessageErrorEncoder() SubscriberOption { + return func(s *Subscriber) { + nackErrorHandler := func(ctx context.Context, err error, msg types.Message, sqsClient SQSClient) { + _, sqsErr := sqsClient.ChangeMessageVisibility(ctx, &sqs.ChangeMessageVisibilityInput{ + QueueUrl: &s.queueURL, + ReceiptHandle: msg.ReceiptHandle, + VisibilityTimeout: 1, + }) + if sqsErr != nil { + s.errorHandler.Handle(ctx, sqsErr) + } + } + s.errorEncoder = nackErrorHandler + } +} + +func deleteMessage(ctx context.Context, sqsClient SQSClient, queueURL string, msg types.Message) error { + _, err := sqsClient.DeleteMessage(ctx, &sqs.DeleteMessageInput{ + QueueUrl: &queueURL, + ReceiptHandle: msg.ReceiptHandle, + }) + return err +} + +// EncodeJSONResponse marshals response as json and loads it into an sqs.SendMessageInput MessageBody. +func EncodeJSONResponse(_ context.Context, input *sqs.SendMessageInput, response interface{}) error { + payload, err := json.Marshal(response) + if err != nil { + return err + } + input.MessageBody = aws.String(string(payload)) + return nil +} diff --git a/pkg/transport/sqs/subscriber_test.go b/pkg/transport/sqs/subscriber_test.go new file mode 100644 index 00000000..9f914c04 --- /dev/null +++ b/pkg/transport/sqs/subscriber_test.go @@ -0,0 +1,429 @@ +package sqs + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" +) + +const ( + testErrMessage = "err!" +) + +var ( + errTypeAssertion = errors.New("type assertion error") +) + +func (mock *mockClient) ReceiveMessage( + ctx context.Context, input *sqs.ReceiveMessageInput, +) (*sqs.ReceiveMessageOutput, error) { + // Add logic to allow context errors. + for { + select { + case d := <-mock.receiveOutputChan: + return d, mock.err + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +func (mock *mockClient) DeleteMessage( + ctx context.Context, input *sqs.DeleteMessageInput, + optFns ...func(options *sqs.Options)) (*sqs.DeleteMessageOutput, error) { + return nil, mock.deleteError +} + +// TestSubscriberDeleteBefore checks if deleteMessage is set properly using subscriber options. +func TestSubscriberDeleteBefore(t *testing.T) { + mock := &mockClient{ + sendOutputChan: make(chan types.Message), + receiveOutputChan: make(chan *sqs.ReceiveMessageOutput), + deleteError: fmt.Errorf("delete err!"), + } + errEncoder := SubscriberErrorEncoder(func( + ctx context.Context, err error, req types.Message, sqsClient SQSClient) { + publishError := sqsError{ + Err: err.Error(), + MsgID: *req.MessageId, + } + payload, err := json.Marshal(publishError) + if err != nil { + t.Fatal(err) + } + + publisher := sqsClient.(*mockClient) + _, err = publisher.Publish(ctx, &sqs.SendMessageInput{ + MessageBody: aws.String(string(payload)), + }) + if err != nil { + t.Fatal(err) + } + }) + subscriber := NewSubscriber(mock, + func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, + func(context.Context, types.Message) (interface{}, error) { return nil, nil }, + func(context.Context, *sqs.SendMessageInput, interface{}) error { return nil }, + queueURL, + errEncoder, + SubscriberDeleteMessageBefore(), + ) + + err := subscriber.ServeMessage(context.Background())(types.Message{ + Body: aws.String("MessageBody"), + MessageId: aws.String("fakeMsgID"), + }) + if err != nil { + t.Fatal(err) + } + + var receiveOutput *sqs.ReceiveMessageOutput + select { + case receiveOutput = <-mock.receiveOutputChan: + break + + case <-time.After(200 * time.Millisecond): + t.Fatal("Timed out waiting for publishing") + } + res, err := decodeSubscriberError(receiveOutput) + if err != nil { + t.Fatal(err) + } + if want, have := "delete err!", res.Err; want != have { + t.Errorf("want %s, have %s", want, have) + } +} + +// TestSubscriberBadDecode checks if decoder errors are handled properly. +func TestSubscriberBadDecode(t *testing.T) { + mock := &mockClient{ + sendOutputChan: make(chan types.Message), + receiveOutputChan: make(chan *sqs.ReceiveMessageOutput), + } + errEncoder := SubscriberErrorEncoder(func( + ctx context.Context, err error, req types.Message, sqsClient SQSClient) { + publishError := sqsError{ + Err: err.Error(), + MsgID: *req.MessageId, + } + payload, err := json.Marshal(publishError) + if err != nil { + t.Fatal(err) + } + + publisher := sqsClient.(*mockClient) + _, err = publisher.Publish(ctx, &sqs.SendMessageInput{ + MessageBody: aws.String(string(payload)), + }) + if err != nil { + t.Fatal(err) + } + }) + subscriber := NewSubscriber(mock, + func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, + func(context.Context, types.Message) (interface{}, error) { return nil, errors.New(testErrMessage) }, + func(context.Context, *sqs.SendMessageInput, interface{}) error { return nil }, + queueURL, + errEncoder, + ) + + err := subscriber.ServeMessage(context.Background())(types.Message{ + Body: aws.String("MessageBody"), + MessageId: aws.String("fakeMsgID"), + }) + if err == nil { + t.Errorf("expected error") + } + + var receiveOutput *sqs.ReceiveMessageOutput + select { + case receiveOutput = <-mock.receiveOutputChan: + break + + case <-time.After(200 * time.Millisecond): + t.Fatal("Timed out waiting for publishing") + } + res, err := decodeSubscriberError(receiveOutput) + if err != nil { + t.Fatal(err) + } + if want, have := testErrMessage, res.Err; want != have { + t.Errorf("want %s, have %s", want, have) + } +} + +// TestSubscriberBadEndpoint checks if endpoint errors are handled properly. +func TestSubscriberBadEndpoint(t *testing.T) { + mock := &mockClient{ + sendOutputChan: make(chan types.Message), + receiveOutputChan: make(chan *sqs.ReceiveMessageOutput), + } + errEncoder := SubscriberErrorEncoder(func( + ctx context.Context, err error, req types.Message, sqsClient SQSClient) { + publishError := sqsError{ + Err: err.Error(), + MsgID: *req.MessageId, + } + payload, err := json.Marshal(publishError) + if err != nil { + t.Fatal(err) + } + + publisher := sqsClient.(*mockClient) + _, err = publisher.Publish(ctx, &sqs.SendMessageInput{ + MessageBody: aws.String(string(payload)), + }) + if err != nil { + t.Fatal(err) + } + }) + subscriber := NewSubscriber(mock, + func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errors.New(testErrMessage) }, + func(context.Context, types.Message) (interface{}, error) { return nil, nil }, + func(context.Context, *sqs.SendMessageInput, interface{}) error { return nil }, + queueURL, + errEncoder, + ) + + err := subscriber.ServeMessage(context.Background())(types.Message{ + Body: aws.String("MessageBody"), + MessageId: aws.String("fakeMsgID"), + }) + if err == nil { + t.Errorf("expected error") + } + + var receiveOutput *sqs.ReceiveMessageOutput + select { + case receiveOutput = <-mock.receiveOutputChan: + break + + case <-time.After(200 * time.Millisecond): + t.Fatal("Timed out waiting for publishing") + } + res, err := decodeSubscriberError(receiveOutput) + if err != nil { + t.Fatal(err) + } + if want, have := testErrMessage, res.Err; want != have { + t.Errorf("want %s, have %s", want, have) + } +} + +// TestSubscriberSuccess checks if subscriber responds correctly to message. +func TestSubscriberSuccess(t *testing.T) { + obj := testReq{ + Squadron: 436, + } + b, err := json.Marshal(obj) + if err != nil { + t.Fatal(err) + } + mock := &mockClient{ + sendOutputChan: make(chan types.Message), + receiveOutputChan: make(chan *sqs.ReceiveMessageOutput), + } + subscriber := NewSubscriber(mock, + testEndpoint, + testReqDecoderfunc, + EncodeJSONResponse, + queueURL, + SubscriberAfter(func( + ctx context.Context, cancel context.CancelFunc, msg types.Message, resp interface{}) context.Context { + _, err = mock.Publish(context.Background(), &sqs.SendMessageInput{ + MessageBody: msg.Body, + }) + if err != nil { + t.Fatal(err) + } + + return ctx + }), + ) + + err = subscriber.ServeMessage(context.Background())(types.Message{ + Body: aws.String(string(b)), + MessageId: aws.String("fakeMsgID"), + }) + if err != nil { + t.Fatal(err) + } + + var receiveOutput *sqs.ReceiveMessageOutput + select { + case receiveOutput = <-mock.receiveOutputChan: + break + + case <-time.After(200 * time.Millisecond): + t.Fatal("Timed out waiting for publishing") + } + res, err := decodeResponse(receiveOutput) + if err != nil { + t.Fatal(err) + } + want := testRes{ + Squadron: 436, + } + if have := res; want != have { + t.Errorf("want %v, have %v", want, have) + } +} + +// TestSubscriberSuccessNoReply checks if subscriber processes correctly message +// without sending response. +func TestSubscriberSuccessNoReply(t *testing.T) { + obj := testReq{ + Squadron: 436, + } + b, err := json.Marshal(obj) + if err != nil { + t.Fatal(err) + } + mock := &mockClient{ + sendOutputChan: make(chan types.Message), + receiveOutputChan: make(chan *sqs.ReceiveMessageOutput), + } + subscriber := NewSubscriber(mock, + testEndpoint, + testReqDecoderfunc, + EncodeJSONResponse, + queueURL, + ) + + err = subscriber.ServeMessage(context.Background())(types.Message{ + Body: aws.String(string(b)), + MessageId: aws.String("fakeMsgID"), + }) + if err != nil { + t.Fatal(err) + } + + var receiveOutput *sqs.ReceiveMessageOutput + select { + case receiveOutput = <-mock.receiveOutputChan: + t.Errorf("received output when none was expected, have %v", receiveOutput) + return + + case <-time.After(200 * time.Millisecond): + // As expected, we did not receive any response from subscriber. + return + } +} + +// TestSubscriberAfter checks if subscriber after is called as expected. +// Here after is used to transfer some info from received message in response. +func TestSubscriberAfter(t *testing.T) { + obj1 := testReq{ + Squadron: 436, + } + b1, err := json.Marshal(obj1) + if err != nil { + t.Fatal(err) + } + mock := &mockClient{ + sendOutputChan: make(chan types.Message), + receiveOutputChan: make(chan *sqs.ReceiveMessageOutput), + } + correlationID := "test" + msg := types.Message{ + Body: aws.String(string(b1)), + MessageId: aws.String("fakeMsgID1"), + MessageAttributes: map[string]types.MessageAttributeValue{ + "correlationID": { + DataType: aws.String("String"), + StringValue: &correlationID, + }, + }, + } + subscriber := NewSubscriber(mock, + testEndpoint, + testReqDecoderfunc, + EncodeJSONResponse, + queueURL, + SubscriberAfter(func( + ctx context.Context, cancel context.CancelFunc, msg types.Message, resp interface{}) context.Context { + _, pubErr := mock.Publish(ctx, &sqs.SendMessageInput{ + MessageBody: msg.Body, + MessageAttributes: msg.MessageAttributes, + }) + if pubErr != nil { + t.Fatal(pubErr) + } + + return ctx + }), + ) + ctx := context.Background() + err = subscriber.ServeMessage(ctx)(msg) + if err != nil { + t.Fatal(err) + } + + var receiveOutput *sqs.ReceiveMessageOutput + select { + case receiveOutput = <-mock.receiveOutputChan: + break + + case <-time.After(200 * time.Millisecond): + t.Fatal("Timed out waiting for publishing") + } + if len(receiveOutput.Messages) != 1 { + t.Errorf("received %d messages instead of 1", len(receiveOutput.Messages)) + } + if correlationIDAttribute, exists := receiveOutput.Messages[0].MessageAttributes["correlationID"]; exists { + if have := correlationIDAttribute.StringValue; *have != correlationID { + t.Errorf("have %s, want %s", *have, correlationID) + } + } else { + t.Errorf("expected message attribute with key correlationID in response, but it was not found") + } +} + +type sqsError struct { + Err string `json:"err"` + MsgID string `json:"msgID"` +} + +func decodeSubscriberError(receiveOutput *sqs.ReceiveMessageOutput) (sqsError, error) { + receivedError := sqsError{} + err := json.Unmarshal([]byte(*receiveOutput.Messages[0].Body), &receivedError) + return receivedError, err +} + +func testEndpoint(ctx context.Context, request interface{}) (interface{}, error) { + req, ok := request.(testReq) + if !ok { + return nil, errTypeAssertion + } + name, prs := names[req.Squadron] + if !prs { + return nil, errors.New("unknown squadron name") + } + res := testRes{ + Squadron: req.Squadron, + Name: name, + } + return res, nil +} + +func testReqDecoderfunc(_ context.Context, msg types.Message) (interface{}, error) { + var obj testReq + err := json.Unmarshal([]byte(*msg.Body), &obj) + return obj, err +} + +func decodeResponse(receiveOutput *sqs.ReceiveMessageOutput) (interface{}, error) { + if len(receiveOutput.Messages) != 1 { + return nil, fmt.Errorf("Error : received %d messages instead of 1", len(receiveOutput.Messages)) + } + resp := testRes{} + err := json.Unmarshal([]byte(*receiveOutput.Messages[0].Body), &resp) + return resp, err +}