Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infinite loop with stateful transducer on core.async channel #37

Open
pukkamustard opened this issue Aug 9, 2020 · 3 comments
Open

Comments

@pukkamustard
Copy link

pukkamustard commented Aug 9, 2020

When running a stateful transducer on a core.async channel I get an unexpected behavior:

(go (println
     (<! (let [c (async/chan 1 xforms/count)]

           (async/onto-chan! c (range 5))

           (async/into []
                       (async/take 10 c))))))

Outputs:

[5 5 5 5 5 5 5 5 5 5]

Expected output:

[5]

When running the same transducer on a sequence:

(into [] xforms/count (range 5))

I get the expected outcome ([5).

I have posted a question on ask.clojure.org (https://ask.clojure.org/index.php/9529/infinite-loop-with-stateful-transducer-core-async-channel). A fix has been provided there but I don't understand what is going. I am not sure if this is a bug in xforms or expected behavior. Maybe you could provide some insight?

@otfrom
Copy link

otfrom commented Oct 20, 2020

I'm having the same problem, but mine came up using sort-by. I'd really like to be able to use things like by-key and others in my async pipelines

@cgrand
Copy link
Owner

cgrand commented Oct 20, 2020

Weird. I'm going to look into it.

@cgrand
Copy link
Owner

cgrand commented Oct 20, 2020

I wonder if it shouldn't be requalified as a core.async bug: a closed transducing channel doesn't remember having called the completing arity of its rf:

user=>  (def c (a/chan 1 (fn [rf] (fn ([] (rf)) ([acc] (rf (unreduced (rf acc [:completed (System/currentTimeMillis)])))) ([acc _] acc)))))
#'user/c
user=> (a/close! c)
nil
user=> (a/<!! c)
[:completed 1603187902345]
user=> (a/<!! c)
[:completed 1603187908246]
user=> (a/<!! c)
[:completed 1603187909392]
user=> (a/<!! c)
[:completed 1603187910475]
user=> (a/<!! c)
[:completed 1603187911584]

It has a broad impact: all transducers that flush on completion should keep track of having completed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants