Skip to content

Commit

Permalink
rework propagators to use a composite propagator
Browse files Browse the repository at this point in the history
instead of propagators always being a list that are run in order
this PR requires a composite propagator be created if a multiple
propagators are to be used.
  • Loading branch information
tsloughter committed Sep 8, 2021
1 parent 6c6d75d commit 620eb77
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 133 deletions.
5 changes: 2 additions & 3 deletions apps/opentelemetry/test/otel_propagation_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ override_propagators(_Config) ->
%% drop baggage with bad value
otel_baggage:set(<<"key-3">>, value3),

Headers = otel_propagator_text_map:inject([{<<"existing-header">>, <<"I exist">>}],
#{propagators => [otel_propagator_baggage]}),
Headers = otel_propagator_text_map:inject({otel_propagator_baggage, []}, [{<<"existing-header">>, <<"I exist">>}]),

%% the manually set propagators does not include trace_context or b3
%% so header must only have the existing-header and the baggage
Expand All @@ -152,7 +151,7 @@ override_propagators(_Config) ->

%% make header keys uppercase to validate the extractor is case insensitive
BinaryHeaders = [{string:uppercase(Key), iolist_to_binary(Value)} || {Key, Value} <- Headers],
otel_propagator_text_map:extract(BinaryHeaders, #{propagators => [otel_propagator_baggage]}),
otel_propagator_text_map:extract({otel_propagator_baggage, []}, BinaryHeaders),

?assertEqual(#{<<"key-1">> => {<<"value=1">>, []},
<<"key-2">> => {<<"value-2">>, [<<"metadata">>, {<<"md-k-1">>, <<"md-v-1">>}]}},
Expand Down
34 changes: 22 additions & 12 deletions apps/opentelemetry_api/src/opentelemetry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
get_tracer/1,
set_text_map_propagators/1,
set_text_map_extractors/1,
get_text_map_extractors/0,
set_text_map_extractor/1,
get_text_map_extractor/0,
set_text_map_injectors/1,
get_text_map_injectors/0,
set_text_map_injector/1,
get_text_map_injector/0,
timestamp/0,
timestamp_to_nano/1,
convert_timestamp/2,
Expand Down Expand Up @@ -153,28 +155,36 @@ get_tracer(Name) ->
%% setting the propagators is the same as setting the same list for
%% injectors and extractors
set_text_map_propagators(List) when is_list(List) ->
set_text_map_injectors(List),
set_text_map_extractors(List);
CompositeInjector = otel_propagator_text_map_composite:create(List),
CompositeExtractor = otel_propagator_text_map_composite:create(List),
set_text_map_injector(CompositeInjector),
set_text_map_extractor(CompositeExtractor);
set_text_map_propagators(_) ->
ok.

set_text_map_extractors(List) when is_list(List) ->
ParsedList = otel_propagator:builtins_to_modules(List),
persistent_term:put({?MODULE, text_map_extractors}, ParsedList);
CompositeExtractor = otel_propagator_text_map_composite:create(List),
set_text_map_extractor(CompositeExtractor);
set_text_map_extractors(_) ->
ok.

set_text_map_extractor(Propagator) ->
persistent_term:put({?MODULE, text_map_extractor}, Propagator).

set_text_map_injectors(List) when is_list(List) ->
ParsedList = otel_propagator:builtins_to_modules(List),
persistent_term:put({?MODULE, text_map_injectors}, ParsedList);
CompositeInjector = otel_propagator_text_map_composite:create(List),
set_text_map_extractor(CompositeInjector);
set_text_map_injectors(_) ->
ok.

get_text_map_extractors() ->
persistent_term:get({?MODULE, text_map_extractors}, []).
set_text_map_injector(Propagator) ->
persistent_term:put({?MODULE, text_map_injector}, Propagator).

get_text_map_extractor() ->
persistent_term:get({?MODULE, text_map_extractor}, otel_propagator_text_map_noop).

get_text_map_injectors() ->
persistent_term:get({?MODULE, text_map_injectors}, []).
get_text_map_injector() ->
persistent_term:get({?MODULE, text_map_injector}, otel_propagator_text_map_noop).

%% @doc A monotonically increasing time provided by the Erlang runtime system in the native time unit.
%% This value is the most accurate and precise timestamp available from the Erlang runtime and
Expand Down
22 changes: 9 additions & 13 deletions apps/opentelemetry_api/src/otel_propagator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,13 @@
builtin_to_module/1]).

%% Sets a value into a carrier
-callback inject(carrier()) -> carrier().
-callback inject(carrier(), propagator_options()) -> carrier().
-callback inject_from(otel_ctx:t(), carrier()) -> carrier().
-callback inject_from(otel_ctx:t(), carrier(), propagator_options()) -> carrier().
-callback inject(t(), carrier()) -> carrier().
-callback inject_from(otel_ctx:t(), t(), carrier()) -> carrier().
%% extracts values from a carrier and sets them in the context
-callback extract(carrier()) -> otel_ctx:t().
-callback extract(carrier(), propagator_options()) -> otel_ctx:t().
-callback extract_to(otel_ctx:t(), carrier()) -> otel_ctx:t().
-callback extract_to(otel_ctx:t(), carrier(), propagator_options()) -> otel_ctx:t().
-callback extract(t(), carrier()) -> otel_ctx:t().
-callback extract_to(otel_ctx:t(), t(), carrier()) -> otel_ctx:t().

-type propagator_options() :: #{propagators => [t()]}.

-type t() :: builtin() | module().
-type t() :: builtin() | module() | {module(), term()}.

-type builtin() :: trace_context | tracecontext | b3. %% multib3 | jaeger

Expand Down Expand Up @@ -78,5 +72,7 @@ builtin_to_module(b3) ->
%% otel_propagator_multib3;
%% builtin_to_module(jaeger) ->
%% otel_propagator_jaeger;
builtin_to_module(Module) ->
Module.
builtin_to_module(Module) when is_atom(Module) ->
Module;
builtin_to_module(Propagator) ->
Propagator.
12 changes: 6 additions & 6 deletions apps/opentelemetry_api/src/otel_propagator_b3.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@

-behaviour(otel_propagator_text_map).

-export([fields/0,
inject/3,
extract/4]).
-export([fields/1,
inject/4,
extract/5]).

-include("opentelemetry.hrl").

Expand All @@ -65,10 +65,10 @@

-define(B3_IS_SAMPLED(S), S =:= "1" orelse S =:= <<"1">> orelse S =:= "true" orelse S =:= <<"true">>).

fields() ->
fields(_) ->
[?B3_TRACE_ID, ?B3_SPAN_ID, ?B3_SAMPLED].

inject(Ctx, Carrier, CarrierSet) ->
inject(Ctx, Carrier, CarrierSet, _Options) ->
case otel_tracer:current_span_ctx(Ctx) of
#span_ctx{trace_id=TraceId,
span_id=SpanId,
Expand All @@ -83,7 +83,7 @@ inject(Ctx, Carrier, CarrierSet) ->
Carrier
end.

extract(Ctx, Carrier, _CarrierKeysFun, CarrierGet) ->
extract(Ctx, Carrier, _CarrierKeysFun, CarrierGet, _Options) ->
try
TraceId = trace_id(Carrier, CarrierGet),
SpanId = span_id(Carrier, CarrierGet),
Expand Down
12 changes: 6 additions & 6 deletions apps/opentelemetry_api/src/otel_propagator_baggage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@

-behaviour(otel_propagator_text_map).

-export([fields/0,
inject/3,
extract/4]).
-export([fields/1,
inject/4,
extract/5]).

-include("opentelemetry.hrl").

Expand All @@ -50,10 +50,10 @@

-define(BAGGAGE_HEADER, <<"baggage">>).

fields() ->
fields(_) ->
[?BAGGAGE_HEADER].

