Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
fix: avoid firehose logger spinning the CPU
Browse files Browse the repository at this point in the history
Under no load, the Firehose logger spun the CPU because the max
of (MAX_INTERVAL-time_since_sent, 0) always resulted in 0. This
is because the time_since_sent became quite large, and the
MAX_INTERVAL was only 30 resulting in a high negative number. 0
was the max, so the loop would spin.

What was actually intended was to track every send attempt, and
wait for MAX_INTERVAL - (now - last_sent_time).
  • Loading branch information
bbangert committed Jun 16, 2017
1 parent 6319545 commit d1baa2a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
6 changes: 3 additions & 3 deletions autopush/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ def _worker(self):
self._last_send = time.time()
while True:
time_since_sent = time.time() - self._last_send
remaining_wait = self.MAX_INTERVAL - time_since_sent
try:
record = self._records.get(
timeout=max(self.MAX_INTERVAL-time_since_sent, 0))
record = self._records.get(timeout=remaining_wait)
except Queue.Empty:
# Send the records
self._send_record_batch()
Expand All @@ -294,6 +294,7 @@ def _worker(self):
self._send_record_batch()

def _send_record_batch(self):
self._last_send = time.time()
if not self._prepped:
return

Expand All @@ -312,4 +313,3 @@ def _send_record_batch(self):

self._prepped = []
self._total_size = 0
self._last_send = time.time()
6 changes: 5 additions & 1 deletion autopush/tests/test_logging.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
import Queue
import sys
import StringIO

Expand Down Expand Up @@ -210,10 +211,13 @@ def test_message_max_batch(self):
def test_queue_timeout(self):
proc = FirehoseProcessor("test")
proc.MAX_INTERVAL = 0
proc._records.get = mock_get = Mock()
proc._send_record_batch = mock_send = Mock()
mock_get.side_effect = (Queue.Empty, None)

proc.start()
proc.stop()
eq_(len(self.mock_boto.mock_calls), 1)
mock_send.assert_called()

def test_batch_send_failure(self):
proc = FirehoseProcessor("test")
Expand Down

0 comments on commit d1baa2a

Please sign in to comment.