forked from mirage/capnp-rpc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcapnp_rpc_lwt.mli
343 lines (268 loc) · 15.3 KB
/
capnp_rpc_lwt.mli
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
(** Cap'n Proto RPC using the Cap'n Proto serialisation and Lwt for concurrency. *)
open Capnp.RPC
include (module type of Capnp.BytesMessage)
module StructRef : sig
(** A promise for a response structure.
You can use the generated [_get_pipelined] functions on a promise to get
a promise for a capability inside the promise, and then pipeline messages
to that promise without waiting for the response struct to arrive. *)
type 'a t
(** An ['a t] is a reference to a response message (that may not have arrived yet)
with content type ['a]. *)
val inc_ref : 'a t -> unit
(** [inc_ref t] increases the reference count on [t] by one. *)
val dec_ref : 'a t -> unit
(** [dec_ref t] reduces the reference count on [t] by one.
When the count reaches zero, this result must never be used again.
If the results have not yet arrived when the count reaches zero, we send
a cancellation request (which may or may not succeed). As soon as the
results are available, they are released. *)
end
module Capability : sig
(** A capability is a reference to an object, or to a promise for an object.
You can invoke methods on a capability even while it is still only a
promise. *)
type +'a t
(** An ['a t] is a capability reference to a service of type ['a]. *)
val broken : Capnp_rpc.Exception.t -> 'a t
(** [broken ex] is a broken capability, with problem [ex].
Any attempt to call methods on it will fail with [ex]. *)
val when_broken : (Capnp_rpc.Exception.t -> unit) -> 'a t -> unit
(** [when_broken fn x] calls [fn problem] when [x] becomes broken.
If [x] is already broken, [fn] is called immediately.
If [x] can never become broken (e.g. it is a near ref), this does nothing.
If [x]'s ref-count reaches zero without [fn] being called, it will never
be called. *)
val when_released : 'a t -> (unit -> unit) -> unit
(** [when_released t fn] will call [fn ()] when [t]'s ref-count drops to zero.
This is used for caches, to remove entries when they become invalid.
For promises, [fn] will be transferred to the resolution if resolved.
For broken caps, this method does nothing (exceptions are never released). *)
val problem : 'a t -> Capnp_rpc.Exception.t option
(** [problem t] is [Some ex] if [t] is broken, or [None] if it is still
believed to be healthy. Once a capability is broken, it will never
work again and any calls made on it will fail with exception [ex]. *)
val await_settled : 'a t -> (unit, Capnp_rpc.Exception.t) Lwt_result.t
(** [await_settled t] resolves once [t] is a "settled" (non-promise) reference.
If [t] is a near, far or broken reference, this returns immediately.
If it is currently a local or remote promise, it waits until it isn't.
[wait_until_settled] takes ownership of [t] until it returns (you must not
[dec_ref] it before then).
@return [Ok ()] on success, or [Error _] if [t] failed.
@since 1.2 *)
val await_settled_exn : 'a t -> unit Lwt.t
(** Like [await_settled], but raises an exception on error.
@since 1.2 *)
val wait_until_settled : 'a t -> unit Lwt.t
[@@deprecated "Use await_settled instead."]
val equal : 'a t -> 'a t -> (bool, [`Unsettled]) result
(** [equal a b] indicates whether [a] and [b] designate the same settled service.
Returns [Error `Unsettled] if [a] or [b] is still a promise (and they therefore
may yet turn out to be equal when the promise resolves). *)
module Request : sig
type 'a t
(** An ['a t] is a builder for the out-going request's payload. *)
val create : ?message_size:int -> (Capnp.Message.rw Slice.t -> 'a) -> 'a t * 'a
(** [create init] is a fresh request payload and contents builder.
Use one of the generated [init_pointer] functions for [init].
@param message_size An estimate of the size of the payload. If this is too small,
additional segments will be allocated automatically, but this
is less efficient than getting the size right to start with. *)
val create_no_args : unit -> 'a t
(** [create_no_args ()] is a payload with no content. *)
val release : 'a t -> unit
(** Clear the exported refs, dropping their ref-counts. This is called automatically
when you send a message, but you might need it if you decide to abort. *)
end
val call : 't t -> ('t, 'a, 'b) MethodID.t -> 'a Request.t -> 'b StructRef.t
(** [call target m req] invokes [target#m req] and returns a promise for the result.
Messages may be sent to the capabilities that will be in the result
before the result arrives - they will be pipelined to the service
responsible for resolving the promise. The caller must call [StructRef.dec_ref]
when finished with the result (consider using one of the [call_*] functions below
instead for a simpler interface). *)
val call_and_wait : 't t -> ('t, 'a, 'b StructStorage.reader_t) MethodID.t ->
'a Request.t -> (('b StructStorage.reader_t * (unit -> unit)), [> `Capnp of Capnp_rpc.Error.t]) Lwt_result.t
(** [call_and_wait t m req] does [call t m req] and waits for the response.
This is simpler than using [call], but doesn't support pipelining
(you can't use any capabilities in the response in another message until the
response arrives).
On success, it returns [Ok (response, release_response_caps)].
Call [release_response_caps] when done with the results, to release any capabilities it might
contain that you didn't use (remembering that future versions of the protocol might add
new optional capabilities you don't know about yet).
If you don't need any capabilities from the result, consider using [call_for_value] instead.
Doing [Lwt.cancel] on the result will send a cancel message to the target
for remote calls. *)
val call_for_value : 't t -> ('t, 'a, 'b StructStorage.reader_t) MethodID.t ->
'a Request.t -> ('b StructStorage.reader_t, [> `Capnp of Capnp_rpc.Error.t]) Lwt_result.t
(** [call_for_value t m req] is similar to [call_and_wait], but automatically
releases any capabilities in the response before returning. Use this if
you aren't expecting any capabilities in the response. *)
val call_for_value_exn : 't t -> ('t, 'a, 'b StructStorage.reader_t) MethodID.t ->
'a Request.t -> 'b StructStorage.reader_t Lwt.t
(** Wrapper for [call_for_value] that turns errors into Lwt failures. *)
val call_for_unit : 't t -> ('t, 'a, 'b StructStorage.reader_t) MethodID.t ->
'a Request.t -> (unit, [> `Capnp of Capnp_rpc.Error.t]) Lwt_result.t
(** Wrapper for [call_for_value] that ignores the result. *)
val call_for_unit_exn : 't t -> ('t, 'a, 'b StructStorage.reader_t) MethodID.t ->
'a Request.t -> unit Lwt.t
(** Wrapper for [call_for_unit] that turns errors into Lwt failures. *)
val call_for_caps : 't t -> ('t, 'a, 'b StructStorage.reader_t) MethodID.t ->
'a Request.t -> ('b StructRef.t -> 'c) -> 'c
(** [call_for_caps target m req extract] is a wrapper for [call] that passes the results promise to
[extract], which should extract any required capability promises from it.
In the common case where you want a single cap "foo" from the result, use
[call_for_caps target m req Results.foo_get_pipelined].
When the remote call finally returns, the result will be released automatically. *)
type 'a resolver
(** An ['a resolver] can be used to resolve a promise for an ['a]. It can only be used once. *)
val promise : unit -> 'a t * 'a resolver
(** [promise ()] returns a fresh local promise and a resolver for it.
Any calls made on the promise will be queued until it is resolved. *)
val resolve_ok : 'a resolver -> 'a t -> unit
(** [resolve_ok r x] resolves [r]'s promise to [x]. [r] takes ownership of [x]
(the caller must use [inc_ref] first if they want to continue using it). *)
val resolve_exn : 'a resolver -> Capnp_rpc.Exception.t -> unit
(** [resolve_exn r x] breaks [r]'s promise with exception [x]. *)
val inc_ref : _ t -> unit
(** [inc_ref t] increases the ref-count on [t] by one. *)
val dec_ref : _ t -> unit
(** [dec_ref t] decreases the ref-count on [t] by one. When the count reaches zero,
the capability is released. This may involve sending a notification to a remote
peer. Any time you extract a capability from a struct or struct promise,
it must eventually be freed by calling [dec_ref] on it. *)
val with_ref : 'a t -> ('a t -> 'b Lwt.t) -> 'b Lwt.t
(** [with_ref t fn] runs [fn t] and then calls [dec_ref t] (whether [fn]
succeeds or not). *)
val pp : 'a t Fmt.t
end
module Sturdy_ref : sig
type +'a t
(** An off-line (persistent) capability reference.
A sturdy ref contains all the information necessary to get a live reference to a service:
- The network address of the hosting vat (e.g. TCP host and port)
- A way to authenticate the hosting vat (e.g. a fingerprint of the vat's public key)
- A way to identify the target service within the vat and prove permission to access it
(e.g. a "Swiss number")
*)
val connect : 'a t -> ('a Capability.t, Capnp_rpc.Exception.t) result Lwt.t
(** [connect t] returns a live reference to [t]'s service. *)
val connect_exn : 'a t -> 'a Capability.t Lwt.t
(** [connect_exn] is a wrapper for [connect] that returns a failed Lwt thread on error. *)
val with_cap :
'a t ->
('a Capability.t -> ('b, [> `Capnp of Capnp_rpc.Exception.t] as 'e) Lwt_result.t) ->
('b, 'e) Lwt_result.t
(** [with_cap t f] uses [connect t] to get a live-ref [x],
then does [Capability.with_ref x f]. *)
val with_cap_exn : 'a t -> ('a Capability.t -> 'b Lwt.t) -> 'b Lwt.t
(** [with_cap_exn t f] uses [connect_exn t] to get a live-ref [x],
then does [Capability.with_ref x f]. *)
val reader :
('a StructStorage.reader_t -> Capnp.MessageSig.ro Slice.t option) ->
'a StructStorage.reader_t -> Uri.t
(** [reader accessor] is a field accessor for reading a sturdy ref.
e.g. if [sr_get] is a generated field accessor for an AnyPointer field, then
[reader Reader.Struct.sr_get] is an accessor that treats it as a SturdyRef field.
todo: This should really return a sturdy ref, not a URI, but that requires a change
to the spec to add a sturdy ref cap-descriptor table entry type. *)
val builder :
('a StructStorage.builder_t -> Capnp.MessageSig.rw Slice.t) ->
'a StructStorage.builder_t -> _ t -> unit
(** [builder setter] converts a generated AnyPointer field setter [setter] to a SturdyRef
setter. Use it to add a SturdyRef to a message with [builder Params.sr_get params sr]. *)
val cast : 'a t -> 'b t
end
module Service : sig
(** Functions for service implementors. *)
type ('a, 'b) method_t = 'a -> (unit -> unit) -> 'b StructRef.t
(** An ('a, 'b) method_t is a method implementation that takes
a reader for the parameters and
a function to release the capabilities in the parameters,
and returns a promise for the results. *)
module Response : sig
type 'b t
(** An ['a t] is a builder for the out-going response. *)
val create : ?message_size:int -> (Capnp.Message.rw Slice.t -> 'a) -> 'a t * 'a
(** [create init] is a fresh response and contents builder.
Use one of the generated [init_pointer] functions for [init]. *)
val create_empty : unit -> 'a t
(** [empty ()] is an empty response. *)
val release : 'a t -> unit
(** Clear the exported refs, dropping their ref-counts. This is called automatically
when you send a message, but you might need it if you decide to abort. *)
end
val return : 'a Response.t -> 'a StructRef.t
(** [return r] wraps up a simple local result as a promise. *)
val return_empty : unit -> 'a StructRef.t
(** [return_empty ()] is a promise for a response with no payload. *)
val return_lwt : (unit -> ('a Response.t, [< `Capnp of Capnp_rpc.Error.t]) Lwt_result.t) -> 'a StructRef.t
(** [return_lwt fn] is a local promise for the result of Lwt thread [fn ()].
If [fn ()] fails, the error is logged and an "Internal error" returned to the caller.
If it returns an [Error] value then that error is returned to the caller.
Note that this does not support pipelining (any messages sent to the response
will be queued locally until it [fn] has produced a result), so it may be better
to return immediately a result containing a promise in some cases. *)
val fail : ?ty:Capnp_rpc.Exception.ty -> ('a, Format.formatter, unit, 'b StructRef.t) format4 -> 'a
(** [fail msg] is an exception with reason [msg]. *)
val fail_lwt :
?ty:Capnp_rpc.Exception.ty ->
('a, Format.formatter, unit, (_, [> `Capnp of Capnp_rpc.Error.t]) Lwt_result.t) format4 ->
'a
(** [fail_lwt msg] is like [fail msg], but can be used with [return_lwt]. *)
end
(**/**)
module Untyped : sig
(** This module is only for use by the code generated by the capnp-ocaml
schema compiler. The generated code provides type-safe wrappers for
everything here. *)
type abstract_method_t
val abstract_method : ('a StructStorage.reader_t, 'b) Service.method_t -> abstract_method_t
val struct_field : 'a StructRef.t -> int -> 'b StructRef.t
val capability_field : 'a StructRef.t -> int -> 'b Capability.t
class type generic_service = object
method dispatch : interface_id:Uint64.t -> method_id:int -> abstract_method_t
method release : unit
method pp : Format.formatter -> unit
end
val local : #generic_service -> 'a Capability.t
val get_cap : Capnp.MessageSig.attachments -> Uint32.t -> _ Capability.t
val add_cap : Capnp.MessageSig.attachments -> _ Capability.t -> Uint32.t
val clear_cap : Capnp.MessageSig.attachments -> Uint32.t -> unit
val unknown_interface : interface_id:Uint64.t -> abstract_method_t
val unknown_method : interface_id:Uint64.t -> method_id:int -> abstract_method_t
end
module Private = Private
module Cast : sig
val cap_of_raw : Capnp_core.Core_types.cap -> 'a Capability.t
val cap_to_raw : 'a Capability.t -> Capnp_core.Core_types.cap
val sturdy_of_raw : Capnp_core.sturdy_ref -> 'a Sturdy_ref.t
val sturdy_to_raw : 'a Sturdy_ref.t -> Capnp_core.sturdy_ref
end
(**/**)
module Persistence : sig
class type ['a] persistent = object
method save : ('a Sturdy_ref.t, Capnp_rpc.Exception.t) result Lwt.t
end
val with_persistence :
('a #persistent) ->
('impl -> 'a Capability.t) ->
(#Untyped.generic_service as 'impl) ->
'a Capability.t
(** [with_persistence persist Service.Foo.local obj] is like [Service.Foo.local obj], but the
resulting service also handles the Cap'n Proto persistence protocol, using [persist]. *)
val with_sturdy_ref :
'a Sturdy_ref.t ->
('impl -> 'a Capability.t) ->
(#Untyped.generic_service as 'impl) ->
'a Capability.t
(** [with_sturdy_ref sr Service.Foo.local obj] is like [Service.Foo.local obj],
but responds to [save] calls by returning [sr]. *)
val save : 'a Capability.t -> (Uri.t, [> `Capnp of Capnp_rpc.Error.t]) Lwt_result.t
(** [save cap] calls the persistent [save] method on [cap].
Note that not all capabilities can be saved.
todo: this should return an ['a Sturdy_ref.t]; see {!Sturdy_ref.reader}. *)
val save_exn : 'a Capability.t -> Uri.t Lwt.t
(** [save_exn] is a wrapper for [save] that returns a failed thread on error. *)
end