Skip to content

Commit

Permalink
fix: grpc timeout segment data loss (#116)
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-pytel authored Jan 29, 2021
1 parent 8e9ea9d commit 6ed3043
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 8 deletions.
9 changes: 4 additions & 5 deletions skywalking/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

import atexit
from queue import Queue
from queue import Queue, Full
from threading import Thread, Event
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -109,8 +109,7 @@ def connected():


def archive(segment: 'Segment'):
if __queue.full():
try: # unlike checking __queue.full() then inserting, this is atomic
__queue.put(segment, block=False)
except Full:
logger.warning('the queue is full, the segment will be abandoned')
return

__queue.put(segment)
18 changes: 16 additions & 2 deletions skywalking/agent/protocol/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import logging
from skywalking.loggings import logger
import traceback
from queue import Queue, Empty
from queue import Queue, Empty, Full
from time import time

import grpc

Expand Down Expand Up @@ -68,10 +69,16 @@ def on_error(self):
self.channel.subscribe(self._cb, try_to_connect=True)

def report(self, queue: Queue, block: bool = True):
start = time()
segment = None

def generator():
nonlocal segment

while True:
try:
segment = queue.get(block=block) # type: Segment
timeout = max(0, config.QUEUE_TIMEOUT - int(time() - start)) # type: int
segment = queue.get(block=block, timeout=timeout) # type: Segment
except Empty:
return

Expand Down Expand Up @@ -120,5 +127,12 @@ def generator():

try:
self.traces_reporter.report(generator())

except grpc.RpcError:
self.on_error()

if segment:
try:
queue.put(segment, block=False)
except Full:
pass
2 changes: 1 addition & 1 deletion skywalking/client/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,4 @@ def __init__(self, channel: grpc.Channel):
self.report_stub = TraceSegmentReportServiceStub(channel)

def report(self, generator):
self.report_stub.collect(generator)
self.report_stub.collect(generator, timeout=config.GRPC_TIMEOUT)
5 changes: 5 additions & 0 deletions skywalking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
if TYPE_CHECKING:
from typing import List

# In order to prevent timeouts and possible segment loss make sure QUEUE_TIMEOUT is always at least few seconds lower
# than GRPC_TIMEOUT.
GRPC_TIMEOUT = 300 # type: int
QUEUE_TIMEOUT = 240 # type: int

RE_IGNORE_PATH = re.compile('^$') # type: re.Pattern

service_name = os.getenv('SW_AGENT_NAME') or 'Python Service Name' # type: str
Expand Down

0 comments on commit 6ed3043

Please sign in to comment.