Skip to content

Commit

Permalink
refactor: implement common batch overload protection module and use i…
Browse files Browse the repository at this point in the history
…t in log_handler
  • Loading branch information
SergeTupchiy committed Mar 20, 2024
1 parent 9d3b62c commit d9afb2d
Show file tree
Hide file tree
Showing 15 changed files with 1,644 additions and 722 deletions.
602 changes: 602 additions & 0 deletions apps/opentelemetry/src/otel_batch_olp.erl

Large diffs are not rendered by default.

454 changes: 50 additions & 404 deletions apps/opentelemetry/src/otel_batch_processor.erl

Large diffs are not rendered by default.

58 changes: 37 additions & 21 deletions apps/opentelemetry/src/otel_exporter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,34 @@
export_traces/4,
export_metrics/4,
export_logs/4,
export/5,
shutdown/1,
report_cb/1]).

-export_type([otel_signal/0,
exporter_config/0]).

%% Do any initialization of the exporter here and return configuration
%% that will be passed along with a list of spans to the `export' function.
-callback init(term()) -> {ok, term()} | ignore.
-callback init(Config) -> {ok, ExporterState} | {error, Reason} | ignore when
Config :: term(),
ExporterState :: term(),
Reason :: term().

%% This function is called when the configured interval expires with any
%% spans that have been collected so far and the configuration returned in `init'.
%% Do whatever needs to be done to export each span here, the caller will block
%% until it returns.
-callback export(traces | metrics, ets:tab(), otel_resource:t(), term()) -> ok |
success |
failed_not_retryable |
failed_retryable.
-callback shutdown(term()) -> ok.
-callback export(otel_signal(), ets:tab(), otel_resource:t(), term()) -> ok | error | {error, term()}.

-callback shutdown(State) -> ok when State :: term().

-type otel_signal() :: traces | metrics | logs.
-type exporter_config() :: module() | {module(), Config :: term()} | undefined | none | ignore.

-include_lib("kernel/include/logger.hrl").

-spec init(exporter_config()) -> {module(), term()} | error | ignore.
init({ExporterModule, Config}) when is_atom(ExporterModule) ->
try ExporterModule:init(Config) of
{ok, ExporterState} when ExporterModule =:= opentelemetry_exporter ->
Expand All @@ -49,8 +58,12 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) ->
{ok, ExporterState} ->
?LOG_INFO("Exporter ~tp successfully initialized", [ExporterModule]),
{ExporterModule, ExporterState};
{error, Reason} ->
?LOG_ERROR("Exporter failed to initalize, error: ~p",
[ExporterModule, Reason]),

Check warning on line 63 in apps/opentelemetry/src/otel_exporter.erl

View check run for this annotation

Codecov / codecov/patch

apps/opentelemetry/src/otel_exporter.erl#L63

Added line #L63 was not covered by tests
error;
ignore ->
undefined
ignore
catch
Kind:Reason:StackTrace ->
%% logging in debug level since config argument in stacktrace could have secrets
Expand All @@ -72,14 +85,14 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) ->
%% the dependency needs to be added
try grpcbox:module_info() of
_ ->
undefined
error

Check warning on line 88 in apps/opentelemetry/src/otel_exporter.erl

View check run for this annotation

Codecov / codecov/patch

apps/opentelemetry/src/otel_exporter.erl#L88

Added line #L88 was not covered by tests
catch
_:_ ->
?LOG_WARNING("OTLP exporter failed to initialize when using the GRPC "
"protocol and `grpcbox` module is not available in the "
"code path. Verify that you have the `grpcbox` dependency "
"included and rerun.", []),
undefined
error

Check warning on line 95 in apps/opentelemetry/src/otel_exporter.erl

View check run for this annotation

Codecov / codecov/patch

apps/opentelemetry/src/otel_exporter.erl#L95

Added line #L95 was not covered by tests
end;
_ ->
%% same as the debug log above
Expand All @@ -89,17 +102,17 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) ->
kind => Kind,
reason => Reason,
exporter => ExporterModule}, #{report_cb => fun ?MODULE:report_cb/1}),
undefined
error

Check warning on line 105 in apps/opentelemetry/src/otel_exporter.erl

View check run for this annotation

Codecov / codecov/patch

apps/opentelemetry/src/otel_exporter.erl#L105

Added line #L105 was not covered by tests
end;
{error, undef} when ExporterModule =:= opentelemetry_exporter ->
?LOG_WARNING("OTLP exporter module `opentelemetry_exporter` not found. "
"Verify you have included the `opentelemetry_exporter` dependency.",
[ExporterModule]),
undefined;
error;

