-
Notifications
You must be signed in to change notification settings - Fork 177
/
Copy pathlwt_unix.cppo.ml
2591 lines (2168 loc) · 74.5 KB
/
lwt_unix.cppo.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)
(* [Lwt_sequence] is deprecated – we don't want users outside Lwt using it.
However, it is still used internally by Lwt. So, briefly disable warning 3
("deprecated"), and create a local, non-deprecated alias for
[Lwt_sequence] that can be referred to by the rest of the code in this
module without triggering any more warnings. *)
module Lwt_sequence = Lwt_sequence
open Lwt.Infix
(* +-----------------------------------------------------------------+
| Configuration |
+-----------------------------------------------------------------+ *)
type async_method =
| Async_none
| Async_detach
| Async_switch
let default_async_method_var = ref Async_detach
let () =
try
match Sys.getenv "LWT_ASYNC_METHOD" with
| "none" ->
default_async_method_var := Async_none
| "detach" ->
default_async_method_var := Async_detach
| "switch" ->
default_async_method_var := Async_switch
| str ->
Printf.eprintf
"%s: invalid lwt async method: '%s', must be 'none', 'detach' or 'switch'\n%!"
(Filename.basename Sys.executable_name) str
with Not_found ->
()
let default_async_method () = !default_async_method_var
let set_default_async_method am = default_async_method_var := am
let async_method_key = Lwt.new_key ()
let async_method () =
match Lwt.get async_method_key with
| Some am -> am
| None -> !default_async_method_var
let with_async_none f =
Lwt.with_value async_method_key (Some Async_none) f
let with_async_detach f =
Lwt.with_value async_method_key (Some Async_detach) f
let with_async_switch f =
Lwt.with_value async_method_key (Some Async_switch) f
(* +-----------------------------------------------------------------+
| Notifications management |
+-----------------------------------------------------------------+ *)
(* Informations about a notifier *)
type notifier = {
notify_handler : unit -> unit;
(* The callback *)
notify_once : bool;
(* Whether to remove the notifier after the reception of the first
notification *)
}
module Notifiers = Hashtbl.Make(struct
type t = int
let equal (x : int) (y : int) = x = y
let hash (x : int) = x
end)
let notifiers = Notifiers.create 1024
(* See https://github.com/ocsigen/lwt/issues/277 and
https://github.com/ocsigen/lwt/pull/278. *)
let current_notification_id = ref (0x7FFFFFFF - 1000)
let rec find_free_id id =
if Notifiers.mem notifiers id then
find_free_id (id + 1)
else
id
let make_notification ?(once=false) f =
let id = find_free_id (!current_notification_id + 1) in
current_notification_id := id;
Notifiers.add notifiers id { notify_once = once; notify_handler = f };
id
let stop_notification id =
Notifiers.remove notifiers id
let set_notification id f =
let notifier = Notifiers.find notifiers id in
Notifiers.replace notifiers id { notifier with notify_handler = f }
let call_notification id =
match Notifiers.find notifiers id with
| exception Not_found -> ()
| notifier ->
if notifier.notify_once then
stop_notification id;
notifier.notify_handler ()
(* +-----------------------------------------------------------------+
| Sleepers |
+-----------------------------------------------------------------+ *)
let sleep delay =
let waiter, wakener = Lwt.task () in
let ev = Lwt_engine.on_timer delay false (fun ev -> Lwt_engine.stop_event ev; Lwt.wakeup wakener ()) in
Lwt.on_cancel waiter (fun () -> Lwt_engine.stop_event ev);
waiter
let yield = Lwt.pause
let auto_yield timeout =
let limit = ref (Unix.gettimeofday () +. timeout) in
fun () ->
let current = Unix.gettimeofday () in
if current >= !limit then begin
limit := current +. timeout;
yield ();
end else
Lwt.return_unit
let auto_pause timeout =
let limit = ref (Unix.gettimeofday () +. timeout) in
fun () ->
let current = Unix.gettimeofday () in
if current >= !limit then begin
limit := current +. timeout;
Lwt.pause ();
end else
Lwt.return_unit
exception Timeout
let timeout d = sleep d >>= fun () -> raise Timeout
let with_timeout d f = Lwt.pick [timeout d; Lwt.apply f ()]
(* +-----------------------------------------------------------------+
| Jobs |
+-----------------------------------------------------------------+ *)
type 'a job
external start_job : 'a job -> async_method -> bool = "lwt_unix_start_job"
(* Starts the given job with given parameters. It returns [true]
if the job is already terminated. *)
external check_job : 'a job -> int -> bool = "lwt_unix_check_job" "noalloc"
(* Check whether that a job has terminated or not. If it has not
yet terminated, it is marked so it will send a notification
when it finishes. *)
[@@ocaml.warning "-3"]
(* For all running job, a waiter and a function to abort it. *)
let jobs = Lwt_sequence.create ()
let rec abort_jobs exn =
match Lwt_sequence.take_opt_l jobs with
| Some (_, f) -> f exn; abort_jobs exn
| None -> ()
let cancel_jobs () = abort_jobs Lwt.Canceled
let wait_for_jobs () =
Lwt.join (Lwt_sequence.fold_l (fun (w, _) l -> w :: l) jobs [])
let wrap_result f x =
try
Result.Ok (f x)
with exn when Lwt.Exception_filter.run exn ->
Result.Error exn
let run_job_aux async_method job result =
(* Starts the job. *)
if start_job job async_method then
(* The job has already terminated, read and return the result
immediately. *)
Lwt.of_result (result job)
else begin
(* Thread for the job. *)
let waiter, wakener = Lwt.wait () in
(* Add the job to the sequence of all jobs. *)
let node = Lwt_sequence.add_l (
(waiter >>= fun _ -> Lwt.return_unit),
(fun exn -> if Lwt.state waiter = Lwt.Sleep then Lwt.wakeup_exn wakener exn))
jobs in
ignore begin
(* Create the notification for asynchronous wakeup. *)
let id =
make_notification ~once:true
(fun () ->
Lwt_sequence.remove node;
let result = result job in
if Lwt.state waiter = Lwt.Sleep then Lwt.wakeup_result wakener result)
in
(* Give the job some time before we fallback to asynchronous
notification. *)
Lwt.pause () >>= fun () ->
(* The job has terminated, send the result immediately. *)
if check_job job id then call_notification id;
Lwt.return_unit
end;
waiter
end
let choose_async_method = function
| Some async_method ->
async_method
| None ->
match Lwt.get async_method_key with
| Some am -> am
| None -> !default_async_method_var
let execute_job ?async_method ~job ~result ~free =
let async_method = choose_async_method async_method in
run_job_aux async_method job (fun job -> let x = wrap_result result job in free job; x)
[@@ocaml.warning "-16"]
external self_result : 'a job -> 'a = "lwt_unix_self_result"
(* returns the result of a job using the [result] field of the C
job structure. *)
external run_job_sync : 'a job -> 'a = "lwt_unix_run_job_sync"
(* Exeuctes a job synchronously and returns its result. *)
let self_result job =
try
Result.Ok (self_result job)
with exn when Lwt.Exception_filter.run exn ->
Result.Error exn
let in_retention_test = ref false
let retained o =
let retained = ref true in
Gc.finalise (fun _ ->
if !in_retention_test then
retained := false)
o;
in_retention_test := true;
retained
let run_job ?async_method job =
if !in_retention_test then begin
Gc.full_major ();
in_retention_test := false
end;
let async_method = choose_async_method async_method in
if async_method = Async_none then
try
Lwt.return (run_job_sync job)
with exn when Lwt.Exception_filter.run exn ->
Lwt.fail exn
else
run_job_aux async_method job self_result
(* +-----------------------------------------------------------------+
| File descriptor wrappers |
+-----------------------------------------------------------------+ *)
type state = Opened | Closed | Aborted of exn
type file_descr = {
fd : Unix.file_descr;
(* The underlying unix file descriptor *)
mutable state: state;
(* The state of the file descriptor *)
mutable set_flags : bool;
(* Whether to set file flags *)
mutable blocking : bool Lwt.t Lazy.t;
(* Is the file descriptor in blocking or non-blocking mode *)
mutable event_readable : Lwt_engine.event option;
(* The event used to check the file descriptor for readability. *)
mutable event_writable : Lwt_engine.event option;
(* The event used to check the file descriptor for writability. *)
hooks_readable : (unit -> unit) Lwt_sequence.t;
(* Hooks to call when the file descriptor becomes readable. *)
hooks_writable : (unit -> unit) Lwt_sequence.t;
(* Hooks to call when the file descriptor becomes writable. *)
}
external is_socket : Unix.file_descr -> bool = "lwt_unix_is_socket" "noalloc"
[@@ocaml.warning "-3"]
external guess_blocking_job : Unix.file_descr -> bool job = "lwt_unix_guess_blocking_job"
let guess_blocking fd =
run_job (guess_blocking_job fd)
let is_blocking ?blocking ?(set_flags=true) fd =
if Sys.win32 then begin
if is_socket fd then
match blocking, set_flags with
| Some state, false ->
lazy(Lwt.return state)
| Some true, true ->
lazy(Unix.clear_nonblock fd;
Lwt.return_true)
| Some false, true ->
lazy(Unix.set_nonblock fd;
Lwt.return_false)
| None, false ->
lazy(Lwt.return_false)
| None, true ->
lazy(Unix.set_nonblock fd;
Lwt.return_false)
else
match blocking with
| Some state ->
lazy(Lwt.return state)
| None ->
lazy(Lwt.return_true)
end else begin
match blocking, set_flags with
| Some state, false ->
lazy(Lwt.return state)
| Some true, true ->
lazy(Unix.clear_nonblock fd;
Lwt.return_true)
| Some false, true ->
lazy(Unix.set_nonblock fd;
Lwt.return_false)
| None, false ->
lazy(guess_blocking fd)
| None, true ->
lazy(guess_blocking fd >>= function
| true ->
Unix.clear_nonblock fd;
Lwt.return_true
| false ->
Unix.set_nonblock fd;
Lwt.return_false)
end
let mk_ch ?blocking ?(set_flags=true) fd = {
fd = fd;
state = Opened;
set_flags = set_flags;
blocking = is_blocking ?blocking ~set_flags fd;
event_readable = None;
event_writable = None;
hooks_readable = Lwt_sequence.create ();
hooks_writable = Lwt_sequence.create ();
}
let check_descriptor ch =
match ch.state with
| Opened ->
()
| Aborted e ->
raise e
| Closed ->
raise (Unix.Unix_error (Unix.EBADF, "check_descriptor", ""))
let state ch = ch.state
let blocking ch =
check_descriptor ch;
Lazy.force ch.blocking
let set_blocking ?(set_flags=true) ch blocking =
check_descriptor ch;
ch.set_flags <- set_flags;
ch.blocking <- is_blocking ~blocking ~set_flags ch.fd
external unix_stub_readable : Unix.file_descr -> bool = "lwt_unix_readable"
external unix_stub_writable : Unix.file_descr -> bool = "lwt_unix_writable"
let rec unix_readable fd =
try
if Sys.win32 then
Unix.select [fd] [] [] 0.0 <> ([], [], [])
else
unix_stub_readable fd
with Unix.Unix_error (Unix.EINTR, _, _) ->
unix_readable fd
let rec unix_writable fd =
try
if Sys.win32 then
Unix.select [] [fd] [] 0.0 <> ([], [], [])
else
unix_stub_writable fd
with Unix.Unix_error (Unix.EINTR, _, _) ->
unix_writable fd
let readable ch =
check_descriptor ch;
unix_readable ch.fd
let writable ch =
check_descriptor ch;
unix_writable ch.fd
let set_state ch st =
ch.state <- st
let clear_events ch =
Lwt_sequence.iter_node_l (fun node -> Lwt_sequence.remove node; Lwt_sequence.get node ()) ch.hooks_readable;
Lwt_sequence.iter_node_l (fun node -> Lwt_sequence.remove node; Lwt_sequence.get node ()) ch.hooks_writable;
begin
match ch.event_readable with
| Some ev ->
ch.event_readable <- None;
Lwt_engine.stop_event ev
| None ->
()
end;
begin
match ch.event_writable with
| Some ev ->
ch.event_writable <- None;
Lwt_engine.stop_event ev
| None ->
()
end
let abort ch e =
if ch.state <> Closed then begin
set_state ch (Aborted e);
clear_events ch
end
let unix_file_descr ch = ch.fd
let of_unix_file_descr = mk_ch
let stdin = of_unix_file_descr ~set_flags:false ~blocking:true Unix.stdin
let stdout = of_unix_file_descr ~set_flags:false ~blocking:true Unix.stdout
let stderr = of_unix_file_descr ~set_flags:false ~blocking:true Unix.stderr
(* +-----------------------------------------------------------------+
| Actions on file descriptors |
+-----------------------------------------------------------------+ *)
type io_event = Read | Write
exception Retry
exception Retry_write
exception Retry_read
type 'a outcome =
| Success of 'a
| Exn of exn
| Requeued of io_event
(* Wait a bit, then stop events that are no more used. *)
let stop_events ch =
Lwt.on_success
(Lwt.pause ())
(fun () ->
if Lwt_sequence.is_empty ch.hooks_readable then begin
match ch.event_readable with
| Some ev ->
ch.event_readable <- None;
Lwt_engine.stop_event ev
| None ->
()
end;
if Lwt_sequence.is_empty ch.hooks_writable then begin
match ch.event_writable with
| Some ev ->
ch.event_writable <- None;
Lwt_engine.stop_event ev
| None ->
()
end)
let register_readable ch =
if ch.event_readable = None then
ch.event_readable <- Some(Lwt_engine.on_readable ch.fd (fun _ -> Lwt_sequence.iter_l (fun f -> f ()) ch.hooks_readable))
let register_writable ch =
if ch.event_writable = None then
ch.event_writable <- Some(Lwt_engine.on_writable ch.fd (fun _ -> Lwt_sequence.iter_l (fun f -> f ()) ch.hooks_writable))
(* Retry a queued syscall, [wakener] is the thread to wakeup if the
action succeeds: *)
let rec retry_syscall node event ch wakener action =
let res =
try
check_descriptor ch;
Success(action ())
with
| Retry
| Unix.Unix_error((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _)
| Sys_blocked_io ->
(* EINTR because we are catching SIG_CHLD hence the system
call might be interrupted to handle the signal; this lets
us restart the system call eventually. *)
Requeued event
| Retry_read ->
Requeued Read
| Retry_write ->
Requeued Write
| e when Lwt.Exception_filter.run e ->
Exn e
in
match res with
| Success v ->
Lwt_sequence.remove !node;
stop_events ch;
Lwt.wakeup wakener v
| Exn e ->
Lwt_sequence.remove !node;
stop_events ch;
Lwt.wakeup_exn wakener e
| Requeued event' ->
if event <> event' then begin
Lwt_sequence.remove !node;
stop_events ch;
match event' with
| Read ->
node := Lwt_sequence.add_r (fun () -> retry_syscall node Read ch wakener action) ch.hooks_readable ;
register_readable ch
| Write ->
node := Lwt_sequence.add_r (fun () -> retry_syscall node Write ch wakener action) ch.hooks_writable;
register_writable ch
end
let dummy = Lwt_sequence.add_r ignore (Lwt_sequence.create ())
let register_action event ch action =
let waiter, wakener = Lwt.task () in
match event with
| Read ->
let node = ref dummy in
node := Lwt_sequence.add_r (fun () -> retry_syscall node Read ch wakener action) ch.hooks_readable;
Lwt.on_cancel waiter (fun () -> Lwt_sequence.remove !node; stop_events ch);
register_readable ch;
waiter
| Write ->
let node = ref dummy in
node := Lwt_sequence.add_r (fun () -> retry_syscall node Write ch wakener action) ch.hooks_writable;
Lwt.on_cancel waiter (fun () -> Lwt_sequence.remove !node; stop_events ch);
register_writable ch;
waiter
(* Wraps a system call *)
let wrap_syscall event ch action =
check_descriptor ch;
Lazy.force ch.blocking >>= fun blocking ->
try
if not blocking || (event = Read && unix_readable ch.fd) || (event = Write && unix_writable ch.fd) then
Lwt.return (action ())
else
register_action event ch action
with
| Retry
| Unix.Unix_error((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _)
| Sys_blocked_io ->
(* The action could not be completed immediately, register it: *)
register_action event ch action
| Retry_read ->
register_action Read ch action
| Retry_write ->
register_action Write ch action
| e when Lwt.Exception_filter.run e ->
Lwt.reraise e
(* +-----------------------------------------------------------------+
| Basic file input/output |
+-----------------------------------------------------------------+ *)
type open_flag =
Unix.open_flag =
| O_RDONLY
| O_WRONLY
| O_RDWR
| O_NONBLOCK
| O_APPEND
| O_CREAT
| O_TRUNC
| O_EXCL
| O_NOCTTY
| O_DSYNC
| O_SYNC
| O_RSYNC
| O_SHARE_DELETE
| O_CLOEXEC
| O_KEEPEXEC
external open_job : string -> Unix.open_flag list -> int -> (Unix.file_descr * bool) job = "lwt_unix_open_job"
let openfile name flags perms =
if Sys.win32 then
Lwt.return (of_unix_file_descr (Unix.openfile name flags perms))
else
run_job (open_job name flags perms) >>= fun (fd, blocking) ->
Lwt.return (of_unix_file_descr ~blocking fd)
external close_job : Unix.file_descr -> unit job = "lwt_unix_close_job"
let close ch =
if ch.state = Closed then check_descriptor ch;
set_state ch Closed;
clear_events ch;
if Sys.win32 then
Lwt.return (Unix.close ch.fd)
else
run_job (close_job ch.fd)
type bigarray =
(char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
let wait_read ch =
Lwt.catch
(fun () ->
if readable ch then
Lwt.return_unit
else
register_action Read ch ignore)
Lwt.reraise
external stub_read : Unix.file_descr -> Bytes.t -> int -> int -> int = "lwt_unix_read"
external read_job : Unix.file_descr -> Bytes.t -> int -> int -> int job = "lwt_unix_read_job"
external stub_pread :
Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int =
"lwt_unix_pread"
external pread_job :
Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int job =
"lwt_unix_pread_job"
let read ch buf pos len =
if pos < 0 || len < 0 || pos > Bytes.length buf - len then
invalid_arg "Lwt_unix.read"
else
Lazy.force ch.blocking >>= function
| true ->
wait_read ch >>= fun () ->
run_job (read_job ch.fd buf pos len)
| false ->
wrap_syscall Read ch (fun () -> stub_read ch.fd buf pos len)
let pread ch buf ~file_offset pos len =
if pos < 0 || len < 0 || pos > Bytes.length buf - len then
invalid_arg "Lwt_unix.pread"
else
Lazy.force ch.blocking >>= function
| true ->
wait_read ch >>= fun () ->
run_job (pread_job ch.fd buf ~file_offset pos len)
| false ->
wrap_syscall Read ch (fun () -> stub_pread ch.fd buf ~file_offset pos len)
external stub_read_bigarray :
Unix.file_descr -> bigarray -> int -> int -> int = "lwt_unix_bytes_read"
external read_bigarray_job :
Unix.file_descr -> bigarray -> int -> int -> int job =
"lwt_unix_bytes_read_job"
let read_bigarray function_name fd buf pos len =
if pos < 0 || len < 0 || pos > Bigarray.Array1.dim buf - len then
invalid_arg function_name
else
blocking fd >>= function
| true ->
wait_read fd >>= fun () ->
run_job (read_bigarray_job (unix_file_descr fd) buf pos len)
| false ->
wrap_syscall Read fd (fun () ->
stub_read_bigarray (unix_file_descr fd) buf pos len)
let wait_write ch =
Lwt.catch
(fun () ->
if writable ch then
Lwt.return_unit
else
register_action Write ch ignore)
Lwt.reraise
external stub_write : Unix.file_descr -> Bytes.t -> int -> int -> int = "lwt_unix_write"
external write_job : Unix.file_descr -> Bytes.t -> int -> int -> int job = "lwt_unix_write_job"
external stub_pwrite :
Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int =
"lwt_unix_pwrite"
external pwrite_job :
Unix.file_descr -> Bytes.t -> file_offset:int -> int -> int -> int job =
"lwt_unix_pwrite_job"
let write ch buf pos len =
if pos < 0 || len < 0 || pos > Bytes.length buf - len then
invalid_arg "Lwt_unix.write"
else
Lazy.force ch.blocking >>= function
| true ->
wait_write ch >>= fun () ->
run_job (write_job ch.fd buf pos len)
| false ->
wrap_syscall Write ch (fun () -> stub_write ch.fd buf pos len)
let pwrite ch buf ~file_offset pos len =
if pos < 0 || len < 0 || pos > Bytes.length buf - len then
invalid_arg "Lwt_unix.pwrite"
else
Lazy.force ch.blocking >>= function
| true ->
wait_write ch >>= fun () ->
run_job (pwrite_job ch.fd buf ~file_offset pos len)
| false ->
wrap_syscall Write ch (fun () -> stub_pwrite ch.fd buf ~file_offset pos len)
let write_string ch buf pos len =
let buf = Bytes.unsafe_of_string buf in
write ch buf pos len
let pwrite_string ch buf ~file_offset pos len =
let buf = Bytes.unsafe_of_string buf in
pwrite ch buf ~file_offset pos len
external stub_write_bigarray :
Unix.file_descr -> bigarray -> int -> int -> int = "lwt_unix_bytes_write"
external write_bigarray_job :
Unix.file_descr -> bigarray -> int -> int -> int job =
"lwt_unix_bytes_write_job"
let write_bigarray function_name fd buf pos len =
if pos < 0 || len < 0 || pos > Bigarray.Array1.dim buf - len then
invalid_arg function_name
else
blocking fd >>= function
| true ->
wait_write fd >>= fun () ->
run_job (write_bigarray_job (unix_file_descr fd) buf pos len)
| false ->
wrap_syscall Write fd (fun () ->
stub_write_bigarray (unix_file_descr fd) buf pos len)
module IO_vectors =
struct
type _bigarray = bigarray
type buffer =
| Bytes of bytes
| Bigarray of _bigarray
type io_vector =
{buffer : buffer;
mutable offset : int;
mutable length : int}
(* This representation does not give constant amortized time append across all
possible operation sequences, but it does for expected typical usage, in
which some number of append operations is followed by some number of
flatten operations. *)
type t =
{mutable prefix : io_vector list;
mutable reversed_suffix : io_vector list;
mutable count : int}
let create () = {prefix = []; reversed_suffix = []; count = 0}
let byte_count {prefix; reversed_suffix; _} =
let count_buff = List.fold_left (fun acc {length; _} -> acc + length) 0 in
count_buff prefix + count_buff reversed_suffix
let append io_vectors io_vector =
io_vectors.reversed_suffix <- io_vector::io_vectors.reversed_suffix;
io_vectors.count <- io_vectors.count + 1
let append_bytes io_vectors buffer offset length =
append io_vectors {buffer = Bytes buffer; offset; length}
let append_bigarray io_vectors buffer offset length =
append io_vectors {buffer = Bigarray buffer; offset; length}
let flatten io_vectors =
match io_vectors.reversed_suffix with
| [] -> ()
| _ ->
io_vectors.prefix <-
io_vectors.prefix @ (List.rev io_vectors.reversed_suffix);
io_vectors.reversed_suffix <- []
let drop io_vectors count =
flatten io_vectors;
let rec loop count prefix =
if count <= 0 then prefix
else
match prefix with
| [] -> []
| {length; _}::rest when length <= count ->
io_vectors.count <- io_vectors.count - 1;
loop (count - length) rest
| first::_ ->
first.offset <- first.offset + count;
first.length <- first.length - count;
prefix
in
io_vectors.prefix <- loop count io_vectors.prefix
let is_empty io_vectors =
flatten io_vectors;
let rec loop = function
| [] -> true
| {length = 0; _}::rest -> loop rest
| _ -> false
in
loop io_vectors.prefix
external stub_iov_max : unit -> int option = "lwt_unix_iov_max"
let system_limit =
if Sys.win32 then None
else stub_iov_max ()
let check tag io_vector =
let buffer_length =
match io_vector.buffer with
| Bytes s -> Bytes.length s
| Bigarray a -> Bigarray.Array1.dim a
in
if io_vector.length < 0 ||
io_vector.offset < 0 ||
io_vector.offset + io_vector.length > buffer_length then
invalid_arg tag
end
(* Flattens the I/O vectors into a single list, checks their bounds, and
evaluates to the minimum of: the number of vectors and the system's
IOV_MAX. *)
let check_io_vectors function_name io_vectors =
IO_vectors.flatten io_vectors;
List.iter (IO_vectors.check function_name) io_vectors.IO_vectors.prefix;
match IO_vectors.system_limit with
| Some limit when io_vectors.IO_vectors.count > limit -> limit
| _ -> io_vectors.IO_vectors.count
external stub_readv :
Unix.file_descr -> IO_vectors.io_vector list -> int -> int =
"lwt_unix_readv"
external readv_job : Unix.file_descr -> IO_vectors.t -> int -> int job =
"lwt_unix_readv_job"
let readv fd io_vectors =
let count = check_io_vectors "Lwt_unix.readv" io_vectors in
if Sys.win32 then
match io_vectors.IO_vectors.prefix with
| [] ->
Lwt.return 0
| first::_ ->
match first.buffer with
| Bytes buffer ->
read fd buffer first.offset first.length
| Bigarray buffer ->
read_bigarray "Lwt_unix.readv" fd buffer first.offset first.length
else
Lazy.force fd.blocking >>= function
| true ->
wait_read fd >>= fun () ->
run_job (readv_job fd.fd io_vectors count)
| false ->
wrap_syscall Read fd (fun () ->
stub_readv fd.fd io_vectors.IO_vectors.prefix count)
external stub_writev :
Unix.file_descr -> IO_vectors.io_vector list -> int -> int =
"lwt_unix_writev"
external writev_job : Unix.file_descr -> IO_vectors.t -> int -> int job =
"lwt_unix_writev_job"
let writev fd io_vectors =
let count = check_io_vectors "Lwt_unix.writev" io_vectors in
if Sys.win32 then
match io_vectors.IO_vectors.prefix with
| [] ->
Lwt.return 0
| first::_ ->
match first.buffer with
| Bytes buffer ->
write fd buffer first.offset first.length
| Bigarray buffer ->
write_bigarray "Lwt_unix.writev" fd buffer first.offset first.length
else
Lazy.force fd.blocking >>= function
| true ->
wait_write fd >>= fun () ->
run_job (writev_job fd.fd io_vectors count)
| false ->
wrap_syscall Write fd (fun () ->
stub_writev fd.fd io_vectors.IO_vectors.prefix count)
(* +-----------------------------------------------------------------+
| Seeking and truncating |
+-----------------------------------------------------------------+ *)
type seek_command =
Unix.seek_command =
| SEEK_SET
| SEEK_CUR
| SEEK_END
external lseek_job :
Unix.file_descr -> int -> Unix.seek_command -> int job = "lwt_unix_lseek_job"
let lseek ch offset whence =
check_descriptor ch;
if Sys.win32 then
Lwt.return (Unix.lseek ch.fd offset whence)
else
run_job (lseek_job ch.fd offset whence)
external truncate_job : string -> int -> unit job = "lwt_unix_truncate_job"
let truncate name offset =
if Sys.win32 then
Lwt.return (Unix.truncate name offset)
else
run_job (truncate_job name offset)
external ftruncate_job :
Unix.file_descr -> int -> unit job = "lwt_unix_ftruncate_job"
let ftruncate ch offset =
check_descriptor ch;
if Sys.win32 then
Lwt.return (Unix.ftruncate ch.fd offset)
else
run_job (ftruncate_job ch.fd offset)
(* +-----------------------------------------------------------------+
| File system synchronisation |
+-----------------------------------------------------------------+ *)
external fdatasync_job : Unix.file_descr -> unit job = "lwt_unix_fdatasync_job"
let fdatasync ch =
check_descriptor ch;
run_job (fdatasync_job ch.fd)
external fsync_job : Unix.file_descr -> unit job = "lwt_unix_fsync_job"
let fsync ch =
check_descriptor ch;
run_job (fsync_job ch.fd)
(* +-----------------------------------------------------------------+
| File status |
+-----------------------------------------------------------------+ *)
type file_perm = Unix.file_perm
type file_kind =
Unix.file_kind =
| S_REG
| S_DIR
| S_CHR
| S_BLK
| S_LNK
| S_FIFO
| S_SOCK
type stats =
Unix.stats =
{
st_dev : int;
st_ino : int;
st_kind : file_kind;
st_perm : file_perm;
st_nlink : int;
st_uid : int;
st_gid : int;
st_rdev : int;
st_size : int;
st_atime : float;
st_mtime : float;
st_ctime : float;
}
external stat_job : string -> Unix.stats job = "lwt_unix_stat_job"