Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KCL version 1.6.3 doesn't shutdown worker cleanly #79

Closed
matthewbogner opened this issue Jun 22, 2016 · 16 comments
Closed

KCL version 1.6.3 doesn't shutdown worker cleanly #79

matthewbogner opened this issue Jun 22, 2016 · 16 comments
Assignees

Comments

@matthewbogner
Copy link

I saw in 1.6.2 release notes that the ability to cleanly shutdown a worker was implemented.

I've put together a reproduction that will demonstrate that this still throws an exception by the worker when attempting to perform the final checkpoint.

See the README for instructions on running the repro scenario:
https://github.com/matthewbogner/kinesis-stress-example

During shutdown of the consumer, you'll see the following exception which shouldn't be occurring:

2016-06-22 17:03:47,979 ERROR [pool-2-thread-1] [net.ibogner.kinesis.RecordProcessor:41] Failed to checkpoint
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:172) ~[amazon-kinesis-client-1.6.3.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216) ~[amazon-kinesis-client-1.6.3.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:77) ~[amazon-kinesis-client-1.6.3.jar:na]
    at net.ibogner.kinesis.RecordProcessor.checkpoint(RecordProcessor.java:39) [classes/:na]
    at net.ibogner.kinesis.RecordProcessor.processRecords(RecordProcessor.java:25) [classes/:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:169) [amazon-kinesis-client-1.6.3.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) [amazon-kinesis-client-1.6.3.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) [amazon-kinesis-client-1.6.3.jar:na]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_71]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_71]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_71]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_71]
@rankinc
Copy link
Contributor

rankinc commented Jun 29, 2016

I have also experienced shutdown problems with 1.6.3, but I don't think that this is one of them. Does this change to your example program help? (You might also want to apply pull #82 to the library first).

