Skip to content
impact11 edited this page Sep 10, 2011 · 6 revisions

Streaming

Request streaming

To handle cases where a large POST body must be processed (ex: file upload), or if you want to run some validation on the supplied request headers prior to processing the rest of the request, you can define on_headers(env, headers) and on_body(env, chunk) methods within your API which will invoked when the headers are first parsed, or as new data arrives to the server.

require 'goliath'

class Stream < Goliath::API
  def on_headers(env, headers)
    env.logger.info 'received headers: ' + headers.inspect
    env['async-headers'] = headers
  end

  def on_body(env, data)
    env.logger.info 'received data: ' + data
    (env['async-body'] ||= '') << data
  end

  def on_close(env)
    env.logger.info 'closing connection'
  end

  def response(env)
    [200, {}, {body: env['async-body'], head: env['async-headers']}]
  end
end

When the on_body callback is defined within the API, the data is streamed to the application and must be processed immediately (in example above, we store it and echo it back) – response(env) is invoked when all the data arrives and HTTP request is complete, but params won’t contain the body (we want to avoid buffering large files within the server). It is up to you to define the desired behavior: you can either process the data in chunks, or you can stream the data and process it at the end of the request.

Response streaming

Need to provide a firehose of updates to your clients? Goliath can efficiently stream the data to your clients:

require 'goliath'

class Stream < Goliath::API
  def response(env)
    i = 0
    pt = EM.add_periodic_timer(1) do
      env.stream_send("#{i}\n")
      i += 1
    end

    EM.add_timer(10) do
      pt.cancel

      env.stream_send("!! BOOM !!\n")
      env.stream_close
    end

    [200, {}, Goliath::Response::STREAMING]
  end
end

In the example above, when the client connects, the server will return a 200 code and a special Goliath::Response::STREAMING response which will indicate to the server that it should not invoke any body post-processing after it sends the response headers.

Once the response header is sent, the connection is kept open and your callbacks have direct access to the underlying connection via the env.stream_send function. In the example above, we setup a periodic 1 second timer, which emits a counter, and a second timer (10 s) which closes the connection.

Hence, when the client connects, it will see the response headers, and then receive 10 messages (1 through 10), followed by a “!! Boom !!”. Of course, instead of sending individual integers, you can stream JSON, XML, or any other format your application requires.

You can also stream media using Chunked Transfer Encoding. The code is only subtly different:

require 'goliath'

class ChunkedStreaming < Goliath::API
  def on_close(env)
    env.logger.info "Connection closed."
  end

  def response(env)
    i = 0
    pt = EM.add_periodic_timer(1) do
      env.chunked_stream_send("#{i}\n")
      i += 1
    end

    EM.add_timer(10) do
      pt.cancel

      env.chunked_stream_send("!! BOOM !!\n")
      env.chunked_stream_close
    end

    headers = { 'Content-Type' => 'text/plain', 'X-Stream' => 'Goliath' }
    chunked_streaming_response(200, headers)
  end
end

Goliath automatically adds the necessary Transfer Encoding headers and Byte size delimiters to the data stream, allowing the recipient to properly identify data chunks and handle them accordingly.