Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Channel Stages from Alpakka to main project. #6268

Merged

Conversation

to11mtm
Copy link
Member

@to11mtm to11mtm commented Nov 26, 2022

Changes

  • Moves Alpakka Channels Extension into the main project.
  • Appends DSLs for main Source and Sink to have Channel methods
  • Adds capability for a Channel Source with a materialized ChannelWriter<T> result
    • This feels more symmetrical with the Sink API that allows both options, and allows for a better option-in-place-of Source.Queue<T>.

The main motivations for this are as follows:

  • We already take System.Threading.Channels as a dep in core Akka
  • We can probably get rid of the ugly backing code behind RunAsAsyncEnumerable, even if it is a minorly-breaking API change it may be better long term.

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

Copy link
Member Author

@to11mtm to11mtm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left my own comments here.

Please let me know if files should be moved/deleted/etc.

CompleteStage();
}
else
continuation.AsTask().ContinueWith(_onReadReady);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a slight change from the Alpakka code:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

_onValueReadFailure =
GetAsyncCallback<Exception>(OnValueReadFailure);
_onReaderComplete = GetAsyncCallback<Exception>(OnReaderComplete);
_onReadReady = ContinueAsyncRead;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is here to avoid delegates potentially being made/initted repeatedly in OnPull.

/// <param name="singleReader"></param>
/// <param name="fullMode"></param>
/// <returns></returns>
public static Sink<T, ChannelReader<T>> AsReader<T>(int bufferSize, bool singleReader = false, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK if we want to keep these, I moved them over for the sake of migration.

/// <typeparam name="T"></typeparam>
/// <param name="reader"></param>
/// <returns></returns>
public static Source<T, NotUsed> FromReader<T>(ChannelReader<T> reader)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK if we want to keep these, I moved them over for the sake of migration.

bool singleWriter = false,
BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait)
{
return Source.FromGraph(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this for the sake of symmetry with the Sinks, as well as it's amazing utility in place of Source.Queue<T>

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some thoughts

public ChannelReaderSink(int bufferSize, bool singleReader = false, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait)
{
_bufferSize = bufferSize;
_singleReader = singleReader;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't singleReader be true by default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was 'as-found' but singleReader = false here is the safer default so I didn't think it was worth changing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, BoundedChannel AFAIK doesn't even use ChannelOptions.SingleReader. it's only used in UnboundedChannel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we don't use UnboundedChannel, so I guess this setting doesn't really matter then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we don't use UnboundedChannel, so I guess this setting doesn't really matter then?

Not yet anyway. The challenge of course is that it may become relevant in the future.

Right now, BoundedChannel<T> uses a Dequeue<T> internally, so it uses a single lock for both readers/writers. In the future however, the internal structure may change (e.x. something more like ConcurrentQueue<T> with segments) and then it may become important.

Really it's a question of whether it is an oversight in ChannelOptions that it is exposed for both types.

CompleteStage();
}
else
continuation.AsTask().ContinueWith(_onReadReady);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb
Copy link
Member

We can add website documentation later, but I think this looks good - can you run API approvals @to11mtm ?

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but needs API approvals

@Aaronontheweb Aaronontheweb enabled auto-merge (squash) December 20, 2022 17:02
@Aaronontheweb
Copy link
Member

I think I have the API approvals working here now

@Aaronontheweb
Copy link
Member

Looks like one of the new versions of Verify we started using might be causing problems with API approval tests.

@Aaronontheweb Aaronontheweb merged commit b7241f5 into akkadotnet:dev Dec 20, 2022
@Aaronontheweb Aaronontheweb added this to the 1.5.0 milestone Dec 20, 2022
Aaronontheweb added a commit to Aaronontheweb/akka.net that referenced this pull request Dec 20, 2022
* Move Channel Stages from Alpakka to main project.

* added API approvals

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
Aaronontheweb added a commit that referenced this pull request Dec 20, 2022
)

* Move Channel Stages from Alpakka to main project. (#6268)

* Move Channel Stages from Alpakka to main project.

* added API approvals

Co-authored-by: Aaron Stannard <aaron@petabridge.com>

* added API approvals

Co-authored-by: Drew <laingas@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants