Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Improve the current implementation of Sender #3375

Closed
amarzavery opened this issue May 30, 2018 · 11 comments
Closed

[Event Hubs] Improve the current implementation of Sender #3375

amarzavery opened this issue May 30, 2018 · 11 comments
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. Event Hubs
Milestone

Comments

@amarzavery
Copy link
Contributor

The current sender has a promise wrapper around the rhea sender which is based on event emitter pattern. Every time a message is sent, event handlers are added and removed and the promise is resolved or rejected based on the type of event received. This design works well as long as the customer is awaiting on the previous send (i.e an eventhub client is sending one message at a time).

If too many messages are sent, then this causes the node.js error too many listeners attached, by default only 11 (max) are allowed.

A better appraoch would be add all the event handlers at the time of creating the sender link and then maintaining an internal map of the messages being sent. The promises would be resolved or rejected based on the type of event received.
Things that make this approach a little complex are:

  • rhea internally has a circular buffer that throttles the number of messages being sent. If the messages cannot be sent then we need to wait for sometime before attempting to send them.
  • How do we handle retry attempts?
@amarzavery amarzavery assigned ghost Jun 14, 2018
@ghost ghost removed their assignment Oct 24, 2018
@ramya-rao-a ramya-rao-a transferred this issue from Azure/azure-event-hubs-node Jun 2, 2019
@ramya-rao-a ramya-rao-a changed the title Improve the current implementation of Sender [Event Hubs] Improve the current implementation of Sender Jun 2, 2019
@ramya-rao-a ramya-rao-a added Client This issue points to a problem in the data-plane of the library. Event Hubs labels Jun 2, 2019
@ramya-rao-a ramya-rao-a added this to the Sprint 155 milestone Jun 2, 2019
@ramya-rao-a
Copy link
Contributor

ramya-rao-a commented Jun 26, 2019

This can be done using the new awaitable sender introduced in amqp/rhea-promise#44 right?

@amarzavery
Copy link
Contributor Author

amarzavery commented Jun 26, 2019

Yes, it should be possible to do that. Notable differences in

Catching Errors and retrying the send

The current EventHubsSender or ServiceBusSender has the retry logic baked in, which the rhea-promise AwaitableSender does not have. You can call await AwaitableSender.send() and if the promise is rejected then retry in the EH/SB sender. We are sending the SenderBusyError in EH and SB which is then translated into an AmqpError. The SenderBusyError is a retryable Error. This information is then used by the retry config to determine whether it should make an attempt to send the message again. However, in rhea-promise it throws InsufficientCreditError. In the wrapper method you will have to catch this and covert it to SenderBusyError or may be add support in amqp-common to treat InsufficientCreditError as a retryable error.

In actionAfterTimeout, in SB and EH we throw ServiceUnavailableError (retryable), which is also translated into an AmqpError and then the promise is rejected. Whereas in rhea-promise we are throwing OperationTimeoutError which is not a retryable error. You will have to catch this as well in the EH/SB sender method and do the right thing.

Handling sender_error, session_error and disconnected events (link, session, connection) getting disconnected

We have our logic to bring back everything online if things get disconnected. This is done when *_error or disconnected event is fired. So we need to pass these handlers to createAwaitableSender() and we need to decide whether we want reject all the Promises of inflight send() operations and clear the deliveryDispositionMap managed by AwaitableSender or bring the connection/session/link online and retry sending those messages again.

Another option is to simply reimplement what is done in AwaitableSender in rhea-promise in EH/SB and handle error and retry the way it is being done currently.

Both the options will work well.

@chradek
Copy link
Contributor

chradek commented Jul 13, 2019

Based on the discussion here so far here's my summary:

Overview

We should switch to using the AwaitableSender from rhea-promise to mitigate the too many listeners attached node.js error we can get today.

Current State

Link recreation

  • onClose
  • onSessionClose

We don't attempt to recreate a link on session or sender errors.

Retry logic

We currently rely on the retry operation within core-amqp to determine if an error is retryable.

Sender details

We use an event-based sender and attach the following event handlers each time we call send:

  • onAccepted
  • onRejected
  • onReleased
  • onModified

None of these check if the error is retryable directly, instead leaving that to the retry method.

Proposed Changes

Sender to AwaitableSender

  • Where we currently call connection.createSender, call createAwaitableSender instead.
  • Continue passing in the on*Close handlers when calling createAwaitableSender. These should attempt to recreate the link if it has closed.

Send implementation

Update to wrap the awaitable send method in a new Promise. We need to keep 3 features in mind:

  1. Cancellation
  2. Retryable errors
  3. Timeout

To support these 3 features, we'll need to create a new Promise that wraps the awaitable send call as we do today.

Cancellation

The awaitable send method doesn't accept an abort signal. We can either update rhea-promise to accept an AbortSignalLike, or handle it in a function that wraps the send method.
In the latter case, cancellation would work similarly to the current implementation wherein the abortSignal abort event calls the wrapper's reject function.

Retryable errors

