-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add success and error callbacks to BulkWriter #483
Conversation
* Add new userCallbackExecutor which is used to ensure user callbacks aren't able to block any gax thread. Currently this is creating a cached thread pool per writer, we should likely treat this thread pool as a singleton and pass it from the builder to leverage the reuse of the threads even past the lifetime of the writer. * Update instances of setting a SettableFuture to only set after the state operation (add/remove) has completed to ensure and downstream future will only start after state update.
…firestore into bc/bulk-error-with-test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice. I am happy that we managed to get this to work!
ApiFuture<BatchWriteResponse> response = | ||
firestore.sendRequest(request.build(), firestore.getClient().batchWriteCallable()); | ||
|
||
committed = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed that this is missing from Node.
Would it make sense to make this an abstract property?
In UpdateBuilder, you could do:
boolean isCommitted() {
return committed;
}
and here you could do:
boolean isCommitted() {
return batchState == SENT;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I'll back-port this to node later along with some of the other changes like the retry logging message.
@@ -42,9 +44,44 @@ | |||
import javax.annotation.Nullable; | |||
|
|||
final class BulkWriter implements AutoCloseable { | |||
/** | |||
* A callback set by `addWriteResultListener()` to be run every time an operation successfully |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe s/set/used ? I am not sure though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be "set" since the addWriteResultListener()
function sets the callback, which is then run on operation success.
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
Outdated
Show resolved
Hide resolved
* modification errors (as this list is modified from both the user thread and the network | ||
* thread). | ||
*/ | ||
private final List<BulkCommitBatch> retryBatchQueue = new CopyOnWriteArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to schedule everything on bulkWriterExecutor
now and use a normal ArrayList? The enqueue part is already on the executor, so the rest should be either be on it as well or be relatively easy to move.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, done.
|
||
/** | ||
* A set of futures that represent pending BulkWriter operations. Each future is completed when | ||
* the BulkWriter operation resolves. This set includes retries. Each retry's promise is added, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"This set includes retries" is not very specific. Do we:
- Add a new operation for each retry (looks like we don't)
- Or do we re-use the existing operations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the documentation and removed the incorrect parts.
google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java
Show resolved
Hide resolved
google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java
Outdated
Show resolved
Hide resolved
google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java
Outdated
Show resolved
Hide resolved
|
||
// Verify that the 2nd operation did not complete as a result of the flush() call. | ||
assertArrayEquals(new String[] {"BEFORE_FLUSH", "FLUSH"}, operations.toArray()); | ||
bulkWriter.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might make sense to add another flush here and verify "AFTER_FLUSH" as well. That way, it doesn't look unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -422,8 +743,8 @@ public void retriesIndividualWritesThatFailWithAbortedOrUnavailable() throws Exc | |||
result1.get(); | |||
fail("set() should have failed"); | |||
} catch (Exception e) { | |||
assertTrue(e.getCause() instanceof FirestoreException); | |||
assertEquals(Status.DEADLINE_EXCEEDED, ((FirestoreException) e.getCause()).getStatus()); | |||
assertTrue(e.getCause() instanceof BulkWriterError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could BulkWriterError
extend FirestoreException
? That way, it can be commonly treated by our users.
BulkWriterError
should likely also be BulkWriterException
. See the first part of the first answer here: https://stackoverflow.com/questions/5813614/what-is-difference-between-errors-and-exceptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I attempted it. I had to make FirestoreException
not final
in order to extend, and I had to make one of the private constructors package-private.
Codecov Report
@@ Coverage Diff @@
## master #483 +/- ##
============================================
+ Coverage 73.33% 74.45% +1.11%
- Complexity 1095 1118 +23
============================================
Files 65 66 +1
Lines 5790 5852 +62
Branches 721 722 +1
============================================
+ Hits 4246 4357 +111
+ Misses 1317 1267 -50
- Partials 227 228 +1
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for not giving up!
/** | ||
* A queue of batches to be retried. Use a synchronized list to avoid multi-thread concurrent | ||
* modification errors (as this list is modified from both the user thread and the network | ||
* thread). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
void onResult(DocumentReference documentReference, WriteResult result); | ||
}; | ||
|
||
/** A callback set by `addWriteErrorListener()` to be run every time an operation fails. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... "And determines if an operation should be retried"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
|
||
/** Whether this BulkWriter instance is closed. Once closed, it cannot be opened again. */ | ||
private boolean closed = false; | ||
|
||
/** Rate limiter used to throttle requests as per the 500/50/5 rule. */ | ||
private final RateLimiter rateLimiter; | ||
|
||
private final FirestoreImpl firestore; | ||
private WriteResultCallback successListener = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value of this can be turned into a static final constant and then referenced as a default so there is only one instance per JVM, rather than an instance per BulkWriter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved.
public void onResult(DocumentReference documentReference, WriteResult result) {} | ||
}; | ||
|
||
private WriteErrorCallback errorListener = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same note as successListener
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved.
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
Show resolved
Hide resolved
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
Show resolved
Hide resolved
return flushComplete; | ||
} | ||
|
||
/** | ||
* Commits all enqueued writes and marks the BulkWriter instance as closed. | ||
* | ||
* <p>After calling `close()`, calling any method wil return an error. | ||
* <p>After calling `close()`, calling any method wil return an error. Any retries scheduled with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* <p>After calling `close()`, calling any method wil return an error. Any retries scheduled with | |
* <p>After calling `close()`, calling any method will return an error. Any retries scheduled with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, thanks
return flushComplete; | ||
} | ||
|
||
/** | ||
* Commits all enqueued writes and marks the BulkWriter instance as closed. | ||
* | ||
* <p>After calling `close()`, calling any method wil return an error. | ||
* <p>After calling `close()`, calling any method wil return an error. Any retries scheduled with | ||
* `addWriteErrorListener()` will be run before the `close()` completes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the same true for success listeners? If so let's document it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterException.java
Show resolved
Hide resolved
<difference> | ||
<differenceType>7002</differenceType> | ||
<className>com/google/cloud/firestore/UpdateBuilder</className> | ||
<method>int getMutationsSize()</method> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't remove this method without a major version release since it's public. because it's public customers could be using already and we won't want to remove it and break them.
We should be able to keep the method public without any issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed back to public.
} | ||
/** Get the number of writes. */ | ||
@VisibleForTesting | ||
int getWriteCount() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to keep this method public for binary compatibility as I noted in the clirr file.
After making this public we no longer need the @VisibleForTesting
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
🤖 I have created a release \*beep\* \*boop\* --- ## [2.2.0](https://www.github.com/googleapis/java-firestore/compare/v2.1.0...v2.2.0) (2021-01-20) ### Features * Add bundle proto building ([#271](https://www.github.com/googleapis/java-firestore/issues/271)) ([994835c](https://www.github.com/googleapis/java-firestore/commit/994835c0a3be077404afa60abd4d4685d17fb533)) * add bundle.proto from googleapis/googleapis ([#407](https://www.github.com/googleapis/java-firestore/issues/407)) ([37da386](https://www.github.com/googleapis/java-firestore/commit/37da386875d1b65121e8a9a92b1a000537f07625)) * add CollectionGroup#getPartitions(long) ([#478](https://www.github.com/googleapis/java-firestore/issues/478)) ([bab064e](https://www.github.com/googleapis/java-firestore/commit/bab064edde26325bf0041ffe28d4c63b97a089c5)) * add implicit ordering for startAt(DocumentReference) calls ([#417](https://www.github.com/googleapis/java-firestore/issues/417)) ([aae6dc9](https://www.github.com/googleapis/java-firestore/commit/aae6dc960f7c42830ceed23c65acaad3e457dcff)) * add max/min throttling options to BulkWriterOptions ([#400](https://www.github.com/googleapis/java-firestore/issues/400)) ([27a9397](https://www.github.com/googleapis/java-firestore/commit/27a9397f67e151d723241c80ccb2ec9f1bfbba1c)) * add success and error callbacks to BulkWriter ([#483](https://www.github.com/googleapis/java-firestore/issues/483)) ([3c05037](https://www.github.com/googleapis/java-firestore/commit/3c05037e8fce8d3ce4907fde85bd254fc98ea588)) * Implementation of Firestore Bundle Builder ([#293](https://www.github.com/googleapis/java-firestore/issues/293)) ([fd5ef90](https://www.github.com/googleapis/java-firestore/commit/fd5ef90b6681cc67aeee6c95f3de80267798dcd0)) * Release bundles ([#466](https://www.github.com/googleapis/java-firestore/issues/466)) ([3af065e](https://www.github.com/googleapis/java-firestore/commit/3af065e32b193931c805b576f410ad90124b43a7)) ### Bug Fixes * add @BetaApi, make BulkWriter public, and refactor Executor ([#497](https://www.github.com/googleapis/java-firestore/issues/497)) ([27ff9f6](https://www.github.com/googleapis/java-firestore/commit/27ff9f6dfa92cac9119d2014c24a0759baa44fb7)) * **build:** sample checkstyle violations ([#457](https://www.github.com/googleapis/java-firestore/issues/457)) ([777ecab](https://www.github.com/googleapis/java-firestore/commit/777ecabd1ce12cbc5f4169de6c23a90f98deac06)) * bulkWriter: writing to the same doc doesn't create a new batch ([#394](https://www.github.com/googleapis/java-firestore/issues/394)) ([259ece8](https://www.github.com/googleapis/java-firestore/commit/259ece8511db71ea79cc1a080eb785a15db88756)) * empty commit to trigger release-please ([fcef0d3](https://www.github.com/googleapis/java-firestore/commit/fcef0d302cd0a9339d82db73152289d6f9f67ff2)) * make BulkWriterOptions public ([#502](https://www.github.com/googleapis/java-firestore/issues/502)) ([6ea05be](https://www.github.com/googleapis/java-firestore/commit/6ea05beb3f27337bef910ca64f0e3f32de6b7e98)) * retry Query streams ([#426](https://www.github.com/googleapis/java-firestore/issues/426)) ([3513cd3](https://www.github.com/googleapis/java-firestore/commit/3513cd39ff43d26c8432c05ce20693350539ae8f)) * retry transactions that fail with expired transaction IDs ([#447](https://www.github.com/googleapis/java-firestore/issues/447)) ([5905438](https://www.github.com/googleapis/java-firestore/commit/5905438af6501353e978210808834a26947aae95)) * verify partition count before invoking GetPartition RPC ([#418](https://www.github.com/googleapis/java-firestore/issues/418)) ([2054ae9](https://www.github.com/googleapis/java-firestore/commit/2054ae971083277e1cf81c2b57500c40a6faa0ef)) ### Documentation * **sample:** normalize firestore sample's region tags ([#453](https://www.github.com/googleapis/java-firestore/issues/453)) ([b529245](https://www.github.com/googleapis/java-firestore/commit/b529245c75f770e1b47ca5d9561bab55a7610677)) ### Dependencies * remove explicit version for jackson ([#479](https://www.github.com/googleapis/java-firestore/issues/479)) ([e2aecfe](https://www.github.com/googleapis/java-firestore/commit/e2aecfec51465b8fb3413337a76f9a3de57b8500)) * update dependency com.google.cloud:google-cloud-conformance-tests to v0.0.12 ([#367](https://www.github.com/googleapis/java-firestore/issues/367)) ([2bdd846](https://www.github.com/googleapis/java-firestore/commit/2bdd84693bbd968cafabd0e7ee56d1a9a7dc31ca)) * update dependency com.google.cloud:google-cloud-conformance-tests to v0.0.13 ([#411](https://www.github.com/googleapis/java-firestore/issues/411)) ([e6157b5](https://www.github.com/googleapis/java-firestore/commit/e6157b5cb532e0184125355b12115058e72afa67)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.10.0 ([#383](https://www.github.com/googleapis/java-firestore/issues/383)) ([cb39ee8](https://www.github.com/googleapis/java-firestore/commit/cb39ee820c2f67e22da623f5a6eaa7ee6bf351e2)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.10.2 ([#403](https://www.github.com/googleapis/java-firestore/issues/403)) ([991dd81](https://www.github.com/googleapis/java-firestore/commit/991dd810360e654fca0b53e0611da0cd77febc7c)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.12.1 ([#425](https://www.github.com/googleapis/java-firestore/issues/425)) ([b897ffa](https://www.github.com/googleapis/java-firestore/commit/b897ffa90427a8f597c02c24f80d1d162be48b23)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.13.0 ([#430](https://www.github.com/googleapis/java-firestore/issues/430)) ([0f8f218](https://www.github.com/googleapis/java-firestore/commit/0f8f218678c3ddebb73748c382cab8e38c2f140d)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.14.1 ([#446](https://www.github.com/googleapis/java-firestore/issues/446)) ([e241f8e](https://www.github.com/googleapis/java-firestore/commit/e241f8ebbfdf202f00424177c69962311b37fc88)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.15.0 ([#460](https://www.github.com/googleapis/java-firestore/issues/460)) ([b82fc35](https://www.github.com/googleapis/java-firestore/commit/b82fc3561d1a397438829ab69df24141481369a2)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.16.0 ([#481](https://www.github.com/googleapis/java-firestore/issues/481)) ([ae98824](https://www.github.com/googleapis/java-firestore/commit/ae988245e6d6391c85414e9b6f7ae1b8148c3a6d)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.16.1 ([4ace93c](https://www.github.com/googleapis/java-firestore/commit/4ace93c7be580a8f7870e71cad2dc19bb5fdef29)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.17.0 ([#487](https://www.github.com/googleapis/java-firestore/issues/487)) ([e11e472](https://www.github.com/googleapis/java-firestore/commit/e11e4723bc75727086bee0436492f458def29ff5)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.18.0 ([#495](https://www.github.com/googleapis/java-firestore/issues/495)) ([f78720a](https://www.github.com/googleapis/java-firestore/commit/f78720a155f1294321f05266b9a546bbf2cb9a04)) * update jackson dependencies to v2.11.3 ([#396](https://www.github.com/googleapis/java-firestore/issues/396)) ([2e176e2](https://www.github.com/googleapis/java-firestore/commit/2e176e2f864262f31e6f93705fa7e794023b9649)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Continuation of #458 but without blocking gax/grpc executor!