Skip to content

Commit

Permalink
Merge pull request #8 from eyadmba/main
Browse files Browse the repository at this point in the history
Add support for data streams
  • Loading branch information
vduseev authored Oct 22, 2023
2 parents b13cc68 + ef9975d commit ccd29c5
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion opensearch_logger/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
flush_frequency: float = 1.0,
extra_fields: Optional[Dict[str, Any]] = None,
raise_on_index_exc: bool = False,
is_data_stream: bool = False,
**kwargs: Any,
):
"""Initialize OpenSearch logging handler.
Expand Down Expand Up @@ -144,6 +145,8 @@ def __init__(
self.index_date_format = index_date_format
self.index_name_sep = index_name_sep

self.is_data_stream = is_data_stream

if extra_fields is None:
extra_fields = {}
self.extra_fields = copy.deepcopy(extra_fields.copy())
Expand Down Expand Up @@ -197,7 +200,15 @@ def flush(self) -> None:
self._buffer = []

index = self._get_index()
actions = [{'_index': index, '_source': record} for record in logs_buffer]
actions = [
{
'_index': index,
'_source': record,
# op_type must be explicitly set to 'create' for bulk operations
# on data streams. See issue #7.
'_op_type': 'create' if self.is_data_stream else 'index'
} for record in logs_buffer
]

helpers.bulk(
client=self._get_opensearch_client(),
Expand Down Expand Up @@ -243,6 +254,10 @@ def _schedule_flush(self) -> None:
self._timer.start()

def _get_index(self) -> str:
if self.is_data_stream:
# index rotation is irrelevant when using data streams
return self._get_never_index_name()

if RotateFrequency.DAILY == self.index_rotate:
return self._get_daily_index_name()
elif RotateFrequency.WEEKLY == self.index_rotate: # pragma: no cover
Expand Down

0 comments on commit ccd29c5

Please sign in to comment.