-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrpc_client.go
700 lines (583 loc) · 20.9 KB
/
rpc_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
package rabbitmq
import (
"context"
"errors"
"fmt"
"github.com/google/uuid"
"github.com/jxo-me/rabbitmq-go/internal/channelmanager"
"github.com/jxo-me/rabbitmq-go/internal/connectionmanager"
amqp "github.com/rabbitmq/amqp091-go"
"sync"
"sync/atomic"
"time"
)
const (
// chanSendWaitTime is the maximum time we will wait when sending a
// response, confirm or error on the corresponding channels. This is so that
// we won't block forever if the listening goroutine has stopped listening.
chanSendWaitTime = 10 * time.Second
)
type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)
type RpcClient struct {
conn *Conn
chanManager *channelmanager.ChannelManager
connManager *connectionmanager.ConnectionManager
disablePublishDueToFlow bool
disablePublishDueToFlowMux *sync.RWMutex
disablePublishDueToBlocked bool
disablePublishDueToBlockedMux *sync.RWMutex
options ClientOptions
log Logger
// timeout is the time we should wait after a request is published before
// we assume the request got lost.
timeout time.Duration
// maxRetries is the amount of times a request will be retried before
// giving up.
maxRetries int
// requests is a single channel used whenever we want to publish a message.
// The channel is consumed in a separate go routine which allows us to add
// messages to the channel that we don't want replies from without the need
// to wait for on going requests.
requests chan *Request
// replyToQueueName can be used to avoid generating queue names on the
// message bus and use a pre-defined name throughout the usage of a client.
replyToQueueName string
// middlewares holds slice of middlewares to run before or after the client
// sends a request.
middlewares []ClientMiddlewareFunc
// stopChan channel is used to signal shutdowns when calling Stop(). The
// channel will be closed when Stop() is called.
stopChan chan struct{}
// didStopChan will close when the client has finished shutdown.
didStopChan chan struct{}
// isRunning is one when the server is running.
isRunning int32
// wantStop tells the runForever function to exit even on connection errors.
wantStop int32
// requestsMap will keep track of requests waiting for confirmations,
// replies or returns. it maps each correlation ID and delivery tag of a
// message to the actual Request. This is to ensure that no matter the order
// of a request and response, we will always publish the response to the
// correct consumer.
requestsMap RequestMap
// Sender is the main send function called after all middlewares has been
// chained and called.
// This field can be overridden to simplify testing.
Sender SendFunc
// onStarted will all be executed after the client has connected.
onStarted []OnStartedFunc
}
/*
OnStarted can be used to hook into the connections/channels that the client is
using. This can be useful if you want more control over amqp directly.
Note that since the client is lazy and won't connect until the first .Send()
the provided OnStartedFunc won't be called until then. Also note that this
is blocking and the client won't continue its startup until this function has
finished executing.
client := NewClient(url)
client.OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) {
// Do something with amqp connections/channels.
})
*/
func (c *RpcClient) OnStarted(f OnStartedFunc) {
c.onStarted = append(c.onStarted, f)
}
// respondErrorToRequest will return the provided response to the caller.
func (c *RpcClient) respondToRequest(ctx context.Context, request *Request, response *amqp.Delivery) {
select {
case request.response <- response:
return
case <-time.After(chanSendWaitTime):
c.log.Errorf(ctx,
"client: nobody is waiting for response on: %s, response: %s",
stringifyRequestForLog(request),
stringifyDeliveryForLog(response),
)
}
}
// respondErrorToRequest will return the provided error to the caller.
func (c *RpcClient) respondErrorToRequest(ctx context.Context, request *Request, err error) {
select {
case request.errChan <- err:
return
case <-time.After(chanSendWaitTime):
c.log.Errorf(ctx,
"nobody is waiting for error on: %s, error: %s",
stringifyRequestForLog(request),
err.Error(),
)
}
}
// confirmRequest will mark the provided request as confirmed by the amqp
// server.
func (c *RpcClient) confirmRequest(ctx context.Context, request *Request) {
select {
case request.confirmed <- struct{}{}:
return
case <-time.After(chanSendWaitTime):
c.log.Errorf(ctx, "nobody is waiting for confirmation on: %s", stringifyRequestForLog(request))
}
}
// retryRequest will retry the provided request, unless the request already
// has been retried too many times.
// Then the provided error will be sent to the
// caller instead.
func (c *RpcClient) retryRequest(ctx context.Context, request *Request, err error) {
if request.numRetries >= c.maxRetries {
// We have already retried too many times
c.log.Errorf(ctx,
"client: could not publish, giving up: reason: %s, %s",
err.Error(),
stringifyRequestForLog(request),
)
// We shouldn't wait for confirmations anymore because they will never
// arrive.
c.confirmRequest(ctx, request)
// Return whatever error .Publish returned to the caller.
c.respondErrorToRequest(ctx, request, err)
return
}
request.numRetries++
go func() {
c.log.Debugf(ctx, "client: queuing request for retry: reason: %s, %s", err.Error(), stringifyRequestForLog(request))
select {
case c.requests <- request:
case <-request.AfterTimeout():
c.log.Errorf(ctx,
"client: request timed out while waiting for retry reason: %s, %s",
err.Error(),
stringifyRequestForLog(request),
)
}
}()
}
// runPublisher consumes messages from chan requests and publishes them on the
// amqp exchange.
// The method will stop consuming if the underlying amqp channel
// is closed for any reason, and when this happens, the messages will be put back
// in chan requests unless we have retried to many times.
func (c *RpcClient) runPublisher(ctx context.Context, ouputChan *amqp.Channel) {
c.log.Debugf(ctx, "client: running publisher...")
// Monitor the closing of this channel.
// We need to do this in a separate,
// goroutine to ensure we won't get a deadlock inside the select below
// which can itself close this channel.
onClose := make(chan struct{})
go func() {
<-ouputChan.NotifyClose(make(chan *amqp.Error))
close(onClose)
}()
// Delivery tags always start at 1, but we increase it before we do any
// .Publish() on the channel.
nextDeliveryTag := uint64(0)
for {
select {
case <-onClose:
// The channels for publishing responses were closed, once the
// client has started again.
// This loop will be restarted.
c.log.Debugf(ctx, "client: publisher stopped after the channel was closed")
return
case request := <-c.requests:
// Set the ReplyTo if needed, or ensure it's empty if it's not.
if request.Reply {
request.Publishing.ReplyTo = c.replyToQueueName
} else {
request.Publishing.ReplyTo = ""
}
c.log.Debugf(ctx, "client: publishing %s", request.Publishing.CorrelationId)
// Set up the delivery tag for this request.
nextDeliveryTag++
request.deliveryTag = nextDeliveryTag
// Ensure the replies, returns and confirms; consumers can get a hold
// of this request once they come in.
c.requestsMap.Set(request)
err := ouputChan.PublishWithContext(
context.Background(),
request.Exchange,
request.RoutingKey,
true,
false,
request.Publishing,
)
if err != nil {
_ = ouputChan.Close()
c.retryRequest(ctx, request, err)
c.log.Errorf(ctx,
"client: publisher stopped because of error: %s, request: %s",
err.Error(),
stringifyRequestForLog(request),
)
return
}
if !c.options.ConfirmMode {
// We're not in confirm mode, so we confirm that we have sent
// the request here.
c.confirmRequest(ctx, request)
if !request.Reply {
// Since we won't get a confirmation of this request, and
// we don't want to have a reply, just return nil to the
// caller.
c.respondToRequest(ctx, request, nil)
}
}
}
}
}
// runConfirmsConsumer will consume both confirmations and returns, and since
// returns always arrive before confirmations, we want to finish handling any
// return before we handle any confirmations.
func (c *RpcClient) runConfirmsConsumer(ctx context.Context, confirms chan amqp.Confirmation, returns chan amqp.Return) {
for {
select {
case ret, ok := <-returns:
if !ok {
return
}
request, ok := c.requestsMap.GetByCorrelationID(ret.CorrelationId)
if !ok {
// This could happen if we stop waiting for requests to return due
// to a timeout.
// But since returns are normally rapid, that
// would mean that something isn't quite right on the amqp server.
c.log.Errorf(ctx, "client: got return for unknown request: %s", stringifyReturnForLog(ret))
continue
}
c.log.Debugf(ctx, "client: publishing is returned by server: %s", ret.CorrelationId)
request.returned = &ret
case confirm, ok := <-confirms:
if !ok {
return
}
request, ok := c.requestsMap.GetByDeliveryTag(confirm.DeliveryTag)
if !ok {
// This could happen if we stop waiting for requests to return due
// to a timeout. But since confirmations are normally rapid, that
// would mean that something isn't quite right on the amqp server.
// Unfortunately, there isn't any way of getting more information
// than the delivery tag from a confirmation.
c.log.Errorf(ctx, "client: got confirmation of unknown request: %d", confirm.DeliveryTag)
continue
}
c.log.Debugf(ctx, "client: confirming request %s", request.Publishing.CorrelationId)
c.confirmRequest(ctx, request)
if !confirm.Ack {
c.respondErrorToRequest(ctx, request, ErrRequestRejected)
// Doesn't matter if the request wants the nil reply below because
// we gave it an error instead.
continue
}
// Check if the request was also returned.
if request.returned != nil {
c.respondErrorToRequest(
ctx,
request,
fmt.Errorf("%w: %d, %s",
ErrRequestReturned,
request.returned.ReplyCode,
request.returned.ReplyText,
),
)
continue
}
if !request.Reply {
// The request isn't expecting a reply, so we need give a nil
// response instead to signal that we're done.
c.log.Debugf(ctx,
"client: sending nil response after confirmation due to no reply wanted %s",
request.Publishing.CorrelationId,
)
c.respondToRequest(ctx, request, nil)
}
}
}
}
// runRepliesConsumer will declare and start consuming from the queue where we
// expect replies to come back.
// The method will stop consuming if the
// underlying amqp channel is closed for any reason.
func (c *RpcClient) runRepliesConsumer(ctx context.Context, inChan *amqp.Channel) error {
queue, err := inChan.QueueDeclare(
c.replyToQueueName,
c.options.QueueOptions.Durable,
c.options.QueueOptions.AutoDelete,
c.options.QueueOptions.Exclusive,
c.options.QueueOptions.NoWait, // no-wait.
tableToAMQPTable(c.options.QueueOptions.Args),
)
if err != nil {
return err
}
messages, err := inChan.Consume(
queue.Name,
c.options.ConsumeOptions.Name,
c.options.ConsumeOptions.AutoAck,
c.options.ConsumeOptions.Exclusive,
c.options.ConsumeOptions.NoLocal, // no-local.
c.options.ConsumeOptions.NoWait, // no-wait.
tableToAMQPTable(c.options.ConsumeOptions.Args),
)
if err != nil {
return err
}
go func() {
c.log.Debugf(ctx, "client: running replies consumer...")
for response := range messages {
request, ok := c.requestsMap.GetByCorrelationID(response.CorrelationId)
if !ok {
c.log.Errorf(ctx,
"client: could not find where to reply. CorrelationId: %s",
stringifyDeliveryForLog(&response),
)
continue
}
c.log.Debugf(ctx, "client: forwarding reply %s", response.CorrelationId)
responseCopy := response
select {
case request.response <- &responseCopy:
case <-time.After(chanSendWaitTime):
c.log.Errorf(ctx, "client: could not send to reply response chan: %s", stringifyRequestForLog(request))
}
}
c.log.Debugf(ctx, "client: replies consumer is done")
}()
return nil
}
// AddMiddleware will add a middleware which will be executed on request.
func (c *RpcClient) AddMiddleware(m ClientMiddlewareFunc) *RpcClient {
c.middlewares = append(c.middlewares, m)
return c
}
// runOnce will connect amqp, set up all the amqp channels, run the publisher
// and run the reply consumer.
// The method will also return the underlying
// amqp error if the underlying connection or socket isn't gracefully closed.
// It will also block until the connection is gone.
func (c *RpcClient) runOnce(ctx context.Context) error {
c.log.Debugf(ctx, "client: starting up...")
inputConn, outputConn, err := createConnections(ctx, c.connManager)
if err != nil {
return err
}
defer func() { _ = inputConn.Close() }()
defer func() { _ = outputConn.Close() }()
inputCh, outputCh, err := createChannels(inputConn, outputConn)
if err != nil {
return err
}
defer func() { _ = inputCh.Close() }()
defer func() { _ = outputCh.Close() }()
// Notify everyone that the client has started.
// Runs sequentially, so there
// aren't any race conditions when working with the connections or channels.
for _, onStarted := range c.onStarted {
onStarted(inputConn, outputConn, inputCh, outputCh)
}
err = c.runRepliesConsumer(ctx, inputCh)
if err != nil {
return err
}
if c.options.ConfirmMode {
// ConfirmMode is wanted, tell the amqp-server that we want to enable
// confirm-mode on this channel and start the confirmation consumer.
err = outputCh.Confirm(
false, // no-wait.
)
if err != nil {
return err
}
go c.runConfirmsConsumer(ctx,
outputCh.NotifyPublish(make(chan amqp.Confirmation)),
outputCh.NotifyReturn(make(chan amqp.Return)),
)
}
go c.runPublisher(ctx, outputCh)
err = monitorAndWait(
c.stopChan,
inputConn.NotifyClose(make(chan *amqp.Error)),
outputConn.NotifyClose(make(chan *amqp.Error)),
inputCh.NotifyClose(make(chan *amqp.Error)),
outputCh.NotifyClose(make(chan *amqp.Error)),
)
if err != nil {
return err
}
return nil
}
func (c *RpcClient) runForever(ctx context.Context) {
if !atomic.CompareAndSwapInt32(&c.isRunning, 0, 1) {
// Already running.
return
}
// Always assume that we don't want to stop initially.
atomic.StoreInt32(&c.wantStop, 0)
c.stopChan = make(chan struct{})
c.didStopChan = make(chan struct{})
go func() {
for {
c.log.Debugf(ctx, "client: connecting...")
err := c.runOnce(ctx)
if err == nil {
c.log.Debugf(ctx, "client: finished gracefully")
break
}
if atomic.LoadInt32(&c.wantStop) == 1 {
c.log.Debugf(ctx, "client: finished with error %s", err.Error())
break
}
c.log.Errorf(ctx, "client: got error: %s, will reconnect in %v second(s)", err, 0.5)
time.Sleep(500 * time.Millisecond)
}
// Tell c.Close() that we have finished shutdown and that it can return.
close(c.didStopChan)
// Ensure we can start again.
atomic.StoreInt32(&c.isRunning, 0)
}()
}
// Send will send a Request by using an amqp.Publishing.
func (c *RpcClient) Send(r *Request) (*amqp.Delivery, error) {
//nolint:critic // We don't want to overwrite any slice, so it's
// intentional to store an appended result in new slice.
middlewares := append(c.middlewares, r.middlewares...)
return ClientMiddlewareChain(c.Sender, middlewares...)(r.Context, r)
}
func (c *RpcClient) send(ctx context.Context, r *Request) (*amqp.Delivery, error) {
// Ensure that the publisher is running.
c.runForever(ctx)
// This is where we get the responses back.
// If this request doesn't want a reply back (by setting Reply to false)
// this channel will get a nil message after the publisher has Published the
// message.
r.response = make(chan *amqp.Delivery)
// This channel is sent to when the request is confirmed. This can happen
// both when confirm-mode is set. And if not set, it's automatically
// confirmed once the request is published.
r.confirmed = make(chan struct{})
// This is where we get any (client) errors if they occur before we could
// even send the request.
r.errChan = make(chan error)
// Set the correlation id on the publishing if not yet set.
if r.Publishing.CorrelationId == "" {
r.Publishing.CorrelationId = uuid.New().String()
}
defer c.requestsMap.Delete(r)
r.startTimeout(c.timeout)
timeoutChan := r.AfterTimeout()
c.log.Debugf(ctx, "client: queuing request %s", r.Publishing.CorrelationId)
select {
case c.requests <- r:
// successful send.
case <-timeoutChan:
c.log.Debugf(ctx, "client: timeout while waiting for request queue %s", r.Publishing.CorrelationId)
return nil, fmt.Errorf("%w while waiting for request queue", ErrRequestTimeout)
}
c.log.Debugf(ctx, "client: waiting for reply of %s", r.Publishing.CorrelationId)
// We the app froze here until the request has been published (or when confirm-mode
// is on; confirmed).
select {
case <-r.confirmed:
// got confirmation.
case <-timeoutChan:
c.log.Debugf(ctx, "client: timeout while waiting for request confirmation %s", r.Publishing.CorrelationId)
return nil, fmt.Errorf("%w while waiting for confirmation", ErrRequestTimeout)
}
// All responses are published on the request response channel.
// The app froze here
// until a response is received and closes the channel when it's read.
select {
case err := <-r.errChan:
c.log.Debugf(ctx, "client: error for %s, %s", r.Publishing.CorrelationId, err.Error())
return nil, err
case <-timeoutChan:
c.log.Debugf(ctx, "client: timeout for %s", r.Publishing.CorrelationId)
return nil, fmt.Errorf("%w while waiting for response", ErrRequestTimeout)
case delivery := <-r.response:
c.log.Debugf(ctx, "client: got delivery for %s", r.Publishing.CorrelationId)
return delivery, nil
}
}
// Stop will gracefully disconnect from AMQP. It is not guaranteed that all
// in flight requests or responses are handled before the disconnect. Instead,
// the user should ensure that all calls to c.Send() has returned before calling
// c.Stop().
func (c *RpcClient) Stop() {
if atomic.LoadInt32(&c.isRunning) != 1 {
return
}
atomic.StoreInt32(&c.wantStop, 1)
close(c.stopChan)
<-c.didStopChan
}
func NewRpcClient(ctx context.Context, conn *Conn, optionFuncs ...func(*ClientOptions)) (*RpcClient, error) {
replyQueueName := "rpc-reply-to-" + uuid.New().String()
defaultOptions := getDefaultClientOptions(replyQueueName)
options := &defaultOptions
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if conn.connectionManager == nil {
return nil, errors.New("connection manager can't be nil")
}
chanManager, err := channelmanager.NewChannelManager(ctx, conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval)
if err != nil {
return nil, err
}
rpcClient := &RpcClient{
conn: conn,
chanManager: chanManager,
connManager: conn.connectionManager,
disablePublishDueToFlow: false,
disablePublishDueToFlowMux: &sync.RWMutex{},
disablePublishDueToBlocked: false,
disablePublishDueToBlockedMux: &sync.RWMutex{},
options: *options,
log: options.Logger,
maxRetries: 10,
requests: make(chan *Request),
requestsMap: RequestMap{
byDeliveryTag: make(map[uint64]*Request),
byCorrelationID: make(map[string]*Request),
},
middlewares: []ClientMiddlewareFunc{},
timeout: time.Second * 10,
replyToQueueName: replyQueueName,
}
rpcClient.Sender = rpcClient.send
return rpcClient, nil
}
// PublishWithContext rpc request the provided data to the given routing key over the connection.
func (c *RpcClient) PublishWithContext(
ctx context.Context,
data []byte,
routingKey string,
) ([]byte, error) {
c.disablePublishDueToFlowMux.RLock()
defer c.disablePublishDueToFlowMux.RUnlock()
if c.disablePublishDueToFlow {
return nil, fmt.Errorf("publishing blocked due to high flow on the server")
}
c.disablePublishDueToBlockedMux.RLock()
defer c.disablePublishDueToBlockedMux.RUnlock()
if c.disablePublishDueToBlocked {
return nil, fmt.Errorf("publishing blocked due to TCP block on the server")
}
request := NewRequest(ctx).WithRoutingKey(routingKey).WithBody(data)
response, err := c.Send(request)
if err != nil {
return nil, err
}
return response.Body, err
}
// Close closes the publisher and releases resources
// The publisher should be discarded as it's not safe for re-use
// Only call Close() once
func (c *RpcClient) Close(ctx context.Context) {
c.Stop()
// close the channel so that rabbitmq server knows that the
// publisher has been stopped.
err := c.chanManager.Close(ctx)
if err != nil {
c.options.Logger.Warningf(ctx, "error while closing the channel: %v", err)
}
c.options.Logger.Infof(ctx, "closing publisher...")
}