-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Solace Read connector: adding implementations of SempClient and SempClientFactory #31542
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
ca56d82
to
601d4b2
Compare
601d4b2
to
f3f33fb
Compare
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, left a few comments
...io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private HttpResponse execute(HttpRequest request) throws IOException { | ||
request.setNumberOfRetries(2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what consideration here for retry twice? Good to have comment for occurrence of hardcoded numbers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I extracted the variable and added a comment.
COOKIE_MANAGER.getCookieStore().removeAll(); | ||
// execute again without cookies to refresh the token. | ||
return execute(request); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there may be other retriable return codes? e.g. from the spec does it possible to return server side error 50X, rate limiting 429, etc. Fine to leave as is for now but good to comment here if we do want to handle them in the future what should be done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I'll leave a comment to consider handling it in the future
try { | ||
response = request.execute(); | ||
} catch (HttpResponseException e) { | ||
if (authFromCookie && e.getStatusCode() == 401) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this (make a request and check for 401) the only way to refresh token?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Solace documentation lacks guidance on detecting token expiration. Typically, tokens include an expires_in
field for calculating refresh times. However, Solace seems to omit this information. Currently, the only way to check token validity is by sending a request and looking for a 401 Unauthorized response, which would then trigger token refresh.
private HttpResponse execute(HttpRequest request) throws IOException { | ||
request.setNumberOfRetries(2); | ||
HttpHeaders httpHeaders = new HttpHeaders(); | ||
boolean authFromCookie = COOKIE_MANAGER.getCookieStore().getCookies().size() > 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The COOKIE_MANAGER is worker wide, suppose there are two SolaceIO sources from different servers, could there be conflict? Should the static COOKIE_MANAGER be a map keyed with destination?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, changed to a ConcurrentHashMap to make sure it's thread-safe.
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/SerializableSupplier.java
Outdated
Show resolved
Hide resolved
…r to a variable, cookie store map
f3f33fb
to
9b6b5b2
Compare
…lientFactory (apache#31542) * Adding implementations of SempClient and SempClientFactory * Use core SerializableSupplier, remove SuppressWarnings, extract number to a variable, cookie store map
This is the 4th PR for the Solace Read connector (addresses #31440).
It depends on the #31541, #31539 and #31476 (root).
The files that are added/modified in this PR, comparing to the #31541:
These are implementations of SempClient and SempClientFactory. SempClientFactory can be used as an argument to
SolaceIO.Read.withSempClientFactory(...)
.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.