diff --git a/sihl-queue/src/sihl_queue.ml b/sihl-queue/src/sihl_queue.ml index 324645b6..a3e91242 100644 --- a/sihl-queue/src/sihl_queue.ml +++ b/sihl-queue/src/sihl_queue.ml @@ -66,7 +66,7 @@ module Make (Repo : Repo.Sig) : Sihl.Contract.Queue.Sig = struct let registered_jobs : job' list ref = ref [] let stop_schedule : (unit -> unit) option ref = ref None - let dispatch ?ctx ?delay input (job : 'a job) = + let dispatch ?callback ?ctx ?delay input (job : 'a job) = let open Sihl.Contract.Queue in let config = Sihl.Configuration.read schema in let force_async = config.force_async in @@ -76,7 +76,10 @@ module Make (Repo : Repo.Sig) : Sihl.Contract.Queue.Sig = struct Logs.debug (fun m -> m "Dispatching job %s" name); let now = Ptime_clock.now () in let job_instance = create_instance ?ctx input delay now job in - Repo.enqueue ?ctx job_instance) + let%lwt () = Repo.enqueue ?ctx job_instance in + match callback with + | None -> Lwt.return () + | Some callback -> callback job_instance) else ( Logs.info (fun m -> m "Skipping queue in development environment"); match%lwt job.handle ?ctx input with @@ -86,7 +89,7 @@ module Make (Repo : Repo.Sig) : Sihl.Contract.Queue.Sig = struct Lwt.return ()) ;; - let dispatch_all ?ctx ?delay inputs job = + let dispatch_all ?callback ?ctx ?delay inputs job = let config = Sihl.Configuration.read schema in let force_async = config.force_async in if Sihl.Configuration.is_production () || force_async @@ -97,7 +100,10 @@ module Make (Repo : Repo.Sig) : Sihl.Contract.Queue.Sig = struct let job_instances = List.map (fun input -> create_instance ?ctx input delay now job) inputs in - Repo.enqueue_all ?ctx job_instances) + let%lwt () = Repo.enqueue_all ?ctx job_instances in + match callback with + | None -> Lwt.return () + | Some callback -> Lwt_list.iter_s callback job_instances) else ( Logs.info (fun m -> m "Skipping queue in development environment"); let rec loop inputs = diff --git a/sihl/src/contract_queue.ml b/sihl/src/contract_queue.ml index ae169c49..6495fcba 100644 --- a/sihl/src/contract_queue.ml +++ b/sihl/src/contract_queue.ml @@ -133,6 +133,9 @@ module type Sig = sig (** [dispatch ?ctx ?delay input job] queues [job] for later processing and returns [unit Lwt.t] once the job has been queued. + An optional [callback] function that will be called after the job has been + enqueued. + An optional [delay] determines the amount of time from now (when dispatch is called) up until the job can be run. If no delay is specified, the job is processed as soon as possible. @@ -140,7 +143,8 @@ module type Sig = sig [input] is the input of the [handle] function which is used for job processing. *) val dispatch - : ?ctx:(string * string) list + : ?callback:(instance -> unit Lwt.t) + -> ?ctx:(string * string) list -> ?delay:Ptime.span -> 'a -> 'a job @@ -154,6 +158,9 @@ module type Sig = sig If the queue backend supports transactions, [dispatch_all] guarantees that either none or all jobs are queued. + An optional [callback] function that will be called after the jobs have been + enqueued. + An optional [delay] determines the amount of time from now (when dispatch is called) up until the jobs can be run. If no delay is specified, the jobs are processed as soon as possible. @@ -161,7 +168,8 @@ module type Sig = sig [inputs] is the input of the [handle] function. It is a list of ['a], one for each ['a job] instance. *) val dispatch_all - : ?ctx:(string * string) list + : ?callback:(instance -> unit Lwt.t) + -> ?ctx:(string * string) list -> ?delay:Ptime.span -> 'a list -> 'a job diff --git a/sihl/test/web_csrf.ml b/sihl/test/web_csrf.ml index 5736beb5..563f8d66 100644 --- a/sihl/test/web_csrf.ml +++ b/sihl/test/web_csrf.ml @@ -245,7 +245,7 @@ let post_request_both_invalid_tokens_fails _ () = [ CCFun.id; Sihl.Test.Session.set_value_req [ csrf_name, "garbage" ] ] in (* Cartesian product 4 requests, invalid/empty cookie and request *) - let reqs = CCList.product CCFun.( @@ ) add_cookie requests in + let reqs = CCList.product ( @@ ) add_cookie requests in let allowed = ref 0 in let handler _ = allowed := !allowed + 1;