inject(Ctx, Carrier, CarrierSet) ->
inject(Ctx, Carrier, CarrierSet, _Options) ->
Baggage = otel_baggage:get_all(Ctx),
case maps:fold(fun(Key, Value, Acc) ->
[$,, [encode_key(Key), "=", encode_value(Value)] | Acc]
Expand All @@ -64,7 +64,7 @@ inject(Ctx, Carrier, CarrierSet) ->
Carrier
end.

extract(Ctx, Carrier, _CarrierKeysFun, CarrierGet) ->
extract(Ctx, Carrier, _CarrierKeysFun, CarrierGet, _Options) ->
case CarrierGet(?BAGGAGE_HEADER, Carrier) of
undefined ->
Ctx;
Expand Down
133 changes: 64 additions & 69 deletions apps/opentelemetry_api/src/otel_propagator_text_map.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,19 @@

-behaviour(otel_propagator).

-export([inject/1,
-export([fields/1,
inject/1,
inject/2,
inject/3,
inject_from/2,
inject_from/3,
inject_from/4,
extract/1,
extract/2,
extract/4,
extract_to/2,
extract_to/3]).
extract_to/3,
extract_to/5]).

-export([default_carrier_get/2,
default_carrier_set/3,
Expand All @@ -73,13 +78,16 @@
-include_lib("kernel/include/logger.hrl").

%% Sets a value into a carrier
-callback inject(otel_ctx:t(), otel_propagator:carrier(), carrier_set()) -> otel_propagator:carrier().
-callback inject(otel_ctx:t(), otel_propagator:carrier(), carrier_set(), propagator_options()) -> otel_propagator:carrier().

%% Extracts values from a carrier and sets them in the context
-callback extract(otel_ctx:t(), otel_propagator:carrier(), carrier_keys(), carrier_get()) -> term().
-callback extract(otel_ctx:t(), otel_propagator:carrier(), carrier_keys(), carrier_get(), propagator_options()) -> term().

%% Returns all the keys the propagator sets with `inject'
-callback fields() -> [field_key()].
-callback fields(propagator_options()) -> [field_key()].

%% a textmap propagator can have any term associated with it
-type propagator_options() :: term().

-type field_key() :: unicode:latin1_binary().
-type field_value() :: unicode:latin1_binary().
Expand All @@ -93,89 +101,76 @@

-type default_text_map_carrier() :: [{unicode:latin1_binary(), unicode:latin1_binary()}].

-type inject_options() :: #{carrier_set_fun => carrier_set(),
propagators => [otel_propagator:t()]}.
%% 2-tuple form is a textmap propagator with options
%% an empty list is passed for options if the propagator is a module with no options
-type t() :: module() | {module(), propagator_options()}.

-export_type([t/0]).

-type extract_options() :: #{carrier_get_fun => carrier_get(),
carrier_keys_fun => carrier_keys(),
propagators => [otel_propagator:t()]}.
-export_type([]).
-spec fields(otel_propagator:t()) -> [field_key()].
fields(Propagator) when is_atom(Propagator) ->
Propagator:fields([]);
fields({Module, Options}) ->
Module:fields(Options).

-spec inject(otel_propagator:carrier()) -> otel_propagator:carrier().
inject(Carrier) ->
inject(Carrier, #{}).
Propagator = opentelemetry:get_text_map_injector(),
inject(Propagator, Carrier, fun default_carrier_set/3).

-spec inject(otel_propagator:t(), otel_propagator:carrier()) -> otel_propagator:carrier().
inject(Propagator, Carrier) ->
inject(Propagator, Carrier, fun default_carrier_set/3).

-spec inject(otel_propagator:carrier(), inject_options()) -> otel_propagator:carrier().
inject(Carrier, InjectOptions) ->
-spec inject(otel_propagator:t(), otel_propagator:carrier(), fun()) -> otel_propagator:carrier().
inject(Propagator, Carrier, CarrierSetFun) ->
Context = otel_ctx:get_current(),
inject_from(Context, Carrier, InjectOptions).
inject_from(Context, Propagator, Carrier, CarrierSetFun).

-spec inject_from(otel_ctx:t(), otel_propagator:carrier()) -> otel_propagator:carrier().
inject_from(Context, Carrier) ->
inject_from(Context, Carrier, #{}).
Propagator = opentelemetry:get_text_map_injector(),
inject_from(Context, Propagator, Carrier, fun default_carrier_set/3).

-spec inject_from(otel_ctx:t(), otel_propagator:carrier(), inject_options()) -> otel_propagator:carrier().
inject_from(Context, Carrier, InjectOptions) when is_map(InjectOptions)->
Injectors = maps:get(propagators, InjectOptions, opentelemetry:get_text_map_injectors()),
CarrierSetFun = maps:get(carrier_set_fun, InjectOptions, fun ?MODULE:default_carrier_set/3),
run_injectors(Context, Injectors, Carrier, CarrierSetFun);
inject_from(_Context, Carrier, InjectOptions) ->
?LOG_INFO("inject failed. InjectOptions must be a map but instead got: ~p", [InjectOptions]),
Carrier.
-spec inject_from(otel_ctx:t(), otel_propagator:t(), otel_propagator:carrier()) -> otel_propagator:carrier().
inject_from(Context, Propagator, Carrier) ->
inject_from(Context, Propagator, Carrier, fun default_carrier_set/3).

-spec inject_from(otel_ctx:t(), otel_propagator:t(), otel_propagator:carrier(), fun()) -> otel_propagator:carrier().
inject_from(Context, Module, Carrier, CarrierSetFun) when is_atom(Module) ->
Module:inject(Context, Carrier, CarrierSetFun, []);
inject_from(Context, {Module, Options}, Carrier, CarrierSetFun) ->
Module:inject(Context, Carrier, CarrierSetFun, Options).

-spec extract(otel_propagator:carrier()) -> otel_ctx:t().
extract(Carrier) ->
extract(Carrier, #{}).
Propagator = opentelemetry:get_text_map_extractor(),
extract(Propagator, Carrier, fun default_carrier_keys/1, fun default_carrier_get/2).

-spec extract(otel_propagator:t(), otel_propagator:carrier()) -> otel_ctx:t().
extract(Propagator, Carrier) ->
extract(Propagator, Carrier, fun default_carrier_keys/1, fun default_carrier_get/2).

-spec extract(otel_propagator:carrier(), extract_options()) -> otel_ctx:t().
extract(Carrier, ExtractOptions) ->
-spec extract(otel_propagator:t(), otel_propagator:carrier(), fun(), fun()) -> otel_ctx:t().
extract(Propagator, Carrier, CarrierKeysFun, CarrierGetFun) ->
Context = otel_ctx:get_current(),
Context1 = extract_to(Context, Carrier, ExtractOptions),
Context1 = extract_to(Context, Propagator, Carrier, CarrierKeysFun, CarrierGetFun),
otel_ctx:attach(Context1).

-spec extract_to(otel_ctx:t(), otel_propagator:carrier()) -> otel_ctx:t().
extract_to(Context, Carrier) ->
extract_to(Context, Carrier, #{}).

-spec extract_to(otel_ctx:t(), otel_propagator:carrier(), extract_options()) -> otel_ctx:t().
extract_to(Context, Carrier, ExtractOptions) when is_map(ExtractOptions) ->
Extractors = maps:get(propagators, ExtractOptions, opentelemetry:get_text_map_extractors()),
CarrierKeysFun = maps:get(carrier_keys_fun, ExtractOptions, fun ?MODULE:default_carrier_keys/1),
CarrierGetFun = maps:get(carrier_get_fun, ExtractOptions, fun ?MODULE:default_carrier_get/2),
run_extractors(Context, Extractors, Carrier, CarrierKeysFun, CarrierGetFun);
extract_to(Context, _Carrier, ExtractOptions) ->
?LOG_INFO("extract failed. ExtractOptions must be a map but instead got: ~p", [ExtractOptions]),
Context.

run_extractors(Context, Extractors, Carrier, CarrierKeysFun, CarrierGetFun) when is_list(Extractors) ->
lists:foldl(fun(Extract, ContextAcc) ->
try Extract:extract(ContextAcc, Carrier, CarrierKeysFun, CarrierGetFun)
catch
C:E:S ->
?LOG_INFO("text map propagator failed to extract from carrier",
#{extractor => Extract, carrier => Carrier,
class => C, exception => E, stacktrace => S}),
ContextAcc
end
end, Context, otel_propagator:builtins_to_modules(Extractors));
run_extractors(Context, Extractors, _, _, _) ->
?LOG_INFO("extract failed. Extractors must be a list but instead got: ~p", [Extractors]),
Context.

run_injectors(Context, Injectors, Carrier, Setter) when is_list(Injectors) ->
lists:foldl(fun(Inject, CarrierAcc) ->
try Inject:inject(Context, CarrierAcc, Setter)
catch
C:E:S ->
?LOG_INFO("text map propagator failed to inject to carrier",
#{injector => Inject, carrier => CarrierAcc,
class => C, exception => E, stacktrace => S}),
CarrierAcc
end
end, Carrier, otel_propagator:builtins_to_modules(Injectors));
run_injectors(_, Injectors, Carrier, _) ->
?LOG_INFO("inject failed. Injectors must be a list but instead got: ~p", [Injectors]),
Carrier.
Propagator = opentelemetry:get_text_map_extractor(),
extract_to(Context, Propagator, Carrier, fun default_carrier_keys/1, fun default_carrier_get/2).

-spec extract_to(otel_ctx:t(), otel_propagator:t(), otel_propagator:carrier()) -> otel_ctx:t().
extract_to(Context, Propagator, Carrier) ->
extract_to(Context, Propagator, Carrier, fun default_carrier_keys/1, fun default_carrier_get/2).

-spec extract_to(otel_ctx:t(), otel_propagator:t(), otel_propagator:carrier(), fun(), fun()) -> otel_ctx:t().
extract_to(Context, Module, Carrier, CarrierKeysFun, CarrierGetFun) when is_atom(Module) ->
Module:extract(Context, Carrier, CarrierKeysFun, CarrierGetFun, []);
extract_to(Context, {Module, Options}, Carrier, CarrierKeysFun, CarrierGetFun) ->
Module:extract(Context, Carrier, CarrierKeysFun, CarrierGetFun, Options).

%% case-insensitive finding of a key string in a list of ASCII strings
%% if there are multiple entries in the list for the same key the values
Expand Down
Loading

0 comments on commit 620eb77

Please sign in to comment.