Check warning on line 111 in apps/opentelemetry/src/otel_exporter.erl

View check run for this annotation

Codecov / codecov/patch

apps/opentelemetry/src/otel_exporter.erl#L111

Added line #L111 was not covered by tests
{error, undef} ->
?LOG_WARNING("Exporter module ~tp not found. Verify you have included "
"the dependency that contains the exporter module.", [ExporterModule]),
undefined;
error;
_ ->
%% same as the debug log above
%% without the stacktrace and at a higher level
Expand All @@ -108,22 +121,25 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) ->
kind => Kind,
reason => Reason,
exporter => ExporterModule}, #{report_cb => fun ?MODULE:report_cb/1}),
undefined
error
end
end;
init(Exporter) when Exporter =:= none ; Exporter =:= undefined ->
undefined;
init(Exporter) when Exporter =:= none; Exporter =:= undefined; Exporter =:= ignore ->
ignore;
init(ExporterModule) when is_atom(ExporterModule) ->
init({ExporterModule, []}).

export_traces(ExporterModule, SpansTid, Resource, Config) ->
ExporterModule:export(traces, SpansTid, Resource, Config).
export_traces(ExporterModule, SpansTid, Resource, ExporterState) ->
export(traces, ExporterModule, SpansTid, Resource, ExporterState).

export_metrics(ExporterModule, MetricsTid, Resource, ExporterState) ->
export(metrics, ExporterModule, MetricsTid, Resource, ExporterState).

export_metrics(ExporterModule, MetricsTid, Resource, Config) ->
ExporterModule:export(metrics, MetricsTid, Resource, Config).
export_logs(ExporterModule, LogsTidAndHandlerConfig, Resource, ExporterState) ->
export(logs, ExporterModule, LogsTidAndHandlerConfig, Resource, ExporterState).

Check warning on line 139 in apps/opentelemetry/src/otel_exporter.erl

View check run for this annotation

Codecov / codecov/patch

apps/opentelemetry/src/otel_exporter.erl#L139

Added line #L139 was not covered by tests

export_logs(ExporterModule, Batch, Resource, Config) ->
ExporterModule:export(logs, Batch, Resource, Config).
export(OtelSignal, ExporterModule, Tid, Resource, ExporterState) ->
ExporterModule:export(OtelSignal, Tid, Resource, ExporterState).

shutdown(undefined) ->
ok;
Expand Down
1 change: 1 addition & 0 deletions apps/opentelemetry/src/otel_exporter_pid.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export(traces, SpansTid, _Resource, Pid) ->
ets:foldl(fun(Span, _Acc) ->
Pid ! {span, Span}
end, [], SpansTid),
ets:delete_all_objects(SpansTid),
ok.

shutdown(_) ->
Expand Down
1 change: 1 addition & 0 deletions apps/opentelemetry/src/otel_exporter_stdout.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export(_, SpansTid, _Resource, _) ->
ets:foldl(fun(Span, _Acc) ->
io:format("~p~n", [Span])
end, [], SpansTid),
ets:delete_all_objects(SpansTid),

Check warning on line 34 in apps/opentelemetry/src/otel_exporter_stdout.erl

View check run for this annotation

Codecov / codecov/patch

apps/opentelemetry/src/otel_exporter_stdout.erl#L34

Added line #L34 was not covered by tests
ok.

shutdown(_) ->
Expand Down
1 change: 1 addition & 0 deletions apps/opentelemetry/src/otel_exporter_tab.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export(traces, SpansTid, _Resource, Tid) ->
ets:foldl(fun(Span, _Acc) ->
ets:insert(Tid, Span)
end, [], SpansTid),
ets:delete_all_objects(SpansTid),
ok.

