-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add Controller() method to Client interface #1063
Conversation
um. It seems the unit test issue is not introduced by this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, I just have a few comments.
|
||
req := &MetadataRequest{Topics: topics} | ||
if client.conf.Version.IsAtLeast(V0_10_0_0) { | ||
req.Version = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it is this line that is causing the tests to fail. The commit you are based on runs successfully, and this branch fails consistently even when I rerun.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reset to the base commit eae91468c24263f06ae73dce45eb26252c73b1d6
and run make test
, the tests still fail at:
=== RUN TestMessageEncoding
--- FAIL: TestMessageEncoding (0.01s)
request_test.go:34: Encoding empty gzip failed
got [132 99 80 148 0 1 255 255 255 255 0 0 0 23 31 139 8 0 0 0 0 0 0 255 1 0 0 255 255 0 0 0 0 0 0 0 0]
want [97 79 149 90 0 1 255 255 255 255 0 0 0 23 31 139 8 0 0 9 110 136 0 255 1 0 0 255 255 0 0 0 0 0 0 0 0]
But fails related to consumer disappeared.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the test fails related to consumer.
client.go
Outdated
@@ -17,6 +17,9 @@ type Client interface { | |||
// altered after it has been created. | |||
Config() *Config | |||
|
|||
// Controller returns the cluster controller broker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Full sentences ending in a period please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh! my fault.
client.go
Outdated
|
||
controller := client.cachedController() | ||
if controller == nil { | ||
return nil, ErrControllerNotAvailable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than returning an error immediately we should try refreshing the metadata once to see if we can fetch the controller - being nil may just mean RefreshMetadata
has never been run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will work on it. But it's a little complicated,I need separating a new method refreshMetadata
from backgroundMetadataUpdater
to refresh the meta data because of the configuration Metadata.Full
.
client.go
Outdated
@@ -97,6 +100,7 @@ type client struct { | |||
seedBrokers []*Broker | |||
deadSeeds []*Broker | |||
|
|||
controller *Broker // cluster controller broker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As with the coordinator map, I'd prefer to store the int32 id here instead of the broker itself. It's easier to reason about object ownership and garbage-collection that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I am working on it.
OK. I fixed all issues and added an unit test. |
@eapache any update on when this pull request will be merged , that would unblock the ClusterAdmin pull request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay! A few very minor comments but otherwise this is ready to go.
errors.go
Outdated
@@ -41,6 +41,14 @@ var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetc | |||
// a RecordBatch. | |||
var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch") | |||
|
|||
// ErrControllerNotAvailable is returned when server didn't give correect controller id. May be kafka server's version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: "correct"
errors.go
Outdated
// is lower than 0.10.0.0. | ||
var ErrControllerNotAvailable = errors.New("kafka: controller is not avaiable") | ||
|
||
// ErrNoTopicsToUpdateMetaData is returned when Meta.Full is set to false but no specific topics were found to update |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please spell out Metadata
(one word) here and in the error message itself
client.go
Outdated
if err := client.RefreshMetadata(topics...); err != nil { | ||
Logger.Println("Client background metadata update:", err) | ||
if err := client.refreshMetadata(); err != nil { | ||
Logger.Printf("Client background metadata update:%s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not format properly (there's no space between the :
and the error, and there's no newline at the end). You can fix the format string, but it's probably simpler to just go back to Println
instead of Printf
which does the right thing.
Thanks for your review! I have fixed the spelling and logging issue. Sorry for that :( |
Thanks! |
Sometimes we need to know the controller broker of a kafka cluster to do something, such as creating or deleting topics.