Skip to content

Commit

Permalink
move thread_local to core; adapt to trace 0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
c-cube committed Sep 21, 2023
1 parent ccfeb07 commit fca15fc
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 34 deletions.
2 changes: 1 addition & 1 deletion catapult.opam
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ bug-reports: "https://github.com/imandra-ai/catapult/issues"
depends: [
"dune" {>= "2.7"}
"base-threads"
"trace" {>= "0.3"}
"trace" {>= "0.4"}
"odoc" {with-doc}
"ocaml" {>= "4.08"}
]
Expand Down
2 changes: 1 addition & 1 deletion dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
mtime)
(depends
base-threads
(trace (>= 0.3))
(trace (>= 0.4))
(odoc :with-doc)
(ocaml (>= "4.08"))))

Expand Down
4 changes: 2 additions & 2 deletions src/client/connection.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
open Catapult_utils
module P = Catapult
module Atomic = P.Atomic_shim_
module Atomic = Catapult.Atomic_shim_
module Thread_local = Catapult.Thread_local

let ( let@ ) = ( @@ )
let default_addr = Endpoint_address.default
Expand Down
94 changes: 76 additions & 18 deletions src/core/adapt_backend.ml
Original file line number Diff line number Diff line change
@@ -1,22 +1,41 @@
module Trace = Trace_core
module A = Atomic_shim_
module TLS = Thread_local

module type BACKEND = Backend.S
module type COLLECTOR = Trace.Collector.S

module Span_tbl = Hashtbl.Make (struct
type t = int64

let equal = Int64.equal
let hash = Hashtbl.hash
end)

type backend = (module BACKEND)

open Event_type

let pid = Unix.getpid ()
let now_ = Clock.now_us

type full_arg = [ `Float of float | Trace.user_data ]
type full_arg = Trace.user_data

(** Counter used to allocate fresh spans *)
let span_gen_ = A.make 0

let span_gen_ = Atomic_shim_.make 0
type span_info = { mutable data: (string * Trace.user_data) list }
(** Information for an in-flight span *)

let k_span_info : (string * [ `Sync | `Async ]) Trace.Meta_map.Key.t =
(** Key for accessing span info in manual spans *)
let k_span_info : (string * [ `Sync | `Async ] * span_info) Trace.Meta_map.Key.t
=
Trace.Meta_map.Key.create ()

(** per-thread table to access span info from implicit spans *)
let span_info_tbl_ : span_info Span_tbl.t TLS.t =
TLS.create ~init:(fun ~t_id:_ -> Span_tbl.create 8) ~close:ignore ()

module Mk_collector (B : BACKEND) : COLLECTOR = struct
(** actually emit an event via the backend *)
let[@inline never] emit_real_ ?ts_us ?cat ?(pid = pid)
Expand All @@ -32,45 +51,84 @@ module Mk_collector (B : BACKEND) : COLLECTOR = struct

let with_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name f =
let start = now_ () in
let sp = Trace.Collector.dummy_span in
let span = A.fetch_and_add span_gen_ 1 |> Int64.of_int in

let info_tbl = TLS.get_or_create span_info_tbl_ in
let info = { data } in
Span_tbl.add info_tbl span info;

let finally () : unit =
let now = now_ () in
let dur = now -. start in
let args =
(data
: (string * Trace_core.user_data) list
(info.data
: (string * Trace.user_data) list
:> (string * full_arg) list)
in
Span_tbl.remove info_tbl span;
emit_real_ ~args name ~ts_us:start ~dur X
in
Fun.protect ~finally (fun () -> f sp)

try
let x = f span in
finally ();
x
with e ->
let bt = Printexc.get_raw_backtrace () in
finally ();
Printexc.raise_with_backtrace e bt

let add_data_to_span span data =
if data <> [] then (
let info_tbl = TLS.get_or_create span_info_tbl_ in
match Span_tbl.find_opt info_tbl span with
| None -> ()
| Some info -> info.data <- List.rev_append data info.data
)

let enter_manual_span ~parent ~flavor ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data
name : Trace_core.explicit_span =
let span = Int64.of_int (Atomic_shim_.fetch_and_add span_gen_ 1) in
name : Trace.explicit_span =
let span = Int64.of_int (A.fetch_and_add span_gen_ 1) in
let flavor = Option.value ~default:`Sync flavor in
let args =
(data : (string * Trace_core.user_data) list :> (string * full_arg) list)
(data : (string * Trace.user_data) list :> (string * full_arg) list)
in
(match flavor with
| `Sync ->
emit_real_ ~cat:[ "async" ] ~args name ~id:(Int64.to_string span) B
| `Async ->
emit_real_ ~cat:[ "async" ] ~args name ~id:(Int64.to_string span) A_b);
let meta = Trace_core.Meta_map.(empty |> add k_span_info (name, flavor)) in
{ Trace_core.span; meta }
let meta =
(* [data] has already been emitted on entry, so we only store additional
meta data in this *)
let info = { data = [] } in
Trace.Meta_map.(empty |> add k_span_info (name, flavor, info))
in
{ Trace.span; meta }

let exit_manual_span (es : Trace_core.explicit_span) : unit =
let name, flavor = Trace_core.Meta_map.find_exn k_span_info es.meta in
let exit_manual_span (es : Trace.explicit_span) : unit =
let name, flavor, info = Trace.Meta_map.find_exn k_span_info es.meta in
(* emit data added after span creation *)
let args =
(info.data : (string * Trace.user_data) list :> (string * full_arg) list)
in
match flavor with
| `Sync -> emit_real_ name E
| `Sync -> emit_real_ ~args name E
| `Async ->
emit_real_ ~cat:[ "async" ] name ~id:(es.span |> Int64.to_string) A_e
emit_real_ ~cat:[ "async" ] ~args name
~id:(es.span |> Int64.to_string)
A_e

let add_data_to_manual_span (es : Trace.explicit_span) data =
if data <> [] then (
let _, _, info = Trace.Meta_map.find_exn k_span_info es.meta in
info.data <- List.rev_append data info.data
)

let counter_int name n : unit = emit_real_ "counter" C ~args:[ name, `Int n ]
let counter_int ~data:_ name n : unit =
emit_real_ "counter" C ~args:[ name, `Int n ]

let counter_float name n : unit =
let counter_float ~data:_ name n : unit =
emit_real_ "counter" C ~args:[ name, `Float n ]

let message ?span:_ ~data msg : unit =
Expand Down
2 changes: 1 addition & 1 deletion src/core/backend.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
if no backend is installed, the tracing functions will do nothing.
*)

type arg = [ `Float of float | Trace_core.user_data ]
type arg = Trace_core.user_data

module type S = sig
val emit :
Expand Down
1 change: 1 addition & 0 deletions src/core/catapult.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ let trace_collector_of_backend : backend -> Trace_core.collector =

module Atomic_shim_ = Atomic_shim_
module Clock = Clock
module Thread_local = Thread_local

(**/**)
3 changes: 1 addition & 2 deletions src/utils/thread_local.ml → src/core/thread_local.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
module P = Catapult
module Atomic = P.Atomic_shim_
module Atomic = Atomic_shim_

(* emulate thread local storage *)
module Int_map = Map.Make (struct
Expand Down
File renamed without changes.
11 changes: 6 additions & 5 deletions src/sqlite/backend.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
open Catapult_utils
module Atomic = Catapult.Atomic_shim_
module Clock = Catapult.Clock
module TLS = Catapult.Thread_local

type event = Ser.Event.t

Expand Down Expand Up @@ -45,19 +46,19 @@ module Make (A : ARG) : Catapult.BACKEND = struct
)

(* per-thread buffer *)
let buf : local_buf Thread_local.t =
Thread_local.create
let buf : local_buf TLS.t =
TLS.create
~init:(fun ~t_id ->
{ t_id; buf = Buffer.create 1024; n_evs = 0; evs = [] })
~close:flush_batch ()

let teardown () =
Thread_local.clear buf;
TLS.clear buf;
Writer.close writer

let tick () =
let now = Clock.now_us () in
Thread_local.iter buf ~f:(check_batch ~now)
TLS.iter buf ~f:(check_batch ~now)

module Out = Catapult_utils.Json_out

Expand All @@ -79,7 +80,7 @@ module Make (A : ARG) : Catapult.BACKEND = struct
let emit ~id ~name ~ph ~tid ~pid ~cat ~ts_us ~args ~stack ~dur ?extra () :
unit =
(* access local buffer to write and add to batch *)
let lbuf = Thread_local.get_or_create buf in
let lbuf = TLS.get_or_create buf in

let j =
let buf = lbuf.buf in
Expand Down
2 changes: 1 addition & 1 deletion src/utils/catapult_utils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ module Ev_to_json = Ev_to_json
module Gc_stats = Gc_stats
module Json_out = Json_out
module Ser = Ser
module Thread_local = Thread_local
module Thread_local = Catapult.Thread_local
3 changes: 2 additions & 1 deletion tests/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

(tests
(names t_thread_local)
(libraries threads catapult catapult.utils))
(package catapult)
(libraries threads catapult))
3 changes: 1 addition & 2 deletions tests/t_thread_local.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
open Catapult_utils
module TL = Thread_local
module TL = Catapult.Thread_local

let tl : int ref TL.t =
TL.create ~init:(fun ~t_id:_ -> ref 0) ~close:(fun _ -> ()) ()
Expand Down

0 comments on commit fca15fc

Please sign in to comment.