--- a/src/main/java/net/ibogner/kinesis/KinesisStressExample.java
+++ b/src/main/java/net/ibogner/kinesis/KinesisStressExample.java
@@ -53,6 +53,7 @@ public class KinesisStressExample {
     private final AmazonKinesisClient kinesisClient;

     private Worker worker;
+    private Thread workerThread;

     public KinesisStressExample(String streamName, String regionName, Integer numThreads, Integer numRecordsPerPut, Long numMessagesToProduce, String credentialsProfileName) {
         this.streamName = streamName;
@@ -92,7 +93,8 @@ public class KinesisStressExample {
                                                           kclWorkerId)
                                 .withRegionName(regionName))
                 .build();
-        new Thread(worker).start();
+        workerThread = new Thread(worker);
+        workerThread.start();
     }

     public void awaitCompletion() throws InterruptedException {
@@ -100,6 +102,7 @@ public class KinesisStressExample {
             Thread.sleep(1000);
         }
         worker.shutdown();
+        workerThread.join();
         logger.info("Done shutting down worker");

         keepGoing = false;

@pfifer
Copy link
Contributor

pfifer commented Jul 8, 2016

Thanks for reporting this.

I'm looking into this, and the provided repro should help.

@pfifer pfifer self-assigned this Jul 8, 2016
@matthewbogner
Copy link
Author

Thanks!

@matthewbogner
Copy link
Author

How is the investigation going ?

@pfifer
Copy link
Contributor

pfifer commented Jul 21, 2016

The shutdown of the worker effectively triggers that instance of the worker to lose all it's leases. Lease loss causes the event loop to call shutdown, instead of process records, on the next iteration. The event loop in worker will only dispatch a single task for a shard at a time. So if a processRecords call is being handled when shutdown is called, nothing will happen until the processRecords call completes. At the same time the lease manager marks the lease as no longer held. So if the record processor calls checkpoint it will receive an exception. This is the body of the race condition, and why your test program shows it reasonably well. The test program calls checkpoint on every processRecords calls. The only way the test program wouldn't cause an exception was if the call to Worker.shutdown() occurs before the next call to ShardConsumer.getNextTask.

I have some ideas on how to improve shutdown, but I still need to think through them some more.

There are some approaches that would allow you to the shutdown behavior you expect. Most of them would require that processRecords determine when it's time to shutdown, and stop processing records.

@matthewbogner
Copy link
Author

Would it make sense to implement a "quiesce" of sorts, so that existing processRecords could complete and perform final checkpointing before the Lease manager destroys itself and erases all records of who owns what ?

@pfifer
Copy link
Contributor

pfifer commented Jul 22, 2016

That's pretty much what I'm thinking. There is some difficulty in handling blocked record processors. The other complexity is figuring out how to fit this in while maintaining compatibility with the existing record processors.

@pfifer
Copy link
Contributor

pfifer commented Aug 15, 2016

I have a variant of the Kinesis client library that supports quiescing: https://github.com/pfifer/amazon-kinesis-client/tree/determinstic-shutdown

I still haven't done any of the unit tests, but have tested the shutdown behavior with my test application. I still need track down, and verify the edge cases especially around handling end of shard scenarios.

@matthewbogner
Copy link
Author

Thanks for the update. I'm still interested in having the solution, when you feel it is mature and well tested.

@pfifer
Copy link
Contributor

pfifer commented Aug 16, 2016

I'm hoping to have it complete, and ready for merging later this week. I don't think it will make the 1.7.0 release, but should be ready shortly after that.

@pfifer pfifer added this to the v1.7.1 Kinesis Client Library milestone Aug 22, 2016
pfifer added a commit to pfifer/amazon-kinesis-client that referenced this issue Nov 3, 2016
* General
  * Allow disabling shard synchronization at startup.
    * Applications can disable shard synchronization at startup.  Disabling shard synchronization can application startup times for very large streams.
    * [PR awslabs#102](awslabs#102)
  * Applications can now request a graceful shutdown, and record processors that implement the IShutdownNotificationAware will be given a chance to checkpoint before being shutdown.
    * This adds a [new interface](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java), and a [new method on Worker](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java#L539).
    * [PR awslabs#109](awslabs#109)
    * Solves [Issue awslabs#79](awslabs#79)
* MultiLangDaemon
  * Applications can now use credential provides that accept string parameters.
    * [PR awslabs#99](awslabs#99)
  * Applications can now use different credentials for each service.
    * [PR awslabs#111](awslabs#111)
pfifer added a commit that referenced this issue Nov 3, 2016
* General
  * Allow disabling shard synchronization at startup.
    * Applications can disable shard synchronization at startup.  Disabling shard synchronization can application startup times for very large streams.
    * [PR #102](#102)
  * Applications can now request a graceful shutdown, and record processors that implement the IShutdownNotificationAware will be given a chance to checkpoint before being shutdown.
    * This adds a [new interface](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java), and a [new method on Worker](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java#L539).
    * [PR #109](#109)
    * Solves [Issue #79](#79)
* MultiLangDaemon
  * Applications can now use credential provides that accept string parameters.
    * [PR #99](#99)
  * Applications can now use different credentials for each service.
    * [PR #111](#111)
@pfifer
Copy link
Contributor

pfifer commented Nov 3, 2016

Graceful shutdown is now merged, and released. It make take a bit for 1.7.1 to become available on Central though.

@pfifer pfifer closed this as completed Nov 3, 2016
@shushantarora
Copy link

still failing in 1.7.2
When called worker.shutdown() , it throws exception in recordprocessor shutdown and shutdown reason is Zoombie.

com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:173)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:77)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

00:27:00,893 INFO pool-4-thread-1 Worker:shouldShutdown:634 - All record processors have been shutdown successfully.
00:27:00,893 INFO pool-4-thread-1 Worker:finalShutdown:610 - Starting worker's final shutdown.
00:27:00,893 INFO pool-4-thread-1 CWPublisherRunnable:shutdown:165 - Shutting down CWPublication thread.
00:27:00,926 INFO cw-metrics-publisher CWPublisherRunnable:run:96 - CWPublication thread finished.
00:27:00,927 INFO pool-4-thread-1 Worker:run:368 - Worker loop is complete. Exiting from worker.

@pfifer
Copy link
Contributor

pfifer commented Jan 30, 2017

To use graceful shutdown you must implement a new interface on your record processor: https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java

You still can't checkpoint in shutdown when the reason is ZOMBIE.

@akumariiit
Copy link

Hi @pfifer ...i am trying the gracefulshutdown, but it is just waiting for record processors to finish, not sure where it is blocked, processor do not take that much time to complete one batch.

shutdown notification, and 9 record processor to complete final shutdown
INFO [2018-09-27 04:15:20,605] com.amazonaws.services.kinesis.clientlibrary.lib.worker.GracefulShutdownCoordinator$GracefulShutdownCallable: Waiting for 9 record process to complete shutdown notification, and 9 record processor to complete final shutdown
INFO [2018-09-27 04:15:21,609] com.amazonaws.services.kinesis.clientlibrary.lib.worker.GracefulShutdownCoordinator$GracefulShutdownCallable: Waiting for 9 record process to complete shutdown notification, and 9 record processor to complete final shutdown
INFO [2018-09-27 04:15:22,612] com.amazonaws.services.kinesis.clientlibrary.lib.worker.GracefulShutdownCoordinator$GracefulShutdownCallable: Waiting for 9 record process to complete shutdown notification, and 9 record processor to complete final shutdown
INFO [2018-09-27 04:15:23,614] com.amazonaws.services.kinesis.clientlibrary.lib.worker.GracefulShutdownCoordinator$GracefulShutdownCallable: Waiting for 9 record process to complete shutdown notification, and 9 record processor to complete final shutdown
INFO [2018-09-27 04:15:24,618] com.amazonaws.services.kinesis.clientlibrary.lib.worker.GracefulShutdownCoordinator$GracefulShutdownCallable: Waiting for 9 record process to complete shutdown notification, and 9 record processor to complete final shutdown
INFO [2018-09-27 04:15:25,622] com.amazonaws.services.kinesis.clientlibrary.lib.worker.GracefulShutdownCoordinator$GracefulShutdownCallable: Waiting for 9 record process to complete shutdown notification, and 9 record processor to complete final shutdown
INFO [2018-09-27 04:15:26,623] com.amazonaws.services.kinesis.clientlibrary.lib.worker.GracefulShutdownCoordinator$GracefulShutdownCallable: Waiting for 9 record process to complete shutdown notification, and 9 record processor to complete final shutdown

jari-kujansuu added a commit to jari-kujansuu/spark that referenced this issue Mar 6, 2020
…heckpoint - instance doesn't hold the lease for this shard" errors.

Spark's KinesisRecordProcessor to implement Kinesis IShutdownNotificationAware interface to be notified before processor will be shutdown so that it can update checkpoint before shutdown.

See more details from:
- awslabs/amazon-kinesis-client#79
- awslabs/amazon-kinesis-client#109
@stsatlantis
Copy link

@akumariiit How did you solve your issue with the graceful shutdown? it doesn't seem to terminate, like ever.

@kaisermario
Copy link

kaisermario commented Sep 3, 2021

@akumariiit @stsatlantis We have the same issue. Any update on this? I don't know which process is blocking this shutdown procedure.
@pfifer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants