Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when toilet receives too many buffers is not handled properly #131

Closed
bblaszkow06 opened this issue Dec 5, 2018 · 1 comment
Closed
Labels
Milestone

Comments

@bblaszkow06
Copy link
Member

bblaszkow06 commented Dec 5, 2018

When pipeline is stopped because of toilet buffer overflow, the error is not handled properly and pipeline fails with Function clause error in Bunch:

2018-12-05T15:43:58.631636Z [warn] [core] PullBuffer :sink (toilet): received 288000 buffers,
                                                                                                     which is above fail_level, from input :output that works in push mode.
                                To have control over amount of buffers being produced, consider using push mode.
                                                                                                                If this is a normal situation, increase toilet warn/fail level.
                                    Buffers: %Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}
                                             PullBuffer %Membrane.Core.PullBuffer{current_size: 288000, demand: 65536, demand_pid: #PID<0.305.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :sink, preferred_size: 65536, q: #Qex<[{:buffers, [%Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}], 288000}]>, toilet: %{fail: 262144, warn: 131072}}

2018-12-05T15:43:58.634813Z [warn] [core] Encountered an error.
                                                               Reason: {:pull_buffer, [toilet: :too_many_buffers]}
                                                                                                                  PullBuffer :sink (toilet): failing: too many buffers
                           Stacktrace:
                                          (membrane_core) lib/membrane/core/pull_buffer.ex:139: Membrane.Core.PullBuffer.store/3
                                                                                                                                    (membrane_core) lib/membrane/core/element/buffer_controller.ex:79: anonymous fn/2 in Membrane.Core.Element.BufferController.handle_buffer_pull/3
                                                                                                                                             (elixir) lib/map.ex:791: Map.get_and_update/3
                                                   (elixir) lib/access.ex:370: Access.get_and_update/3
                                                                                                          (elixir) lib/map.ex:791: Map.get_and_update/3
                (elixir) lib/access.ex:370: Access.get_and_update/3


2018-12-05T15:43:58.636849Z [warn] [core] PullBuffer :sink (toilet): received 288000 buffers,
                                                                                             which is above fail_level, from input :output that works in push mode.
                        To have control over amount of buffers being produced, consider using push mode.
                                                                                                        If this is a normal situation, increase toilet warn/fail level.
                            Buffers: %Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}
                                     PullBuffer %Membrane.Core.PullBuffer{current_size: 288000, demand: 65536, demand_pid: #PID<0.305.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :sink, preferred_size: 65536, q: #Qex<[{:buffers, [%Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}], 288000}]>, toilet: %{fail: 262144, warn: 131072}}

2018-12-05T15:43:58.639087Z [warn] [core] Encountered an error.
                                                               Reason: {:pull_buffer, [toilet: :too_many_buffers]}
                                                                                                                  PullBuffer :sink (toilet): failing: too many buffers
                           Stacktrace:
                                          (membrane_core) lib/membrane/core/pull_buffer.ex:139: Membrane.Core.PullBuffer.store/3
                                                                                                                                    (elixir) lib/map.ex:791: Map.get_and_update/3
                                          (elixir) lib/access.ex:370: Access.get_and_update/3
                                                                                                 (elixir) lib/map.ex:791: Map.get_and_update/3
       (elixir) lib/access.ex:370: Access.get_and_update/3
                                                              (membrane_core) lib/membrane/core/element/buffer_controller.ex:72: Membrane.Core.Element.BufferController.handle_buffer_pull/3


2018-12-05T15:43:58.647542Z [warn] [sink core] Encountered an error.
                                                                    Reason: {:function_clause, [{Bunch, :stateful_try_with_status, [error: %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: {:pull_buffer, [toilet: :too_many_buffers]}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: true, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}], [file: 'lib/bunch.ex', line: 158]}, {Membrane.Core.Element.MessageDispatcher, :handle_message, 3, [file: 'lib/membrane/core/element/message_dispatcher.ex', line: 20]}, {Membrane.Element, :handle_info, 2, [file: 'lib/membrane/element.ex', line: 270]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 637]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 711]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}
                                                                                                                Element :sink: Terminating: Attempt to terminate element when it is not stopped

                                                    state: %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: %Membrane.Core.PullBuffer{current_size: 0, demand: 65536, demand_pid: #PID<0.305.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :sink, preferred_size: 65536, q: #Qex<[]>, toilet: %{fail: 262144, warn: 131072}}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: false, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}
                                                           Stacktrace:
                                                                          (membrane_core) lib/membrane/core/element/lifecycle_controller.ex:72: Membrane.Core.Element.LifecycleController.handle_shutdown/2
                                                                    (membrane_core) lib/membrane/core/element/message_dispatcher.ex:20: Membrane.Core.Element.MessageDispatcher.handle_message/3
                                                         (membrane_core) lib/membrane/element.ex:251: Membrane.Element.terminate/2
                                                                                                                                      (stdlib) gen_server.erl:673: :gen_server.try_terminate/3
                                                       (stdlib) gen_server.erl:858: :gen_server.terminate/10
                                                                                                                (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3



17:05:28.408 [error] GenServer #PID<0.306.0> terminating
** (FunctionClauseError) no function clause matching in Bunch.stateful_try_with_status/1
    (bunch) lib/bunch.ex:158: Bunch.stateful_try_with_status({:error, %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: {:pull_buffer, [toilet: :too_many_buffers]}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: true, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}})
    (membrane_core) lib/membrane/core/element/message_dispatcher.ex:20: Membrane.Core.Element.MessageDispatcher.handle_message/3 
    (membrane_core) lib/membrane/element.ex:270: Membrane.Element.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {Membrane.Core.Message, :buffer, [[%Membrane.Buffer{metadata: %{}, payload: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}], :input]}
State: %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: %Membrane.Core.PullBuffer{current_size: 0, demand: 65536, demand_pid: #PID<0.305.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :sink, preferred_size: 65536, q: #Qex<[]>, toilet: %{fail: 262144, warn: 131072}}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: false, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}
2018-12-05T15:43:58.664980Z [warn] [mixer core] Encountered an error.
                                                                     Reason: {:invalid_action, [action: {:demand, {{:dynamic, :input, 0}, :self, 177960}}, callback: :handle_event, module: Membrane.Element.LiveAudioMixer.Source]}
                                                                                         Element :mixer: Elements' Membrane.Element.LiveAudioMixer.Source :handle_event callback returned
                                              invalid action: {:demand, {{:dynamic, :input, 0}, :self, 177960}}. For possible actions are check types
          in Membrane.Element.Action module. Keep in mind that some actions are
                                                                               available in different formats or unavailable for some callbacks,
     element types, playback states or under some other conditions.

                                                                   state: %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{caps: %Membrane.Caps.Audio.Raw{channels: 2, format: :s16le, sample_rate: 48000}, delay: 500000000, interval: 1000000000, playing: true, sinks: %{{:dynamic, :input, 0} => %{eos: false, queue: "", skip: 0}}, start_playing_time: -576457814235974775, tick: 1, timer_ref: #Reference<0.2486223907.3552837633.128386>}, module: Membrane.Element.LiveAudioMixer.Source, name: :mixer, pads: %{data: %{:output => %Membrane.Element.Pad.Data{accepted_caps: Membrane.Caps.Audio.Raw, availability: :always, buffer: nil, caps: %Membrane.Caps.Audio.Raw{channels: 2, format: :s16le, sample_rate: 48000}, current_id: nil, demand: nil, demand_unit: nil, direction: :output, end_of_stream?: false, mode: :push, other_demand_unit: :bytes, other_ref: :input, pid: #PID<0.306.0>, start_of_stream?: false, sticky_messages: nil}, {:dynamic, :input, 0} => %Membrane.Element.Pad.Data{accepted_caps: Membrane.Caps.Audio.Raw, availability: :on_request, buffer: %Membrane.Core.PullBuffer{current_size: 0, demand: 0, demand_pid: #PID<0.304.0>, input_ref: :output, metric: Membrane.Buffer.Metric.ByteSize, min_demand: 16384, name: :mixer, preferred_size: 65536, q: #Qex<[]>, toilet: false}, caps: %Membrane.Caps.Audio.Raw{channels: 2, format: :s16le, sample_rate: 48000}, current_id: 1, demand: 0, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.304.0>, start_of_stream?: true, sticky_messages: []}}, dynamic_currently_linking: [], info: %{input: %{accepted_caps: Membrane.Caps.Audio.Raw, availability: :on_request, current_id: 1, demand_unit: :bytes, direction: :input, mode: :pull}}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :filter, watcher: #PID<0.300.0>}
                                                                                                                       Stacktrace:
                                                                                                                                      (membrane_core) lib/membrane/core/element/action_handler.ex:22: Membrane.Core.Element.ActionHandler.handle_action/4
                                                                                                                  (bunch) lib/bunch/enum.ex:121: anonymous fn/3 in Bunch.Enum.try_reduce/3
                                                   (elixir) lib/enum.ex:3281: Enumerable.List.reduce/3
                                                                                                          (elixir) lib/enum.ex:1968: Enum.reduce_while/3
                 (membrane_core) lib/membrane/core/callback_handler.ex:127: Membrane.Core.CallbackHandler.exec_handle_actions/5
                                                                                                                                   (membrane_core) lib/membrane/core/element/event_controller.ex:40: Membrane.Core.Element.EventController.exec_handle_event/4


2018-12-05T15:43:58.665156Z [warn] [core] Error while handling actions returned by callback :handle_event

** (EXIT from #PID<0.271.0>) shell process exited with reason: an exception was raised:
    ** (FunctionClauseError) no function clause matching in Bunch.stateful_try_with_status/1
        (bunch) lib/bunch.ex:158: Bunch.stateful_try_with_status({:error, %Membrane.Core.Element.State{controlling_pid: #PID<0.300.0>, delayed_demands: %{}, internal_state: %{endpoint_id: :default, latency: :high, native: #Reference<0.2486223907.3552968708.126269>, portaudio_buffer_size: 256, ringbuffer_size: 4096}, module: Membrane.Element.PortAudio.Sink, name: :sink, pads: %{data: %{input: %Membrane.Element.Pad.Data{accepted_caps: {Membrane.Caps.Audio.Raw, [channels: 2, sample_rate: 48000, format: :s16le]}, availability: :always, buffer: {:pull_buffer, [toilet: :too_many_buffers]}, caps: nil, current_id: nil, demand: 16384, demand_unit: :bytes, direction: :input, end_of_stream?: false, mode: :pull, other_demand_unit: nil, other_ref: :output, pid: #PID<0.305.0>, start_of_stream?: true, sticky_messages: []}}, dynamic_currently_linking: [], info: %{}}, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :playing, target_locked?: false, target_state: :playing}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, type: :sink, watcher: #PID<0.300.0>}})
        (membrane_core) lib/membrane/core/element/message_dispatcher.ex:20: Membrane.Core.Element.MessageDispatcher.handle_message/3
        (membrane_core) lib/membrane/element.ex:270: Membrane.Element.handle_info/2
        (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
        (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
        (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3

here's the pipeline causing error:

defmodule Membrane.Demo.MP3.Pipeline do
  use Membrane.Pipeline
  def handle_init(path_to_mp3) do
    caps = %Membrane.Caps.Audio.Raw{
          format: :s16le,
          sample_rate: 48_000,
          channels: 2
        }

    children = [
      file_src: %Membrane.Element.File.Source{location: path_to_mp3},
      decoder: Membrane.Element.Mad.Decoder,
      converter: %Membrane.Element.FFmpeg.SWResample.Converter{
        output_caps: caps
      },
      mixer: %Membrane.Element.LiveAudioMixer.Source{caps: caps},
      sink: Membrane.Element.PortAudio.Sink
    ]

    links = %{
      {:file_src, :output} => {:decoder, :input},
      {:decoder, :output} => {:converter, :input},
      {:converter, :output} => {:mixer, :input},
      {:mixer, :output} => {:sink, :input, pull_buffer: [toilet: true]}
    }

    spec = %Membrane.Pipeline.Spec{
      children: children,
      links: links
    }

    {{:ok, spec}, %{}}
  end
end
@bblaszkow06 bblaszkow06 added the bug label Dec 5, 2018
@bblaszkow06 bblaszkow06 added this to the v0.3.0 milestone Dec 5, 2018
@bblaszkow06
Copy link
Member Author

Fixed in #138

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant