From 346099bb5f9157794bf776de9736d26cb94c3d15 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Mon, 20 Mar 2023 14:34:23 +0100 Subject: [PATCH] remove `EnumeratorInterpreter` from `Akka.Streams.Implementation.Fusing` --- .../Fusing/EnumeratorInterpreter.cs | 268 ------------------ 1 file changed, 268 deletions(-) delete mode 100644 src/core/Akka.Streams/Implementation/Fusing/EnumeratorInterpreter.cs diff --git a/src/core/Akka.Streams/Implementation/Fusing/EnumeratorInterpreter.cs b/src/core/Akka.Streams/Implementation/Fusing/EnumeratorInterpreter.cs deleted file mode 100644 index 3877be0bfdd..00000000000 --- a/src/core/Akka.Streams/Implementation/Fusing/EnumeratorInterpreter.cs +++ /dev/null @@ -1,268 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2022 Lightbend Inc. -// Copyright (C) 2013-2022 .NET Foundation -// -//----------------------------------------------------------------------- - -using System; -using System.Collections; -using System.Collections.Generic; -using System.Linq; -using Akka.Event; -using Akka.Streams.Stage; - -namespace Akka.Streams.Implementation.Fusing -{ - /// - /// TBD - /// - internal static class EnumeratorInterpreter - { - /// - /// TBD - /// - /// TBD - public sealed class EnumeratorUpstream : GraphInterpreter.UpstreamBoundaryStageLogic - { - /// - /// TBD - /// - public bool HasNext; - - private readonly Outlet _outlet; - - /// - /// TBD - /// - /// TBD - public EnumeratorUpstream(IEnumerator input) - { - _outlet = new Outlet("IteratorUpstream.out") { Id = 0 }; - - SetHandler(_outlet, onPull: () => - { - if (!HasNext) CompleteStage(); - else - { - var element = input.Current; - HasNext = input.MoveNext(); - if (!HasNext) - { - Push(_outlet, element); - Complete(_outlet); - } - else Push(_outlet, element); - } - }, - onDownstreamFinish: InternalOnDownstreamFinish); - } - - /// - /// TBD - /// - public override Outlet Out => _outlet; - } - - /// - /// TBD - /// - /// TBD - public sealed class EnumeratorDownstream : GraphInterpreter.DownstreamBoundaryStageLogic, IEnumerator - { - /// - /// TBD - /// - internal bool IsDone; - /// - /// TBD - /// - internal TOut NextElement; - /// - /// TBD - /// - internal bool NeedsPull = true; - /// - /// TBD - /// - internal Exception LastFailure; - - private readonly Inlet _inlet; - - /// - /// TBD - /// - public EnumeratorDownstream() - { - _inlet = new Inlet("IteratorDownstream.in") { Id = 0 }; - - SetHandler(_inlet, onPush: () => - { - NextElement = Grab(_inlet); - NeedsPull = false; - }, - onUpstreamFinish: () => - { - IsDone = true; - CompleteStage(); - }, - onUpstreamFailure: cause => - { - IsDone = true; - LastFailure = cause; - CompleteStage(); - }); - } - - /// - /// TBD - /// - public override Inlet In => _inlet; - - /// - /// TBD - /// - public void Dispose() { } - - /// - /// TBD - /// - /// TBD - public bool MoveNext() - { - if (LastFailure != null) - { - var e = LastFailure; - LastFailure = null; - throw e; - } - if (!HasNext()) - return false; - - NeedsPull = true; - return true; - } - - /// - /// TBD - /// - public void Reset() - { - IsDone = false; - NextElement = default(TOut); - NeedsPull = true; - LastFailure = null; - } - - /// - /// TBD - /// - /// TBD - public bool HasNext() - { - if(!IsDone) - PullIfNeeded(); - - return !(IsDone && NeedsPull) || LastFailure != null; - } - - /// - /// TBD - /// - public TOut Current => NextElement; - - object IEnumerator.Current => Current; - - private void PullIfNeeded() - { - if (NeedsPull) - { - Pull(_inlet); - Interpreter.Execute(int.MaxValue); - } - } - } - } - - /// - /// TBD - /// - /// TBD - /// TBD - internal sealed class EnumeratorInterpreter : IEnumerable - { - private readonly IEnumerable> _ops; - private readonly EnumeratorInterpreter.EnumeratorUpstream _upstream; - private readonly EnumeratorInterpreter.EnumeratorDownstream _downstream = new EnumeratorInterpreter.EnumeratorDownstream(); - /// - /// TBD - /// - /// TBD - /// TBD - public EnumeratorInterpreter(IEnumerator input, IEnumerable> ops) - { - _ops = ops; - _upstream = new EnumeratorInterpreter.EnumeratorUpstream(input); - - Init(); - } - - private void Init() - { - var i = 0; - var length = _ops.Count(); - var attributes = new Attributes[length]; - for (var j = 0; j < length; j++) attributes[j] = Attributes.None; - var ins = new Inlet[length + 1]; - var inOwners = new int[length + 1]; - var outs = new Outlet[length + 1]; - var outOwners = new int[length + 1]; - var stages = new IGraphStageWithMaterializedValue[length]; - - ins[length] = null; - inOwners[length] = GraphInterpreter.Boundary; - outs[0] = null; - outOwners[0] = GraphInterpreter.Boundary; - - var opsEnumerator = _ops.GetEnumerator(); - while (opsEnumerator.MoveNext()) - { - var op = opsEnumerator.Current; - var stage = new PushPullGraphStage(_ => op, Attributes.None); - stages[i] = stage; - ins[i] = stage.Shape.Inlet; - inOwners[i] = i; - outs[i + 1] = stage.Shape.Outlet; - outOwners[i + 1] = i; - - i++; - } - - var assembly = new GraphAssembly(stages, attributes, ins, inOwners, outs, outOwners); - var tup = assembly.Materialize(Attributes.None, assembly.Stages.Select(x => x.Module).ToArray(), new Dictionary(), _ => { }); - var connections = tup.Item1; - var logics = tup.Item2; - - var interpreter = new GraphInterpreter( - assembly: assembly, - materializer: NoMaterializer.Instance, - log: NoLogger.Instance, - connections: connections, - logics: logics, - onAsyncInput: (_1, _2, _3) => throw new NotSupportedException("IteratorInterpreter does not support asynchronous events."), - fuzzingMode: false, - context: null); - interpreter.AttachUpstreamBoundary(0, _upstream); - interpreter.AttachDownstreamBoundary(length, _downstream); - interpreter.Init(null); - } - - /// - /// TBD - /// - /// TBD - public IEnumerator GetEnumerator() => _downstream; - - IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - } -}