Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize shutdown in stanza adapter #34638

Merged
merged 5 commits into from
Sep 10, 2024

Conversation

michalpristas
Copy link
Contributor

@michalpristas michalpristas commented Aug 13, 2024

Description:
This PR takes emitter-converter-receiver pipeline and synchronize shutdown in a sense that lower level of the pipeline needs to be fully finished before next one is shut down.

Link to tracking Issue: #31074

Testing: UT
run

 go test -run ^TestShutdownFlush$ . -count 10000 -failfast

Documentation: in code comments

Copy link
Member

@djaglowski djaglowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These appear to be nice improvements. Ranging over channels is a lot cleaner. Having a channel / wg for each function is much more precise.

One thing I'm wondering about is the use of context.Background(). I wonder if this indicates there are still issues to iron out. What is your take on this @michalpristas?

Also would really appreciate a review by @swiatekm if possible.

pkg/stanza/adapter/converter.go Outdated Show resolved Hide resolved
pkg/stanza/adapter/receiver.go Outdated Show resolved Hide resolved
pkg/stanza/operator/helper/emitter.go Outdated Show resolved Hide resolved
@michalpristas michalpristas marked this pull request as ready for review August 15, 2024 15:03
@michalpristas michalpristas requested review from a team and dmitryax August 15, 2024 15:03
@michalpristas
Copy link
Contributor Author

with context.Background used i'm fine with how it is signalling flush that we don't care about the cancellation now, passing a cancelled context to it would not be ideal and flush is still able to be unblocked on ctx.Done in case called from public API. i get it's not nicest thing though

Copy link
Contributor

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Aug 30, 2024
@djaglowski djaglowski removed the Stale label Sep 3, 2024
@djaglowski
Copy link
Member

Unless anyone else wants to review this, I think we just need to resolve the CI failures

@swiatekm
Copy link
Contributor

swiatekm commented Sep 10, 2024

@djaglowski I'm going to take this up and get it over the finish line, as @michalpristas is currently on extended leave. I reviewed the change and I do think it fixes the problem it claims to fix. Some parts are still a bit awkward (I think we should refactor the emitter to just use a channel for signalling finalization instead of messing around with contexts), but those can be addressed in a follow up imo.

Since you already reviewed this change, how about I just rebase and fix any remaining CI issues?

michalpristas and others added 4 commits September 10, 2024 14:48
Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
After the shutdown changes in previous commits, a receiver can only shut
down after its emitter was shut down first.
@djaglowski
Copy link
Member

Thank you @swiatekm!

@@ -151,7 +151,10 @@ func TestEmitterToConsumer(t *testing.T) {

err = logsReceiver.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
defer func() { require.NoError(t, logsReceiver.Shutdown(context.Background())) }()
defer func() {
require.NoError(t, logsReceiver.emitter.Stop())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This is now necessary, because the receiver is not allowed to stop until the emitter does. In an actual pipeline, the emitter itself is owned by the pipeline, and the pipeline is responsible for closing it. Here, we need to do it manually.

@swiatekm
Copy link
Contributor

@djaglowski could you re-review? It's Michal's PR, so I don't have the power to re-request it. 😅

@djaglowski djaglowski merged commit ede416f into open-telemetry:main Sep 10, 2024
155 of 156 checks passed
@github-actions github-actions bot added this to the next release milestone Sep 10, 2024
@djaglowski
Copy link
Member

Thanks again @michalpristas and @swiatekm. It'll be great to have this fix in place.

I'm happy to review a followup PR if there's any cleanup in mind.

}
case <-ctx.Done():
// flush currently batched entries
for oldBatch := e.makeNewBatch(); len(oldBatch) > 0; oldBatch = e.makeNewBatch() {
Copy link
Member

@andrzej-stencel andrzej-stencel Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific scenario that warrants using a for instead of doing it just once? I would assume that at the point when LogEmitter.Stop is called the rest of the Stanza pipeline is stopped, so no new entries should be landing in the emitter's batch.

Not that the for hurts, it will work "more than" fine. 🙂 I just wonder if I'm missing something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not necessary in practice, because other components shouldn't be calling Process after Stop was already called. And this is how the pipeline acts. @djaglowski am I wrong here?

djaglowski pushed a commit that referenced this pull request Sep 11, 2024
Clean up emitter shutdown in stanza. Instead of using contexts for
signaling shutdown, use an idiomatic close channel. We still have a bit
of context usage, but that's required by the operator API, and is only
used for cancelling a given entry, not for component shutdown.

Follow up to
#34638.
jriguera pushed a commit to springernature/opentelemetry-collector-contrib that referenced this pull request Oct 4, 2024
**Description:** 
This PR takes emitter-converter-receiver pipeline and synchronize
shutdown in a sense that lower level of the pipeline needs to be fully
finished before next one is shut down.

**Link to tracking Issue:** open-telemetry#31074

**Testing:** UT
run 
```
 go test -run ^TestShutdownFlush$ . -count 10000 -failfast
```

**Documentation:** in code comments

---------

Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
Co-authored-by: Mikołaj Świątek <mail@mikolajswiatek.com>
jriguera pushed a commit to springernature/opentelemetry-collector-contrib that referenced this pull request Oct 4, 2024
Clean up emitter shutdown in stanza. Instead of using contexts for
signaling shutdown, use an idiomatic close channel. We still have a bit
of context usage, but that's required by the operator API, and is only
used for cancelling a given entry, not for component shutdown.

Follow up to
open-telemetry#34638.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants