-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How can I debug a "negative WaitGroup counter" in a processor? #198
Comments
I could image that happening if you are starting goroutines inside the process callback, or using the Ctx object after the process callback returned? |
Dang... you're right @db7, I do call I'm calling |
@activeshadow. Glad it was the right guess. If you want to use a goroutine, then it's better to create a producer and pass the producer and message to the goroutine (but not the goka.Context). It may be also safer cloning the message. Depending on how your codec works, it could reuse message buffers in upcoming process callbacks and cause weird behaviour if the message is not cloned. This approach has a problem, however. You won't have at least one processing anymore. If your process crashes and you "decoupled" the message from the context, you won't see the message again. The input messages are committed once the callback returns and all emits/loopbacks/sets of the context have completed. You won't have that anymore. So just use that if that is ok for you. Other ideas: 1. Stream delayIf you need to slow down the complete stream, the best way is to "put the callbacks to sleep", ie func mycallback(ctx goka.Context, m interface{}){
sleep until time.Now() > m.TriggerTime
do whatever with m
} Since each partition processed by a single goroutine, and the partition is processed in FIFO order, this will effectively slow down the complete stream. I've used this in an application that a stream of events should always be processed 1h later. 2. Selective delay with loopbackIf you just want to slow down the processing of few messages, then you can use loopback and Kafka itself. func mycallback(ctx goka.Context, m interface{}){
if special m, ctx.Loopback(m)
otherwise do whatever with m
}
func mycallback(ctx goka.Context, m interface{}){
if time.Now() < m.TriggerTime {
ctx.Loopback(m) // keep looping until time has passed
}
do whatever with m
} This may be ok depending on your traffic, may be not. 3. Selective stream delayAnother idea is to combine both ideas (1) and (2).
Let me know if any of these suggestions make sense. |
Thanks @db7. I think I'll start w/ option 2. I was using the same handler for the loopback as I was for the input, so i'll just create a new handler for the loopback that does exactly what the handler for the input does, but with a delay first. |
@activeshadow can I close this issue? If you are still facing related problems, please let us know. |
@db7 I can close it! No longer having this issue based on your suggestions. |
I've got a processor that keeps panicking with the following:
How can I debug this, given the stack trace doesn't show what part of my code is making the call? I suspect it has something to do with using the loopback table, since this is the only processor I have that uses it, and it's the only processor I have that's panicking like this.
Note that I'm using a local copy of the
goka
library with a few edits for unit testing.The text was updated successfully, but these errors were encountered: