Skip to content

Commit

Permalink
Merge branch 'main' into batch-repeat-timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
tsloughter authored Nov 8, 2021
2 parents 9a80c50 + 17bc408 commit 4a05b37
Show file tree
Hide file tree
Showing 7 changed files with 431 additions and 127 deletions.
10 changes: 9 additions & 1 deletion apps/opentelemetry/src/opentelemetry_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ init([Opts]) ->
type => worker,
modules => [otel_batch_processor]},

SimpleProcessorOpts = proplists:get_value(otel_simple_processor, Processors, #{}),
SimpleProcessor = #{id => otel_simple_processor,
start => {otel_simple_processor, start_link, [SimpleProcessorOpts]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [otel_simple_processor]},

SpanSup = #{id => otel_span_sup,
start => {otel_span_sup, start_link, [Opts]},
type => supervisor,
Expand All @@ -67,6 +75,6 @@ init([Opts]) ->
%% `TracerServer' *must* start before the `BatchProcessor'
%% `BatchProcessor' relies on getting the `Resource' from
%% the `TracerServer' process
ChildSpecs = [Detectors, TracerServer, BatchProcessor, SpanSup],
ChildSpecs = [Detectors, TracerServer, BatchProcessor, SimpleProcessor, SpanSup],

{ok, {SupFlags, ChildSpecs}}.
167 changes: 45 additions & 122 deletions apps/opentelemetry/src/otel_batch_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% @doc This module has the behaviour that each exporter must implement
%% and creates the buffer of trace spans to be exported.
%%
%% The exporter process can be configured to export the current finished
%% spans based on timeouts and the size of the finished spans table.
%% @doc The Batch Span Processor implements the `otel_span_processor'
%% behaviour. It stores finished Spans in a ETS table buffer and exports
%% them on an interval or when the table reaches a maximum size.
%%
%% Timeouts:
%% exporting_timeout_ms: How long to let the exports run before killing.
Expand Down Expand Up @@ -109,7 +107,7 @@ init([Args]) ->
ScheduledDelay = maps:get(scheduled_delay_ms, Args, ?DEFAULT_SCHEDULED_DELAY_MS),
CheckTableSize = maps:get(check_table_size_ms, Args, ?DEFAULT_CHECK_TABLE_SIZE_MS),

Exporter = init_exporter(maps:get(exporter, Args, undefined)),
Exporter = otel_exporter:init(maps:get(exporter, Args, undefined)),
Resource = otel_tracer_provider:resource(),

_Tid1 = new_export_table(?TABLE_1),
Expand Down Expand Up @@ -146,11 +144,19 @@ exporting({timeout, export_spans}, export_spans, _) ->
{keep_state_and_data, [postpone]};
exporting(enter, _OldState, Data=#data{exporting_timeout_ms=ExportingTimeout,
scheduled_delay_ms=SendInterval}) ->
{OldTableName, RunnerPid} = export_spans(Data),
{keep_state, Data#data{runner_pid=RunnerPid,
handed_off_table=OldTableName},
[{state_timeout, ExportingTimeout, exporting_timeout},
{{timeout, export_spans}, SendInterval, export_spans}]};
case export_spans(Data) of
ok ->
%% in an `enter' handler we can't return a `next_state' or `next_event'
%% so we rely on a timeout to trigger the transition to `idle'
{keep_state, Data#data{runner_pid=undefined}, [{state_timeout, 0, empty_table}]};
{OldTableName, RunnerPid} ->
{keep_state, Data#data{runner_pid=RunnerPid,
handed_off_table=OldTableName},
[{state_timeout, ExportingTimeout, exporting_timeout},
{{timeout, export_spans}, SendInterval, export_spans}]}
end;
exporting(state_timeout, empty_table, Data) ->
{next_state, idle, Data};
exporting(state_timeout, exporting_timeout, Data=#data{handed_off_table=ExportingTable}) ->
%% kill current exporting process because it is taking too long
%% which deletes the exporting table, so create a new one and
Expand Down Expand Up @@ -189,8 +195,8 @@ handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_q
keep_state_and_data
end;
handle_event_(_, {call, From}, {set_exporter, Exporter}, Data=#data{exporter=OldExporter}) ->
shutdown_exporter(OldExporter),
{keep_state, Data#data{exporter=init_exporter(Exporter)}, [{reply, From, ok}]};
otel_exporter:shutdown(OldExporter),
{keep_state, Data#data{exporter=otel_exporter:init(Exporter)}, [{reply, From, ok}]};
handle_event_(_, _, _, _) ->
keep_state_and_data.

Expand Down Expand Up @@ -227,6 +233,9 @@ do_insert(Span) ->
complete_exporting(Data=#data{handed_off_table=ExportingTable})
when ExportingTable =/= undefined ->
new_export_table(ExportingTable),
{next_state, idle, Data#data{runner_pid=undefined,
handed_off_table=undefined}};
complete_exporting(Data) ->
{next_state, idle, Data#data{runner_pid=undefined,
handed_off_table=undefined}}.

Expand All @@ -247,95 +256,31 @@ new_export_table(Name) ->
%% for each instrumentation_library and export together.
{keypos, #span.instrumentation_library}]).

init_exporter(undefined) ->
undefined;
init_exporter({ExporterModule, Config}) when is_atom(ExporterModule) ->
try ExporterModule:init(Config) of
{ok, ExporterConfig} ->
{ExporterModule, ExporterConfig};
ignore ->
undefined
catch
Kind:Reason:StackTrace ->
%% logging in debug level since config argument in stacktrace could have secrets
?LOG_DEBUG(#{source => exporter,
during => init,
kind => Kind,
reason => Reason,
exporter => ExporterModule,
stacktrace => StackTrace}, #{report_cb => fun ?MODULE:report_cb/1}),

%% print a more useful message about the failure if we can discern
%% one from the failure reason and exporter used
case {Kind, Reason} of
{error, badarg} when ExporterModule =:= opentelemetry_exporter ->
case maps:get(protocol, Config, undefined) of
grpc ->
%% grpc protocol uses grpcbox which is not included by default
%% this will check if it is available so we can warn the user if
%% the dependency needs to be added
try grpcbox:module_info() of
_ ->
undefined
catch
_:_ ->
?LOG_WARNING("OTLP tracer, ~p, failed to initialize when using GRPC protocol and `grpcbox` module is not available in the code path. Verify that you have the `grpcbox` dependency included and rerun.", [ExporterModule]),
undefined
end;
_ ->
%% same as the debug log above
%% without the stacktrace and at a higher level
?LOG_WARNING(#{source => exporter,
during => init,
kind => Kind,
reason => Reason,
exporter => ExporterModule}, #{report_cb => fun ?MODULE:report_cb/1}),
undefined
end;
{error, undef} when ExporterModule =:= opentelemetry_exporter ->
?LOG_WARNING("Trace exporter module ~p not found. Verify you have included the `opentelemetry_exporter` dependency.", [ExporterModule]),
undefined;
{error, undef} ->
?LOG_WARNING("Trace exporter module ~p not found. Verify you have included the dependency that contains the exporter module.", [ExporterModule]),
undefined;
_ ->
%% same as the debug log above
%% without the stacktrace and at a higher level
?LOG_WARNING(#{source => exporter,
during => init,
kind => Kind,
reason => Reason,
exporter => ExporterModule}, #{report_cb => fun ?MODULE:report_cb/1}),
undefined
end
end;
init_exporter(ExporterModule) when is_atom(ExporterModule) ->
init_exporter({ExporterModule, []}).

shutdown_exporter(undefined) ->
ok;
shutdown_exporter({ExporterModule, Config}) ->
ExporterModule:shutdown(Config).

export_spans(#data{exporter=Exporter,
resource=Resource}) ->
CurrentTable = ?CURRENT_TABLE,
NewCurrentTable = case CurrentTable of
?TABLE_1 ->
?TABLE_2;
?TABLE_2 ->
?TABLE_1
end,

%% an atom is a single word so this does not trigger a global GC
persistent_term:put(?CURRENT_TABLES_KEY, NewCurrentTable),
%% set the table to accept inserts
enable(),
case ets:info(CurrentTable, size) of
0 ->
%% nothing to do if the table is empty
ok;
_ ->
NewCurrentTable = case CurrentTable of
?TABLE_1 ->
?TABLE_2;
?TABLE_2 ->
?TABLE_1
end,

%% an atom is a single word so this does not trigger a global GC
persistent_term:put(?CURRENT_TABLES_KEY, NewCurrentTable),
%% set the table to accept inserts
enable(),

Self = self(),
RunnerPid = erlang:spawn_link(fun() -> send_spans(Self, Resource, Exporter) end),
ets:give_away(CurrentTable, RunnerPid, export),
{CurrentTable, RunnerPid}.
Self = self(),
RunnerPid = erlang:spawn_link(fun() -> send_spans(Self, Resource, Exporter) end),
ets:give_away(CurrentTable, RunnerPid, export),
{CurrentTable, RunnerPid}
end.

%% Additional benefit of using a separate process is calls to `register` won't
%% timeout if the actual exporting takes longer than the call timeout
Expand All @@ -357,7 +302,7 @@ export({ExporterModule, Config}, Resource, SpansTid) ->
%% don't let a exporter exception crash us
%% and return true if exporter failed
try
ExporterModule:export(SpansTid, Resource, Config) =:= failed_not_retryable
otel_exporter:export(ExporterModule, SpansTid, Resource, Config) =:= failed_not_retryable
catch
Kind:Reason:StackTrace ->
?LOG_INFO(#{source => exporter,
Expand All @@ -370,33 +315,11 @@ export({ExporterModule, Config}, Resource, SpansTid) ->
end.

%% logger format functions
report_cb(#{source := exporter,
during := init,
kind := Kind,
reason := Reason,
exporter := ExporterModule,
stacktrace := StackTrace}) ->
{"OTLP tracer ~p failed to initialize: ~ts",
[ExporterModule, format_exception(Kind, Reason, StackTrace)]};
report_cb(#{source := exporter,
during := init,
kind := Kind,
reason := Reason,
exporter := ExporterModule}) ->
{"OTLP tracer ~p failed to initialize with exception ~p:~p", [ExporterModule, Kind, Reason]};
report_cb(#{source := exporter,
during := export,
kind := Kind,
reason := Reason,
exporter := ExporterModule,
stacktrace := StackTrace}) ->
{"exporter threw exception: exporter=~p ~ts",
[ExporterModule, format_exception(Kind, Reason, StackTrace)]}.

-if(?OTP_RELEASE >= 24).
format_exception(Kind, Reason, StackTrace) ->
erl_error:format_exception(Kind, Reason, StackTrace).
-else.
format_exception(Kind, Reason, StackTrace) ->
io_lib:format("~p:~p ~p", [Kind, Reason, StackTrace]).
-endif.
[ExporterModule, otel_utils:format_exception(Kind, Reason, StackTrace)]}.
95 changes: 95 additions & 0 deletions apps/opentelemetry/src/otel_exporter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
%%%-----------------------------------------------------------------------
-module(otel_exporter).

-export([init/1,
export/4,
shutdown/1,
report_cb/1]).

%% Do any initialization of the exporter here and return configuration
%% that will be passed along with a list of spans to the `export' function.
-callback init(term()) -> {ok, term()} | ignore.
Expand All @@ -30,3 +35,93 @@
failed_not_retryable |
failed_retryable.
-callback shutdown(term()) -> ok.

