-
Notifications
You must be signed in to change notification settings - Fork 177
/
Copy pathlwt_io.ml
1568 lines (1368 loc) · 51.8 KB
/
lwt_io.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
(* OCaml promise library
* http://www.ocsigen.org/lwt
* Copyright (C) 2009 Jérémie Dimino
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, with linking exceptions;
* either version 2.1 of the License, or (at your option) any later
* version. See COPYING file for details.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
* 02111-1307, USA.
*)
open Lwt.Infix
exception Channel_closed of string
(* Minimum size for buffers: *)
let min_buffer_size = 16
let check_buffer_size fun_name buffer_size =
if buffer_size < min_buffer_size then
Printf.ksprintf invalid_arg "Lwt_io.%s: too small buffer size" fun_name
else if buffer_size > Sys.max_string_length then
Printf.ksprintf invalid_arg "Lwt_io.%s: too big buffer size" fun_name
else
()
let check_buffer fun_name buffer =
check_buffer_size fun_name (Lwt_bytes.length buffer)
let default_buffer_size = ref 4096
(* +-----------------------------------------------------------------+
| Types |
+-----------------------------------------------------------------+ *)
type input
type output
type 'a mode =
| Input : input mode
| Output : output mode
let input : input mode = Input
let output : output mode = Output
(* A channel state *)
type 'mode state =
| Busy_primitive
(* A primitive is running on the channel *)
| Busy_atomic of 'mode channel
(* An atomic operations is being performed on the channel. The
argument is the temporary atomic wrapper. *)
| Waiting_for_busy
(* A queued operation has not yet started. *)
| Idle
(* The channel is unused *)
| Closed
(* The channel has been closed *)
| Invalid
(* The channel is a temporary channel created for an atomic
operation which has terminated. *)
(* A wrapper, which ensures that io operations are atomic: *)
and 'mode channel = {
mutable state : 'mode state;
channel : 'mode _channel;
(* The real channel *)
mutable queued : unit Lwt.u Lwt_sequence.t;
(* Queued operations *)
}
and 'mode _channel = {
mutable buffer : Lwt_bytes.t;
mutable length : int;
mutable ptr : int;
(* Current position *)
mutable max : int;
(* Position of the end of data int the buffer. It is equal to
[length] for output channels. *)
abort_waiter : int Lwt.t;
(* Thread which is wakeup with an exception when the channel is
closed. *)
abort_wakener : int Lwt.u;
mutable auto_flushing : bool;
(* Wether the auto-flusher is currently running or not *)
main : 'mode channel;
(* The main wrapper *)
close : unit Lwt.t Lazy.t;
(* Close function *)
mode : 'mode mode;
(* The channel mode *)
mutable offset : int64;
(* Number of bytes really read/written *)
typ : typ;
(* Type of the channel. *)
}
and typ =
| Type_normal of (Lwt_bytes.t -> int -> int -> int Lwt.t) * (int64 -> Unix.seek_command -> int64 Lwt.t)
(* The channel has been created with [make]. The first argument
is the refill/flush function and the second is the seek
function. *)
| Type_bytes
(* The channel has been created with [of_bytes]. *)
type input_channel = input channel
type output_channel = output channel
type direct_access = {
da_buffer : Lwt_bytes.t;
mutable da_ptr : int;
mutable da_max : int;
da_perform : unit -> int Lwt.t;
}
let mode wrapper = wrapper.channel.mode
(* +-----------------------------------------------------------------+
| Creations, closing, locking, ... |
+-----------------------------------------------------------------+ *)
(* This strange hash function is fine because Lwt_io only ever:
- adds distinct channels to the hash set,
- folds over the hash set.
Lwt_io never looks up individual elements. The constant function is not
suitable, because then all channels will end up in the same hash bucket.
A weak hash set is used instead of a weak array to avoid having to include
resizing and compaction code in Lwt_io. *)
let hash_output_channel =
let index = ref 0 in
fun () ->
index := !index + 1;
!index
module Outputs = Weak.Make(struct
type t = output_channel
let hash _ = hash_output_channel ()
let equal = ( == )
end)
(* Table of all opened output channels. On exit they are all
flushed: *)
let outputs = Outputs.create 32
let position : type mode. mode channel -> int64 = fun wrapper ->
let ch = wrapper.channel in
match ch.mode with
| Input ->
Int64.sub ch.offset (Int64.of_int (ch.max - ch.ptr))
| Output ->
Int64.add ch.offset (Int64.of_int ch.ptr)
let name : type mode. mode _channel -> string = fun ch ->
match ch.mode with
| Input -> "input"
| Output -> "output"
let closed_channel ch = Channel_closed(name ch)
let invalid_channel ch = Failure(Printf.sprintf "temporary atomic channel %s no more valid" (name ch))
let is_busy ch =
match ch.state with
| Invalid ->
raise (invalid_channel ch.channel)
| Idle | Closed ->
false
| Busy_primitive | Busy_atomic _ | Waiting_for_busy ->
true
(* Flush/refill the buffer. No race condition could happen because
this function is always called atomically: *)
let perform_io : type mode. mode _channel -> int Lwt.t = fun ch -> match ch.main.state with
| Busy_primitive | Busy_atomic _ -> begin
match ch.typ with
| Type_normal(perform_io, _) ->
let ptr, len = match ch.mode with
| Input ->
(* Size of data in the buffer *)
let size = ch.max - ch.ptr in
(* If there are still data in the buffer, keep them: *)
if size > 0 then Lwt_bytes.unsafe_blit ch.buffer ch.ptr ch.buffer 0 size;
(* Update positions: *)
ch.ptr <- 0;
ch.max <- size;
(size, ch.length - size)
| Output ->
(0, ch.ptr) in
Lwt.pick [ch.abort_waiter;
if Sys.win32 then
Lwt.catch
(fun () -> perform_io ch.buffer ptr len)
(function
| Unix.Unix_error (Unix.EPIPE, _, _) ->
Lwt.return 0
| exn -> Lwt.fail exn) [@ocaml.warning "-4"]
else
perform_io ch.buffer ptr len
] >>= fun n ->
(* Never trust user functions... *)
if n < 0 || n > len then
Lwt.fail (Failure (Printf.sprintf "Lwt_io.perform_io: invalid result of the [%s] function"
(match ch.mode with Input -> "read" | Output -> "write")))
else begin
(* Update the global offset: *)
ch.offset <- Int64.add ch.offset (Int64.of_int n);
(* Update buffer positions: *)
begin match ch.mode with
| Input ->
ch.max <- ch.max + n
| Output ->
(* Shift remaining data: *)
let len = len - n in
Lwt_bytes.unsafe_blit ch.buffer n ch.buffer 0 len;
ch.ptr <- len
end;
Lwt.return n
end
| Type_bytes -> begin
match ch.mode with
| Input ->
Lwt.return 0
| Output ->
Lwt.fail (Failure "cannot flush a channel created with Lwt_io.of_string")
end
end
| Closed ->
Lwt.fail (closed_channel ch)
| Invalid ->
Lwt.fail (invalid_channel ch)
| Idle | Waiting_for_busy ->
assert false
let refill = perform_io
let flush_partial = perform_io
let rec flush_total oc =
if oc.ptr > 0 then
flush_partial oc >>= fun _ ->
flush_total oc
else
Lwt.return_unit
let safe_flush_total oc =
Lwt.catch
(fun () -> flush_total oc)
(fun _ -> Lwt.return_unit)
let deepest_wrapper ch =
let rec loop wrapper =
match wrapper.state with
| Busy_atomic wrapper ->
loop wrapper
| Busy_primitive | Waiting_for_busy | Idle | Closed | Invalid ->
wrapper
in
loop ch.main
let auto_flush oc =
Lwt.pause () >>= fun () ->
let wrapper = deepest_wrapper oc in
match wrapper.state with
| Busy_primitive | Waiting_for_busy ->
(* The channel is used, cancel auto flushing. It will be
restarted when the channel Lwt.returns to the [Idle] state: *)
oc.auto_flushing <- false;
Lwt.return_unit
| Busy_atomic _ ->
(* Cannot happen since we took the deepest wrapper: *)
assert false
| Idle ->
oc.auto_flushing <- false;
wrapper.state <- Busy_primitive;
safe_flush_total oc >>= fun () ->
if wrapper.state = Busy_primitive then
wrapper.state <- Idle;
if not (Lwt_sequence.is_empty wrapper.queued) then
Lwt.wakeup_later (Lwt_sequence.take_l wrapper.queued) ();
Lwt.return_unit
| Closed | Invalid ->
Lwt.return_unit
(* A ``locked'' channel is a channel in the state [Busy_primitive] or
[Busy_atomic] *)
let unlock : type m. m channel -> unit = fun wrapper -> match wrapper.state with
| Busy_primitive | Busy_atomic _ ->
if Lwt_sequence.is_empty wrapper.queued then
wrapper.state <- Idle
else begin
wrapper.state <- Waiting_for_busy;
Lwt.wakeup_later (Lwt_sequence.take_l wrapper.queued) ()
end;
(* Launches the auto-flusher: *)
let ch = wrapper.channel in
if (* Launch the auto-flusher only if the channel is not busy: *)
(wrapper.state = Idle &&
(* Launch the auto-flusher only for output channel: *)
(match ch.mode with Input -> false | Output -> true) &&
(* Do not launch two auto-flusher: *)
not ch.auto_flushing &&
(* Do not launch the auto-flusher if operations are queued: *)
Lwt_sequence.is_empty wrapper.queued) then begin
ch.auto_flushing <- true;
ignore (auto_flush ch)
end
| Closed | Invalid ->
(* Do not change channel state if the channel has been closed *)
if not (Lwt_sequence.is_empty wrapper.queued) then
Lwt.wakeup_later (Lwt_sequence.take_l wrapper.queued) ()
| Idle | Waiting_for_busy ->
(* We must never unlock an unlocked channel *)
assert false
(* Wrap primitives into atomic io operations: *)
let primitive f wrapper = match wrapper.state with
| Idle ->
wrapper.state <- Busy_primitive;
Lwt.finalize
(fun () -> f wrapper.channel)
(fun () ->
unlock wrapper;
Lwt.return_unit)
| Busy_primitive | Busy_atomic _ | Waiting_for_busy ->
Lwt.add_task_r wrapper.queued >>= fun () ->
begin match wrapper.state with
| Closed ->
(* The channel has been closed while we were waiting *)
unlock wrapper;
Lwt.fail (closed_channel wrapper.channel)
| Idle | Waiting_for_busy ->
wrapper.state <- Busy_primitive;
Lwt.finalize
(fun () -> f wrapper.channel)
(fun () ->
unlock wrapper;
Lwt.return_unit)
| Invalid ->
Lwt.fail (invalid_channel wrapper.channel)
| Busy_primitive | Busy_atomic _ ->
assert false
end
| Closed ->
Lwt.fail (closed_channel wrapper.channel)
| Invalid ->
Lwt.fail (invalid_channel wrapper.channel)
(* Wrap a sequence of io operations into an atomic operation: *)
let atomic f wrapper = match wrapper.state with
| Idle ->
let tmp_wrapper = { state = Idle;
channel = wrapper.channel;
queued = Lwt_sequence.create () } in
wrapper.state <- Busy_atomic tmp_wrapper;
Lwt.finalize
(fun () -> f tmp_wrapper)
(fun () ->
(* The temporary wrapper is no more valid: *)
tmp_wrapper.state <- Invalid;
unlock wrapper;
Lwt.return_unit)
| Busy_primitive | Busy_atomic _ | Waiting_for_busy ->
Lwt.add_task_r wrapper.queued >>= fun () ->
begin match wrapper.state with
| Closed ->
(* The channel has been closed while we were waiting *)
unlock wrapper;
Lwt.fail (closed_channel wrapper.channel)
| Idle | Waiting_for_busy ->
let tmp_wrapper = { state = Idle;
channel = wrapper.channel;
queued = Lwt_sequence.create () } in
wrapper.state <- Busy_atomic tmp_wrapper;
Lwt.finalize
(fun () -> f tmp_wrapper)
(fun () ->
tmp_wrapper.state <- Invalid;
unlock wrapper;
Lwt.return_unit)
| Invalid ->
Lwt.fail (invalid_channel wrapper.channel)
| Busy_primitive | Busy_atomic _ ->
assert false
end
| Closed ->
Lwt.fail (closed_channel wrapper.channel)
| Invalid ->
Lwt.fail (invalid_channel wrapper.channel)
let rec abort wrapper = match wrapper.state with
| Busy_atomic tmp_wrapper ->
(* Close the depest opened wrapper: *)
abort tmp_wrapper
| Closed ->
(* Double close, just returns the same thing as before *)
Lazy.force wrapper.channel.close
| Invalid ->
Lwt.fail (invalid_channel wrapper.channel)
| Idle | Busy_primitive | Waiting_for_busy ->
wrapper.state <- Closed;
(* Abort any current real reading/writing operation on the
channel: *)
Lwt.wakeup_exn wrapper.channel.abort_wakener (closed_channel wrapper.channel);
Lazy.force wrapper.channel.close
let close : type mode. mode channel -> unit Lwt.t = fun wrapper ->
let channel = wrapper.channel in
if channel.main != wrapper then
Lwt.fail (Failure "Lwt_io.close: cannot close a channel obtained via Lwt_io.atomic")
else
match channel.mode with
| Input ->
(* Just close it now: *)
abort wrapper
| Output ->
Lwt.catch
(fun () ->
(* Performs all pending actions, flush the buffer, then close it: *)
primitive (fun channel ->
safe_flush_total channel >>= fun () -> abort wrapper) wrapper)
(fun _ ->
abort wrapper)
let is_closed wrapper =
match wrapper.state with
| Closed -> true
| Busy_primitive | Busy_atomic _ | Waiting_for_busy | Idle | Invalid -> false
let flush_all () =
let wrappers = Outputs.fold (fun x l -> x :: l) outputs [] in
Lwt_list.iter_p
(fun wrapper ->
Lwt.catch
(fun () -> primitive safe_flush_total wrapper)
(fun _ -> Lwt.return_unit))
wrappers
let () =
(* Flush all opened ouput channels on exit: *)
Lwt_main.at_exit flush_all
let no_seek _pos _cmd =
Lwt.fail (Failure "Lwt_io.seek: seek not supported on this channel")
let make :
type m.
?buffer : Lwt_bytes.t ->
?close : (unit -> unit Lwt.t) ->
?seek : (int64 -> Unix.seek_command -> int64 Lwt.t) ->
mode : m mode ->
(Lwt_bytes.t -> int -> int -> int Lwt.t) ->
m channel = fun ?buffer ?(close=Lwt.return) ?(seek=no_seek) ~mode perform_io ->
let (buffer, size) =
match buffer with
| Some buffer ->
check_buffer "Lwt_io.make" buffer;
(buffer, Lwt_bytes.length buffer)
| None ->
let size = !default_buffer_size in
(Lwt_bytes.create size, size)
in
let abort_waiter, abort_wakener = Lwt.wait () in
let rec ch = {
buffer = buffer;
length = size;
ptr = 0;
max = (match mode with
| Input -> 0
| Output -> size);
close = lazy(Lwt.catch close Lwt.fail);
abort_waiter = abort_waiter;
abort_wakener = abort_wakener;
main = wrapper;
auto_flushing = false;
mode = mode;
offset = 0L;
typ = Type_normal(perform_io, fun pos cmd -> try seek pos cmd with e -> Lwt.fail e);
} and wrapper = {
state = Idle;
channel = ch;
queued = Lwt_sequence.create ();
} in
(match mode with
| Input -> ()
| Output -> Outputs.add outputs wrapper);
wrapper
let of_bytes ~mode bytes =
let length = Lwt_bytes.length bytes in
let abort_waiter, abort_wakener = Lwt.wait () in
let rec ch = {
buffer = bytes;
length = length;
ptr = 0;
max = length;
close = lazy(Lwt.return_unit);
abort_waiter = abort_waiter;
abort_wakener = abort_wakener;
main = wrapper;
(* Auto flush is set to [true] to prevent writing functions from
trying to launch the auto-fllushed. *)
auto_flushing = true;
mode = mode;
offset = 0L;
typ = Type_bytes;
} and wrapper = {
state = Idle;
channel = ch;
queued = Lwt_sequence.create ();
} in
wrapper
let of_fd : type m. ?buffer : Lwt_bytes.t -> ?close : (unit -> unit Lwt.t) -> mode : m mode -> Lwt_unix.file_descr -> m channel = fun ?buffer ?close ~mode fd ->
let perform_io = match mode with
| Input -> Lwt_bytes.read fd
| Output -> Lwt_bytes.write fd
in
make
?buffer
~close:(match close with
| Some f -> f
| None -> (fun () -> Lwt_unix.close fd))
~seek:(fun pos cmd -> Lwt_unix.LargeFile.lseek fd pos cmd)
~mode
perform_io
let of_unix_fd : type m. ?buffer : Lwt_bytes.t -> ?close : (unit -> unit Lwt.t) -> mode : m mode -> Unix.file_descr -> m channel = fun ?buffer ?close ~mode fd ->
of_fd ?buffer ?close ~mode (Lwt_unix.of_unix_file_descr fd)
let buffered : type m. m channel -> int = fun ch ->
match ch.channel.mode with
| Input -> ch.channel.max - ch.channel.ptr
| Output -> ch.channel.ptr
let buffer_size ch = ch.channel.length
let resize_buffer : type m. m channel -> int -> unit Lwt.t = fun wrapper len ->
if len < min_buffer_size then invalid_arg "Lwt_io.resize_buffer: buffer size too small";
match wrapper.channel.typ with
| Type_bytes ->
Lwt.fail (Failure "Lwt_io.resize_buffer: cannot resize the buffer of a channel created with Lwt_io.of_string")
| Type_normal _ ->
let f : type m. m _channel -> unit Lwt.t = fun ch ->
match ch.mode with
| Input ->
let unread_count = ch.max - ch.ptr in
(* Fail if we want to decrease the buffer size and there is
too much unread data in the buffer: *)
if len < unread_count then
Lwt.fail (Failure "Lwt_io.resize_buffer: cannot decrease buffer size, too much unread data")
else begin
let buffer = Lwt_bytes.create len in
Lwt_bytes.unsafe_blit ch.buffer ch.ptr buffer 0 unread_count;
ch.buffer <- buffer;
ch.length <- len;
ch.ptr <- 0;
ch.max <- unread_count;
Lwt.return_unit
end
| Output ->
(* If we decrease the buffer size, flush the buffer until
the number of buffered bytes fits into the new buffer: *)
let rec loop () =
if ch.ptr > len then
flush_partial ch >>= fun _ ->
loop ()
else
Lwt.return_unit
in
loop () >>= fun () ->
let buffer = Lwt_bytes.create len in
Lwt_bytes.unsafe_blit ch.buffer 0 buffer 0 ch.ptr;
ch.buffer <- buffer;
ch.length <- len;
ch.max <- len;
Lwt.return_unit
in
primitive f wrapper
(* +-----------------------------------------------------------------+
| Byte-order |
+-----------------------------------------------------------------+ *)
module ByteOrder =
struct
module type S = sig
val pos16_0 : int
val pos16_1 : int
val pos32_0 : int
val pos32_1 : int
val pos32_2 : int
val pos32_3 : int
val pos64_0 : int
val pos64_1 : int
val pos64_2 : int
val pos64_3 : int
val pos64_4 : int
val pos64_5 : int
val pos64_6 : int
val pos64_7 : int
end
module LE =
struct
let pos16_0 = 0
let pos16_1 = 1
let pos32_0 = 0
let pos32_1 = 1
let pos32_2 = 2
let pos32_3 = 3
let pos64_0 = 0
let pos64_1 = 1
let pos64_2 = 2
let pos64_3 = 3
let pos64_4 = 4
let pos64_5 = 5
let pos64_6 = 6
let pos64_7 = 7
end
module BE =
struct
let pos16_0 = 1
let pos16_1 = 0
let pos32_0 = 3
let pos32_1 = 2
let pos32_2 = 1
let pos32_3 = 0
let pos64_0 = 7
let pos64_1 = 6
let pos64_2 = 5
let pos64_3 = 4
let pos64_4 = 3
let pos64_5 = 2
let pos64_6 = 1
let pos64_7 = 0
end
end
module Primitives =
struct
(* This module contains all primitives operations. The operates
without protection regarding locking, they are wrapped after into
safe operations. *)
(* +---------------------------------------------------------------+
| Reading |
+---------------------------------------------------------------+ *)
let rec read_char ic =
let ptr = ic.ptr in
if ptr = ic.max then
refill ic >>= function
| 0 -> Lwt.fail End_of_file
| _ -> read_char ic
else begin
ic.ptr <- ptr + 1;
Lwt.return (Lwt_bytes.unsafe_get ic.buffer ptr)
end
let read_char_opt ic =
Lwt.catch
(fun () -> read_char ic >|= fun ch -> Some ch)
(function
| End_of_file -> Lwt.return_none
| exn -> Lwt.fail exn)
let read_line ic =
let buf = Buffer.create 128 in
let rec loop cr_read =
Lwt.try_bind (fun _ -> read_char ic)
(function
| '\n' ->
Lwt.return(Buffer.contents buf)
| '\r' ->
if cr_read then Buffer.add_char buf '\r';
loop true
| ch ->
if cr_read then Buffer.add_char buf '\r';
Buffer.add_char buf ch;
loop false)
(function
| End_of_file ->
if cr_read then Buffer.add_char buf '\r';
Lwt.return(Buffer.contents buf)
| exn ->
Lwt.fail exn)
in
read_char ic >>= function
| '\r' -> loop true
| '\n' -> Lwt.return ""
| ch -> Buffer.add_char buf ch; loop false
let read_line_opt ic =
Lwt.catch
(fun () -> read_line ic >|= fun ch -> Some ch)
(function
| End_of_file -> Lwt.return_none
| exn -> Lwt.fail exn)
let unsafe_read_into ic buf ofs len =
let avail = ic.max - ic.ptr in
if avail > 0 then begin
let len = min len avail in
Lwt_bytes.unsafe_blit_to_bytes ic.buffer ic.ptr buf ofs len;
ic.ptr <- ic.ptr + len;
Lwt.return len
end else begin
refill ic >>= fun n ->
let len = min len n in
Lwt_bytes.unsafe_blit_to_bytes ic.buffer 0 buf ofs len;
ic.ptr <- len;
ic.max <- n;
Lwt.return len
end
let read_into ic buf ofs len =
if ofs < 0 || len < 0 || ofs + len > Bytes.length buf then
Lwt.fail (Invalid_argument "Lwt_io.read_into")
else begin
if len = 0 then
Lwt.return 0
else
unsafe_read_into ic buf ofs len
end
let rec unsafe_read_into_exactly ic buf ofs len =
unsafe_read_into ic buf ofs len >>= function
| 0 ->
Lwt.fail End_of_file
| n ->
let len = len - n in
if len = 0 then
Lwt.return_unit
else
unsafe_read_into_exactly ic buf (ofs + n) len
let read_into_exactly ic buf ofs len =
if ofs < 0 || len < 0 || ofs + len > Bytes.length buf then
Lwt.fail (Invalid_argument "Lwt_io.read_into_exactly")
else begin
if len = 0 then
Lwt.return_unit
else
unsafe_read_into_exactly ic buf ofs len
end
let rev_concat len l =
let buf = Bytes.create len in
let _ =
List.fold_left
(fun ofs str ->
let len = String.length str in
let ofs = ofs - len in
String.unsafe_blit str 0 buf ofs len;
ofs)
len l
in
buf
let rec read_all ic total_len acc =
let len = ic.max - ic.ptr in
let buf = Bytes.create len in
Lwt_bytes.unsafe_blit_to_bytes ic.buffer ic.ptr buf 0 len;
let str = Bytes.unsafe_to_string buf in
ic.ptr <- ic.max;
refill ic >>= function
| 0 ->
Lwt.return (rev_concat (len + total_len) (str :: acc))
| _ ->
read_all ic (len + total_len) (str :: acc)
let read count ic =
match count with
| None ->
read_all ic 0 [] >|= Bytes.unsafe_to_string
| Some len ->
let buf = Bytes.create len in
unsafe_read_into ic buf 0 len >>= fun real_len ->
if real_len < len then
Lwt.return Bytes.(sub buf 0 real_len |> unsafe_to_string)
else
Lwt.return (Bytes.unsafe_to_string buf)
let read_value ic =
let header = Bytes.create 20 in
unsafe_read_into_exactly ic header 0 20 >>= fun () ->
let bsize = Marshal.data_size header 0 in
let buffer = Bytes.create (20 + bsize) in
Bytes.unsafe_blit header 0 buffer 0 20;
unsafe_read_into_exactly ic buffer 20 bsize >>= fun () ->
(* Marshal.from_bytes should be used here, but we want 4.01
compat. *)
Lwt.return (Marshal.from_string (Bytes.unsafe_to_string buffer) 0)
(* +---------------------------------------------------------------+
| Writing |
+---------------------------------------------------------------+ *)
let flush = flush_total
let rec write_char oc ch =
let ptr = oc.ptr in
if ptr < oc.length then begin
oc.ptr <- ptr + 1;
Lwt_bytes.unsafe_set oc.buffer ptr ch;
Lwt.return_unit
end else
flush_partial oc >>= fun _ ->
write_char oc ch
let rec unsafe_write_from oc str ofs len =
let avail = oc.length - oc.ptr in
if avail >= len then begin
Lwt_bytes.unsafe_blit_from_bytes str ofs oc.buffer oc.ptr len;
oc.ptr <- oc.ptr + len;
Lwt.return 0
end else begin
Lwt_bytes.unsafe_blit_from_bytes str ofs oc.buffer oc.ptr avail;
oc.ptr <- oc.length;
flush_partial oc >>= fun _ ->
let len = len - avail in
if oc.ptr = 0 then begin
if len = 0 then
Lwt.return 0
else
(* Everything has been written, try to write more: *)
unsafe_write_from oc str (ofs + avail) len
end else
(* Not everything has been written, just what is
remaining: *)
Lwt.return len
end
let write_from oc buf ofs len =
if ofs < 0 || len < 0 || ofs + len > Bytes.length buf then
Lwt.fail (Invalid_argument "Lwt_io.write_from")
else begin
if len = 0 then
Lwt.return 0
else
unsafe_write_from oc buf ofs len >>= fun remaining -> Lwt.return (len - remaining)
end
let write_from_string oc buf ofs len =
let buf = Bytes.unsafe_of_string buf in
write_from oc buf ofs len
let rec unsafe_write_from_exactly oc buf ofs len =
unsafe_write_from oc buf ofs len >>= function
| 0 ->
Lwt.return_unit
| n ->
unsafe_write_from_exactly oc buf (ofs + len - n) n
let write_from_exactly oc buf ofs len =
if ofs < 0 || len < 0 || ofs + len > Bytes.length buf then
Lwt.fail (Invalid_argument "Lwt_io.write_from_exactly")
else begin
if len = 0 then
Lwt.return_unit
else
unsafe_write_from_exactly oc buf ofs len
end
let write_from_string_exactly oc buf ofs len =
let buf = Bytes.unsafe_of_string buf in
write_from_exactly oc buf ofs len
let write oc str =
let buf = Bytes.unsafe_of_string str in
unsafe_write_from_exactly oc buf 0 (Bytes.length buf)
let write_line oc str =
let buf = Bytes.unsafe_of_string str in
unsafe_write_from_exactly oc buf 0 (Bytes.length buf) >>= fun () ->
write_char oc '\n'
let write_value oc ?(flags=[]) x =
write oc (Marshal.to_string x flags)
(* +---------------------------------------------------------------+
| Low-level access |
+---------------------------------------------------------------+ *)
let rec read_block_unsafe ic size f =
if ic.max - ic.ptr < size then
refill ic >>= function
| 0 ->
Lwt.fail End_of_file
| _ ->
read_block_unsafe ic size f
else begin
let ptr = ic.ptr in
ic.ptr <- ptr + size;
f ic.buffer ptr
end
let rec write_block_unsafe oc size f =
if oc.max - oc.ptr < size then
flush_partial oc >>= fun _ ->
write_block_unsafe oc size f
else begin
let ptr = oc.ptr in
oc.ptr <- ptr + size;
f oc.buffer ptr
end
let block : type m. m _channel -> int -> (Lwt_bytes.t -> int -> 'a Lwt.t) -> 'a Lwt.t = fun ch size f ->
if size < 0 || size > min_buffer_size then
Lwt.fail (Invalid_argument "Lwt_io.block")
else
if ch.max - ch.ptr >= size then begin
let ptr = ch.ptr in
ch.ptr <- ptr + size;
f ch.buffer ptr
end else
match ch.mode with
| Input ->
read_block_unsafe ch size f
| Output ->
write_block_unsafe ch size f
let perform token da ch =
if !token then begin
if da.da_max <> ch.max || da.da_ptr < ch.ptr || da.da_ptr > ch.max then
Lwt.fail (Invalid_argument "Lwt_io.direct_access.da_perform")
else begin
ch.ptr <- da.da_ptr;
perform_io ch >>= fun count ->
da.da_ptr <- ch.ptr;
da.da_max <- ch.max;
Lwt.return count
end
end else
Lwt.fail (Failure "Lwt_io.perform: this function can not be called outside Lwt_io.direct_access")
let direct_access ch f =
let token = ref true in
let rec da = {
da_ptr = ch.ptr;
da_max = ch.max;
da_buffer = ch.buffer;
da_perform = (fun _ -> perform token da ch);
} in
f da >>= fun x ->
token := false;
if da.da_max <> ch.max || da.da_ptr < ch.ptr || da.da_ptr > ch.max then
Lwt.fail (Failure "Lwt_io.direct_access: invalid result of [f]")
else begin
ch.ptr <- da.da_ptr;
Lwt.return x