Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trace-subscriber #32

Merged
merged 12 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ jobs:
- run: opam exec -- dune runtest -p trace-tef,trace-fuchsia

# with depopts
- run: opam install hmap
- run: opam install hmap mtime
- run: opam exec -- dune build '@install' -p trace,trace-tef,trace-fuchsia

2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ clean:

test:
@dune runtest $(DUNE_OPTS)
test-autopromote:
@dune runtest $(DUNE_OPTS) --auto-promote

doc:
@dune build $(DUNE_OPTS) @doc
Expand Down
73 changes: 49 additions & 24 deletions dune-project
Original file line number Diff line number Diff line change
@@ -1,64 +1,89 @@
(lang dune 2.9)

(name trace)

(generate_opam_files true)

(version 0.7)

(source
(github c-cube/ocaml-trace))

(authors "Simon Cruanes")

(maintainers "Simon Cruanes")

(license MIT)

;(documentation https://url/to/documentation)

(package
(name trace)
(synopsis "A stub for tracing/observability, agnostic in how data is collected")
(synopsis
"A stub for tracing/observability, agnostic in how data is collected")
(depends
(ocaml (>= 4.08))
dune)
(ocaml
(>= 4.08))
dune)
(depopts
hmap
(mtime (>= 2.0)))
hmap
(mtime
(>= 2.0)))
(tags
(trace tracing observability profiling)))

(package
(name ppx_trace)
(synopsis "A ppx-based preprocessor for trace")
(depends
(ocaml (>= 4.12)) ; we use __FUNCTION__
(ppxlib (>= 0.28))
(trace (= :version))
(trace-tef (and (= :version) :with-test))
dune)
(ocaml
(>= 4.12)) ; we use __FUNCTION__
(ppxlib
(>= 0.28))
(trace
(= :version))
(trace-tef
(and
(= :version)
:with-test))
dune)
(depopts
(mtime (>= 2.0)))
(tags
(trace ppx)))

(package
(name trace-tef)
(synopsis "A simple backend for trace, emitting Catapult/TEF JSON into a file")
(synopsis
"A simple backend for trace, emitting Catapult/TEF JSON into a file")
(depends
(ocaml (>= 4.08))
(trace (= :version))
(mtime (>= 2.0))
base-unix
dune)
(ocaml
(>= 4.08))
(trace
(= :version))
(mtime
(>= 2.0))
base-unix
dune)
(tags
(trace tracing catapult TEF chrome-format)))

(package
(name trace-fuchsia)
(synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file")
(synopsis
"A high-performance backend for trace, emitting a Fuchsia trace into a file")
(depends
(ocaml (>= 4.08))
(trace (= :version))
(mtime (>= 2.0))
(thread-local-storage (>= 0.2))
base-bigarray
base-unix
dune)
(ocaml
(>= 4.08))
(trace
(= :version))
(mtime
(>= 2.0))
(thread-local-storage
(>= 0.2))
base-bigarray
base-unix
dune)
(tags
(trace tracing fuchsia)))