-include_lib("kernel/include/logger.hrl").

init(undefined) ->
undefined;
init({ExporterModule, Config}) when is_atom(ExporterModule) ->
try ExporterModule:init(Config) of
{ok, ExporterConfig} ->
{ExporterModule, ExporterConfig};
ignore ->
undefined
catch
Kind:Reason:StackTrace ->
%% logging in debug level since config argument in stacktrace could have secrets
?LOG_DEBUG(#{source => exporter,
during => init,
kind => Kind,
reason => Reason,
exporter => ExporterModule,
stacktrace => StackTrace}, #{report_cb => fun ?MODULE:report_cb/1}),

%% print a more useful message about the failure if we can discern
%% one from the failure reason and exporter used
case {Kind, Reason} of
{error, badarg} when ExporterModule =:= opentelemetry_exporter ->
case maps:get(protocol, Config, undefined) of
grpc ->
%% grpc protocol uses grpcbox which is not included by default
%% this will check if it is available so we can warn the user if
%% the dependency needs to be added
try grpcbox:module_info() of
_ ->
undefined
catch
_:_ ->
?LOG_WARNING("OTLP tracer, ~p, failed to initialize when using GRPC protocol and `grpcbox` module is not available in the code path. Verify that you have the `grpcbox` dependency included and rerun.", [ExporterModule]),
undefined
end;
_ ->
%% same as the debug log above
%% without the stacktrace and at a higher level
?LOG_WARNING(#{source => exporter,
during => init,
kind => Kind,
reason => Reason,
exporter => ExporterModule}, #{report_cb => fun ?MODULE:report_cb/1}),
undefined
end;
{error, undef} when ExporterModule =:= opentelemetry_exporter ->
?LOG_WARNING("Trace exporter module ~p not found. Verify you have included the `opentelemetry_exporter` dependency.", [ExporterModule]),
undefined;
{error, undef} ->
?LOG_WARNING("Trace exporter module ~p not found. Verify you have included the dependency that contains the exporter module.", [ExporterModule]),
undefined;
_ ->
%% same as the debug log above
%% without the stacktrace and at a higher level
?LOG_WARNING(#{source => exporter,
during => init,
kind => Kind,
reason => Reason,
exporter => ExporterModule}, #{report_cb => fun ?MODULE:report_cb/1}),
undefined
end
end;
init(ExporterModule) when is_atom(ExporterModule) ->
init({ExporterModule, []}).

export(ExporterModule, SpansTid, Resource, Config) ->
ExporterModule:export(SpansTid, Resource, Config).

shutdown(undefined) ->
ok;
shutdown({ExporterModule, Config}) ->
ExporterModule:shutdown(Config).

report_cb(#{source := exporter,
during := init,
kind := Kind,
reason := Reason,
exporter := ExporterModule,
stacktrace := StackTrace}) ->
{"OTLP tracer ~p failed to initialize: ~ts",
[ExporterModule, otel_utils:format_exception(Kind, Reason, StackTrace)]};
report_cb(#{source := exporter,
during := init,
kind := Kind,
reason := Reason,
exporter := ExporterModule}) ->
{"OTLP tracer ~p failed to initialize with exception ~p:~p", [ExporterModule, Kind, Reason]}.
Loading

0 comments on commit 4a05b37

Please sign in to comment.