Skip to content

Commit

Permalink
add retry mechanism to write_backlog (#605)
Browse files Browse the repository at this point in the history
* add retry_throttle decorator to write_backlog
* update documentation

---------

Co-authored-by: Jörg Zimmermann <101292599+ekneg54@users.noreply.github.com>
  • Loading branch information
dtrai2 and ekneg54 committed Jun 12, 2024
1 parent d2f22ee commit 4887391
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 24 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* `timestamper` now writes `_timestamper_missing_field_warning` tag to event tags instead of `_timestamper_failure` in case of missing fields
* rename `--thread_count` parameter to `--thread-count` in http generator
* removed `--report` parameter and feature from http generator
* when using `extend_target_list` in the `field manager`the ordering of the given source fields is now preserved
* when using `extend_target_list` in the `field manager`the ordering of the given source fields is now preserved
* logprep now exits with a negative exit code if pipeline restart fails 5 times
* this was implemented because further restart behavior should be configured on level of a system init service or container orchestrating service like k8s
* the `restart_count` parameter is configurable. If you want the old behavior back, you can set this parameter to a negative number
Expand All @@ -31,12 +31,13 @@
* add pseudonymization tools to logprep -> see: `logprep pseudo --help`
* add `restart_count` parameter to configuration
* add option `mode` to `pseudonymizer` processor and to pseudonymization tools to chose the AES Mode for encryption and decryption
* add retry mechanism to opensearch parallel bulk, if opensearch returns 429 `rejected_execution_exception`

### Improvements

* remove logger from Components and Factory signatures
* align processor architecture to use methods like `write_to_target`, `add_field_to` and `get_dotted_field_value` when reading and writing from and to events
* required substantial refactoring of the `hyperscan_resolver`, `generic_resolver` and `template_replacer`
* required substantial refactoring of the `hyperscan_resolver`, `generic_resolver` and `template_replacer`
* change `pseudonymizer`, `pre_detector`, `selective_extractor` processors and `pipeline` to handle `extra_data` the same way
* refactor `clusterer`, `pre_detector` and `pseudonymizer` processors and change `rule_tree` so that the processor do not require `process` override
* required substantial refactoring of the `clusterer`
Expand Down
3 changes: 1 addition & 2 deletions logprep/abc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""

from abc import abstractmethod
from logging import Logger
from typing import Optional

from attrs import define, field, validators
Expand Down Expand Up @@ -39,7 +38,7 @@ def __init__(self, output, message, raw_input):


class FatalOutputError(OutputError):
"""Must not be catched."""
"""Must not be caught."""


class Output(Connector):
Expand Down
6 changes: 4 additions & 2 deletions logprep/connector/elasticsearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ class Config(Output.Config):
"""(Optional) Timeout for the connection (default is 500ms)."""
max_retries: int = field(validator=validators.instance_of(int), default=0)
"""(Optional) Maximum number of retries for documents rejected with code 429 (default is 0).
Increases backoff time by 2 seconds per try, but never exceeds 600 seconds."""
Increases backoff time by 2 seconds per try, but never exceeds 600 seconds. When using
parallel_bulk in the opensearch connector then the backoff time starts with 1 second. With
each consecutive retry 500 to 1000 ms will be added to the delay, chosen randomly """
user: Optional[str] = field(validator=validators.instance_of(str), default="")
"""(Optional) User used for authentication."""
secret: Optional[str] = field(validator=validators.instance_of(str), default="")
Expand Down Expand Up @@ -439,7 +441,7 @@ def _message_exceeds_max_size_error(self, error):
return True

if error.error == "rejected_execution_exception":
reason = error.info.get("error", {}).get("reason", {})
reason = error.info.get("error", {}).get("reason", "")
match = self._size_error_pattern.match(reason)
if match and int(match.group("size")) >= int(match.group("max_size")):
return True
Expand Down
45 changes: 30 additions & 15 deletions logprep/connector/opensearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@
"""

import logging
import random
import time
from functools import cached_property

import opensearchpy as search
from attrs import define, field, validators
from opensearchpy import helpers
from opensearchpy.serializer import JSONSerializer

from logprep.abc.output import Output
from logprep.abc.output import Output, FatalOutputError
from logprep.connector.elasticsearch.output import ElasticsearchOutput

logger = logging.getLogger("OpenSearchOutput")
Expand Down Expand Up @@ -87,12 +89,10 @@ class Config(ElasticsearchOutput.Config):
default=4, validator=[validators.instance_of(int), validators.gt(1)]
)
"""Number of threads to use for bulk requests."""