Expand Down
3 changes: 3 additions & 0 deletions ppx_trace.opam
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ depends: [
"dune" {>= "2.9"}
"odoc" {with-doc}
]
depopts: [
"mtime" {>= "2.0"}
]
build: [
["dune" "subst"] {dev}
[
Expand Down
129 changes: 129 additions & 0 deletions src/subscriber/callbacks.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
open Trace_core
open Types

(** First class module signature for callbacks *)
module type S = sig
type st
(** Type of the state passed to every callback. *)

val on_init : st -> time_ns:float -> unit
(** Called when the subscriber is initialized in a collector *)

val on_shutdown : st -> time_ns:float -> unit
(** Called when the collector is shutdown *)

val on_name_thread : st -> time_ns:float -> tid:int -> name:string -> unit
(** Current thread is being named *)

val on_name_process : st -> time_ns:float -> tid:int -> name:string -> unit
(** Current process is being named *)

val on_enter_span :
st ->
__FUNCTION__:string option ->
__FILE__:string ->
__LINE__:int ->
time_ns:float ->
tid:int ->
data:(string * user_data) list ->
name:string ->
span ->
unit
(** Enter a regular (sync) span *)

val on_exit_span : st -> time_ns:float -> tid:int -> span -> unit
(** Exit a span. This and [on_enter_span] must follow strict stack discipline *)

val on_add_data : st -> data:(string * user_data) list -> span -> unit
(** Add data to a regular span (which must be active) *)

val on_message :
st ->
time_ns:float ->
tid:int ->
span:span option ->
data:(string * user_data) list ->
string ->
unit
(** Emit a log message *)

val on_counter :
st ->
time_ns:float ->
tid:int ->
data:(string * user_data) list ->
name:string ->
float ->
unit
(** Emit the current value of a counter *)

val on_enter_manual_span :
st ->
__FUNCTION__:string option ->
__FILE__:string ->
__LINE__:int ->
time_ns:float ->
tid:int ->
parent:span option ->
data:(string * user_data) list ->
name:string ->
flavor:flavor option ->
trace_id:int ->
span ->
unit
(** Enter a manual (possibly async) span *)

val on_exit_manual_span :
st ->
time_ns:float ->
tid:int ->
name:string ->
data:(string * user_data) list ->
flavor:flavor option ->
trace_id:int ->
span ->
unit
(** Exit a manual span *)
end

type 'st t = (module S with type st = 'st)
(** Callbacks for a subscriber. There is one callback per event
in {!Trace}. The type ['st] is the state that is passed to
every single callback. *)

(** Dummy callbacks.
It can be useful to reuse some of these functions in a
real subscriber that doesn't want to handle {b all}
events, but only some of them. *)
module Dummy = struct
let on_init _ ~time_ns:_ = ()
let on_shutdown _ ~time_ns:_ = ()
let on_name_thread _ ~time_ns:_ ~tid:_ ~name:_ = ()
let on_name_process _ ~time_ns:_ ~tid:_ ~name:_ = ()
let on_message _ ~time_ns:_ ~tid:_ ~span:_ ~data:_ _msg = ()
let on_counter _ ~time_ns:_ ~tid:_ ~data:_ ~name:_ _v = ()

let on_enter_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~time_ns:_ ~tid:_
~data:_ ~name:_ _sp =
()

let on_exit_span _ ~time_ns:_ ~tid:_ _ = ()
let on_add_data _ ~data:_ _sp = ()

let on_enter_manual_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~time_ns:_
~tid:_ ~parent:_ ~data:_ ~name:_ ~flavor:_ ~trace_id:_ _sp =
()

let on_exit_manual_span _ ~time_ns:_ ~tid:_ ~name:_ ~data:_ ~flavor:_
~trace_id:_ _ =
()
end

(** Dummy callbacks, do nothing. *)
let dummy (type st) () : st t =
let module M = struct
type nonrec st = st

include Dummy
end in
(module M)
13 changes: 13 additions & 0 deletions src/subscriber/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

(library
(name trace_subscriber)
(public_name trace.subscriber)
(libraries (re_export trace.core)
(select thread_.ml from
(threads -> thread_.real.ml)
( -> thread_.dummy.ml))
(select time_.ml from
(mtime mtime.clock.os -> time_.mtime.ml)
(mtime mtime.clock.jsoo -> time_.mtime.ml)
( -> time_.dummy.ml))))

104 changes: 104 additions & 0 deletions src/subscriber/subscriber.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
(** A trace subscriber. It pairs a set of callbacks
with the state they need (which can contain a file handle,
a socket, config, etc.).

The design goal for this is that it should be possible to avoid allocations
when the trace collector calls the callbacks. *)
type t =
| Sub : {
st: 'st;
callbacks: 'st Callbacks.t;
}
-> t

(** Dummy subscriber that ignores every call. *)
let dummy : t = Sub { st = (); callbacks = Callbacks.dummy () }

open struct
module Tee_cb : Callbacks.S with type st = t * t = struct
type nonrec st = t * t

let on_init
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns =
CB1.on_init s1 ~time_ns;
CB2.on_init s2 ~time_ns

let on_shutdown
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns =
CB1.on_shutdown s1 ~time_ns;
CB2.on_shutdown s2 ~time_ns

let on_name_thread
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name =
CB1.on_name_thread s1 ~time_ns ~tid ~name;
CB2.on_name_thread s2 ~time_ns ~tid ~name

let on_name_process
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name =
CB1.on_name_process s1 ~time_ns ~tid ~name;
CB2.on_name_process s2 ~time_ns ~tid ~name

let on_enter_span
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~__FUNCTION__ ~__FILE__
~__LINE__ ~time_ns ~tid ~data ~name span =
CB1.on_enter_span s1 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data
~name span;
CB2.on_enter_span s2 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data
~name span

let on_exit_span
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid span =
CB1.on_exit_span s1 ~time_ns ~tid span;
CB2.on_exit_span s2 ~time_ns ~tid span

let on_add_data
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~data span =
CB1.on_add_data s1 ~data span;
CB2.on_add_data s2 ~data span

let on_message
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~span ~data
msg =
CB1.on_message s1 ~time_ns ~tid ~span ~data msg;
CB2.on_message s2 ~time_ns ~tid ~span ~data msg

let on_counter
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~data ~name
n =
CB1.on_counter s1 ~time_ns ~tid ~data ~name n;
CB2.on_counter s2 ~time_ns ~tid ~data ~name n

let on_enter_manual_span
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~__FUNCTION__ ~__FILE__
~__LINE__ ~time_ns ~tid ~parent ~data ~name ~flavor ~trace_id span =
CB1.on_enter_manual_span s1 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns
~tid ~parent ~data ~name ~flavor ~trace_id span;
CB2.on_enter_manual_span s2 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns
~tid ~parent ~data ~name ~flavor ~trace_id span

let on_exit_manual_span
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name ~data
~flavor ~trace_id span =
CB1.on_exit_manual_span s1 ~time_ns ~tid ~name ~data ~flavor ~trace_id
span;
CB2.on_exit_manual_span s2 ~time_ns ~tid ~name ~data ~flavor ~trace_id
span
end
end

(** [tee s1 s2] is a subscriber that forwards every
call to [s1] and [s2] both. *)
let tee (s1 : t) (s2 : t) : t =
let st = s1, s2 in
Sub { st; callbacks = (module Tee_cb) }
1 change: 1 addition & 0 deletions src/subscriber/thread_.dummy.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
let[@inline] get_tid () = 0
2 changes: 2 additions & 0 deletions src/subscriber/thread_.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
val get_tid : unit -> int
(** Get current thread ID *)
1 change: 1 addition & 0 deletions src/subscriber/thread_.real.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
let[@inline] get_tid () = Thread.id @@ Thread.self ()
1 change: 1 addition & 0 deletions src/subscriber/time_.dummy.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
let[@inline] get_time_ns () : float = 0.
Loading