Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add standard chunking format #914

Merged
merged 5 commits into from
Apr 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class EventStream
include Enumerable
include MessagePackFactory::Mixin

def records
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name records seems to represent returns the event entries.
num_records or size, rubish, is better for me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size looks bad, because it sometimes means bytes of a data, and a type of EventStream (MessagePackEventStream) is actually a String object.
How about events?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

events is better for me.

@sonots How about this? This method will be used in flowcounter_simple or similar gems.

Copy link
Member

@sonots sonots Apr 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks this method returns an integer, not an array. So, both records and events look bad for me.
What about num_events, num_records, or just length?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll use length.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once I would use length, but I re-found that Chunk#records already exists, and I made EventStream#records as same name with it.
Chunk also has Chunk#size (bytesize), and #size != #length is extraordinary confusing in ruby way. Hmm.

Copy link
Member Author

@tagomoris tagomoris Apr 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best to:

  • change Chunk#size to return # of events, and make #length as alias of #size
  • add Chunk#bytesize to return # of bytes of chunk content

But this changes the meaning of chunk.size, and it will break compatibility of plugins.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll create another pull-request to change Chunk#records and EventStream#records to #num_events later.
Let me merge this right now.

raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end

def repeatable?
false
end
Expand All @@ -29,7 +33,8 @@ def each(&block)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end

