Skip to content

Commit

Permalink
Ensure flow options effectively override producer dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
josevalim committed Mar 9, 2021
1 parent 9686433 commit 6523ca1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
2 changes: 1 addition & 1 deletion lib/flow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ defmodule Flow do
"""
@spec partition(t | [t], keyword()) :: t
def partition(flow_or_flows, options \\ []) when is_list(options) do
merge(List.wrap(flow_or_flows), GenStage.PartitionDispatcher, options)
merge(flow_or_flows, GenStage.PartitionDispatcher, options)
end

@doc """
Expand Down
8 changes: 4 additions & 4 deletions lib/flow/materialize.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule Flow.Materialize do

def materialize(%Flow{} = flow, demand, start_link, type, type_options) do
%{operations: operations, options: options, producers: producers, window: window} = flow
options = Keyword.merge(type_options, options)
options = Keyword.merge(options, type_options)
{ops, batchers} = split_operations(operations)

{producers, consumers, ops, window} =
Expand Down Expand Up @@ -148,9 +148,9 @@ defmodule Flow.Materialize do
ops,
start_link,
window,
options
_options
) do
{producers, consumers} = materialize(flow, :forward, start_link, :producer_consumer, options)
{producers, consumers} = materialize(flow, :forward, start_link, :producer_consumer, [])
{type, {acc, fun, trigger}, ops} = ensure_ops(ops)

stages = Keyword.fetch!(flow.options, :stages)
Expand Down Expand Up @@ -195,7 +195,7 @@ defmodule Flow.Materialize do
options
) do
{producers, intermediary} =
materialize(flow, :forward, start_link, :producer_consumer, options)
materialize(flow, :forward, start_link, :producer_consumer, [])

timeout = Keyword.get(options, :subscribe_timeout, 5_000)

Expand Down
14 changes: 14 additions & 0 deletions test/flow_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,20 @@ defmodule FlowTest do
|> Enum.sort() == [[1, 2, 4, 5], [3, 6]]
end

test "allows function based partitioning after shuffling" do
enumerables = [
[%{key: 1, value: 1}, %{key: 2, value: 2}, %{key: 3, value: 3}],
[%{key: 1, value: 4}, %{key: 2, value: 5}, %{key: 3, value: 6}]
]

assert Flow.from_enumerables(enumerables)
|> Flow.shuffle(stages: 2)
|> Flow.partition(key: & &1.key, stages: 2)
|> Flow.reduce(fn -> [] end, &[&1 | &2])
|> Flow.on_trigger(fn acc -> {[acc |> Enum.map(& &1.value) |> Enum.sort()], acc} end)
|> Enum.sort() == [[1, 2, 4, 5], [3, 6]]
end

test "allows custom windowing" do
window =
Flow.Window.fixed(1, :second, fn
Expand Down

0 comments on commit 6523ca1

Please sign in to comment.