-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add socket listener & writer #2094
Conversation
@@ -163,6 +167,15 @@ func (a *Accumulator) NFields() int { | |||
return counter | |||
} | |||
|
|||
// Wait waits for a metric to be added to the accumulator. | |||
// Accumulator must already be locked. | |||
func (a *Accumulator) Wait() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added so that tests don't have to call sleep()
with some excessive value to avoid race conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
func (psl *packetSocketListener) listen() { | ||
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet | ||
for { | ||
_, _, err := psl.ReadFrom(buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these lines should only parse what was most recently read into the buffer, like this:
+ n, _, err := psl.ReadFrom(buf)
+ if err != nil {
+ psl.AddError(err)
+ break
+ }
+
+ metrics, err := psl.Parse(buf[0:n])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops :-)
Dunno how I missed that one. Thanks for the catch.
@@ -163,6 +167,15 @@ func (a *Accumulator) NFields() int { | |||
return counter | |||
} | |||
|
|||
// Wait waits for a metric to be added to the accumulator. | |||
// Accumulator must already be locked. | |||
func (a *Accumulator) Wait() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
|
||
for _, m := range metrics { | ||
strs, err := sw.Serialize(m) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed the behavior of the Serialize functions, they now return a byte slice which includes all newlines at the end of each metric. This part of the code will need to be refactored for that, but give me a day or two because I also have a change that will create an io.Reader from a slice of metrics. Using the io.Reader will create fewer allocations and also allow using io.CopyBuffer() into sw.Conn. Using io.CopyBuffer provides for a configurable max buffer size which users of datagram protocols will likely want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using io.CopyBuffer provides for a configurable max buffer size which users of datagram protocols will likely want.
We would need to slap a warning on the value that setting it below 64kb (at least when using UDP) is dangerous. If the read buffer size is set less than the size of an incoming packet, the packet will be lost. The only use case I can think of for adjusting from a default 64kb would be unix domain sockets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The max buffer size I'm talking about here is for the writer
, so that it would chop up the result of Serialize()
into chunks rather than sending it all as a single byte buffer (which could easily exceed 64kb with thousands of metrics).
UDP users will also probably want their packets closer to the 512 - 4096 byte range
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doh. I need to stop reviewing code within an hour of waking up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also have a change that will create an io.Reader from a slice of metrics
Did this get implemented? I went looking for such a thing but couldn't find anything.
} | ||
|
||
for _, m := range metrics { | ||
strs, err := sw.Serialize(m) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using io.CopyBuffer provides for a configurable max buffer size which users of datagram protocols will likely want.
We would need to slap a warning on the value that setting it below 64kb (at least when using UDP) is dangerous. If the read buffer size is set less than the size of an incoming packet, the packet will be lost. The only use case I can think of for adjusting from a default 64kb would be unix domain sockets.
func (psl *packetSocketListener) listen() { | ||
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet | ||
for { | ||
_, _, err := psl.ReadFrom(buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops :-)
Dunno how I missed that one. Thanks for the catch.
testutil/accumulator.go
Outdated
@@ -27,6 +27,7 @@ func (p *Metric) String() string { | |||
// Accumulator defines a mocked out accumulator | |||
type Accumulator struct { | |||
sync.Mutex | |||
Cond *sync.Cond |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note to self: cleanup: explicit Cond
name not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see requested changes in review, otherwise I think it looks ready to merge.
thanks again @phemmer!
scnr := bufio.NewScanner(c) | ||
for scnr.Scan() { | ||
bs := scnr.Bytes() | ||
bs = append(bs, '\n') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a workaround for #2297
I would like to remove this as growing the slice can result in more allocations. Potentially big ones as these slices will be large.
bs := buf[:n] | ||
if len(bs) > 0 && bs[len(bs)-1] != '\n' { | ||
bs = append(bs, '\n') | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the other workaround for #2297
|
PR updated to remove workarounds for #2297 |
Current build failure looks unrelated to changes in this PR |
rebased for merge conflict |
closes influxdata#1516 closes influxdata#1711 closes influxdata#1721 closes influxdata#1526
Required for all PRs:
This adds support for a generic socket writer & listener.
The original intent was to add support for unix domain sockets. But it was trivial to write generic plugins that can handle all protocols. Thus the functionality of the
socket_listener
duplicatestcp_listener
andudp_listener
.However in the case of
tcp_listener
, there is a critical difference in that the plugin will not ever drop a metric (such as if the buffer fills up). TCP is meant to be a reliable protocol, thus it should be up to the sender whether data gets dropped.Another slight difference in the
socket_listener
is that instead of having 2 layers of buffering, an application chan buffer and a socket buffer, it only has a socket buffer. Config parameters have been provided for adjusting the size of the socket buffer. The chan buffer could be added, but I couldn't see any benefit to doing so, and thought it might be less confusing having only 1 layer of buffering.Closes #1516, #1711, #1721
Obsoletes #1526