def to_msgpack_stream
def to_msgpack_stream(time_int: false)
return to_msgpack_stream_forced_integer if time_int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct to_msgpack_stream_forced_integer call is deprecated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we can remove it right now (because it's a v0.14 new feature), and it's better to be done.
But I'm not sure and this pull-request is not about that purpose.

out = msgpack_packer
each {|time,record|
out.write([time,record])
Expand All @@ -46,7 +51,6 @@ def to_msgpack_stream_forced_integer
end
end


class OneEventStream < EventStream
def initialize(time, record)
@time = time
Expand All @@ -57,6 +61,10 @@ def dup
OneEventStream.new(@time, @record.dup)
end

def records
1
end

def repeatable?
true
end
Expand All @@ -81,6 +89,10 @@ def dup
ArrayEventStream.new(entries)
end

def records
@entries.size
end

def repeatable?
true
end
Expand All @@ -102,7 +114,7 @@ def each(&block)
#
# Use this class as below, in loop of data-enumeration:
# 1. initialize blank stream:
# streams[tag] ||= MultiEventStream
# streams[tag] ||= MultiEventStream.new
# 2. add events
# stream[tag].add(time, record)
class MultiEventStream < EventStream
Expand All @@ -119,6 +131,10 @@ def dup
es
end

def records
@time_array.size
end

def add(time, record)
@time_array << time
@record_array << record
Expand All @@ -144,16 +160,20 @@ def each(&block)

class MessagePackEventStream < EventStream
# Keep cached_unpacker argument for existence plugins
def initialize(data, cached_unpacker = nil)
def initialize(data, records = 0, cached_unpacker = nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some 3rd party plugins use MessagePackEventStream.new(data, @cached_unpadker) in their code.
Should we break these code?

Copy link
Member Author

@tagomoris tagomoris Apr 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it's my mistake. I'll fix it with keyword argument.

@data = data
@records = records
end

def records
@records
end

def repeatable?
true
end

def each(&block)
# TODO format check
msgpack_unpacker.feed_each(@data, &block)
nil
end
Expand All @@ -162,5 +182,21 @@ def to_msgpack_stream
@data
end
end
end

module ChunkMessagePackEventStreamer
include MessagePackFactory::Mixin
# chunk.extend(ChunkEventStreamer)
# => chunk.each{|time, record| ... }
def each(&block)
Copy link
Member

@repeatedly repeatedly Apr 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original implementaion resuces EOFError from MessagePack::Unpacker.
Is removing rescue safe?

def msgpack_each(&block)
  open do |io|
    u = msgpack_factory.unpacker(io)
    begin
      u.each(&block)
    rescue EOFError
    end
  end
end

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessagePack::Unpacker raises EOFError if data underflow occurs when unpacking data. It's caused by broken/unflushed data on disk.
For broken data, this code (without rescue) might raise failure (and it will not be solved forever), but it should be because it's content data of users.
For unflushed data, this code will raise error at once, but retry will succeed if it's after successful flush.

So, this code shouldn't do rescue and ignore.

open do |io|
msgpack_unpacker(io).each(&block)
end
nil
end
alias :msgpack_each :each

def to_msgpack_stream
Copy link
Member

@repeatedly repeatedly Apr 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time_int: false argument is not needed?
EventStream base class has time_int: false argument.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mixin is used only for MessagePackEventStream, and it's already packed/encoded from ruby object to msgpack.
There's nothing to do about time encoding here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

read
end
end
end
53 changes: 50 additions & 3 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def emit(metadata, data, force: false)
chunk.synchronize do
begin
chunk.append(data)
if !size_over?(chunk) || force
if !chunk_size_over?(chunk) || force
chunk.commit
stored = true
@stage_size += (chunk.size - original_size)
Expand All @@ -198,6 +198,49 @@ def emit(metadata, data, force: false)
emit_step_by_step(metadata, data)
end

def emit_bulk(metadata, bulk, records)
return if bulk.nil? || bulk.empty?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When emit_bulk receives nil bulk data?
Should we care nil case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for safe. And this method is not for busy loop.

raise BufferOverflowError unless storable?

stored = false
synchronize do # critical section for buffer (stage/queue)
until stored
chunk = @stage[metadata]
unless chunk
chunk = @stage[metadata] = generate_chunk(metadata)
end

chunk.synchronize do # critical section for chunk (chunk append/commit/rollback)
begin
empty_chunk = chunk.empty?
chunk.concat(bulk, records)

if chunk_size_over?(chunk)
if empty_chunk
log.warn "chunk bytes limit exceeds for a bulk event stream: #{bulk.bytesize}bytes"
else
chunk.rollback
enqueue_chunk(metadata)
next
end
end

chunk.commit
stored = true
@stage_size += bulk.bytesize
if chunk_size_full?(chunk)
enqueue_chunk(metadata)
end
rescue
chunk.rollback
raise
end
end
end
end
nil
end

def queued_records
synchronize { @queue.reduce(0){|r, chunk| r + chunk.records } }
end
Expand Down Expand Up @@ -310,10 +353,14 @@ def clear_queue!
end
end

def size_over?(chunk)
def chunk_size_over?(chunk)
chunk.size > @chunk_bytes_limit || (@chunk_records_limit && chunk.records > @chunk_records_limit)
end

def chunk_size_full?(chunk)
chunk.size >= @chunk_bytes_limit || (@chunk_records_limit && chunk.records >= @chunk_records_limit)
end

def emit_step_by_step(metadata, data)
attempt_records = data.size / 3

Expand All @@ -336,7 +383,7 @@ def emit_step_by_step(metadata, data)
attempt = data.slice(0, attempt_records)
chunk.append(attempt)

if size_over?(chunk)
if chunk_size_over?(chunk)
chunk.rollback

if attempt_records <= MINIMUM_APPEND_ATTEMPT_RECORDS
Expand Down
19 changes: 7 additions & 12 deletions lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
# limitations under the License.
#

require 'fluent/msgpack_factory'
require 'fluent/plugin/buffer'
require 'fluent/unique_id'
require 'fluent/event'

require 'fileutils'
require 'monitor'
Expand All @@ -26,8 +26,8 @@ module Plugin
class Buffer # fluent/plugin/buffer is alread loaded
class Chunk
include MonitorMixin
include MessagePackFactory::Mixin
include UniqueId::Mixin
include ChunkMessagePackEventStreamer

# Chunks has 2 part:
# * metadata: contains metadata which should be restored after resume (if possible)
Expand Down Expand Up @@ -64,6 +64,11 @@ def append(data)
raise NotImplementedError, "Implement this method in child class"
end

# for event streams which is packed or zipped (and we want not to unpack/uncompress)
def concat(bulk, records)
raise NotImplementedError, "Implement this method in child class"
end

def commit
raise NotImplementedError, "Implement this method in child class"
end
Expand Down Expand Up @@ -108,16 +113,6 @@ def write_to(io)
FileUtils.copy_stream(i, io)
end
end

def msgpack_each(&block)
open do |io|
u = msgpack_factory.unpacker(io)
begin
u.each(&block)
rescue EOFError
end
end
end
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

require 'fluent/plugin/buffer/chunk'
require 'fluent/unique_id'
require 'fluent/msgpack_factory'

module Fluent
module Plugin
Expand All @@ -33,6 +34,7 @@ class FileChunk < Chunk
# path_suffix: path suffix string, like '.log' (or any other user specified)

include SystemConfig::Mixin
include MessagePackFactory::Mixin

FILE_PERMISSION = 0644

Expand Down Expand Up @@ -74,6 +76,16 @@ def append(data)
true
end

def concat(bulk, records)
raise "BUG: appending to non-staged chunk, now '#{@state}'" unless @state == :staged

bulk.force_encoding(Encoding::ASCII_8BIT)
@chunk.write bulk
@adding_bytes += bulk.bytesize
@adding_records += records
true
end

def commit
write_metadata # this should be at first: of course, this operation may fail

Expand Down
12 changes: 10 additions & 2 deletions lib/fluent/plugin/buffer/memory_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,28 @@ class Buffer
class MemoryChunk < Chunk
def initialize(metadata)
super
@chunk = ''.force_encoding('ASCII-8BIT')
@chunk = ''.force_encoding(Encoding::ASCII_8BIT)
@chunk_bytes = 0
@adding_bytes = 0
@adding_records = 0
end

def append(data)
adding = data.join.force_encoding('ASCII-8BIT')
adding = data.join.force_encoding(Encoding::ASCII_8BIT)
@chunk << adding
@adding_bytes += adding.bytesize
@adding_records += data.size
true
end

def concat(bulk, records)
bulk.force_encoding(Encoding::ASCII_8BIT)
@chunk << bulk
@adding_bytes += bulk.bytesize
@adding_records += records
true
end

def commit
@records += @adding_records
@chunk_bytes += @adding_bytes
Expand Down
Loading