-
Notifications
You must be signed in to change notification settings - Fork 0
/
publish_flow_block.go
44 lines (39 loc) · 1.52 KB
/
publish_flow_block.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package rabbitmq
import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
)
func (publisher *Publisher) startNotifyFlowHandler(ctx context.Context) {
notifyFlowChan := publisher.chanManager.NotifyFlowSafe(make(chan bool))
publisher.disablePublishDueToFlowMux.Lock()
publisher.disablePublishDueToFlow = false
publisher.disablePublishDueToFlowMux.Unlock()
for ok := range notifyFlowChan {
publisher.disablePublishDueToFlowMux.Lock()
if ok {
publisher.options.Logger.Warningf(ctx, "pausing publishing due to flow request from server")
publisher.disablePublishDueToFlow = true
} else {
publisher.disablePublishDueToFlow = false
publisher.options.Logger.Warningf(ctx, "resuming publishing due to flow request from server")
}
publisher.disablePublishDueToFlowMux.Unlock()
}
}
func (publisher *Publisher) startNotifyBlockedHandler(ctx context.Context) {
blockings := publisher.connManager.NotifyBlockedSafe(make(chan amqp.Blocking))
publisher.disablePublishDueToBlockedMux.Lock()
publisher.disablePublishDueToBlocked = false
publisher.disablePublishDueToBlockedMux.Unlock()
for b := range blockings {
publisher.disablePublishDueToBlockedMux.Lock()
if b.Active {
publisher.options.Logger.Warningf(ctx, "pausing publishing due to TCP blocking from server")
publisher.disablePublishDueToBlocked = true
} else {
publisher.disablePublishDueToBlocked = false
publisher.options.Logger.Warningf(ctx, "resuming publishing due to TCP blocking from server")
}
publisher.disablePublishDueToBlockedMux.Unlock()
}
}