shutdown(_) ->
Expand Down
2 changes: 1 addition & 1 deletion apps/opentelemetry/src/otel_simple_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ export({ExporterModule, Config}, Resource, SpansTid) ->
%% don't let a exporter exception crash us
%% and return true if exporter failed
try
otel_exporter:export_traces(ExporterModule, SpansTid, Resource, Config) =:= failed_not_retryable
otel_exporter:export_traces(ExporterModule, SpansTid, Resource, Config)
catch
Kind:Reason:StackTrace ->
?LOG_INFO(#{source => exporter,
Expand Down
3 changes: 1 addition & 2 deletions apps/opentelemetry/src/otel_span_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@

-callback on_end(opentelemetry:span(), processor_config()) -> true |
dropped |
{error, invalid_span} |
{error, no_export_buffer}.
{error, term()}.

-callback force_flush(processor_config()) -> ok |
{error, term()}.
Expand Down
30 changes: 18 additions & 12 deletions apps/opentelemetry/test/opentelemetry_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ init_per_testcase(disabled_sdk, Config) ->
Config;
init_per_testcase(no_exporter, Config) ->
application:set_env(opentelemetry, processors,
[{otel_batch_processor, #{scheduled_delay_ms => 1}}]),
[{otel_batch_processor, #{scheduled_delay_ms => 1, exporter => none}}]),
{ok, _} = application:ensure_all_started(opentelemetry),
Config;
init_per_testcase(disable_auto_creation, Config) ->
Expand Down Expand Up @@ -119,6 +119,9 @@ init_per_testcase(too_many_attributes, Config) ->
{ok, _} = application:ensure_all_started(opentelemetry),
Config1;
init_per_testcase(tracer_instrumentation_scope, Config) ->
%% Note: this actually mutes a bug / design drawback, as
%% persistent terms are not cleaned / properly refreshed and may keep stale data.
cleanup_persistent_terms(opentelemetry),
Config1 = set_batch_tab_processor(Config),
{ok, _} = application:ensure_all_started(opentelemetry),
Config1;
Expand All @@ -128,7 +131,8 @@ init_per_testcase(multiple_tracer_providers, Config) ->
{ok, _} = application:ensure_all_started(opentelemetry),
Config;
init_per_testcase(multiple_processors, Config) ->
application:set_env(opentelemetry, processors, [{otel_batch_processor, #{scheduled_delay_ms => 1,
application:set_env(opentelemetry, processors, [{otel_batch_processor, #{name => first,
scheduled_delay_ms => 1,
exporter => {otel_exporter_pid, self()}}},
{otel_batch_processor, #{name => second,
scheduled_delay_ms => 1,
Expand Down Expand Up @@ -269,7 +273,7 @@ logger_metadata(_Config) ->
ok.

%% logger metadata will either be undefined, or a map without hex_span_ctx_keys:
%% [otel_trace_id, otel_span_id, ,otel_trace_flags]
%% [otel_trace_id, otel_span_id, otel_trace_flags]
empty_metadata() ->
case logger:get_process_metadata() of
undefined ->
Expand Down Expand Up @@ -589,7 +593,6 @@ update_span_data(Config) ->

ok.


tracer_instrumentation_scope(Config) ->
Tid = ?config(tid, Config),

Expand All @@ -606,7 +609,6 @@ tracer_instrumentation_scope(Config) ->
otel_span:end_span(SpanCtx1),

[Span1] = assert_exported(Tid, SpanCtx1),

?assertMatch({instrumentation_scope,<<"tracer1">>,<<"1.0.0">>,<<"http://schema.org/myschema">>},
Span1#span.instrumentation_scope).

Expand Down Expand Up @@ -1080,21 +1082,16 @@ disabled_sdk(_Config) ->

no_exporter(_Config) ->
SpanCtx1 = ?start_span(<<"span-1">>),

%% set_exporter will enable the export table even if the exporter ends
%% up being undefined to ensure no spans are lost. so briefly spans
%% will be captured
otel_batch_processor:set_exporter(none),
otel_span:end_span(SpanCtx1),

%% once the exporter is "initialized" the table is cleared and disabled
%% future spans are not added
?UNTIL([] =:= otel_batch_processor:current_tab_to_list(otel_batch_processor_global)),
?UNTIL([] =:= otel_batch_olp:current_tab_to_list(otel_batch_processor_global)),

SpanCtx2 = ?start_span(<<"span-2">>),
otel_span:end_span(SpanCtx2),

?assertEqual([], otel_batch_processor:current_tab_to_list(otel_batch_processor_global)),
?assertEqual([], otel_batch_olp:current_tab_to_list(otel_batch_processor_global)),

ok.

Expand All @@ -1118,3 +1115,12 @@ assert_not_exported(Tid, #span_ctx{trace_id=TraceId,
span_id=SpanId,
_='_'})).

cleanup_persistent_terms(Module) ->
lists:foreach(
fun({Key, _}) ->
case is_tuple(Key) andalso element(1, Key) =:= Module of
true -> persistent_term:erase(Key);
false -> ok
end
end,
persistent_term:get()).
72 changes: 37 additions & 35 deletions apps/opentelemetry/test/otel_batch_processor_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

all() ->
[exporting_timeout_test,
check_table_size_test,
exporting_runner_timeout_test].
exporting_runner_timeout_test,
check_table_size_test].

%% verifies that after the runner has to be killed for taking too long
%% that everything is still functional and the exporter does not crash
Expand All @@ -33,48 +33,20 @@ exporting_timeout_test(_Config) ->
ok
end.

check_table_size_test(_Config) ->
MaxQueueSize = 10,
CheckTableSizeMs = 1,
{ok, _Pid, #{reg_name := RegName}} = otel_batch_processor:start_link(
#{name => test_processor_check_size_test,
resource => otel_resource:create([]),
exporter => ?MODULE,
exporting_timeout_ms => timer:minutes(10),
%% long enough, so that it never happens during the test
scheduled_delay_ms => timer:minutes(10),
check_table_size_ms => CheckTableSizeMs,
max_queue_size => MaxQueueSize}
),
%% max_queue_size limit is not reached
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),
lists:foreach(fun(_) ->
otel_batch_processor:on_end(generate_span(), #{reg_name => RegName})
end,
lists:seq(1, MaxQueueSize)),
%% Wait for more than CheckTablesizeMS to be sure check timeout occurred
timer:sleep(CheckTableSizeMs * 5),
dropped = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),

otel_batch_processor:force_flush(#{reg_name => RegName}),
%% force_flush is async, have to wait for some long enough time again,
timer:sleep(CheckTableSizeMs * 10),
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}).

exporting_runner_timeout_test(_Config) ->
process_flag(trap_exit, true),

{ok, Pid, #{reg_name := RegName}} = otel_batch_processor:start_link(
#{name => test_processor1,
{ok, Pid, State} = otel_batch_processor:start_link(
#{name => test_processor,
resource => otel_resource:create([]),
exporter => ?MODULE,
exporting_timeout_ms => 1,
scheduled_delay_ms => 1}),

%% Insert a few spans to make sure runner process will be spawned and killed
%% because it hangs for 10 minutes (see export/4 below)
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),
true = otel_batch_processor:on_end(generate_span(), State),
true = otel_batch_processor:on_end(generate_span(), State),

receive
{'EXIT', Pid, _} ->
Expand All @@ -85,9 +57,35 @@ exporting_runner_timeout_test(_Config) ->
ok
end.

check_table_size_test(_Config) ->
MaxQueueSize = 10,
{ok, _Pid, State} = otel_batch_processor:start_link(
#{name => test_processor_check_size_test,
resource => otel_resource:create([]),
exporter => ?MODULE,
exporting_timeout_ms => timer:minutes(10),
%% long enough, so that it never happens during the test
scheduled_delay_ms => timer:minutes(10),
max_queue_size => MaxQueueSize}
),
%% max_queue_size limit is not reached
true = otel_batch_processor:on_end(generate_span(), State),

insert_spans(State, MaxQueueSize),

%% Wait a little to give the handler time to transition to the export state
timer:sleep(30),

%% Insert the same number again, rgis time to the next table, as the previous is being exported,
%% exporter is slow (see init_per_testcase), so we can be sure that we will go to the drop mode,
%% with no chance to switch the table this time.
insert_spans(State, MaxQueueSize),

dropped = otel_batch_processor:on_end(generate_span(), State).

%% exporter behaviour

init(_) ->
init(_OtelSignal, _ExporterId, _) ->
{ok, []}.

export(_, _, _, _) ->
Expand All @@ -98,6 +96,10 @@ shutdown(_) ->

%% helpers

insert_spans(State, N) ->
lists:foreach(fun(_) -> otel_batch_processor:on_end(generate_span(), State) end,
lists:seq(1, N)).

generate_span() ->
#span{trace_id = otel_id_generator:generate_trace_id(),
span_id = otel_id_generator:generate_span_id(),
Expand Down
11 changes: 10 additions & 1 deletion apps/opentelemetry_api/src/otel_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
format_binary_string/2,
format_binary_string/3,
assert_to_binary/1,
unicode_to_binary/1]).
unicode_to_binary/1,
stack_without_args/1]).

-if(?OTP_RELEASE >= 24).
format_exception(Kind, Reason, StackTrace) ->
Expand Down Expand Up @@ -56,3 +57,11 @@ unicode_to_binary(String) ->
_ ->
{error, bad_binary_conversion}
end.

%% Args may contain sensitive data
stack_without_args([{M, F, Args, Info} | T]) when is_list(Args) ->
[{M, F, length(Args), Info} | stack_without_args(T)];

Check warning on line 63 in apps/opentelemetry_api/src/otel_utils.erl

View check run for this annotation

Codecov / codecov/patch

apps/opentelemetry_api/src/otel_utils.erl#L63

Added line #L63 was not covered by tests
stack_without_args([StItem | T] ) ->
[StItem | stack_without_args(T)];
stack_without_args([]) ->
[].
Loading

0 comments on commit d9afb2d

Please sign in to comment.