diff --git a/go.mod b/go.mod index 71630ea036..c85922a0c7 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/influxdata/tdigest v0.0.1 // indirect github.com/kelseyhightower/envconfig v1.4.0 github.com/pkg/errors v0.9.1 - github.com/rabbitmq/amqp091-go v1.3.0 + github.com/rabbitmq/amqp091-go v1.8.1 github.com/rabbitmq/cluster-operator v1.13.1 github.com/rabbitmq/messaging-topology-operator v1.6.0 go.opencensus.io v0.23.0 diff --git a/go.sum b/go.sum index 1b7309f5ac..ca8458b7e1 100644 --- a/go.sum +++ b/go.sum @@ -436,8 +436,8 @@ github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPH github.com/prometheus/statsd_exporter v0.21.0 h1:hA05Q5RFeIjgwKIYEdFd59xu5Wwaznf33yKI+pyX6T8= github.com/prometheus/statsd_exporter v0.21.0/go.mod h1:rbT83sZq2V+p73lHhPZfMc3MLCHmSHelCh9hSGYNLTQ= github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= -github.com/rabbitmq/amqp091-go v1.3.0 h1:A/QuHiNw7LMCJsxx9iZn5lrIz6OrhIn7Dfk5/1YatWM= -github.com/rabbitmq/amqp091-go v1.3.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= +github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA= +github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/rabbitmq/cluster-operator v1.13.1 h1:OvxS440P5Q5YdX2U8cDz9OI0qhuSuNcC18p8PQj1JDI= github.com/rabbitmq/cluster-operator v1.13.1/go.mod h1:fauJ0q72yNheyW30qraaCPr6shEcH5i1xBoGvTiVyws= github.com/rabbitmq/messaging-topology-operator v1.6.0 h1:DlkHTM4IzqFOUBxlFRS+nrTAV29+IpC/iXgb/DVA6s8= @@ -525,7 +525,8 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.4.0 h1:CpDZl6aOlLhReez+8S3eEotD7Jx0Os++lemPlMULQP0= go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= diff --git a/vendor/github.com/rabbitmq/amqp091-go/.golangci.yml b/vendor/github.com/rabbitmq/amqp091-go/.golangci.yml new file mode 100644 index 0000000000..4341bcf984 --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/.golangci.yml @@ -0,0 +1,3 @@ +run: + build-tags: + - integration diff --git a/vendor/github.com/rabbitmq/amqp091-go/.travis.yml b/vendor/github.com/rabbitmq/amqp091-go/.travis.yml deleted file mode 100644 index 720da80ce4..0000000000 --- a/vendor/github.com/rabbitmq/amqp091-go/.travis.yml +++ /dev/null @@ -1,23 +0,0 @@ -language: go - -go: - - 1.17.x - - 1.16.x - -addons: - apt: - packages: - - rabbitmq-server - -services: - - rabbitmq - -env: - - GO111MODULE=on AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ - -before_install: - - go get -v golang.org/x/lint/golint - -script: - - ./pre-commit - - go test -cpu=1,2 -v -tags integration ./... diff --git a/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md b/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md new file mode 100644 index 0000000000..02523c2522 --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md @@ -0,0 +1,283 @@ +# Changelog + +## [v1.8.0](https://github.com/rabbitmq/amqp091-go/tree/v1.8.0) (2023-03-21) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.7.0...v1.8.0) + +**Closed issues:** + +- memory leak [\#179](https://github.com/rabbitmq/amqp091-go/issues/179) +- the publishWithContext interface will not return when it times out [\#178](https://github.com/rabbitmq/amqp091-go/issues/178) + +**Merged pull requests:** + +- Fix race condition on confirms [\#183](https://github.com/rabbitmq/amqp091-go/pull/183) ([calloway-jacob](https://github.com/calloway-jacob)) +- Add a CloseDeadline function to Connection [\#181](https://github.com/rabbitmq/amqp091-go/pull/181) ([Zerpet](https://github.com/Zerpet)) +- Fix memory leaks [\#180](https://github.com/rabbitmq/amqp091-go/pull/180) ([GXKe](https://github.com/GXKe)) +- Bump go.uber.org/goleak from 1.2.0 to 1.2.1 [\#177](https://github.com/rabbitmq/amqp091-go/pull/177) ([dependabot[bot]](https://github.com/apps/dependabot)) + +## [v1.7.0](https://github.com/rabbitmq/amqp091-go/tree/v1.7.0) (2023-02-09) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1...v1.7.0) + +**Closed issues:** + +- \#31 resurfacing \(?\) [\#170](https://github.com/rabbitmq/amqp091-go/issues/170) +- Deprecate QueueInspect [\#167](https://github.com/rabbitmq/amqp091-go/issues/167) +- v1.6.0 causing rabbit connection errors [\#160](https://github.com/rabbitmq/amqp091-go/issues/160) + +**Merged pull requests:** + +- Set channels and allocator to nil in shutdown [\#172](https://github.com/rabbitmq/amqp091-go/pull/172) ([lukebakken](https://github.com/lukebakken)) +- Fix racing in Open [\#171](https://github.com/rabbitmq/amqp091-go/pull/171) ([Zerpet](https://github.com/Zerpet)) +- adding go 1.20 to tests [\#169](https://github.com/rabbitmq/amqp091-go/pull/169) ([halilylm](https://github.com/halilylm)) +- Deprecate the QueueInspect function [\#168](https://github.com/rabbitmq/amqp091-go/pull/168) ([lukebakken](https://github.com/lukebakken)) +- Check if channel is nil before updating it [\#150](https://github.com/rabbitmq/amqp091-go/pull/150) ([julienschmidt](https://github.com/julienschmidt)) + +## [v1.6.1](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1) (2023-02-01) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1-rc.2...v1.6.1) + +**Merged pull requests:** + +- Update Makefile targets related to RabbitMQ [\#163](https://github.com/rabbitmq/amqp091-go/pull/163) ([Zerpet](https://github.com/Zerpet)) + +## [v1.6.1-rc.2](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1-rc.2) (2023-01-31) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1-rc.1...v1.6.1-rc.2) + +**Merged pull requests:** + +- Do not overly protect writes [\#162](https://github.com/rabbitmq/amqp091-go/pull/162) ([lukebakken](https://github.com/lukebakken)) + +## [v1.6.1-rc.1](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1-rc.1) (2023-01-31) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.0...v1.6.1-rc.1) + +**Closed issues:** + +- Calling Channel\(\) on an empty connection panics [\#148](https://github.com/rabbitmq/amqp091-go/issues/148) + +**Merged pull requests:** + +- Ensure flush happens and correctly lock connection for a series of unflushed writes [\#161](https://github.com/rabbitmq/amqp091-go/pull/161) ([lukebakken](https://github.com/lukebakken)) + +## [v1.6.0](https://github.com/rabbitmq/amqp091-go/tree/v1.6.0) (2023-01-20) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.5.0...v1.6.0) + +**Implemented enhancements:** + +- Add constants for Queue arguments [\#145](https://github.com/rabbitmq/amqp091-go/pull/145) ([Zerpet](https://github.com/Zerpet)) + +**Closed issues:** + +- README not up to date [\#154](https://github.com/rabbitmq/amqp091-go/issues/154) +- Allow re-using default connection config \(custom properties\) [\#152](https://github.com/rabbitmq/amqp091-go/issues/152) +- Rename package name to amqp in V2 [\#151](https://github.com/rabbitmq/amqp091-go/issues/151) +- Helper types to declare quorum queues [\#144](https://github.com/rabbitmq/amqp091-go/issues/144) +- Inefficient use of buffers reduces potential throughput for basicPublish with small messages. [\#141](https://github.com/rabbitmq/amqp091-go/issues/141) +- bug, close cause panic [\#130](https://github.com/rabbitmq/amqp091-go/issues/130) +- Publishing Headers are unable to store Table with slice values [\#125](https://github.com/rabbitmq/amqp091-go/issues/125) +- Example client can deadlock in Close due to unconsumed confirmations [\#122](https://github.com/rabbitmq/amqp091-go/issues/122) +- SAC not working properly [\#106](https://github.com/rabbitmq/amqp091-go/issues/106) + +**Merged pull requests:** + +- Add automatic CHANGELOG.md generation [\#158](https://github.com/rabbitmq/amqp091-go/pull/158) ([lukebakken](https://github.com/lukebakken)) +- Supply library-defined props with NewConnectionProperties [\#157](https://github.com/rabbitmq/amqp091-go/pull/157) ([slagiewka](https://github.com/slagiewka)) +- Fix linter warnings [\#156](https://github.com/rabbitmq/amqp091-go/pull/156) ([Zerpet](https://github.com/Zerpet)) +- Remove outdated information from README [\#155](https://github.com/rabbitmq/amqp091-go/pull/155) ([scriptcoded](https://github.com/scriptcoded)) +- Add example producer using DeferredConfirm [\#149](https://github.com/rabbitmq/amqp091-go/pull/149) ([Zerpet](https://github.com/Zerpet)) +- Ensure code is formatted [\#147](https://github.com/rabbitmq/amqp091-go/pull/147) ([lukebakken](https://github.com/lukebakken)) +- Fix inefficient use of buffers that reduces the potential throughput of basicPublish [\#142](https://github.com/rabbitmq/amqp091-go/pull/142) ([fadams](https://github.com/fadams)) +- Do not embed context in DeferredConfirmation [\#140](https://github.com/rabbitmq/amqp091-go/pull/140) ([tie](https://github.com/tie)) +- Add constant for default exchange [\#139](https://github.com/rabbitmq/amqp091-go/pull/139) ([marlongerson](https://github.com/marlongerson)) +- Fix indentation and remove unnecessary instructions [\#138](https://github.com/rabbitmq/amqp091-go/pull/138) ([alraujo](https://github.com/alraujo)) +- Remove unnecessary instruction [\#135](https://github.com/rabbitmq/amqp091-go/pull/135) ([alraujo](https://github.com/alraujo)) +- Fix example client to avoid deadlock in Close [\#123](https://github.com/rabbitmq/amqp091-go/pull/123) ([Zerpet](https://github.com/Zerpet)) +- Bump go.uber.org/goleak from 1.1.12 to 1.2.0 [\#116](https://github.com/rabbitmq/amqp091-go/pull/116) ([dependabot[bot]](https://github.com/apps/dependabot)) + +## [v1.5.0](https://github.com/rabbitmq/amqp091-go/tree/v1.5.0) (2022-09-07) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.4.0...v1.5.0) + +**Implemented enhancements:** + +- Provide a friendly way to set connection name [\#105](https://github.com/rabbitmq/amqp091-go/issues/105) + +**Closed issues:** + +- Support connection.update-secret [\#107](https://github.com/rabbitmq/amqp091-go/issues/107) +- Example Client: Implementation of a Consumer with reconnection support [\#40](https://github.com/rabbitmq/amqp091-go/issues/40) + +**Merged pull requests:** + +- use PublishWithContext instead of Publish [\#115](https://github.com/rabbitmq/amqp091-go/pull/115) ([Gsantomaggio](https://github.com/Gsantomaggio)) +- Add support for connection.update-secret [\#114](https://github.com/rabbitmq/amqp091-go/pull/114) ([Zerpet](https://github.com/Zerpet)) +- Remove warning on RabbitMQ tutorials in go [\#113](https://github.com/rabbitmq/amqp091-go/pull/113) ([ChunyiLyu](https://github.com/ChunyiLyu)) +- Update AMQP Spec [\#110](https://github.com/rabbitmq/amqp091-go/pull/110) ([Zerpet](https://github.com/Zerpet)) +- Add an example of reliable consumer [\#109](https://github.com/rabbitmq/amqp091-go/pull/109) ([Zerpet](https://github.com/Zerpet)) +- Add convenience function to set connection name [\#108](https://github.com/rabbitmq/amqp091-go/pull/108) ([Zerpet](https://github.com/Zerpet)) + +## [v1.4.0](https://github.com/rabbitmq/amqp091-go/tree/v1.4.0) (2022-07-19) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.3.4...v1.4.0) + +**Closed issues:** + +- target machine actively refused connection [\#99](https://github.com/rabbitmq/amqp091-go/issues/99) +- 504 channel/connection is not open error occurred in multiple connection with same rabbitmq service [\#97](https://github.com/rabbitmq/amqp091-go/issues/97) +- Add possible cancel of DeferredConfirmation [\#92](https://github.com/rabbitmq/amqp091-go/issues/92) +- Documentation [\#89](https://github.com/rabbitmq/amqp091-go/issues/89) +- Channel Close gets stuck after closing a connection \(via management UI\) [\#88](https://github.com/rabbitmq/amqp091-go/issues/88) +- this library has same issue [\#83](https://github.com/rabbitmq/amqp091-go/issues/83) +- Provide a logging interface [\#81](https://github.com/rabbitmq/amqp091-go/issues/81) +- 1.4.0 release checklist [\#77](https://github.com/rabbitmq/amqp091-go/issues/77) +- Data race in the client example [\#72](https://github.com/rabbitmq/amqp091-go/issues/72) +- reader go routine hangs and leaks when Connection.Close\(\) is called multiple times [\#69](https://github.com/rabbitmq/amqp091-go/issues/69) +- Support auto-reconnect and cluster [\#65](https://github.com/rabbitmq/amqp091-go/issues/65) +- Connection/Channel Deadlock [\#32](https://github.com/rabbitmq/amqp091-go/issues/32) +- Closing connection and/or channel hangs NotifyPublish is used [\#21](https://github.com/rabbitmq/amqp091-go/issues/21) +- Consumer channel isn't closed in the event of unexpected disconnection [\#18](https://github.com/rabbitmq/amqp091-go/issues/18) + +**Merged pull requests:** + +- fix race condition with context close and confirm at the same time on DeferredConfirmation. [\#101](https://github.com/rabbitmq/amqp091-go/pull/101) ([sapk](https://github.com/sapk)) +- Add build TLS config from URI [\#98](https://github.com/rabbitmq/amqp091-go/pull/98) ([reddec](https://github.com/reddec)) +- Use context for Publish methods [\#96](https://github.com/rabbitmq/amqp091-go/pull/96) ([sapk](https://github.com/sapk)) +- Added function to get the remote peer's IP address \(conn.RemoteAddr\(\)\) [\#95](https://github.com/rabbitmq/amqp091-go/pull/95) ([rabb1t](https://github.com/rabb1t)) +- Update connection documentation [\#90](https://github.com/rabbitmq/amqp091-go/pull/90) ([Zerpet](https://github.com/Zerpet)) +- Revert test to demonstrate actual bug [\#87](https://github.com/rabbitmq/amqp091-go/pull/87) ([lukebakken](https://github.com/lukebakken)) +- Minor improvements to examples [\#86](https://github.com/rabbitmq/amqp091-go/pull/86) ([lukebakken](https://github.com/lukebakken)) +- Do not skip flaky test in CI [\#85](https://github.com/rabbitmq/amqp091-go/pull/85) ([lukebakken](https://github.com/lukebakken)) +- Add logging [\#84](https://github.com/rabbitmq/amqp091-go/pull/84) ([lukebakken](https://github.com/lukebakken)) +- Add a win32 build [\#82](https://github.com/rabbitmq/amqp091-go/pull/82) ([lukebakken](https://github.com/lukebakken)) +- channel: return nothing instead of always a nil-error in receive methods [\#80](https://github.com/rabbitmq/amqp091-go/pull/80) ([fho](https://github.com/fho)) +- update the contributing & readme files, improve makefile [\#79](https://github.com/rabbitmq/amqp091-go/pull/79) ([fho](https://github.com/fho)) +- Fix lint errors [\#78](https://github.com/rabbitmq/amqp091-go/pull/78) ([lukebakken](https://github.com/lukebakken)) +- ci: run golangci-lint [\#76](https://github.com/rabbitmq/amqp091-go/pull/76) ([fho](https://github.com/fho)) +- ci: run test via make & remove travis CI config [\#75](https://github.com/rabbitmq/amqp091-go/pull/75) ([fho](https://github.com/fho)) +- ci: run tests with race detector [\#74](https://github.com/rabbitmq/amqp091-go/pull/74) ([fho](https://github.com/fho)) +- Detect go routine leaks in integration testcases [\#73](https://github.com/rabbitmq/amqp091-go/pull/73) ([fho](https://github.com/fho)) +- connection: fix: reader go-routine is leaked on connection close [\#70](https://github.com/rabbitmq/amqp091-go/pull/70) ([fho](https://github.com/fho)) +- adding best practises for NotifyPublish for issue\_21 scenario [\#68](https://github.com/rabbitmq/amqp091-go/pull/68) ([DanielePalaia](https://github.com/DanielePalaia)) +- Update Go version [\#67](https://github.com/rabbitmq/amqp091-go/pull/67) ([Zerpet](https://github.com/Zerpet)) +- Regenerate certs with SHA256 to fix test with Go 1.18+ [\#66](https://github.com/rabbitmq/amqp091-go/pull/66) ([anthonyfok](https://github.com/anthonyfok)) + +## [v1.3.4](https://github.com/rabbitmq/amqp091-go/tree/v1.3.4) (2022-04-01) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.3.3...v1.3.4) + +**Merged pull requests:** + +- bump version to 1.3.4 [\#63](https://github.com/rabbitmq/amqp091-go/pull/63) ([DanielePalaia](https://github.com/DanielePalaia)) +- updating doc [\#62](https://github.com/rabbitmq/amqp091-go/pull/62) ([DanielePalaia](https://github.com/DanielePalaia)) + +## [v1.3.3](https://github.com/rabbitmq/amqp091-go/tree/v1.3.3) (2022-04-01) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.3.2...v1.3.3) + +**Closed issues:** + +- Add Client Version [\#49](https://github.com/rabbitmq/amqp091-go/issues/49) +- OpenTelemetry Propagation [\#22](https://github.com/rabbitmq/amqp091-go/issues/22) + +**Merged pull requests:** + +- bump buildVersion for release [\#61](https://github.com/rabbitmq/amqp091-go/pull/61) ([DanielePalaia](https://github.com/DanielePalaia)) +- adding documentation for notifyClose best pratices [\#60](https://github.com/rabbitmq/amqp091-go/pull/60) ([DanielePalaia](https://github.com/DanielePalaia)) +- adding documentation on NotifyClose of connection and channel to enfo… [\#59](https://github.com/rabbitmq/amqp091-go/pull/59) ([DanielePalaia](https://github.com/DanielePalaia)) + +## [v1.3.2](https://github.com/rabbitmq/amqp091-go/tree/v1.3.2) (2022-03-28) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.3.1...v1.3.2) + +**Closed issues:** + +- Potential race condition in Connection module [\#31](https://github.com/rabbitmq/amqp091-go/issues/31) + +**Merged pull requests:** + +- bump versioning to 1.3.2 [\#58](https://github.com/rabbitmq/amqp091-go/pull/58) ([DanielePalaia](https://github.com/DanielePalaia)) + +## [v1.3.1](https://github.com/rabbitmq/amqp091-go/tree/v1.3.1) (2022-03-25) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.3.0...v1.3.1) + +**Closed issues:** + +- Possible deadlock on DeferredConfirmation.Wait\(\) [\#46](https://github.com/rabbitmq/amqp091-go/issues/46) +- Call to Delivery.Ack blocks indefinitely in case of disconnection [\#19](https://github.com/rabbitmq/amqp091-go/issues/19) +- Unexpacted behavor of channel.IsClosed\(\) [\#14](https://github.com/rabbitmq/amqp091-go/issues/14) +- A possible dead lock in connection close notification Go channel [\#11](https://github.com/rabbitmq/amqp091-go/issues/11) + +**Merged pull requests:** + +- These ones were the ones testing Open scenarios. The issue is that Op… [\#57](https://github.com/rabbitmq/amqp091-go/pull/57) ([DanielePalaia](https://github.com/DanielePalaia)) +- changing defaultVersion to buildVersion and create a simple change\_ve… [\#54](https://github.com/rabbitmq/amqp091-go/pull/54) ([DanielePalaia](https://github.com/DanielePalaia)) +- adding integration test for issue 11 [\#50](https://github.com/rabbitmq/amqp091-go/pull/50) ([DanielePalaia](https://github.com/DanielePalaia)) +- Remove the old link product [\#48](https://github.com/rabbitmq/amqp091-go/pull/48) ([Gsantomaggio](https://github.com/Gsantomaggio)) +- Fix deadlock on DeferredConfirmations [\#47](https://github.com/rabbitmq/amqp091-go/pull/47) ([SpencerTorres](https://github.com/SpencerTorres)) +- Example client: Rename Stream\(\) to Consume\(\) to avoid confusion with RabbitMQ streams [\#39](https://github.com/rabbitmq/amqp091-go/pull/39) ([andygrunwald](https://github.com/andygrunwald)) +- Example client: Rename `name` to `queueName` to make the usage clear and explicit [\#38](https://github.com/rabbitmq/amqp091-go/pull/38) ([andygrunwald](https://github.com/andygrunwald)) +- Client example: Renamed concept "Session" to "Client" [\#37](https://github.com/rabbitmq/amqp091-go/pull/37) ([andygrunwald](https://github.com/andygrunwald)) +- delete unuseful code [\#36](https://github.com/rabbitmq/amqp091-go/pull/36) ([liutaot](https://github.com/liutaot)) +- Client Example: Fix closing order [\#35](https://github.com/rabbitmq/amqp091-go/pull/35) ([andygrunwald](https://github.com/andygrunwald)) +- Client example: Use instance logger instead of global logger [\#34](https://github.com/rabbitmq/amqp091-go/pull/34) ([andygrunwald](https://github.com/andygrunwald)) + +## [v1.3.0](https://github.com/rabbitmq/amqp091-go/tree/v1.3.0) (2022-01-13) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.2.0...v1.3.0) + +**Closed issues:** + +- documentation of changes triggering version updates [\#29](https://github.com/rabbitmq/amqp091-go/issues/29) +- Persistent messages folder [\#27](https://github.com/rabbitmq/amqp091-go/issues/27) + +**Merged pull requests:** + +- Expose a method to enable out-of-order Publisher Confirms [\#33](https://github.com/rabbitmq/amqp091-go/pull/33) ([benmoss](https://github.com/benmoss)) +- Fix Signed 8-bit headers being treated as unsigned [\#26](https://github.com/rabbitmq/amqp091-go/pull/26) ([alex-goodisman](https://github.com/alex-goodisman)) + +## [v1.2.0](https://github.com/rabbitmq/amqp091-go/tree/v1.2.0) (2021-11-17) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.1.0...v1.2.0) + +**Closed issues:** + +- No access to this vhost [\#24](https://github.com/rabbitmq/amqp091-go/issues/24) +- copyright issue? [\#12](https://github.com/rabbitmq/amqp091-go/issues/12) +- A possible dead lock when publishing message with confirmation [\#10](https://github.com/rabbitmq/amqp091-go/issues/10) +- Semver release [\#7](https://github.com/rabbitmq/amqp091-go/issues/7) + +**Merged pull requests:** + +- Fix deadlock between publishing and receiving confirms [\#25](https://github.com/rabbitmq/amqp091-go/pull/25) ([benmoss](https://github.com/benmoss)) +- Add GetNextPublishSeqNo for channel in confirm mode [\#23](https://github.com/rabbitmq/amqp091-go/pull/23) ([kamal-github](https://github.com/kamal-github)) +- Added support for cert-only login without user and password [\#20](https://github.com/rabbitmq/amqp091-go/pull/20) ([mihaitodor](https://github.com/mihaitodor)) + +## [v1.1.0](https://github.com/rabbitmq/amqp091-go/tree/v1.1.0) (2021-09-21) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/ebd83429aa8cb06fa569473f623e87675f96d3a9...v1.1.0) + +**Closed issues:** + +- AMQPLAIN authentication does not work [\#15](https://github.com/rabbitmq/amqp091-go/issues/15) + +**Merged pull requests:** + +- Fix AMQPLAIN authentication mechanism [\#16](https://github.com/rabbitmq/amqp091-go/pull/16) ([hodbn](https://github.com/hodbn)) +- connection: clarify documented behavior of NotifyClose [\#13](https://github.com/rabbitmq/amqp091-go/pull/13) ([pabigot](https://github.com/pabigot)) +- Add a link to pkg.go.dev API docs [\#9](https://github.com/rabbitmq/amqp091-go/pull/9) ([benmoss](https://github.com/benmoss)) +- add test go version 1.16.x and 1.17.x [\#8](https://github.com/rabbitmq/amqp091-go/pull/8) ([k4n4ry](https://github.com/k4n4ry)) +- fix typos [\#6](https://github.com/rabbitmq/amqp091-go/pull/6) ([h44z](https://github.com/h44z)) +- Heartbeat interval should be timeout/2 [\#5](https://github.com/rabbitmq/amqp091-go/pull/5) ([ifo20](https://github.com/ifo20)) +- Exporting Channel State [\#4](https://github.com/rabbitmq/amqp091-go/pull/4) ([eibrunorodrigues](https://github.com/eibrunorodrigues)) +- Add codeql analysis [\#3](https://github.com/rabbitmq/amqp091-go/pull/3) ([MirahImage](https://github.com/MirahImage)) +- Add PR github action. [\#2](https://github.com/rabbitmq/amqp091-go/pull/2) ([MirahImage](https://github.com/MirahImage)) +- Update Copyright Statement [\#1](https://github.com/rabbitmq/amqp091-go/pull/1) ([rlewis24](https://github.com/rlewis24)) + + + +\* *This Changelog was automatically generated by [github_changelog_generator](https://github.com/github-changelog-generator/github-changelog-generator)* diff --git a/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md b/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md index 75a32b73d4..ec86fe54c4 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md +++ b/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md @@ -1,37 +1,62 @@ -## Prerequisites +# Contributing -1. Go: [https://golang.org/dl/](https://golang.org/dl/) -1. Golint `go get -u -v github.com/golang/lint/golint` +## Workflow -## Contributing - -The workflow is pretty standard: +Here is the recommended workflow: 1. Fork this repository, **github.com/rabbitmq/amqp091-go** -1. Add the pre-commit hook: `ln -s ../../pre-commit .git/hooks/pre-commit` 1. Create your feature branch (`git checkout -b my-new-feature`) +1. Run Static Checks 1. Run integration tests (see below) 1. **Implement tests** -1. Implement fixs -1. Commit your changes (`git commit -am 'Add some feature'`) +1. Implement fixes +1. Commit your changes. Use a [good, descriptive, commit message][good-commit]. 1. Push to a branch (`git push -u origin my-new-feature`) 1. Submit a pull request -## Running Tests +[good-commit]: https://cbea.ms/git-commit/ + +## Running Static Checks + +golangci-lint must be installed to run the static checks. See [installation +docs](https://golangci-lint.run/usage/install/) for more information. -The test suite assumes that: +The static checks can be run via: - * A RabbitMQ node is running on localhost with all defaults: [https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html) - * `AMQP_URL` is exported to `amqp://guest:guest@127.0.0.1:5672/` +```shell +make checks +``` + +## Running Tests ### Integration Tests -After starting a local RabbitMQ, run integration tests with the following: +Running the Integration tests require: -``` shell -env AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ go test -v -tags integration +* A running RabbitMQ node with all defaults: + [https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html) +* That the server is either reachable via `amqp://guest:guest@127.0.0.1:5672/` + or the environment variable `AMQP_URL` set to it's URL + (e.g.: `export AMQP_URL="amqp://guest:verysecretpasswd@rabbitmq-host:5772/`) + +The integration tests can be run via: + +```shell +make tests ``` +Some tests require access to `rabbitmqctl` CLI. Use the environment variable +`RABBITMQ_RABBITMQCTL_PATH=/some/path/to/rabbitmqctl` to run those tests. + +If you have Docker available in your machine, you can run: + +```shell +make tests-docker +``` + +This target will start a RabbitMQ container, run the test suite with the environment +variable setup, and stop RabbitMQ container after a successful run. + All integration tests should use the `integrationConnection(...)` test helpers defined in `integration_test.go` to setup the integration environment and logging. diff --git a/vendor/github.com/rabbitmq/amqp091-go/Makefile b/vendor/github.com/rabbitmq/amqp091-go/Makefile index b2ab895d3c..69e9e2be12 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/Makefile +++ b/vendor/github.com/rabbitmq/amqp091-go/Makefile @@ -1,17 +1,41 @@ .DEFAULT_GOAL := list # Insert a comment starting with '##' after a target, and it will be printed by 'make' and 'make list' +.PHONY: list list: ## list Makefile targets @echo "The most used targets: \n" @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' +.PHONY: check-fmt +check-fmt: ## Ensure code is formatted + gofmt -l -d . # For the sake of debugging + test -z "$$(gofmt -l .)" +.PHONY: fmt fmt: ## Run go fmt against code go fmt ./... +.PHONY: tests +tests: ## Run all tests and requires a running rabbitmq-server. Use GO_TEST_FLAGS to add extra flags to go test + go test -race -v -tags integration $(GO_TEST_FLAGS) -vet: ## Run go vet against code - go vet ./... +.PHONY: tests-docker +tests-docker: rabbitmq-server + RABBITMQ_RABBITMQCTL_PATH="DOCKER:$(CONTAINER_NAME)" go test -race -v -tags integration $(GO_TEST_FLAGS) + $(MAKE) stop-rabbitmq-server -tests: ## Run all tests and requires a running rabbitmq-server - env AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ go test -v -tags integration +.PHONY: check +check: + golangci-lint run ./... + +CONTAINER_NAME ?= amqp091-go-rabbitmq + +.PHONY: rabbitmq-server +rabbitmq-server: ## Start a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit + docker run --detach --rm --name $(CONTAINER_NAME) \ + --publish 5672:5672 --publish 15672:15672 \ + --pull always rabbitmq:3-management + +.PHONY: stop-rabbitmq-server +stop-rabbitmq-server: ## Stop a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit + docker stop $(CONTAINER_NAME) diff --git a/vendor/github.com/rabbitmq/amqp091-go/README.md b/vendor/github.com/rabbitmq/amqp091-go/README.md index daeb7aa846..6d3143f67a 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/README.md +++ b/vendor/github.com/rabbitmq/amqp091-go/README.md @@ -1,6 +1,8 @@ # Go RabbitMQ Client Library +[![amqp091-go](https://github.com/rabbitmq/amqp091-go/actions/workflows/tests.yml/badge.svg)](https://github.com/rabbitmq/amqp091-go/actions/workflows/tests.yml) [![Go Reference](https://pkg.go.dev/badge/github.com/rabbitmq/amqp091-go.svg)](https://pkg.go.dev/github.com/rabbitmq/amqp091-go) +[![Go Report Card](https://goreportcard.com/badge/github.com/rabbitmq/amqp091-go)](https://goreportcard.com/report/github.com/rabbitmq/amqp091-go) This is a Go AMQP 0.9.1 client maintained by the [RabbitMQ core team](https://github.com/rabbitmq). It was [originally developed by Sean Treadway](https://github.com/streadway/amqp). @@ -35,9 +37,6 @@ This client uses the same 2-clause BSD license as the original project. This project is based on a mature Go client that's been around for over a decade. -We expect this client to undergo moderate breaking public API changes in 2021. -Major and minor versions will be updated accordingly. - ## Supported Go Versions @@ -72,26 +71,26 @@ Things not intended to be supported. topology declaration so that reconnection is trivial and encapsulated in the caller's application code. * AMQP Protocol negotiation for forward or backward compatibility. - * 0.9.1 is stable and widely deployed. Versions 0.10 and 1.0 are divergent - specifications that change the semantics and wire format of the protocol. - We will accept patches for other protocol support but have no plans for - implementation ourselves. + * 0.9.1 is stable and widely deployed. AMQP 1.0 is a divergent + specification (a different protocol) and belongs to a different library. * Anything other than PLAIN and EXTERNAL authentication mechanisms. * Keeping the mechanisms interface modular makes it possible to extend outside of this package. If other mechanisms prove to be popular, then we would accept patches to include them in this package. + * Support for [`basic.return` and `basic.ack` frame ordering](https://www.rabbitmq.com/confirms.html#when-publishes-are-confirmed). + This client uses Go channels for certain protocol events and ordering between + events sent to two different channels generally cannot be guaranteed. ## Usage -See the 'examples' subdirectory for simple producers and consumers executables. +See the [_examples](_examples) subdirectory for simple producers and consumers executables. If you have a use-case in mind which isn't well-represented by the examples, please file an issue. ## Documentation * [Godoc API reference](http://godoc.org/github.com/rabbitmq/amqp091-go) - * [RabbitMQ tutorials in Go](https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go) currently use a different client. - They will be switched to use this client eventually + * [RabbitMQ tutorials in Go](https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go) ## Contributing @@ -99,12 +98,8 @@ Pull requests are very much welcomed. Create your pull request on a non-main branch, make sure a test or example is included that covers your change, and your commits represent coherent changes that include a reason for the change. -To run the integration tests, make sure you have RabbitMQ running on any host, -export the environment variable `AMQP_URL=amqp://host/` and run `go test -tags -integration`. TravisCI will also run the integration tests. +See [CONTRIBUTING.md](CONTRIBUTING.md) for more information. ## License -BSD 2 clause - see LICENSE for more details. - - +BSD 2 clause, see LICENSE for more details. diff --git a/vendor/github.com/rabbitmq/amqp091-go/RELEASE.md b/vendor/github.com/rabbitmq/amqp091-go/RELEASE.md new file mode 100644 index 0000000000..a1b1ae0c3c --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/RELEASE.md @@ -0,0 +1,5 @@ +## Changelog Generation + +``` +github_changelog_generator --token GITHUB-TOKEN -u rabbitmq -p amqp091-go --no-unreleased --release-branch main +``` diff --git a/vendor/github.com/rabbitmq/amqp091-go/certs.sh b/vendor/github.com/rabbitmq/amqp091-go/certs.sh index 834f422427..403e80c544 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/certs.sh +++ b/vendor/github.com/rabbitmq/amqp091-go/certs.sh @@ -38,7 +38,7 @@ serial = $dir/serial default_crl_days = 7 default_days = 3650 -default_md = sha1 +default_md = sha256 policy = testca_policy x509_extensions = certificate_extensions @@ -57,7 +57,7 @@ basicConstraints = CA:false [ req ] default_bits = 2048 default_keyfile = ./private/cakey.pem -default_md = sha1 +default_md = sha256 prompt = yes distinguished_name = root_ca_distinguished_name x509_extensions = root_ca_extensions diff --git a/vendor/github.com/rabbitmq/amqp091-go/change_version.sh b/vendor/github.com/rabbitmq/amqp091-go/change_version.sh new file mode 100644 index 0000000000..ff8e3694c6 --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/change_version.sh @@ -0,0 +1,4 @@ +#!/bin/bash +echo $1 > VERSION +sed -i -e "s/.*buildVersion = \"*.*/buildVersion = \"$1\"/" ./connection.go +go fmt ./... diff --git a/vendor/github.com/rabbitmq/amqp091-go/channel.go b/vendor/github.com/rabbitmq/amqp091-go/channel.go index a4afc98717..ae6f2d1ad1 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/channel.go +++ b/vendor/github.com/rabbitmq/amqp091-go/channel.go @@ -6,6 +6,8 @@ package amqp091 import ( + "context" + "errors" "reflect" "sync" "sync/atomic" @@ -15,14 +17,14 @@ import ( // +------+---------+-------------+ +------------+ +-----------+ // | type | channel | size | | payload | | frame-end | // +------+---------+-------------+ +------------+ +-----------+ -// octet short long size octets octet +// +// octet short long size octets octet const frameHeaderSize = 1 + 2 + 4 + 1 /* Channel represents an AMQP channel. Used as a context for valid message exchange. Errors on methods with this Channel as a receiver means this channel should be discarded and a new channel established. - */ type Channel struct { destructor sync.Once @@ -66,7 +68,7 @@ type Channel struct { errors chan *Error // State machine that manages frame order, must only be mutated by the connection - recv func(*Channel, frame) error + recv func(*Channel, frame) // Current state for frame re-assembly, only mutated from recv message messageWithContent @@ -87,9 +89,16 @@ func newChannel(c *Connection, id uint16) *Channel { } } +// Signal that from now on, Channel.send() should call Channel.sendClosed() +func (ch *Channel) setClosed() { + atomic.StoreInt32(&ch.closed, 1) +} + // shutdown is called by Connection after the channel has been removed from the // connection registry. func (ch *Channel) shutdown(e *Error) { + ch.setClosed() + ch.destructor.Do(func() { ch.m.Lock() defer ch.m.Unlock() @@ -103,14 +112,7 @@ func (ch *Channel) shutdown(e *Error) { for _, c := range ch.closes { c <- e } - } - - // Signal that from now on, Channel.send() should call - // Channel.sendClosed() - atomic.StoreInt32(&ch.closed, 1) - - // Notify RPC if we're selecting - if e != nil { + // Notify RPC if we're selecting ch.errors <- e } @@ -154,7 +156,7 @@ func (ch *Channel) shutdown(e *Error) { // only 'channel.close' is sent to the server. func (ch *Channel) send(msg message) (err error) { // If the channel is closed, use Channel.sendClosed() - if atomic.LoadInt32(&ch.closed) == 1 { + if ch.IsClosed() { return ch.sendClosed(msg) } @@ -230,14 +232,38 @@ func (ch *Channel) sendOpen(msg message) (err error) { size = len(body) } - if err = ch.connection.send(&methodFrame{ + // If the channel is closed, use Channel.sendClosed() + if ch.IsClosed() { + return ch.sendClosed(msg) + } + + // Flush the buffer only after all the Frames that comprise the Message + // have been written to maximise benefits of using a buffered writer. + defer func() { + if endError := ch.connection.endSendUnflushed(); endError != nil { + if err == nil { + err = endError + } + } + }() + + // We use sendUnflushed() in this method as sending the message requires + // sending multiple Frames (methodFrame, headerFrame, N x bodyFrame). + // Flushing after each Frame is inefficient, as it negates much of the + // benefit of using a buffered writer and results in more syscalls than + // necessary. Flushing buffers after every frame can have a significant + // performance impact when sending (e.g. basicPublish) small messages, + // so sendUnflushed() performs an *Unflushed* write, but is otherwise + // equivalent to the send() method. We later use the separate flush + // method to explicitly flush the buffer after all Frames are written. + if err = ch.connection.sendUnflushed(&methodFrame{ ChannelId: ch.id, Method: content, }); err != nil { return } - if err = ch.connection.send(&headerFrame{ + if err = ch.connection.sendUnflushed(&headerFrame{ ChannelId: ch.id, ClassId: class, Size: uint64(len(body)), @@ -252,7 +278,7 @@ func (ch *Channel) sendOpen(msg message) (err error) { j = len(body) } - if err = ch.connection.send(&bodyFrame{ + if err = ch.connection.sendUnflushed(&bodyFrame{ ChannelId: ch.id, Body: body[i:j], }); err != nil { @@ -260,6 +286,11 @@ func (ch *Channel) sendOpen(msg message) (err error) { } } } else { + // If the channel is closed, use Channel.sendClosed() + if ch.IsClosed() { + return ch.sendClosed(msg) + } + err = ch.connection.send(&methodFrame{ ChannelId: ch.id, Method: msg, @@ -274,11 +305,16 @@ func (ch *Channel) sendOpen(msg message) (err error) { func (ch *Channel) dispatch(msg message) { switch m := msg.(type) { case *channelClose: + // Note: channel state is set to closed immedately after the message is + // decoded by the Connection + // lock before sending connection.close-ok // to avoid unexpected interleaving with basic.publish frames if // publishing is happening concurrently ch.m.Lock() - ch.send(&channelCloseOk{}) + if err := ch.send(&channelCloseOk{}); err != nil { + Logger.Printf("error sending channelCloseOk, channel id: %d error: %+v", ch.id, err) + } ch.m.Unlock() ch.connection.closeChannel(ch, newError(m.ReplyCode, m.ReplyText)) @@ -288,7 +324,9 @@ func (ch *Channel) dispatch(msg message) { c <- m.Active } ch.notifyM.RUnlock() - ch.send(&channelFlowOk{Active: m.Active}) + if err := ch.send(&channelFlowOk{Active: m.Active}); err != nil { + Logger.Printf("error sending channelFlowOk, channel id: %d error: %+v", ch.id, err) + } case *basicCancel: ch.notifyM.RLock() @@ -334,40 +372,41 @@ func (ch *Channel) dispatch(msg message) { } } -func (ch *Channel) transition(f func(*Channel, frame) error) error { +func (ch *Channel) transition(f func(*Channel, frame)) { ch.recv = f - return nil } -func (ch *Channel) recvMethod(f frame) error { +func (ch *Channel) recvMethod(f frame) { switch frame := f.(type) { case *methodFrame: if msg, ok := frame.Method.(messageWithContent); ok { ch.body = make([]byte, 0) ch.message = msg - return ch.transition((*Channel).recvHeader) + ch.transition((*Channel).recvHeader) + return } ch.dispatch(frame.Method) // termination state - return ch.transition((*Channel).recvMethod) + ch.transition((*Channel).recvMethod) case *headerFrame: // drop - return ch.transition((*Channel).recvMethod) + ch.transition((*Channel).recvMethod) case *bodyFrame: // drop - return ch.transition((*Channel).recvMethod) - } + ch.transition((*Channel).recvMethod) - panic("unexpected frame type") + default: + panic("unexpected frame type") + } } -func (ch *Channel) recvHeader(f frame) error { +func (ch *Channel) recvHeader(f frame) { switch frame := f.(type) { case *methodFrame: // interrupt content and handle method - return ch.recvMethod(f) + ch.recvMethod(f) case *headerFrame: // start collecting if we expect body frames @@ -376,29 +415,31 @@ func (ch *Channel) recvHeader(f frame) error { if frame.Size == 0 { ch.message.setContent(ch.header.Properties, ch.body) ch.dispatch(ch.message) // termination state - return ch.transition((*Channel).recvMethod) + ch.transition((*Channel).recvMethod) + return } - return ch.transition((*Channel).recvContent) + ch.transition((*Channel).recvContent) case *bodyFrame: // drop and reset - return ch.transition((*Channel).recvMethod) - } + ch.transition((*Channel).recvMethod) - panic("unexpected frame type") + default: + panic("unexpected frame type") + } } // state after method + header and before the length // defined by the header has been reached -func (ch *Channel) recvContent(f frame) error { +func (ch *Channel) recvContent(f frame) { switch frame := f.(type) { case *methodFrame: // interrupt content and handle method - return ch.recvMethod(f) + ch.recvMethod(f) case *headerFrame: // drop and reset - return ch.transition((*Channel).recvMethod) + ch.transition((*Channel).recvMethod) case *bodyFrame: if cap(ch.body) == 0 { @@ -409,13 +450,15 @@ func (ch *Channel) recvContent(f frame) error { if uint64(len(ch.body)) >= ch.header.Size { ch.message.setContent(ch.header.Properties, ch.body) ch.dispatch(ch.message) // termination state - return ch.transition((*Channel).recvMethod) + ch.transition((*Channel).recvMethod) + return } - return ch.transition((*Channel).recvContent) - } + ch.transition((*Channel).recvContent) - panic("unexpected frame type") + default: + panic("unexpected frame type") + } } /* @@ -423,7 +466,6 @@ Close initiate a clean channel closure by sending a close message with the error code set to '200'. It is safe to call this method multiple times. - */ func (ch *Channel) Close() error { defer ch.connection.closeChannel(ch, nil) @@ -449,6 +491,8 @@ this channel. The chan provided will be closed when the Channel is closed and on a graceful close, no error will be sent. +In case of a non graceful close the error will be notified synchronously by the library +so that it will be necessary to consume the Channel from the caller in order to avoid deadlocks */ func (ch *Channel) NotifyClose(c chan *Error) chan *Error { ch.notifyM.Lock() @@ -494,7 +538,6 @@ much on the same connection, all channels using that connection will suffer, including acknowledgments from deliveries. Use different Connections if you desire to interleave consumers and producers in the same process to avoid your basic.ack messages from getting rate limited with your basic.publish messages. - */ func (ch *Channel) NotifyFlow(c chan bool) chan bool { ch.notifyM.Lock() @@ -516,7 +559,6 @@ immediate flags. A return struct has a copy of the Publishing along with some error information about why the publishing failed. - */ func (ch *Channel) NotifyReturn(c chan Return) chan Return { ch.notifyM.Lock() @@ -537,7 +579,6 @@ from the server when a queue is deleted or when consuming from a mirrored queue where the master has just failed (and was moved to another node). The subscription tag is returned to the listener. - */ func (ch *Channel) NotifyCancel(c chan string) chan string { ch.notifyM.Lock() @@ -600,6 +641,8 @@ or Channel while confirms are in-flight. It's advisable to wait for all Confirmations to arrive before calling Channel.Close() or Connection.Close(). +It is also advisable for the caller to consume from the channel returned till it is closed +to avoid possible deadlocks */ func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation { ch.notifyM.Lock() @@ -612,7 +655,6 @@ func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation { } return confirm - } /* @@ -678,7 +720,6 @@ When noWait is true, do not wait for the server to acknowledge the cancel. Only use this when you are certain there are no deliveries in flight that require an acknowledgment, otherwise they will arrive and be dropped in the client without an ack, and will not be redelivered to other consumers. - */ func (ch *Channel) Cancel(consumer string, noWait bool) error { req := &basicCancel{ @@ -711,12 +752,12 @@ the type "direct" with the routing key matching the queue's name. With this default binding, it is possible to publish messages that route directly to this queue by publishing to "" with the routing key of the queue name. - QueueDeclare("alerts", true, false, false, false, nil) - Publish("", "alerts", false, false, Publishing{Body: []byte("...")}) + QueueDeclare("alerts", true, false, false, false, nil) + Publish("", "alerts", false, false, Publishing{Body: []byte("...")}) - Delivery Exchange Key Queue - ----------------------------------------------- - key: alerts -> "" -> alerts -> alerts + Delivery Exchange Key Queue + ----------------------------------------------- + key: alerts -> "" -> alerts -> alerts The queue name may be empty, in which case the server will generate a unique name which will be returned in the Name field of Queue struct. @@ -752,7 +793,6 @@ or attempting to modify an existing queue from a different connection. When the error return value is not nil, you can assume the queue could not be declared with these parameters, and the channel will be closed. - */ func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) { if err := args.Validate(); err != nil { @@ -786,13 +826,11 @@ func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noW } /* - QueueDeclarePassive is functionally and parametrically equivalent to QueueDeclare, except that it sets the "passive" attribute to true. A passive queue is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent queue will cause RabbitMQ to throw an exception. This function can be used to test for the existence of a queue. - */ func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) { if err := args.Validate(); err != nil { @@ -839,6 +877,7 @@ declared with specific parameters. If a queue by this name does not exist, an error will be returned and the channel will be closed. +Deprecated: Use QueueDeclare with "Passive: true" instead. */ func (ch *Channel) QueueInspect(name string) (Queue, error) { req := &queueDeclare{ @@ -863,14 +902,14 @@ QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key. - QueueBind("pagers", "alert", "log", false, nil) - QueueBind("emails", "info", "log", false, nil) + QueueBind("pagers", "alert", "log", false, nil) + QueueBind("emails", "info", "log", false, nil) - Delivery Exchange Key Queue - ----------------------------------------------- - key: alert --> log ----> alert --> pagers - key: info ---> log ----> info ---> emails - key: debug --> log (none) (dropped) + Delivery Exchange Key Queue + ----------------------------------------------- + key: alert --> log ----> alert --> pagers + key: info ---> log ----> info ---> emails + key: debug --> log (none) (dropped) If a binding with the same key and arguments already exists between the exchange and queue, the attempt to rebind will be ignored and the existing @@ -880,16 +919,16 @@ In the case that multiple bindings may cause the message to be routed to the same queue, the server will only route the publishing once. This is possible with topic exchanges. - QueueBind("pagers", "alert", "amq.topic", false, nil) - QueueBind("emails", "info", "amq.topic", false, nil) - QueueBind("emails", "#", "amq.topic", false, nil) // match everything + QueueBind("pagers", "alert", "amq.topic", false, nil) + QueueBind("emails", "info", "amq.topic", false, nil) + QueueBind("emails", "#", "amq.topic", false, nil) // match everything - Delivery Exchange Key Queue - ----------------------------------------------- - key: alert --> amq.topic ----> alert --> pagers - key: info ---> amq.topic ----> # ------> emails - \---> info ---/ - key: debug --> amq.topic ----> # ------> emails + Delivery Exchange Key Queue + ----------------------------------------------- + key: alert --> amq.topic ----> alert --> pagers + key: info ---> amq.topic ----> # ------> emails + \---> info ---/ + key: debug --> amq.topic ----> # ------> emails It is only possible to bind a durable queue to a durable exchange regardless of whether the queue or exchange is auto-deleted. Bindings between durable queues @@ -900,7 +939,6 @@ will be closed. When noWait is false and the queue could not be bound, the channel will be closed with an error. - */ func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error { if err := args.Validate(); err != nil { @@ -925,7 +963,6 @@ arguments. It is possible to send and empty string for the exchange name which means to unbind the queue from the default exchange. - */ func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error { if err := args.Validate(); err != nil { @@ -982,7 +1019,6 @@ When noWait is true, the queue will be deleted without waiting for a response from the server. The purged message count will not be meaningful. If the queue could not be deleted, a channel exception will be raised and the channel will be closed. - */ func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error) { req := &queueDelete{ @@ -1053,7 +1089,6 @@ be dropped. When the consumer tag is cancelled, all inflight messages will be delivered until the returned chan is closed. - */ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) { // When we return from ch.call, there may be a delivery already for the @@ -1164,13 +1199,11 @@ func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, inter } /* - ExchangeDeclarePassive is functionally and parametrically equivalent to ExchangeDeclare, except that it sets the "passive" attribute to true. A passive exchange is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent exchange will cause RabbitMQ to throw an exception. This function can be used to detect the existence of an exchange. - */ func (ch *Channel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error { if err := args.Validate(); err != nil { @@ -1233,14 +1266,14 @@ exchange even though multiple bindings will match. Given a message delivered to the source exchange, the message will be forwarded to the destination exchange when the routing key is matched. - ExchangeBind("sell", "MSFT", "trade", false, nil) - ExchangeBind("buy", "AAPL", "trade", false, nil) + ExchangeBind("sell", "MSFT", "trade", false, nil) + ExchangeBind("buy", "AAPL", "trade", false, nil) - Delivery Source Key Destination - example exchange exchange - ----------------------------------------------- - key: AAPL --> trade ----> MSFT sell - \---> AAPL --> buy + Delivery Source Key Destination + example exchange exchange + ----------------------------------------------- + key: AAPL --> trade ----> MSFT sell + \---> AAPL --> buy When noWait is true, do not wait for the server to confirm the binding. If any error occurs the channel will be closed. Add a listener to NotifyClose to @@ -1328,9 +1361,47 @@ confirmations start at 1. Exit when all publishings are confirmed. When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1. +Deprecated: Use PublishWithContext instead. */ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error { - _, err := ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) + _, err := ch.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, mandatory, immediate, msg) + return err +} + +/* +PublishWithContext sends a Publishing from the client to an exchange on the server. + +When you want a single message to be delivered to a single queue, you can +publish to the default exchange with the routingKey of the queue name. This is +because every declared queue gets an implicit route to the default exchange. + +Since publishings are asynchronous, any undeliverable message will get returned +by the server. Add a listener with Channel.NotifyReturn to handle any +undeliverable message when calling publish with either the mandatory or +immediate parameters as true. + +Publishings can be undeliverable when the mandatory flag is true and no queue is +bound that matches the routing key, or when the immediate flag is true and no +consumer on the matched queue is ready to accept the delivery. + +This can return an error when the channel, connection or socket is closed. The +error or lack of an error does not indicate whether the server has received this +publishing. + +It is possible for publishing to not reach the broker if the underlying socket +is shut down without pending publishing packets being flushed from the kernel +buffers. The easy way of making it probable that all publishings reach the +server is to always call Connection.Close before terminating your publishing +application. The way to ensure that all publishings reach the server is to add +a listener to Channel.NotifyPublish and put the channel in confirm mode with +Channel.Confirm. Publishing delivery tags and their corresponding +confirmations start at 1. Exit when all publishings are confirmed. + +When Publish does not return an error and the channel is in confirm mode, the +internal counter for DeliveryTags with the first confirmation starts at 1. +*/ +func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { + _, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg) return err } @@ -1339,8 +1410,24 @@ PublishWithDeferredConfirm behaves identically to Publish but additionally retur DeferredConfirmation, allowing the caller to wait on the publisher confirmation for this message. If the channel has not been put into confirm mode, the DeferredConfirmation will be nil. + +Deprecated: Use PublishWithDeferredConfirmWithContext instead. */ func (ch *Channel) PublishWithDeferredConfirm(exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { + return ch.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, mandatory, immediate, msg) +} + +/* +PublishWithDeferredConfirmWithContext behaves identically to Publish but additionally returns a +DeferredConfirmation, allowing the caller to wait on the publisher confirmation +for this message. If the channel has not been put into confirm mode, +the DeferredConfirmation will be nil. +*/ +func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { + if ctx == nil { + return nil, errors.New("amqp091-go: nil Context") + } + if err := msg.Headers.Validate(); err != nil { return nil, err } @@ -1348,6 +1435,11 @@ func (ch *Channel) PublishWithDeferredConfirm(exchange, key string, mandatory, i ch.m.Lock() defer ch.m.Unlock() + var dc *DeferredConfirmation + if ch.confirming { + dc = ch.confirms.publish() + } + if err := ch.send(&basicPublish{ Exchange: exchange, RoutingKey: key, @@ -1370,14 +1462,13 @@ func (ch *Channel) PublishWithDeferredConfirm(exchange, key string, mandatory, i AppId: msg.AppId, }, }); err != nil { + if ch.confirming { + ch.confirms.unpublish() + } return nil, err } - if ch.confirming { - return ch.confirms.Publish(), nil - } - - return nil, nil + return dc, nil } /* @@ -1396,7 +1487,6 @@ delivery. When autoAck is true, the server will automatically acknowledge this message so you don't have to. But if you are unable to fully process this message before the channel or connection is closed, the message will not get requeued. - */ func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error) { req := &basicGet{Queue: queue, NoAck: autoAck} @@ -1428,7 +1518,6 @@ the channel is in a transaction is not defined. Once a channel has been put into transaction mode, it cannot be taken out of transaction mode. Use a different channel for non-transactional semantics. - */ func (ch *Channel) Tx() error { return ch.call( @@ -1442,7 +1531,6 @@ TxCommit atomically commits all publishings and acknowledgments for a single queue and immediately start a new transaction. Calling this method without having called Channel.Tx is an error. - */ func (ch *Channel) TxCommit() error { return ch.call( @@ -1456,7 +1544,6 @@ TxRollback atomically rolls back all publishings and acknowledgments for a single queue and immediately start a new transaction. Calling this method without having called Channel.Tx is an error. - */ func (ch *Channel) TxRollback() error { return ch.call( @@ -1486,7 +1573,6 @@ pause its publishings when `false` is sent on that channel. Note: RabbitMQ prefers to use TCP push back to control flow for all channels on a connection, so under high volume scenarios, it's wise to open separate Connections for publishings and deliveries. - */ func (ch *Channel) Flow(active bool) error { return ch.call( @@ -1518,7 +1604,6 @@ persisting the message if necessary. When noWait is true, the client will not wait for a response. A channel exception could occur if the server does not support this method. - */ func (ch *Channel) Confirm(noWait bool) error { if err := ch.call( @@ -1547,6 +1632,11 @@ If the deliveries cannot be recovered, an error will be returned and the channel will be closed. Note: this method is not implemented on RabbitMQ, use Delivery.Nack instead + +Deprecated: This method is deprecated in RabbitMQ. RabbitMQ used Recover(true) +as a mechanism for consumers to tell the broker that they were ready for more +deliveries, back in 2008-2009. Support for this will be removed from RabbitMQ in +a future release. Use Nack() with requeue=true instead. */ func (ch *Channel) Recover(requeue bool) error { return ch.call( diff --git a/vendor/github.com/rabbitmq/amqp091-go/confirms.go b/vendor/github.com/rabbitmq/amqp091-go/confirms.go index 654d755104..577e042bcc 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/confirms.go +++ b/vendor/github.com/rabbitmq/amqp091-go/confirms.go @@ -6,6 +6,7 @@ package amqp091 import ( + "context" "sync" ) @@ -38,7 +39,7 @@ func (c *confirms) Listen(l chan Confirmation) { } // Publish increments the publishing counter -func (c *confirms) Publish() *DeferredConfirmation { +func (c *confirms) publish() *DeferredConfirmation { c.publishedMut.Lock() defer c.publishedMut.Unlock() @@ -46,6 +47,15 @@ func (c *confirms) Publish() *DeferredConfirmation { return c.deferredConfirmations.Add(c.published) } +// unpublish decrements the publishing counter and removes the +// DeferredConfirmation. It must be called immediately after a publish fails. +func (c *confirms) unpublish() { + c.publishedMut.Lock() + defer c.publishedMut.Unlock() + c.deferredConfirmations.remove(c.published) + c.published-- +} + // confirm confirms one publishing, increments the expecting delivery tag, and // removes bookkeeping for that delivery tag. func (c *confirms) confirm(confirmation Confirmation) { @@ -98,11 +108,14 @@ func (c *confirms) Multiple(confirmed Confirmation) { c.resequence() } -// Close closes all listeners, discarding any out of sequence confirmations +// Cleans up the confirms struct and its dependencies. +// Closes all listeners, discarding any out of sequence confirmations func (c *confirms) Close() error { c.m.Lock() defer c.m.Unlock() + c.deferredConfirmations.Close() + for _, l := range c.listeners { close(l) } @@ -126,22 +139,34 @@ func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation { defer d.m.Unlock() dc := &DeferredConfirmation{DeliveryTag: tag} - dc.wg.Add(1) + dc.done = make(chan struct{}) d.confirmations[tag] = dc return dc } +// remove is only used to drop a tag whose publish failed +func (d *deferredConfirmations) remove(tag uint64) { + d.m.Lock() + defer d.m.Unlock() + dc, found := d.confirmations[tag] + if !found { + return + } + close(dc.done) + delete(d.confirmations, tag) +} + func (d *deferredConfirmations) Confirm(confirmation Confirmation) { d.m.Lock() defer d.m.Unlock() dc, found := d.confirmations[confirmation.DeliveryTag] if !found { - // we should never receive a confirmation for a tag that hasn't been published, but a test causes this to happen + // We should never receive a confirmation for a tag that hasn't + // been published, but a test causes this to happen. return } - dc.confirmation = confirmation - dc.wg.Done() + dc.setAck(confirmation.Ack) delete(d.confirmations, confirmation.DeliveryTag) } @@ -151,14 +176,63 @@ func (d *deferredConfirmations) ConfirmMultiple(confirmation Confirmation) { for k, v := range d.confirmations { if k <= confirmation.DeliveryTag { - v.confirmation = Confirmation{DeliveryTag: k, Ack: confirmation.Ack} - v.wg.Done() + v.setAck(confirmation.Ack) delete(d.confirmations, k) } } } +// Close nacks all pending DeferredConfirmations being blocked by dc.Wait(). +func (d *deferredConfirmations) Close() { + d.m.Lock() + defer d.m.Unlock() + + for k, v := range d.confirmations { + v.setAck(false) + delete(d.confirmations, k) + } +} + +// setAck sets the acknowledgement status of the confirmation. Note that it must +// not be called more than once. +func (d *DeferredConfirmation) setAck(ack bool) { + d.ack = ack + close(d.done) +} + +// Done returns the channel that can be used to wait for the publisher +// confirmation. +func (d *DeferredConfirmation) Done() <-chan struct{} { + return d.done +} + +// Acked returns the publisher confirmation in a non-blocking manner. It returns +// false if the confirmation was not acknowledged yet or received negative +// acknowledgement. +func (d *DeferredConfirmation) Acked() bool { + select { + case <-d.done: + default: + return false + } + return d.ack +} + +// Wait blocks until the publisher confirmation. It returns true if the server +// successfully received the publishing. func (d *DeferredConfirmation) Wait() bool { - d.wg.Wait() - return d.confirmation.Ack + <-d.done + return d.ack +} + +// WaitContext waits until the publisher confirmation. It returns true if the +// server successfully received the publishing. If the context expires before +// that, ctx.Err() is returned. +func (d *DeferredConfirmation) WaitContext(ctx context.Context) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + case <-d.done: + } + return d.ack, nil } diff --git a/vendor/github.com/rabbitmq/amqp091-go/connection.go b/vendor/github.com/rabbitmq/amqp091-go/connection.go index 4023ade8c4..3d50d95580 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/connection.go +++ b/vendor/github.com/rabbitmq/amqp091-go/connection.go @@ -8,8 +8,12 @@ package amqp091 import ( "bufio" "crypto/tls" + "crypto/x509" + "errors" + "fmt" "io" "net" + "os" "reflect" "strconv" "strings" @@ -23,8 +27,9 @@ const ( defaultHeartbeat = 10 * time.Second defaultConnectionTimeout = 30 * time.Second - defaultProduct = "https://github.com/streadway/amqp" - defaultVersion = "β" + defaultProduct = "AMQP 0.9.1 Client" + buildVersion = "1.8.1" + platform = "golang" // Safer default that makes channel leaks a lot easier to spot // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593. defaultChannelMax = (2 << 10) - 1 @@ -71,6 +76,17 @@ type Config struct { Dial func(network, addr string) (net.Conn, error) } +// NewConnectionProperties creates an amqp.Table to be used as amqp.Config.Properties. +// +// Defaults to library-defined values. For empty properties, use make(amqp.Table) instead. +func NewConnectionProperties() Table { + return Table{ + "product": defaultProduct, + "version": buildVersion, + "platform": platform, + } +} + // Connection manages the serialization and deserialization of frames from IO // and dispatches the frames to the appropriate channel. All RPC methods and // asynchronous Publishing, Delivery, Ack, Nack and Return messages are @@ -209,7 +225,11 @@ func DialConfig(url string, config Config) (*Connection, error) { if uri.Scheme == "amqps" { if config.TLSClientConfig == nil { - config.TLSClientConfig = new(tls.Config) + tlsConfig, err := tlsConfigFromURI(uri) + if err != nil { + return nil, fmt.Errorf("create TLS config from URI: %w", err) + } + config.TLSClientConfig = tlsConfig } // If ServerName has not been specified in TLSClientConfig, @@ -220,7 +240,6 @@ func DialConfig(url string, config Config) (*Connection, error) { client := tls.Client(conn, config.TLSClientConfig) if err := client.Handshake(); err != nil { - conn.Close() return nil, err } @@ -235,7 +254,6 @@ func DialConfig(url string, config Config) (*Connection, error) { Open accepts an already established connection, or other io.ReadWriteCloser as a transport. Use this method if you have established a TLS connection or wish to use your own custom transport. - */ func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) { c := &Connection{ @@ -251,6 +269,22 @@ func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) { return c, c.open(config) } +/* +UpdateSecret updates the secret used to authenticate this connection. It is used when +secrets have an expiration date and need to be renewed, like OAuth 2 tokens. + +It returns an error if the operation is not successful, or if the connection is closed. +*/ +func (c *Connection) UpdateSecret(newSecret, reason string) error { + if c.IsClosed() { + return ErrClosed + } + return c.call(&connectionUpdateSecret{ + NewSecret: newSecret, + Reason: reason, + }, &connectionUpdateSecretOk{}) +} + /* LocalAddr returns the local TCP peer address, or ":0" (the zero value of net.TCPAddr) as a fallback default value if the underlying transport does not support LocalAddr(). @@ -264,6 +298,18 @@ func (c *Connection) LocalAddr() net.Addr { return &net.TCPAddr{} } +/* +RemoteAddr returns the remote TCP peer address, if known. +*/ +func (c *Connection) RemoteAddr() net.Addr { + if conn, ok := c.conn.(interface { + RemoteAddr() net.Addr + }); ok { + return conn.RemoteAddr() + } + return &net.TCPAddr{} +} + // ConnectionState returns basic TLS details of the underlying transport. // Returns a zero value when the underlying connection does not implement // ConnectionState() tls.ConnectionState. @@ -280,12 +326,14 @@ func (c *Connection) ConnectionState() tls.ConnectionState { NotifyClose registers a listener for close events either initiated by an error accompanying a connection.close method or by a normal shutdown. -The chan provided will be closed when the Channel is closed and on a +The chan provided will be closed when the Connection is closed and on a graceful close, no error will be sent. +In case of a non graceful close the error will be notified synchronously by the library +so that it will be necessary to consume the Channel from the caller in order to avoid deadlocks + To reconnect after a transport or protocol error, register a listener here and re-run your setup process. - */ func (c *Connection) NotifyClose(receiver chan *Error) chan *Error { c.m.Lock() @@ -309,7 +357,6 @@ become free again. This optional extension is supported by the server when the "connection.blocked" server capability key is true. - */ func (c *Connection) NotifyBlocked(receiver chan Blocking) chan Blocking { c.m.Lock() @@ -352,12 +399,47 @@ func (c *Connection) Close() error { ) } +// CloseDeadline requests and waits for the response to close this AMQP connection. +// +// Accepts a deadline for waiting the server response. The deadline is passed +// to the low-level connection i.e. network socket. +// +// Regardless of the error returned, the connection is considered closed, and it +// should not be used after calling this function. +// +// In the event of an I/O timeout, connection-closed listeners are NOT informed. +// +// After returning from this call, all resources associated with this connection, +// including the underlying io, Channels, Notify listeners and Channel consumers +// will also be closed. +func (c *Connection) CloseDeadline(deadline time.Time) error { + if c.IsClosed() { + return ErrClosed + } + + defer c.shutdown(nil) + + err := c.setDeadline(deadline) + if err != nil { + return err + } + + return c.call( + &connectionClose{ + ReplyCode: replySuccess, + ReplyText: "kthxbai", + }, + &connectionCloseOk{}, + ) +} + func (c *Connection) closeWith(err *Error) error { if c.IsClosed() { return ErrClosed } defer c.shutdown(err) + return c.call( &connectionClose{ ReplyCode: uint16(err.Code), @@ -370,7 +452,19 @@ func (c *Connection) closeWith(err *Error) error { // IsClosed returns true if the connection is marked as closed, otherwise false // is returned. func (c *Connection) IsClosed() bool { - return (atomic.LoadInt32(&c.closed) == 1) + return atomic.LoadInt32(&c.closed) == 1 +} + +// setDeadline is a wrapper to type assert Connection.conn and set an I/O +// deadline in the underlying TCP connection socket, by calling +// net.Conn.SetDeadline(). It returns an error, in case the type assertion fails, +// although this should never happen. +func (c *Connection) setDeadline(t time.Time) error { + con, ok := c.conn.(net.Conn) + if !ok { + return errInvalidTypeAssertion + } + return con.SetDeadline(t) } func (c *Connection) send(f frame) error { @@ -401,6 +495,74 @@ func (c *Connection) send(f frame) error { return err } +// This method is intended to be used with sendUnflushed() to end a sequence +// of sendUnflushed() calls and flush the connection +func (c *Connection) endSendUnflushed() error { + c.sendM.Lock() + defer c.sendM.Unlock() + return c.flush() +} + +// sendUnflushed performs an *Unflushed* write. It is otherwise equivalent to +// send(), and we provide a separate flush() function to explicitly flush the +// buffer after all Frames are written. +// +// Why is this a thing? +// +// send() method uses writer.WriteFrame(), which will write the Frame then +// flush the buffer. For cases like the sendOpen() method on Channel, which +// sends multiple Frames (methodFrame, headerFrame, N x bodyFrame), flushing +// after each Frame is inefficient as it negates much of the benefit of using a +// buffered writer, and results in more syscalls than necessary. Flushing buffers +// after every frame can have a significant performance impact when sending +// (basicPublish) small messages, so this method performs an *Unflushed* write +// but is otherwise equivalent to send() method, and we provide a separate +// flush method to explicitly flush the buffer after all Frames are written. +func (c *Connection) sendUnflushed(f frame) error { + if c.IsClosed() { + return ErrClosed + } + + c.sendM.Lock() + err := c.writer.WriteFrameNoFlush(f) + c.sendM.Unlock() + + if err != nil { + // shutdown could be re-entrant from signaling notify chans + go c.shutdown(&Error{ + Code: FrameError, + Reason: err.Error(), + }) + } + + return err +} + +// This method is intended to be used with sendUnflushed() to explicitly flush +// the buffer after all required Frames have been written to the buffer. +func (c *Connection) flush() (err error) { + if buf, ok := c.writer.w.(*bufio.Writer); ok { + err = buf.Flush() + + // Moving send notifier to flush increases basicPublish for the small message + // case. As sendUnflushed + flush is used for the case of sending semantically + // related Frames (e.g. a Message like basicPublish) there is no real advantage + // to sending per Frame vice per "group of related Frames" and for the case of + // small messages time.Now() is (relatively) expensive. + if err == nil { + // Broadcast we sent a frame, reducing heartbeats, only + // if there is something that can receive - like a non-reentrant + // call or if the heartbeater isn't running + select { + case c.sends <- time.Now(): + default: + } + } + } + + return +} + func (c *Connection) shutdown(err *Error) { atomic.StoreInt32(&c.closed, 1) @@ -412,9 +574,6 @@ func (c *Connection) shutdown(err *Error) { for _, c := range c.closes { c <- err } - } - - if err != nil { c.errors <- err } // Shutdown handler goroutine can still receive the result. @@ -439,8 +598,8 @@ func (c *Connection) shutdown(err *Error) { c.conn.Close() - c.channels = map[uint16]*Channel{} - c.allocator = newAllocator(1, c.Config.ChannelMax) + c.channels = nil + c.allocator = nil c.noNotify = true }) } @@ -461,11 +620,10 @@ func (c *Connection) dispatch0(f frame) { switch m := mf.Method.(type) { case *connectionClose: // Send immediately as shutdown will close our side of the writer. - c.send(&methodFrame{ - ChannelId: 0, - Method: &connectionCloseOk{}, - }) - + f := &methodFrame{ChannelId: 0, Method: &connectionCloseOk{}} + if err := c.send(f); err != nil { + Logger.Printf("error sending connectionCloseOk, error: %+v", err) + } c.shutdown(newError(m.ReplyCode, m.ReplyText)) case *connectionBlocked: for _, c := range c.blocks { @@ -482,16 +640,25 @@ func (c *Connection) dispatch0(f frame) { // kthx - all reads reset our deadline. so we can drop this default: // lolwat - channel0 only responds to methods and heartbeats - c.closeWith(ErrUnexpectedFrame) + if err := c.closeWith(ErrUnexpectedFrame); err != nil { + Logger.Printf("error sending connectionCloseOk with ErrUnexpectedFrame, error: %+v", err) + } } } func (c *Connection) dispatchN(f frame) { c.m.Lock() - channel := c.channels[f.channel()] + channel, ok := c.channels[f.channel()] + if ok { + updateChannel(f, channel) + } else { + Logger.Printf("[debug] dropping frame, channel %d does not exist", f.channel()) + } c.m.Unlock() - if channel != nil { + // Note: this could result in concurrent dispatch depending on + // how channels are managed in an application + if ok { channel.recv(channel, f) } else { c.dispatchClosed(f) @@ -514,15 +681,17 @@ func (c *Connection) dispatchClosed(f frame) { if mf, ok := f.(*methodFrame); ok { switch mf.Method.(type) { case *channelClose: - c.send(&methodFrame{ - ChannelId: f.channel(), - Method: &channelCloseOk{}, - }) + f := &methodFrame{ChannelId: f.channel(), Method: &channelCloseOk{}} + if err := c.send(f); err != nil { + Logger.Printf("error sending channelCloseOk, channel id: %d error: %+v", f.channel(), err) + } case *channelCloseOk: // we are already closed, so do nothing default: // unexpected method on closed channel - c.closeWith(ErrClosed) + if err := c.closeWith(ErrClosed); err != nil { + Logger.Printf("error sending connectionCloseOk with ErrClosed, error: %+v", err) + } } } } @@ -535,6 +704,8 @@ func (c *Connection) reader(r io.Reader) { frames := &reader{buf} conn, haveDeadliner := r.(readDeadliner) + defer close(c.rpc) + for { frame, err := frames.ReadFrame() @@ -594,7 +765,13 @@ func (c *Connection) heartbeater(interval time.Duration, done chan *Error) { // When reading, reset our side of the deadline, if we've negotiated one with // a deadline that covers at least 2 server heartbeats if interval > 0 { - conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval)) + if err := conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval)); err != nil { + var opErr *net.OpError + if !errors.As(err, &opErr) { + Logger.Printf("error setting read deadline in heartbeater: %+v", err) + return + } + } } case <-done: @@ -640,8 +817,10 @@ func (c *Connection) releaseChannel(id uint16) { c.m.Lock() defer c.m.Unlock() - delete(c.channels, id) - c.allocator.release(int(id)) + if !c.IsClosed() { + delete(c.channels, id) + c.allocator.release(int(id)) + } } // openChannel allocates and opens a channel, must be paired with closeChannel @@ -670,7 +849,6 @@ func (c *Connection) closeChannel(ch *Channel, e *Error) { Channel opens a unique, concurrent server channel to process the bulk of AMQP messages. Any error from methods on this receiver will render the receiver invalid and a new Channel should be opened. - */ func (c *Connection) Channel() (*Channel, error) { return c.openChannel() @@ -685,39 +863,45 @@ func (c *Connection) call(req message, res ...message) error { } } - select { - case err, ok := <-c.errors: - if !ok { + msg, ok := <-c.rpc + if !ok { + err, errorsChanIsOpen := <-c.errors + if !errorsChanIsOpen { return ErrClosed } return err + } - case msg := <-c.rpc: - // Try to match one of the result types - for _, try := range res { - if reflect.TypeOf(msg) == reflect.TypeOf(try) { - // *res = *msg - vres := reflect.ValueOf(try).Elem() - vmsg := reflect.ValueOf(msg).Elem() - vres.Set(vmsg) - return nil - } + // Try to match one of the result types + for _, try := range res { + if reflect.TypeOf(msg) == reflect.TypeOf(try) { + // *res = *msg + vres := reflect.ValueOf(try).Elem() + vmsg := reflect.ValueOf(msg).Elem() + vres.Set(vmsg) + return nil } - return ErrCommandInvalid } - // unreachable + return ErrCommandInvalid } -// Connection = open-Connection *use-Connection close-Connection -// open-Connection = C:protocol-header -// S:START C:START-OK -// *challenge -// S:TUNE C:TUNE-OK -// C:OPEN S:OPEN-OK -// challenge = S:SECURE C:SECURE-OK -// use-Connection = *channel -// close-Connection = C:CLOSE S:CLOSE-OK -// / S:CLOSE C:CLOSE-OK +// Communication flow to open, use and close a connection. 'C:' are +// frames sent by the Client. 'S:' are frames sent by the Server. +// +// Connection = open-Connection *use-Connection close-Connection +// +// open-Connection = C:protocol-header +// S:START C:START-OK +// *challenge +// S:TUNE C:TUNE-OK +// C:OPEN S:OPEN-OK +// +// challenge = S:SECURE C:SECURE-OK +// +// use-Connection = *channel +// +// close-Connection = C:CLOSE S:CLOSE-OK +// S:CLOSE C:CLOSE-OK func (c *Connection) open(config Config) error { if err := c.send(&protocolHeader{}); err != nil { return err @@ -756,15 +940,14 @@ func (c *Connection) openStart(config Config) error { func (c *Connection) openTune(config Config, auth Authentication) error { if len(config.Properties) == 0 { - config.Properties = Table{ - "product": defaultProduct, - "version": defaultVersion, - } + config.Properties = NewConnectionProperties() } config.Properties["capabilities"] = Table{ "connection.blocked": true, "consumer_cancel_notify": true, + "basic.nack": true, + "publisher_confirms": true, } ok := &connectionStartOk{ @@ -782,6 +965,10 @@ func (c *Connection) openTune(config Config, auth Authentication) error { return ErrCredentials } + // Edge case that may race with c.shutdown() + // https://github.com/rabbitmq/amqp091-go/issues/170 + c.m.Lock() + // When the server and client both use default 0, then the max channel is // only limited by uint16. c.Config.ChannelMax = pick(config.ChannelMax, int(tune.ChannelMax)) @@ -790,6 +977,10 @@ func (c *Connection) openTune(config Config, auth Authentication) error { } c.Config.ChannelMax = min(c.Config.ChannelMax, maxChannelMax) + c.allocator = newAllocator(1, c.Config.ChannelMax) + + c.m.Unlock() + // Frame size includes headers and end byte (len(payload)+8), even if // this is less than FrameMinSize, use what the server sends because the // alternative is to stop the handshake here. @@ -844,10 +1035,48 @@ func (c *Connection) openComplete() error { _ = deadliner.SetDeadline(time.Time{}) } - c.allocator = newAllocator(1, c.Config.ChannelMax) return nil } +// tlsConfigFromURI tries to create TLS configuration based on query parameters. +// Returns default (empty) config in case no suitable client cert and/or client key not provided. +// Returns error in case certificates can not be parsed. +func tlsConfigFromURI(uri URI) (*tls.Config, error) { + var certPool *x509.CertPool + if uri.CACertFile != "" { + data, err := os.ReadFile(uri.CACertFile) + if err != nil { + return nil, fmt.Errorf("read CA certificate: %w", err) + } + + certPool = x509.NewCertPool() + certPool.AppendCertsFromPEM(data) + } else if sysPool, err := x509.SystemCertPool(); err != nil { + return nil, fmt.Errorf("load system certificates: %w", err) + } else { + certPool = sysPool + } + + if uri.CertFile == "" || uri.KeyFile == "" { + // no client auth (mTLS), just server auth + return &tls.Config{ + RootCAs: certPool, + ServerName: uri.ServerName, + }, nil + } + + certificate, err := tls.LoadX509KeyPair(uri.CertFile, uri.KeyFile) + if err != nil { + return nil, fmt.Errorf("load client certificate: %w", err) + } + + return &tls.Config{ + Certificates: []tls.Certificate{certificate}, + RootCAs: certPool, + ServerName: uri.ServerName, + }, nil +} + func max(a, b int) int { if a > b { return a diff --git a/vendor/github.com/rabbitmq/amqp091-go/consumers.go b/vendor/github.com/rabbitmq/amqp091-go/consumers.go index 8c23fadab7..c352fece97 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/consumers.go +++ b/vendor/github.com/rabbitmq/amqp091-go/consumers.go @@ -75,6 +75,33 @@ func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) { } case out <- *queue[0]: + /* + * https://github.com/rabbitmq/amqp091-go/issues/179 + * https://github.com/rabbitmq/amqp091-go/pull/180 + * + * Comment from @lars-t-hansen: + * + * Given Go's slice semantics, and barring any information + * available to the compiler that proves that queue is the only + * pointer to the memory it references, the only meaning that + * queue = queue[1:] can have is basically queue += sizeof(queue + * element), ie, it bumps a pointer. Looking at the generated + * code for a simple example (on ARM64 in this case) bears this + * out. So what we're left with is an array that we have a + * pointer into the middle of. When the GC traces this pointer, + * it too does not know whether the array has multiple + * referents, and so its only sensible choice is to find the + * beginning of the array, and if the array is not already + * visited, mark every element in it, including the "dead" + * pointer. + * + * (Depending on the program dynamics, an element may eventually + * be appended to the queue when the queue is at capacity, and + * in this case the live elements are copied into a new array + * and the old array is left to be GC'd eventually, along with + * the dead object. But that can take time.) + */ + queue[0] = nil queue = queue[1:] } } diff --git a/vendor/github.com/rabbitmq/amqp091-go/doc.go b/vendor/github.com/rabbitmq/amqp091-go/doc.go index ba2efb08c3..8cb0b64f04 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/doc.go +++ b/vendor/github.com/rabbitmq/amqp091-go/doc.go @@ -9,13 +9,13 @@ Package amqp091 is an AMQP 0.9.1 client with RabbitMQ extensions Understand the AMQP 0.9.1 messaging model by reviewing these links first. Much of the terminology in this library directly relates to AMQP concepts. - Resources + Resources - http://www.rabbitmq.com/tutorials/amqp-concepts.html - http://www.rabbitmq.com/getstarted.html - http://www.rabbitmq.com/amqp-0-9-1-reference.html + http://www.rabbitmq.com/tutorials/amqp-concepts.html + http://www.rabbitmq.com/getstarted.html + http://www.rabbitmq.com/amqp-0-9-1-reference.html -Design +# Design Most other broker clients publish to queues, but in AMQP, clients publish Exchanges instead. AMQP is programmable, meaning that both the producers and @@ -47,7 +47,7 @@ asynchronous like Channel.Publish. The error values should still be checked for these methods as they will indicate IO failures like when the underlying connection closes. -Asynchronous Events +# Asynchronous Events Clients of this library may be interested in receiving some of the protocol messages other than Deliveries like basic.ack methods while a channel is in @@ -61,36 +61,36 @@ Any asynchronous events, including Deliveries and Publishings must always have a receiver until the corresponding chans are closed. Without asynchronous receivers, the synchronous methods will block. -Use Case +# Use Case It's important as a client to an AMQP topology to ensure the state of the broker matches your expectations. For both publish and consume use cases, make sure you declare the queues, exchanges and bindings you expect to exist -prior to calling Channel.Publish or Channel.Consume. +prior to calling [Channel.PublishWithContext] or [Channel.Consume]. - // Connections start with amqp.Dial() typically from a command line argument - // or environment variable. - connection, err := amqp.Dial(os.Getenv("AMQP_URL")) + // Connections start with amqp.Dial() typically from a command line argument + // or environment variable. + connection, err := amqp.Dial(os.Getenv("AMQP_URL")) - // To cleanly shutdown by flushing kernel buffers, make sure to close and - // wait for the response. - defer connection.Close() + // To cleanly shutdown by flushing kernel buffers, make sure to close and + // wait for the response. + defer connection.Close() - // Most operations happen on a channel. If any error is returned on a - // channel, the channel will no longer be valid, throw it away and try with - // a different channel. If you use many channels, it's useful for the - // server to - channel, err := connection.Channel() + // Most operations happen on a channel. If any error is returned on a + // channel, the channel will no longer be valid, throw it away and try with + // a different channel. If you use many channels, it's useful for the + // server to + channel, err := connection.Channel() - // Declare your topology here, if it doesn't exist, it will be created, if - // it existed already and is not what you expect, then that's considered an - // error. + // Declare your topology here, if it doesn't exist, it will be created, if + // it existed already and is not what you expect, then that's considered an + // error. - // Use your connection on this topology with either Publish or Consume, or - // inspect your queues with QueueInspect. It's unwise to mix Publish and - // Consume to let TCP do its job well. + // Use your connection on this topology with either Publish or Consume, or + // inspect your queues with QueueInspect. It's unwise to mix Publish and + // Consume to let TCP do its job well. -SSL/TLS - Secure connections +# SSL/TLS - Secure connections When Dial encounters an amqps:// scheme, it will use the zero value of a tls.Config. This will only perform server certificate and host verification. @@ -104,5 +104,62 @@ encounters an amqp:// scheme. SSL/TLS in RabbitMQ is documented here: http://www.rabbitmq.com/ssl.html +# Best practises for Connection and Channel notifications: + +In order to be notified when a connection or channel gets closed, both +structures offer the possibility to register channels using +[Channel.NotifyClose] and [Connection.NotifyClose] functions: + + notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error)) + +No errors will be sent in case of a graceful connection close. In case of a +non-graceful closure due to e.g. network issue, or forced connection closure +from the Management UI, the error will be notified synchronously by the library. + +The error is sent synchronously to the channel, so that the flow will wait until +the receiver consumes from the channel. To avoid deadlocks in the library, it is +necessary to consume from the channels. This could be done inside a +different goroutine with a select listening on the two channels inside a for +loop like: + + go func() { + for notifyConnClose != nil || notifyChanClose != nil { + select { + case err, ok := <-notifyConnClose: + if !ok { + notifyConnClose = nil + } else { + fmt.Printf("connection closed, error %s", err) + } + case err, ok := <-notifyChanClose: + if !ok { + notifyChanClose = nil + } else { + fmt.Printf("channel closed, error %s", err) + } + } + } + }() + +Another approach is to use buffered channels: + + notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1)) + +The library sends to notification channels just once. After sending a notification +to all channels, the library closes all registered notification channels. After +receiving a notification, the application should create and register a new channel. + +# Best practises for NotifyPublish notifications: + +Using [Channel.NotifyPublish] allows the caller of the library to be notified, +through a go channel, when a message has been received and confirmed by the +broker. It's advisable to wait for all Confirmations to arrive before calling +[Channel.Close] or [Connection.Close]. It is also necessary to consume from this +channel until it gets closed. The library sends synchronously to the registered channel. +It is advisable to use a buffered channel, with capacity set to the maximum acceptable +number of unconfirmed messages. + +It is important to consume from the confirmation channel at all times, in order to avoid +deadlocks in the library. */ package amqp091 diff --git a/vendor/github.com/rabbitmq/amqp091-go/fuzz.go b/vendor/github.com/rabbitmq/amqp091-go/fuzz.go index 602220fcc3..c9f03ea4e6 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/fuzz.go +++ b/vendor/github.com/rabbitmq/amqp091-go/fuzz.go @@ -3,6 +3,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +//go:build gofuzz // +build gofuzz package amqp091 diff --git a/vendor/github.com/rabbitmq/amqp091-go/log.go b/vendor/github.com/rabbitmq/amqp091-go/log.go new file mode 100644 index 0000000000..7540f137af --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/log.go @@ -0,0 +1,23 @@ +// Copyright (c) 2022 VMware, Inc. or its affiliates. All Rights Reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package amqp091 + +type Logging interface { + Printf(format string, v ...interface{}) +} + +var Logger Logging = NullLogger{} + +// Enables logging using a custom Logging instance. Note that this is +// not thread safe and should be called at application start +func SetLogger(logger Logging) { + Logger = logger +} + +type NullLogger struct { +} + +func (l NullLogger) Printf(format string, v ...interface{}) { +} diff --git a/vendor/github.com/rabbitmq/amqp091-go/pre-commit b/vendor/github.com/rabbitmq/amqp091-go/pre-commit deleted file mode 100644 index 3715530073..0000000000 --- a/vendor/github.com/rabbitmq/amqp091-go/pre-commit +++ /dev/null @@ -1,67 +0,0 @@ -#!/bin/sh - -LATEST_STABLE_SUPPORTED_GO_VERSION="1.11" - -main() { - if local_go_version_is_latest_stable - then - run_gofmt - run_golint - run_govet - fi - run_unit_tests -} - -local_go_version_is_latest_stable() { - go version | grep -q $LATEST_STABLE_SUPPORTED_GO_VERSION -} - -log_error() { - echo "$*" 1>&2 -} - -run_gofmt() { - GOFMT_FILES=$(gofmt -l .) - if [ -n "$GOFMT_FILES" ] - then - log_error "gofmt failed for the following files: -$GOFMT_FILES - -please run 'gofmt -w .' on your changes before committing." - exit 1 - fi -} - -run_golint() { - GOLINT_ERRORS=$(golint ./... | grep -v "Id should be") - if [ -n "$GOLINT_ERRORS" ] - then - log_error "golint failed for the following reasons: -$GOLINT_ERRORS - -please run 'golint ./...' on your changes before committing." - exit 1 - fi -} - -run_govet() { - GOVET_ERRORS=$(go tool vet ./*.go 2>&1) - if [ -n "$GOVET_ERRORS" ] - then - log_error "go vet failed for the following reasons: -$GOVET_ERRORS - -please run 'go tool vet ./*.go' on your changes before committing." - exit 1 - fi -} - -run_unit_tests() { - if [ -z "$NOTEST" ] - then - log_error 'Running short tests...' - env AMQP_URL= go test -short - fi -} - -main diff --git a/vendor/github.com/rabbitmq/amqp091-go/read.go b/vendor/github.com/rabbitmq/amqp091-go/read.go index 57444b0648..a8bed13795 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/read.go +++ b/vendor/github.com/rabbitmq/amqp091-go/read.go @@ -17,26 +17,26 @@ import ( ReadFrame reads a frame from an input stream and returns an interface that can be cast into one of the following: - methodFrame - PropertiesFrame - bodyFrame - heartbeatFrame + methodFrame + PropertiesFrame + bodyFrame + heartbeatFrame 2.3.5 frame Details All frames consist of a header (7 octets), a payload of arbitrary size, and a 'frame-end' octet that detects malformed frames: - 0 1 3 7 size+7 size+8 - +------+---------+-------------+ +------------+ +-----------+ - | type | channel | size | | payload | | frame-end | - +------+---------+-------------+ +------------+ +-----------+ - octet short long size octets octet + 0 1 3 7 size+7 size+8 + +------+---------+-------------+ +------------+ +-----------+ + | type | channel | size | | payload | | frame-end | + +------+---------+-------------+ +------------+ +-----------+ + octet short long size octets octet To read a frame, we: - 1. Read the header and check the frame type and channel. - 2. Depending on the frame type, we read the payload and process it. - 3. Read the frame end octet. + 1. Read the header and check the frame type and channel. + 2. Depending on the frame type, we read the payload and process it. + 3. Read the frame end octet. In realistic implementations where performance is a concern, we would use “read-ahead buffering” or @@ -131,20 +131,6 @@ func readDecimal(r io.Reader) (v Decimal, err error) { return } -func readFloat32(r io.Reader) (v float32, err error) { - if err = binary.Read(r, binary.BigEndian, &v); err != nil { - return - } - return -} - -func readFloat64(r io.Reader) (v float64, err error) { - if err = binary.Read(r, binary.BigEndian, &v); err != nil { - return - } - return -} - func readTimestamp(r io.Reader) (v time.Time, err error) { var sec int64 if err = binary.Read(r, binary.BigEndian, &sec); err != nil { @@ -182,7 +168,7 @@ func readField(r io.Reader) (v interface{}, err error) { if err = binary.Read(r, binary.BigEndian, &value); err != nil { return } - return (value != 0), nil + return value != 0, nil case 'B': var value [1]byte @@ -268,12 +254,12 @@ func readField(r io.Reader) (v interface{}, err error) { } /* - Field tables are long strings that contain packed name-value pairs. The - name-value pairs are encoded as short string defining the name, and octet - defining the values type and then the value itself. The valid field types for - tables are an extension of the native integer, bit, string, and timestamp - types, and are shown in the grammar. Multi-octet integer fields are always - held in network byte order. +Field tables are long strings that contain packed name-value pairs. The +name-value pairs are encoded as short string defining the name, and octet +defining the values type and then the value itself. The valid field types for +tables are an extension of the native integer, bit, string, and timestamp +types, and are shown in the grammar. Multi-octet integer fields are always +held in network byte order. */ func readTable(r io.Reader) (table Table, err error) { var nested bytes.Buffer diff --git a/vendor/github.com/rabbitmq/amqp091-go/spec091.go b/vendor/github.com/rabbitmq/amqp091-go/spec091.go index 0261c15912..d86e753a95 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/spec091.go +++ b/vendor/github.com/rabbitmq/amqp091-go/spec091.go @@ -552,6 +552,66 @@ func (msg *connectionUnblocked) read(r io.Reader) (err error) { return } +type connectionUpdateSecret struct { + NewSecret string + Reason string +} + +func (msg *connectionUpdateSecret) id() (uint16, uint16) { + return 10, 70 +} + +func (msg *connectionUpdateSecret) wait() bool { + return true +} + +func (msg *connectionUpdateSecret) write(w io.Writer) (err error) { + + if err = writeLongstr(w, msg.NewSecret); err != nil { + return + } + + if err = writeShortstr(w, msg.Reason); err != nil { + return + } + + return +} + +func (msg *connectionUpdateSecret) read(r io.Reader) (err error) { + + if msg.NewSecret, err = readLongstr(r); err != nil { + return + } + + if msg.Reason, err = readShortstr(r); err != nil { + return + } + + return +} + +type connectionUpdateSecretOk struct { +} + +func (msg *connectionUpdateSecretOk) id() (uint16, uint16) { + return 10, 71 +} + +func (msg *connectionUpdateSecretOk) wait() bool { + return true +} + +func (msg *connectionUpdateSecretOk) write(w io.Writer) (err error) { + + return +} + +func (msg *connectionUpdateSecretOk) read(r io.Reader) (err error) { + + return +} + type channelOpen struct { reserved1 string } @@ -2852,6 +2912,22 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err } mf.Method = method + case 70: // connection update-secret + //fmt.Println("NextMethod: class:10 method:70") + method := &connectionUpdateSecret{} + if err = method.read(r.r); err != nil { + return + } + mf.Method = method + + case 71: // connection update-secret-ok + //fmt.Println("NextMethod: class:10 method:71") + method := &connectionUpdateSecretOk{} + if err = method.read(r.r); err != nil { + return + } + mf.Method = method + default: return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) } diff --git a/vendor/github.com/rabbitmq/amqp091-go/types.go b/vendor/github.com/rabbitmq/amqp091-go/types.go index 3319990947..e8d8986a69 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/types.go +++ b/vendor/github.com/rabbitmq/amqp091-go/types.go @@ -8,10 +8,11 @@ package amqp091 import ( "fmt" "io" - "sync" "time" ) +const DefaultExchange = "" + // Constants for standard AMQP 0-9-1 exchange types. const ( ExchangeDirect = "direct" @@ -62,6 +63,11 @@ var ( ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"} ) +// internal errors used inside the library +var ( + errInvalidTypeAssertion = &Error{Code: InternalError, Reason: "type assertion unsuccessful", Server: false, Recover: true} +) + // Error captures the code and reason a channel or connection has been closed // by the server. type Error struct { @@ -184,9 +190,10 @@ type Blocking struct { // allows users to directly correlate a publishing to a confirmation. These are // returned from PublishWithDeferredConfirm on Channels. type DeferredConfirmation struct { - wg sync.WaitGroup - DeliveryTag uint64 - confirmation Confirmation + DeliveryTag uint64 + + done chan struct{} + ack bool } // Confirmation notifies the acknowledgment or negative acknowledgement of a @@ -204,24 +211,87 @@ type Decimal struct { Value int32 } +// Most common queue argument keys in queue declaration. For a comprehensive list +// of queue arguments, visit [RabbitMQ Queue docs]. +// +// QueueTypeArg queue argument is used to declare quorum and stream queues. +// Accepted values are QueueTypeClassic (default), QueueTypeQuorum and +// QueueTypeStream. [Quorum Queues] accept (almost) all queue arguments as their +// Classic Queues counterparts. Check [feature comparison] docs for more +// information. +// +// Queues can define their [max length] using QueueMaxLenArg and +// QueueMaxLenBytesArg queue arguments. Overflow behaviour is set using +// QueueOverflowArg. Accepted values are QueueOverflowDropHead (default), +// QueueOverflowRejectPublish and QueueOverflowRejectPublishDLX. +// +// [Queue TTL] can be defined using QueueTTLArg. That is, the time-to-live for an +// unused queue. [Queue Message TTL] can be defined using QueueMessageTTLArg. +// This will set a time-to-live for **messages** in the queue. +// +// [Stream retention] can be configured using StreamMaxLenBytesArg, to set the +// maximum size of the stream. Please note that stream queues always keep, at +// least, one segment. [Stream retention] can also be set using StreamMaxAgeArg, +// to set time-based retention. Values are string with unit suffix. Valid +// suffixes are Y, M, D, h, m, s. E.g. "7D" for one week. The maximum segment +// size can be set using StreamMaxSegmentSizeBytesArg. The default value is +// 500_000_000 bytes ~= 500 megabytes +// +// [RabbitMQ Queue docs]: https://rabbitmq.com/queues.html +// [Stream retention]: https://rabbitmq.com/streams.html#retention +// [max length]: https://rabbitmq.com/maxlength.html +// [Queue TTL]: https://rabbitmq.com/ttl.html#queue-ttl +// [Queue Message TTL]: https://rabbitmq.com/ttl.html#per-queue-message-ttl +// [Quorum Queues]: https://rabbitmq.com/quorum-queues.html +// [feature comparison]: https://rabbitmq.com/quorum-queues.html#feature-comparison +const ( + QueueTypeArg = "x-queue-type" + QueueMaxLenArg = "x-max-length" + QueueMaxLenBytesArg = "x-max-length-bytes" + StreamMaxLenBytesArg = "x-max-length-bytes" + QueueOverflowArg = "x-overflow" + QueueMessageTTLArg = "x-message-ttl" + QueueTTLArg = "x-expires" + StreamMaxAgeArg = "x-max-age" + StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes" +) + +// Values for queue arguments. Use as values for queue arguments during queue declaration. +// The following argument table will create a classic queue, with max length set to 100 messages, +// and a queue TTL of 30 minutes. +// +// args := amqp.Table{ +// amqp.QueueTypeArg: QueueTypeClassic, +// amqp.QueueMaxLenArg: 100, +// amqp.QueueTTLArg: 1800000, +// } +const ( + QueueTypeClassic = "classic" + QueueTypeQuorum = "quorum" + QueueTypeStream = "stream" + QueueOverflowDropHead = "drop-head" + QueueOverflowRejectPublish = "reject-publish" + QueueOverflowRejectPublishDLX = "reject-publish-dlx" +) + // Table stores user supplied fields of the following types: // -// bool -// byte -// int8 -// float32 -// float64 -// int -// int16 -// int32 -// int64 -// nil -// string -// time.Time -// amqp.Decimal -// amqp.Table -// []byte -// []interface{} - containing above types +// bool +// byte +// int8 +// float32 +// float64 +// int +// int16 +// int32 +// int64 +// nil +// string +// time.Time +// amqp.Decimal +// amqp.Table +// []byte +// []interface{} - containing above types // // Functions taking a table will immediately fail when the table contains a // value of an unsupported type. @@ -232,7 +302,6 @@ type Decimal struct { // Use a type assertion when reading values from a table for type conversion. // // RabbitMQ expects int32 for integer values. -// type Table map[string]interface{} func validateField(f interface{}) error { @@ -265,17 +334,12 @@ func (t Table) Validate() error { return validateField(t) } -// Heap interface for maintaining delivery tags -type tagSet []uint64 - -func (set tagSet) Len() int { return len(set) } -func (set tagSet) Less(i, j int) bool { return (set)[i] < (set)[j] } -func (set tagSet) Swap(i, j int) { (set)[i], (set)[j] = (set)[j], (set)[i] } -func (set *tagSet) Push(tag interface{}) { *set = append(*set, tag.(uint64)) } -func (set *tagSet) Pop() interface{} { - val := (*set)[len(*set)-1] - *set = (*set)[:len(*set)-1] - return val +// Sets the connection name property. This property can be used in +// amqp.Config to set a custom connection name during amqp.DialConfig(). This +// can be helpful to identify specific connections in RabbitMQ, for debugging or +// tracing purposes. +func (t Table) SetClientConnectionName(connName string) { + t["connection_name"] = connName } type message interface { @@ -299,11 +363,11 @@ The base interface implemented as: All frames consist of a header (7 octets), a payload of arbitrary size, and a 'frame-end' octet that detects malformed frames: - 0 1 3 7 size+7 size+8 - +------+---------+-------------+ +------------+ +-----------+ - | type | channel | size | | payload | | frame-end | - +------+---------+-------------+ +------------+ +-----------+ - octet short long size octets octet + 0 1 3 7 size+7 size+8 + +------+---------+-------------+ +------------+ +-----------+ + | type | channel | size | | payload | | frame-end | + +------+---------+-------------+ +------------+ +-----------+ + octet short long size octets octet To read a frame, we: @@ -314,13 +378,24 @@ To read a frame, we: In realistic implementations where performance is a concern, we would use “read-ahead buffering” or “gathering reads” to avoid doing three separate system calls to read a frame. - */ type frame interface { write(io.Writer) error channel() uint16 } +/* +Perform any updates on the channel immediately after the frame is decoded while the +connection mutex is held. +*/ +func updateChannel(f frame, channel *Channel) { + if mf, isMethodFrame := f.(*methodFrame); isMethodFrame { + if _, isChannelClose := mf.Method.(*channelClose); isChannelClose { + channel.setClosed() + } + } +} + type reader struct { r io.Reader } @@ -345,17 +420,17 @@ func (protocolHeader) channel() uint16 { Method frames carry the high-level protocol commands (which we call "methods"). One method frame carries one command. The method frame payload has this format: - 0 2 4 - +----------+-----------+-------------- - - - | class-id | method-id | arguments... - +----------+-----------+-------------- - - - short short ... + 0 2 4 + +----------+-----------+-------------- - - + | class-id | method-id | arguments... + +----------+-----------+-------------- - - + short short ... To process a method frame, we: 1. Read the method frame payload. 2. Unpack it into a structure. A given method always has the same structure, - so we can unpack the method rapidly. 3. Check that the method is allowed in - the current context. + so we can unpack the method rapidly. 3. Check that the method is allowed in + the current context. 4. Check that the method arguments are valid. 5. Execute the method. @@ -394,11 +469,11 @@ follows it with a content header and zero or more content body frames. A content header frame has this format: - 0 2 4 12 14 - +----------+--------+-----------+----------------+------------- - - - | class-id | weight | body size | property flags | property list... - +----------+--------+-----------+----------------+------------- - - - short short long long short remainder... + 0 2 4 12 14 + +----------+--------+-----------+----------------+------------- - - + | class-id | weight | body size | property flags | property list... + +----------+--------+-----------+----------------+------------- - - + short short long long short remainder... We place content body in distinct frames (rather than including it in the method) so that AMQP may support "zero copy" techniques in which content is @@ -426,10 +501,10 @@ into several (or many) chunks, each forming a "content body frame". Looking at the frames for a specific channel, as they pass on the wire, we might see something like this: - [method] - [method] [header] [body] [body] - [method] - ... + [method] + [method] [header] [body] [body] + [method] + ... */ type bodyFrame struct { ChannelId uint16 diff --git a/vendor/github.com/rabbitmq/amqp091-go/uri.go b/vendor/github.com/rabbitmq/amqp091-go/uri.go index f50abe0ae7..87ef09e6fe 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/uri.go +++ b/vendor/github.com/rabbitmq/amqp091-go/uri.go @@ -32,12 +32,16 @@ var defaultURI = URI{ // URI represents a parsed AMQP URI string. type URI struct { - Scheme string - Host string - Port int - Username string - Password string - Vhost string + Scheme string + Host string + Port int + Username string + Password string + Vhost string + CertFile string // client TLS auth - path to certificate (PEM) + CACertFile string // client TLS auth - path to CA certificate (PEM) + KeyFile string // client TLS auth - path to private key (PEM) + ServerName string // client TLS auth - server name } // ParseURI attempts to parse the given AMQP URI according to the spec. @@ -45,17 +49,28 @@ type URI struct { // // Default values for the fields are: // -// Scheme: amqp -// Host: localhost -// Port: 5672 -// Username: guest -// Password: guest -// Vhost: / +// Scheme: amqp +// Host: localhost +// Port: 5672 +// Username: guest +// Password: guest +// Vhost: / // +// Supports TLS query parameters. See https://www.rabbitmq.com/uri-query-parameters.html +// +// certfile: +// keyfile: +// cacertfile: +// server_name_indication: +// +// If cacertfile is not provided, system CA certificates will be used. +// Mutual TLS (client auth) will be enabled only in case keyfile AND certfile provided. +// +// If Config.TLSClientConfig is set, TLS parameters from URI will be ignored. func ParseURI(uri string) (URI, error) { builder := defaultURI - if strings.Contains(uri, " ") == true { + if strings.Contains(uri, " ") { return builder, errURIWhitespace } @@ -113,6 +128,13 @@ func ParseURI(uri string) (URI, error) { } } + // see https://www.rabbitmq.com/uri-query-parameters.html + params := u.Query() + builder.CertFile = params.Get("certfile") + builder.KeyFile = params.Get("keyfile") + builder.CACertFile = params.Get("cacertfile") + builder.ServerName = params.Get("server_name_indication") + return builder, nil } @@ -150,8 +172,6 @@ func (uri URI) String() string { } } - authority.Host = net.JoinHostPort(uri.Host, strconv.Itoa(uri.Port)) - if defaultPort, found := schemePorts[uri.Scheme]; !found || defaultPort != uri.Port { authority.Host = net.JoinHostPort(uri.Host, strconv.Itoa(uri.Port)) } else { diff --git a/vendor/github.com/rabbitmq/amqp091-go/write.go b/vendor/github.com/rabbitmq/amqp091-go/write.go index e7307d2222..d0011f86c4 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/write.go +++ b/vendor/github.com/rabbitmq/amqp091-go/write.go @@ -15,6 +15,11 @@ import ( "time" ) +func (w *writer) WriteFrameNoFlush(frame frame) (err error) { + err = frame.write(w.w) + return +} + func (w *writer) WriteFrame(frame frame) (err error) { if err = frame.write(w.w); err != nil { return @@ -63,8 +68,8 @@ func (f *heartbeatFrame) write(w io.Writer) (err error) { // +----------+--------+-----------+----------------+------------- - - // | class-id | weight | body size | property flags | property list... // +----------+--------+-----------+----------------+------------- - - -// short short long long short remainder... // +// short short long long short remainder... func (f *headerFrame) write(w io.Writer) (err error) { var payload bytes.Buffer var zeroTime time.Time @@ -418,5 +423,5 @@ func writeTable(w io.Writer, table Table) (err error) { } } - return writeLongstr(w, string(buf.Bytes())) + return writeLongstr(w, buf.String()) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 9696a822fe..f2a75772b7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -264,8 +264,8 @@ github.com/prometheus/procfs/internal/util ## explicit; go 1.13 github.com/prometheus/statsd_exporter/pkg/mapper github.com/prometheus/statsd_exporter/pkg/mapper/fsm -# github.com/rabbitmq/amqp091-go v1.3.0 -## explicit; go 1.15 +# github.com/rabbitmq/amqp091-go v1.8.1 +## explicit; go 1.16 github.com/rabbitmq/amqp091-go # github.com/rabbitmq/cluster-operator v1.13.1 ## explicit; go 1.17