Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean shutdown when multilang deamen is terminated by giving one last chance to checkpoint. #174

Merged
merged 6 commits into from
Jun 21, 2017

Conversation

mikramulhaq
Copy link
Contributor

No description provided.

Copy link
Contributor

@pfifer pfifer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for submitting this, just some minor changes before I can accept the request.

}
try {
LOG.info("Requesting a checkpoint on shutdown notification.");
checkpointer.checkpoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether to checkpoint is determined by the application, so checkpointing here isn't safe. This should use a similar dispatch mechanism that processRecords uses. It might be possible to shortcut the dispatch at this time, and just make whether to checkpoint a configuration flag.

The dispatch code for processRecords for reference.

config.getKinesisClientLibConfiguration(),
config.getRecordProcessorFactory(),
executorService);

Runtime.getRuntime().addShutdownHook(new Thread()
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please match the existing style with the curly brace on the same line.

LOG.info("Process terminanted, will initiate shutdown.");
try {
Future<Void> fut = daemon.worker.requestShutdown();
fut.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a worst case scenario this could cause the shutdown to become stuck. It would be best to start with some timeout value of how long to wait for the shutdown request to complete. I usually start with something in the range of 5 seconds. In the future it may make sense to make it configurable.

LOG.info("Requesting a checkpoint on shutdown notification.");
checkpointer.checkpoint();
} catch (InvalidStateException e) {
LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java will automatically call toString during string concatenation. Additionally for exceptions it's common to pass the exception as the second argument to the logger call. This will allow the logger to emit a stack trace as well e.g.

LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e, e);

{
LOG.info("Process terminanted, will initiate shutdown.");
try {
Future<Void> fut = daemon.worker.requestShutdown();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to ensure that when requestShutdown is invoked. There is no process records invoked. Need a lock may be ?!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way dispatch is handled in the worker already guarantees this. The record processors are represented as a state machine, and all request shutdown does is request that on the change event to go to the Shutdown Notification Requested state. This is the diagram for the state changes.

Copy link
Contributor

@pfifer pfifer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if you can add some simple tests. Integration testing will require updating one of the MultiLang Daemon Clients.

/**
* The name used for the action field in {@link Message}.
*/
public static final String ACTION = "shutdownrequested";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Please use camelCase for the action name for consistency.

* @param checkpointer A checkpointer.
* @return Whether or not this operation succeeded.
*/
boolean shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some tests for this method in MultiLangProtocolTest.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

@pfifer pfifer added this to the v1.8.0 milestone Jun 12, 2017
@pfifer
Copy link
Contributor

pfifer commented Jun 12, 2017

Thanks everything looks good. Starting the process to merge the changes now.

@pfifer pfifer modified the milestones: v1.7.6, v1.8.0 Jun 13, 2017
@pfifer
Copy link
Contributor

pfifer commented Jun 14, 2017

One last thing:

Please confirm that we can use, modify, copy, and redistribute this contribution. Thanks.

@mikramulhaq
Copy link
Contributor Author

@pfifer - yes confirmed. You can modify, copy and redistribute this contribution. Thanks,

@pfifer pfifer merged commit 5a8bac2 into awslabs:master Jun 21, 2017
pfifer added a commit to pfifer/amazon-kinesis-client that referenced this pull request Jun 21, 2017
* Added support for graceful shutdown in MultiLang Clients
  * PR awslabs#174
  * PR awslabs#182
* Updated documentation for `v2.IRecordProcessor#shutdown`, and `KinesisClientLibConfiguration#idleTimeBetweenReadsMillis`
  * PR awslabs#170
* Updated to version 1.11.151 of the AWS Java SDK
  * PR awslabs#183
pfifer added a commit that referenced this pull request Jun 22, 2017
* Added support for graceful shutdown in MultiLang Clients
  * PR #174
  * PR #182
* Updated documentation for `v2.IRecordProcessor#shutdown`, and `KinesisClientLibConfiguration#idleTimeBetweenReadsMillis`
  * PR #170
* Updated to version 1.11.151 of the AWS Java SDK
  * PR #183
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants