diff --git a/lib/log-courier/client.rb b/lib/log-courier/client.rb index 0a443a0c..b77eb1a8 100644 --- a/lib/log-courier/client.rb +++ b/lib/log-courier/client.rb @@ -30,21 +30,60 @@ class ProtocolError < StandardError; end # Describes a pending payload class PendingPayload - attr_accessor :ack_events - attr_accessor :events - attr_accessor :nonce - attr_accessor :data + class << self + @json_adapter + def get_json_adapter + @json_adapter = MultiJson.adapter.instance if @json_adapter.nil? + return @json_adapter + end + end - attr_accessor :previous attr_accessor :next + attr_accessor :nonce + attr_accessor :events + attr_accessor :last_sequence + attr_accessor :sequence_len + attr_accessor :payload - def initialize(options = {}) - @ack_events = 0 + def initialize(events, nonce) + @events = events + @nonce = nonce + + generate + end + + def generate + fail ArgumentError, 'Corrupt payload' if @events.length == 0 + + buffer = Zlib::Deflate.new - options.each do |k, v| - fail ArgumentError unless self.respond_to?(k) - instance_variable_set "@#{k}", v + # Write each event in JSON format + events.each do |event| + json_data = self.class.get_json_adapter.dump(event) + # Add length and then the data + buffer << [json_data.length].pack('N') << json_data end + + # Generate and store the payload + @payload = nonce + buffer.flush(Zlib::FINISH) + @last_sequence = 0 + @sequence_len = @events.length + end + + def ack(sequence) + if sequence <= @last_sequence + return 0, false + elsif sequence >= @sequence_len + lines = @sequence_len - @last_sequence + @last_sequence = sequence + @payload = nil + return lines, true + end + + lines = sequence - @last_sequence + @last_sequence = sequence + @payload = nil + return lines, false end end @@ -63,9 +102,6 @@ def initialize(options = {}) require 'log-courier/client_tls' @client = ClientTls.new(@options) - # Load the json adapter - @json_adapter = MultiJson.adapter.instance - @event_queue = EventQueue.new @options[:spool_size] @pending_payloads = {} @first_payload = nil @@ -100,7 +136,7 @@ def shutdown @io_thread.raise ShutdownSignal @spooler_thread.join @io_thread.join - return + return @pending_payloads.length == 0 end private @@ -274,14 +310,9 @@ def send_ping def send_jdat(events) # Generate the JSON payload and compress it nonce = generate_nonce - data = buffer_jdat_data(events, nonce) # Save the pending payload - payload = PendingPayload.new( - :events => events, - :nonce => nonce, - :data => data - ) + payload = PendingPayload.new(events, nonce) @pending_payloads[nonce] = payload @@ -294,27 +325,7 @@ def send_jdat(events) end # Send it - @client.send 'JDAT', payload.data - return - end - - def buffer_jdat_data(events, nonce) - buffer = Zlib::Deflate.new - - # Write each event in JSON format - events.each do |event| - buffer_jdat_data_event(buffer, event) - end - - # Generate and return the message - nonce + buffer.flush(Zlib::FINISH) - end - - def buffer_jdat_data_event(buffer, event) - json_data = @json_adapter.dump(event) - - # Add length and then the data - buffer << [json_data.length].pack('N') << json_data + @client.send 'JDAT', payload.payload return end @@ -322,6 +333,8 @@ def process_pong(message) # Sanity fail ProtocolError, "Unexpected data attached to pong message (#{message.length})" if message.length != 0 + @logger.debug 'PONG message received' unless @logger.nil? || !@logger.debug? + # No longer pending a PONG @ping_pending = false return @@ -332,29 +345,27 @@ def process_ackn(message) fail ProtocolError, "ACKN message size invalid (#{message.length})" if message.length != 20 # Grab nonce - sequence, nonce = message[0...4].unpack('N').first, message[4..-1] + nonce, sequence = message.unpack('A16N') + + if !@logger.nil? && @logger.debug? + nonce_str = nonce.each_byte.map do |b| + b.to_s(16).rjust(2, '0') + end + + @logger.debug 'ACKN message received', :nonce => nonce_str.join, :sequence => sequence + end # Find the payload - skip if we couldn't as it will just a duplicated ACK return unless @pending_payloads.key?(nonce) payload = @pending_payloads[nonce] + lines, complete = payload.ack(sequence) - # Full ACK? - # TODO: protocol error if sequence too large? - if sequence >= payload.events.length - @client.resume_send if @client.send_paused? - + if complete @pending_payloads.delete nonce - payload.previous.next = payload.next - else - # Partial ACK - only process if something was actually processed - if sequence > payload.ack_events - payload.ack_events = sequence - payload.events = payload.events[0...sequence] - payload.data = nil - end + @first_payload = payload.next + @client.resume_send if @client.send_paused? end - return end end end diff --git a/lib/log-courier/client_tls.rb b/lib/log-courier/client_tls.rb index c2122685..67439e7e 100644 --- a/lib/log-courier/client_tls.rb +++ b/lib/log-courier/client_tls.rb @@ -89,6 +89,25 @@ def send(signature, message) return end + def pause_send + return if @send_paused + @send_paused = true + @send_q << nil + return + end + + def send_paused? + @send_paused + end + + def resume_send + if @send_paused + @send_paused = false + @send_q << nil + end + return + end + private def run_send(io_control) @@ -171,25 +190,6 @@ def run_recv(io_control) return end - def pause_send - return if @send_paused - @send_paused = true - @send_q << nil - return - end - - def send_paused - @send_paused - end - - def resume_send - if @send_paused - @send_paused = false - @send_q << nil - end - return - end - def tls_connect # TODO: Implement random selection - and don't use separate :port - remember to update post_connection_check too address = @options[:addresses][0] diff --git a/spec/gem_spec.rb b/spec/gem_spec.rb index a642749a..aeb3da2c 100644 --- a/spec/gem_spec.rb +++ b/spec/gem_spec.rb @@ -64,6 +64,6 @@ def shutdown i += 1 end - shutdown + expect(shutdown).to eq true end end