-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: add VectorSink::Sink #4846
Conversation
Signed-off-by: Kirill Fomichev <fanatid@ya.ru>
Signed-off-by: Kirill Fomichev <fanatid@ya.ru>
Signed-off-by: Kirill Fomichev <fanatid@ya.ru>
Signed-off-by: Kirill Fomichev <fanatid@ya.ru>
Signed-off-by: Kirill Fomichev <fanatid@ya.ru>
Signed-off-by: Kirill Fomichev <fanatid@ya.ru>
078443a
to
adfa9b6
Compare
record = record.timestamp(timestamp.timestamp_millis()); | ||
} | ||
let producer = Arc::clone(&self.producer); | ||
self.delivery_fut.push(Box::pin(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send_result
is not async and previously we returned NotReady
if the queue was full. In the new Sink
trait we can not return not ready, so for the case when sending blocked (queue full) I added FuturesUnordered
where send_result
called with 10ms delay. A number of futures limited by SEND_RESULT_LIMIT
(set to 5).
I'm not completely sure that this is the best solution, maybe have 1 future with 1ms (or something like this) delay will be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to follow up on this, I think what you have here is reasonable.
If we wanted to be more precise, we could try to tie the retries to the completion of the in_flight
futures instead of a delay. So when one of those is complete we try again to send data, assuming that the completed request could have opened up space in the buffer. But that could be a bit tricky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, right, that sounds more correct way. I agree that this can be tricky, but should I try to add it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably worth spending a little bit of time to see how it would work, but not a lot if it turns out to be complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in #5007
let message = encode_event(item, &self.encoding).map_err(|_| ())?; | ||
|
||
let producer = Arc::clone(&self.producer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an internal state instead Arc<Mutex<T>>
.
Signed-off-by: Kirill Fomichev <fanatid@ya.ru>
Signed-off-by: Kirill Fomichev <fanatid@ya.ru>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ok on my end, but I don't know what the best approach for the semantics of this should be, particularly your questions about the librdkafka interface.
Signed-off-by: Kirill Fomichev <fanatid@ya.ru>
Signed-off-by: Kirill Fomichev <fanatid@ya.ru>
Signed-off-by: Kirill Fomichev <fanatid@ya.ru> Signed-off-by: Brian Menges <brian.menges@anaplan.com>
Part of #2942 #4247
Only introduce variant and upgrade pulsar and kafka. Integration tests for kafka fail for me locally, will mark as draft for some time.