queue_size: int = field(
default=4, validator=[validators.instance_of(int), validators.gt(1)]
)
"""Number of queue size to use for bulk requests."""

chunk_size: int = field(
default=500, validator=[validators.instance_of(int), validators.gt(1)]
)
Expand Down Expand Up @@ -137,15 +137,30 @@ def _bulk(self, client, actions, *args, **kwargs):
self._handle_transport_error(error)

def _parallel_bulk(self, client, actions, *args, **kwargs):
for success, item in helpers.parallel_bulk(
client,
actions=actions,
chunk_size=self._config.chunk_size,
queue_size=self._config.queue_size,
raise_on_error=True,
raise_on_exception=True,
):
if not success:
result = item[list(item.keys())[0]]
if "error" in result:
raise result.get("error")
bulk_delays = 1
for _ in range(self._config.max_retries + 1):
try:
for success, item in helpers.parallel_bulk(
client,
actions=actions,
chunk_size=self._config.chunk_size,
queue_size=self._config.queue_size,
raise_on_error=True,
raise_on_exception=True,
):
if not success:
result = item[list(item.keys())[0]]
if "error" in result:
raise result.get("error")
break
except search.ConnectionError as error:
raise error
except search.exceptions.TransportError as error:
if self._message_exceeds_max_size_error(error):
raise error
time.sleep(bulk_delays)
bulk_delays += random.randint(500, 1000) / 1000
else:
raise FatalOutputError(
self, "Opensearch too many requests, all parallel bulk retries failed"
)
2 changes: 1 addition & 1 deletion tests/unit/connector/test_elasticsearch_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def test_handle_bulk_index_error_calls_bulk_with_error_documents(self, fake_bulk
{"invalid": "error"},
[{"foo": "*" * 500}],
1,
TypeError,
FatalOutputError,
),
(
429,
Expand Down
36 changes: 34 additions & 2 deletions tests/unit/connector/test_opensearch_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,9 @@ def test_write_to_search_context_calls_handle_serialization_error_if_serializati

@mock.patch(
"opensearchpy.helpers.parallel_bulk",
side_effect=search.ConnectionError,
side_effect=search.ConnectionError(-1),
)
@mock.patch("time.sleep", mock.MagicMock()) # to speed up test execution
def test_write_to_search_context_calls_handle_connection_error_if_connection_error(self, _):
self.object._config.message_backlog_size = 1
self.object._handle_connection_error = mock.MagicMock()
Expand Down Expand Up @@ -269,7 +270,7 @@ def test_handle_bulk_index_error_calls_bulk_with_error_documents(self, fake_bulk
{"invalid": "error"},
[{"foo": "*" * 500}],
1,
TypeError,
FatalOutputError,
),
(
429,
Expand Down Expand Up @@ -404,3 +405,34 @@ def test_opensearch_parallel_bulk(self):
def test_setup_populates_cached_properties(self, mock_getmembers):
self.object.setup()
mock_getmembers.assert_called_with(self.object)

@mock.patch(
"opensearchpy.helpers.parallel_bulk",
side_effect=search.TransportError(429, "rejected_execution_exception", {}),
)
@mock.patch("time.sleep")
def test_write_backlog_fails_if_all_retries_are_exceeded(self, _, mock_sleep):
self.object._config.maximum_message_size_mb = 1
self.object._config.max_retries = 5
self.object._message_backlog = [{"some": "event"}]
with pytest.raises(
FatalOutputError, match="Opensearch too many requests, all parallel bulk retries failed"
):
self.object._write_backlog()
assert mock_sleep.call_count == 6 # one initial try + 5 retries
assert self.object._message_backlog == [{"some": "event"}]

@mock.patch("time.sleep")
def test_write_backlog_is_successful_after_two_retries(self, mock_sleep):
side_effects = [
search.TransportError(429, "rejected_execution_exception", {}),
search.TransportError(429, "rejected_execution_exception", {}),
[],
]
with mock.patch("opensearchpy.helpers.parallel_bulk", side_effect=side_effects):
self.object._config.maximum_message_size_mb = 1
self.object._config.max_retries = 5
self.object._message_backlog = [{"some": "event"}]
self.object._write_backlog()
assert mock_sleep.call_count == 2
assert self.object._message_backlog == []

0 comments on commit 4887391

Please sign in to comment.