-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
nsqd: add producer client connections to /stats #881
Conversation
I think the official client libraries either only produce or only consume on a single connection to nsqd, so if that requirement was formalized, it would remove the need to add another counter to the client struct ("messages" could be re-used). It is completely valid, and not uncommon, to publish to multiple different topics from a single producer tcp client connection to nsqd. But often only a single topic will be published to by a process / connection. It would be a useful enhancement to associate a producer client tcp connection with the topic(s) it publishes to, so one can see producer stats for different topics differentiated. Perhaps the most practical way is to limit this association to one topic, and if more than one, then just consider it "multiple". |
@ploxiln I did consider what you had mentioned. If producer client publishs more than one topic, there is more work to do, such as providing separate client struct for topics published by one producer or changing client struct (but make it more dirty or complicated). So just wait for owners' comment. |
Yea, but it's not a real restriction so I don't think we can reasonably do that.
Feels like we can defer tracking stats at that granularity for this first pass. |
nsqd/client_v2.go
Outdated
@@ -346,6 +349,10 @@ func (c *clientV2) SendingMessage() { | |||
atomic.AddUint64(&c.MessageCount, 1) | |||
} | |||
|
|||
func (c *clientV2) PublishMessage(msgNum uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Publish/Published
nsqd/http.go
Outdated
@@ -52,6 +52,7 @@ func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer | |||
router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1)) | |||
router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1)) | |||
router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1)) | |||
router.Handle("GET", "/producer_stats", http_api.Decorate(s.doProducerStats, log, http_api.V1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still strongly in favor of keeping this all in /stats
, perhaps with a flag to enable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A flag to return one or all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think he means "all", e.g. a flag/param like "with_producers=true" or "with_producers=1" or "producers=1" (brevity to taste ;).
If there was (minimal) channel information associated with the producers, then this new flag could combine with the existing "topic" flag in the logical way.
nsqd/nsqd.go
Outdated
@@ -51,7 +55,8 @@ type NSQD struct { | |||
errValue atomic.Value | |||
startTime time.Time | |||
|
|||
topicMap map[string]*Topic | |||
topicMap map[string]*Topic | |||
producers map[int64]Producer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling this variable producers
seems misleading because we can't really know or categorize for sure. I'd rather call this clients
.
nsqd/protocol_v2.go
Outdated
@@ -796,6 +797,9 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { | |||
return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) | |||
} | |||
|
|||
p.ctx.nsqd.AddClient(client.ID, client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we add the client every time here, can't we do this once when any client connects? This relates to my comments above about variable naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we can't make sure if a client is a producer, until it's first PUB or MPUB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what mreiferson is suggesting is to keep a set of all clients (added to the set when they connect). Then, when generating publisher statistics, you can iterate over all clients and include them only if they have published any messages.
nsqd/nsqd.go
Outdated
// AddClient adds a client to the producers map | ||
func (n *NSQD) AddClient(clientID int64, client Producer) { | ||
n.Lock() | ||
defer n.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we're really going to have to call AddClient()
on every single PUB
, we can make it a bit more efficient by not using defer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... but if there was another client property, from which it could be determined if the client connection had already published anything, AddClient()
could usually be skipped
EDIT: possibly obsoleted by comments above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can check if PublishCount property I added is greater than 0.
Just to be clear, @mreiferson, I proposed not tracking multiple topics per publisher, just one. Are you agreeing with that, or saying we should never track any topic for publisher stats (in this "first pass")? |
@ploxiln I'm suggesting that, for now, perhaps it's easiest to land something that doesn't track the destination topic at all (but iterate on that after). But, as I'm writing this, I'm realizing that really wouldn't be all that helpful for the use case here. |
nsqd/client_v2.go
Outdated
@@ -346,6 +353,10 @@ func (c *clientV2) SendingMessage() { | |||
atomic.AddUint64(&c.MessageCount, 1) | |||
} | |||
|
|||
func (c *clientV2) MessagesPublished(msgNum uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call this PublishedMessage
nsqd/http.go
Outdated
jsonFormat := formatString == "json" | ||
|
||
var producersStats []ClientStats | ||
if withProducers == "1" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1
seems like an overly specific value... how about if it's a non-empty string instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's sometimes confusing if "0" and "no" and "false" are considered "true", and awkward if you need to override a parameter with a blank string e.g. in your own helper functions. So I humbly suggest checking against "1", "yes", and "true", perhaps case-insensitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a fair point, 👍 to 1
, yes
, and true
case-insensitive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, with that, I would argue that the name of the param should be producers
, not with_producers
.
Ready for review and follow-up for nsqadmin. By the way, certificates under the nsqd/test directory has expired, so some test cases fail. please make a update. |
nsqd/nsqd.go
Outdated
|
||
_, ok := n.clients[clientID] | ||
if !ok { | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlock also needed here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
nsqd/nsqd.go
Outdated
// AddClient adds a client to the producers map | ||
func (n *NSQD) AddClient(clientID int64, client Client) { | ||
n.Lock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whitespace 👮
nsqd/nsqd.go
Outdated
// RemoveClient removes a client from the producers map | ||
func (n *NSQD) RemoveClient(clientID int64) { | ||
n.Lock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whitespace 👮
This is looking good to me after those minor nitpicks. |
mreiferson updated the certificates. Rebasing on master will fix those test failures. |
I've rebased this, waiting for tests to pass. |
Given that we're marching towards an actual In that vein, after thinking more about this, it's certainly a good start but it seems like what one would really want is to understand which topics a producer is publishing too, right? |
|
nsqd/client_v2.go
Outdated
func (c *clientV2) PublishedMessage(topic string, count uint64) { | ||
c.metaLock.Lock() | ||
counter, ok := c.pubCounts[topic] | ||
if ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking up a missing key in a golang map returns the zero value ... it would work, but would it be bad style, to just:
c.pubCounts[topic] += count
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
overall looks great |
I'll squash this down, but I don't want to land this until we stamp stable |
you mean "stable 1.1.0" right :) |
there seems to be a possible data race
|
yep, edited
I'll take a look |
It does look like there is indeed a race, but it's in an entirely unrelated code path. It's interesting that we've never observed it before, and that does indicate that this change might have made it more likely, but then again it did happen on an IMO we can fix it separately if it continues to rear its ugly head. |
agreed, we can fix separately if we ever see it again |
add /producer_stats endpoint for nsqd. #879