Skip to content

Commit

Permalink
Fix courier output plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
driskell committed Dec 11, 2014
1 parent f9041c9 commit c578706
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 76 deletions.
123 changes: 67 additions & 56 deletions lib/log-courier/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,60 @@ class ProtocolError < StandardError; end

# Describes a pending payload
class PendingPayload
attr_accessor :ack_events
attr_accessor :events
attr_accessor :nonce
attr_accessor :data
class << self
@json_adapter
def get_json_adapter
@json_adapter = MultiJson.adapter.instance if @json_adapter.nil?
return @json_adapter
end
end

attr_accessor :previous
attr_accessor :next
attr_accessor :nonce
attr_accessor :events
attr_accessor :last_sequence
attr_accessor :sequence_len
attr_accessor :payload

def initialize(options = {})
@ack_events = 0
def initialize(events, nonce)
@events = events
@nonce = nonce

generate
end

def generate
fail ArgumentError, 'Corrupt payload' if @events.length == 0

buffer = Zlib::Deflate.new

options.each do |k, v|
fail ArgumentError unless self.respond_to?(k)
instance_variable_set "@#{k}", v
# Write each event in JSON format
events.each do |event|
json_data = self.class.get_json_adapter.dump(event)
# Add length and then the data
buffer << [json_data.length].pack('N') << json_data
end

# Generate and store the payload
@payload = nonce + buffer.flush(Zlib::FINISH)
@last_sequence = 0
@sequence_len = @events.length
end

def ack(sequence)
if sequence <= @last_sequence
return 0, false
elsif sequence >= @sequence_len
lines = @sequence_len - @last_sequence
@last_sequence = sequence
@payload = nil
return lines, true
end

lines = sequence - @last_sequence
@last_sequence = sequence
@payload = nil
return lines, false
end
end

Expand All @@ -63,9 +102,6 @@ def initialize(options = {})
require 'log-courier/client_tls'
@client = ClientTls.new(@options)

# Load the json adapter
@json_adapter = MultiJson.adapter.instance

@event_queue = EventQueue.new @options[:spool_size]
@pending_payloads = {}
@first_payload = nil
Expand Down Expand Up @@ -100,7 +136,7 @@ def shutdown
@io_thread.raise ShutdownSignal
@spooler_thread.join
@io_thread.join
return
return @pending_payloads.length == 0
end

private
Expand Down Expand Up @@ -274,14 +310,9 @@ def send_ping
def send_jdat(events)
# Generate the JSON payload and compress it
nonce = generate_nonce
data = buffer_jdat_data(events, nonce)

# Save the pending payload
payload = PendingPayload.new(
:events => events,
:nonce => nonce,
:data => data
)
payload = PendingPayload.new(events, nonce)

@pending_payloads[nonce] = payload

Expand All @@ -294,34 +325,16 @@ def send_jdat(events)
end

# Send it
@client.send 'JDAT', payload.data
return
end

def buffer_jdat_data(events, nonce)
buffer = Zlib::Deflate.new

# Write each event in JSON format
events.each do |event|
buffer_jdat_data_event(buffer, event)
end

# Generate and return the message
nonce + buffer.flush(Zlib::FINISH)
end

def buffer_jdat_data_event(buffer, event)
json_data = @json_adapter.dump(event)

# Add length and then the data
buffer << [json_data.length].pack('N') << json_data
@client.send 'JDAT', payload.payload
return
end

def process_pong(message)
# Sanity
fail ProtocolError, "Unexpected data attached to pong message (#{message.length})" if message.length != 0

@logger.debug 'PONG message received' unless @logger.nil? || !@logger.debug?

# No longer pending a PONG
@ping_pending = false
return
Expand All @@ -332,29 +345,27 @@ def process_ackn(message)
fail ProtocolError, "ACKN message size invalid (#{message.length})" if message.length != 20

# Grab nonce
sequence, nonce = message[0...4].unpack('N').first, message[4..-1]
nonce, sequence = message.unpack('A16N')

if !@logger.nil? && @logger.debug?
nonce_str = nonce.each_byte.map do |b|
b.to_s(16).rjust(2, '0')
end

@logger.debug 'ACKN message received', :nonce => nonce_str.join, :sequence => sequence
end

# Find the payload - skip if we couldn't as it will just a duplicated ACK
return unless @pending_payloads.key?(nonce)

payload = @pending_payloads[nonce]
lines, complete = payload.ack(sequence)

# Full ACK?
# TODO: protocol error if sequence too large?
if sequence >= payload.events.length
@client.resume_send if @client.send_paused?

if complete
@pending_payloads.delete nonce
payload.previous.next = payload.next
else
# Partial ACK - only process if something was actually processed
if sequence > payload.ack_events
payload.ack_events = sequence
payload.events = payload.events[0...sequence]
payload.data = nil
end
@first_payload = payload.next
@client.resume_send if @client.send_paused?
end
return
end
end
end
38 changes: 19 additions & 19 deletions lib/log-courier/client_tls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@ def send(signature, message)
return
end

def pause_send
return if @send_paused
@send_paused = true
@send_q << nil
return
end

def send_paused?
@send_paused
end

def resume_send
if @send_paused
@send_paused = false
@send_q << nil
end
return
end

private

def run_send(io_control)
Expand Down Expand Up @@ -171,25 +190,6 @@ def run_recv(io_control)
return
end

def pause_send
return if @send_paused
@send_paused = true
@send_q << nil
return
end

def send_paused
@send_paused
end

def resume_send
if @send_paused
@send_paused = false
@send_q << nil
end
return
end

def tls_connect
# TODO: Implement random selection - and don't use separate :port - remember to update post_connection_check too
address = @options[:addresses][0]
Expand Down
2 changes: 1 addition & 1 deletion spec/gem_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ def shutdown
i += 1
end

shutdown
expect(shutdown).to eq true
end
end

0 comments on commit c578706

Please sign in to comment.