We can continue to rely on the retry function from core-amqp to handle most retryable errors. We can either update the retry function to treat InsufficientCreditError and OperationTimeoutError errors as retryable (this would impact the receiver and management clients as well), or catch the error thrown by send, call translate on it ourselves and set its retryable property.
I'd rather go with the 1st approach since if these errors could be retryable for receiver/management links as well, but otherwise the 2nd approach would be safer.

Timeout

We can handle timeouts the same as we do currently.

Questions

Did sender_error and session_error always indicate the a disconnect, or is that new with AwaitableSender? I'm wondering if our existing code intentionally doesn't attempt to recreate the link. It's possible we didn't notice since subsequent send calls should create the link if it was closed.

@ramya-rao-a
Copy link
Contributor

@amarzavery

in rhea-promise we are throwing OperationTimeoutError which is not a retryable error.

As we discussed offline while working on amqp/rhea-promise#42, we arrived at the conclusion that OperationTimeoutError should be a retryable error. As a follow up on that discussion I had logged #2891. Therefore, consumers of core-amqp package will find OperationTimeoutError as a retryable error. Consumers of amqp-common will not. This is fine as we intend to use the awaitable sender from rhea-promise only via core-amqp

@chradek

We don't attempt to recreate a link on session or sender errors.

A sender_error event is always followed by a sender_close event
A sessoin_error event is always followed by a session_close event.
Therefore the link recreation (by using onDetached()) is being taken care of on session or sender errors

@ramya0820
Copy link
Member

ramya0820 commented Jul 26, 2019

@chradek @ramya-rao-a

Following are some notes on the implementation at - #4446

  • About timeout, since rhea-promise's AwaitableSender accepts sendTimeoutInSeconds (which has a default value of 20 seconds) and throws the OperationTimeoutError (retryable), it seems like we should be able to simplify and not have to have the timeout handling from SDK side and allow users to configure, pass in this value directly.
    A challenge here though, is that because this option is set at time of creating the sender, we are losing context/control over being able to have control over time taken by init.

    • If we don't make any changes, then we retain the timeout handling, but rhea-promise will almost always throw the OperationTimeoutError much earlier (at default 20 sec mark), and our timer handling is never going to be hit (which sort of defeats the purpose of 'per attempt timeout'?).
      Unless timeout happened due to some other reason, such as due to init() taking too long.
  • About cancellation, yes - current mechanisms seem sufficient (to wrap the send in a promise and having abort handlers around it).
    However, we may need to update rhea-promise as well which is not trivial because it's not in our mono-repo to begin with. @amarzavery @ramya-rao-a thoughts on this?


Also, discussed offline with @ramya-rao-a about having a sample/repro to verify the original problem being solved, and so below is the sample being used to test.

Based on the value supplied for setMaxListeners on the sender which is set to 1000.
If senderCount exceeds this value then Node throws a warning -> MaxListenersExceededWarning: Possible EventEmitter memory leak detected..

With changes in linked PR, this test failed previously and now succeeds, validating our approach!

     const senderCount = 1200;
      try {
        const producer = client.createProducer();
        const promises = [];
        for (let i = 0; i < senderCount; i++) {
          debug(">>>>> Sending a message to the hub when i == %d", i);
          promises.push(producer.send([{ body: `Hello World ${i}` }]));
        }
        await Promise.all(promises);
      } catch (err) {
        debug("An error occurred while running the test: ", err);
        throw err;
      }

@ramya0820
Copy link
Member

@chradek @ramya-rao-a let me know if there are any other comments on this. Looks like amqp/rhea-promise#48 is related and takes care of the updates to rhea-promise

@ramya-rao-a
Copy link
Contributor

Good point on timeout being configured at sender creation time. Logged amqp/rhea-promise#49

For now, lets use the approach as described in #4446 (comment)

@ramya0820 ramya0820 modified the milestones: Sprint 155, Sprint 156 Jul 31, 2019
@ramya-rao-a
Copy link
Contributor

#4446 is now merged.

@ramya0820 You have a test case mentioned in the comment above. Can you please send a PR that adds that test to our code before closing this issue?

@ramya0820
Copy link
Member

ramya0820 commented Aug 2, 2019

Can you please send a PR that adds that test to our code before closing this issue?

Sent #4637

In sender tests, 2 of them are failing - to do with message size checks, currently skipped these in this PR. cc: @ShivangiReja

Receiving a MessagingError instead of MessageTooLargeError from service side with following rejection message
Sender \'eventhubtest-1c003fb5-0c3d-4c4f-8490-420321cff636\' on amqp session \'local-2_remote-2_connection-4\', received a \'rejected\' disposition. Hence we are rejecting the promise.

@chradek
Copy link
Contributor

chradek commented Aug 2, 2019

@ramya0820
It looks like errors returned by the awaitable sender get converted to a SendOperationFailedError, which is also considered retryable.

We can get the original error from the innerError field.

So the change in evetHubSender.ts would be changing

to err = translate(err.innerError || err);

@chradek
Copy link
Contributor

chradek commented Aug 6, 2019

Completed by #4446 and #4637.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. Event Hubs
Projects
None yet
Development

No branches or pull requests

4 participants