-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathAmqpConsumer.cs
259 lines (223 loc) · 9.66 KB
/
AmqpConsumer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
using Amqp.Types;
namespace RabbitMQ.AMQP.Client.Impl
{
public class AmqpConsumer : AbstractReconnectLifeCycle, IConsumer
{
private enum PauseStatus
{
UNPAUSED,
PAUSING,
PAUSED,
}
private readonly AmqpConnection _connection;
private readonly string _address;
private readonly MessageHandler _messageHandler;
private readonly int _initialCredits;
private readonly Map _filters;
private readonly Guid _id = Guid.NewGuid();
private ReceiverLink? _receiverLink;
private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
public AmqpConsumer(AmqpConnection connection, string address,
MessageHandler messageHandler, int initialCredits, Map filters)
{
_connection = connection;
_address = address;
_messageHandler = messageHandler;
_initialCredits = initialCredits;
_filters = filters;
if (false == _connection.Consumers.TryAdd(_id, this))
{
// TODO error?
}
}
public override async Task OpenAsync()
{
try
{
TaskCompletionSource<ReceiverLink> attachCompletedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, _id, _filters);
void onAttached(ILink argLink, Attach argAttach)
{
if (argLink is ReceiverLink link)
{
attachCompletedTcs.SetResult(link);
}
else
{
// TODO create "internal bug" exception type?
var ex = new InvalidOperationException(
"invalid link in onAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
attachCompletedTcs.SetException(ex);
}
}
ReceiverLink? tmpReceiverLink = null;
Task receiverLinkTask = Task.Run(async () =>
{
Session session = await _connection._nativePubSubSessions.GetOrCreateSessionAsync()
.ConfigureAwait(false);
tmpReceiverLink = new ReceiverLink(session, _id.ToString(), attach, onAttached);
});
// TODO configurable timeout
TimeSpan waitSpan = TimeSpan.FromSeconds(5);
_receiverLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
.ConfigureAwait(false);
await receiverLinkTask.WaitAsync(waitSpan)
.ConfigureAwait(false);
System.Diagnostics.Debug.Assert(tmpReceiverLink != null);
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(_receiverLink, tmpReceiverLink));
if (_receiverLink is null)
{
throw new ConsumerException($"{ToString()} Failed to create receiver link (null was returned)");
}
else if (_receiverLink.LinkState != LinkState.Attached)
{
throw new ConsumerException(
$"{ToString()} Failed to create receiver link. Link state is not attached, error: " +
_receiverLink.Error?.ToString() ?? "Unknown error");
}
else
{
_receiverLink.SetCredit(_initialCredits);
// TODO save / cancel task
_ = Task.Run(ProcessMessages);
await base.OpenAsync()
.ConfigureAwait(false);
}
}
catch (Exception e)
{
throw new ConsumerException($"{ToString()} Failed to create receiver link, {e}");
}
}
private async Task ProcessMessages()
{
try
{
if (_receiverLink is null)
{
// TODO is this a serious error?
return;
}
while (_receiverLink is { LinkState: LinkState.Attached })
{
// TODO the timeout waiting for messages should be configurable
TimeSpan timeout = TimeSpan.FromSeconds(60);
Message? nativeMessage = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false);
if (nativeMessage is null)
{
// this is not a problem, it is just a timeout.
// the timeout is set to 60 seconds.
// For the moment I'd trace it at some point we can remove it
Trace.WriteLine(TraceLevel.Verbose,
$"{ToString()}: Timeout {timeout.Seconds} s.. waiting for message.");
continue;
}
_unsettledMessageCounter.Increment();
IContext context = new DeliveryContext(_receiverLink, nativeMessage, _unsettledMessageCounter);
var amqpMessage = new AmqpMessage(nativeMessage);
// TODO catch exceptions thrown by handlers,
// then call exception handler?
await _messageHandler(context, amqpMessage).ConfigureAwait(false);
}
}
catch (Exception e)
{
if (State == State.Closing)
{
return;
}
Trace.WriteLine(TraceLevel.Error, $"{ToString()} Failed to process messages, {e}");
// TODO this is where a Listener should get a closed event
// See the ConsumerShouldBeClosedWhenQueueIsDeleted test
}
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed.");
}
public void Pause()
{
if (_receiverLink is null)
{
// TODO create "internal bug" exception type?
throw new InvalidOperationException("_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
if ((int)PauseStatus.UNPAUSED == Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
(int)PauseStatus.PAUSING, (int)PauseStatus.UNPAUSED))
{
_receiverLink.SetCredit(credit: 0);
if ((int)PauseStatus.PAUSING != Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
(int)PauseStatus.PAUSED, (int)PauseStatus.PAUSING))
{
_pauseStatus = PauseStatus.UNPAUSED;
// TODO create "internal bug" exception type?
throw new InvalidOperationException("error transitioning from PAUSING -> PAUSED, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
}
else
{
// TODO: log a warning that user tried to pause an already-paused consumer?
}
}
public long UnsettledMessageCount
{
get
{
return _unsettledMessageCounter.Get();
}
}
public void Unpause()
{
if (_receiverLink is null)
{
// TODO create "internal bug" exception type?
throw new InvalidOperationException("_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
if ((int)PauseStatus.PAUSED == Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
(int)PauseStatus.UNPAUSED, (int)PauseStatus.PAUSED))
{
_receiverLink.SetCredit(credit: _initialCredits);
}
else
{
// TODO: log a warning that user tried to unpause a not-paused consumer?
}
}
// TODO cancellation token
public override async Task CloseAsync()
{
if (_receiverLink is null)
{
return;
}
OnNewStatus(State.Closing, null);
try
{
// TODO global timeout for closing, other async actions?
await _receiverLink.CloseAsync(TimeSpan.FromSeconds(5))
.ConfigureAwait(false);
}
catch (Exception ex)
{
Trace.WriteLine(TraceLevel.Warning, "Failed to close receiver link. The consumer will be closed anyway", ex);
}
_receiverLink = null;
OnNewStatus(State.Closed, null);
_connection.Consumers.TryRemove(_id, out _);
}
public override string ToString()
{
return $"Consumer{{Address='{_address}', " +
$"id={_id}, " +
$"Connection='{_connection}', " +
$"State='{State}'}}";
}
}
}