-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Gateway: accept HTTP request for producing messages #655
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM
but I have left some feedback in some places
GoogleIdToken idToken = verifier.verify(context.credentials()); | ||
final String credentials = context.credentials(); | ||
if (credentials == null) { | ||
return GatewayAuthenticationResult.authenticationFailed("Token not found."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it is better to say "Credentials not found", token is specific to some mechanisms
this.topicConnectionsRuntimeRegistry = topicConnectionsRuntimeRegistry; | ||
} | ||
|
||
@SneakyThrows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SneakyThrows often hide potential problems, do we really need it ?
what happens if there is an error in anyplace ? we risk to leak resources
|
||
final String gatewayId = requestContext.gateway().getId(); | ||
final String applicationId = requestContext.applicationId(); | ||
log.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.debug ?
this is going to happen for every new consumer ?
try { | ||
reader.close(); | ||
} catch (Exception e) { | ||
log.error("error closing reader", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is better to report the topic name, or anything that connect this error with an application/tenant.... otherwise it is almost useless
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usually there's the kafka/pulsar client errors logged with the name of the topic before this error. anyway, this error is not that important, I'll move it to debug
topicConnectionsRuntime.createProducer( | ||
null, streamingCluster, Map.of("topic", topic)); | ||
producer.start(); | ||
log.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug ?
.headers(headers) | ||
.build(); | ||
producer.write(record).get(); | ||
log.info("Produced record {}", record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug
producer.write(record).get(); | ||
log.info("Produced record {}", record); | ||
} catch (Throwable tt) { | ||
throw new ProduceException(tt.getMessage(), ProduceResponse.Status.PRODUCER_ERROR); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log ?
@NotBlank @PathVariable("application") String application, | ||
@NotBlank @PathVariable("gateway") String gateway, | ||
@RequestBody ProduceRequest produceRequest) | ||
throws ProduceGateway.ProduceException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that the framework would handle ProduceGateway.ProduceException better than an "internal server error"
we could catch it and return a better error to the user, not a stack trace, but a ProduceResponse with some "error field" and a description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will return a json with the detail of the error, there's a generic error handling in ResourceErrorsHandler
if (r.value() == null) { | ||
throw new IllegalStateException( | ||
"Cannot infer schema because value is null"); | ||
final Schema<?> valueSchema; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cbornet FYI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
7897c34
to
5a565a4
Compare
Changes:
The body must be a json {value: xx, key:xx, headers: {xx}}