-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathmirai.R
1534 lines (1420 loc) · 57.5 KB
/
mirai.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (C) 2022-2023 Hibiki AI Limited <info@hibiki-ai.com>
#
# This file is part of mirai.
#
# mirai is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# mirai is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# mirai. If not, see <https://www.gnu.org/licenses/>.
# mirai ------------------------------------------------------------------------
#' Daemon
#'
#' Implements a persistent executor for the remote process. Awaits data,
#' evaluates an expression in an environment containing the supplied data,
#' and returns the result to the host caller.
#'
#' @param url the character host or dispatcher URL to dial into, including the
#' port to connect to (and optionally for websockets, a path), e.g.
#' 'tcp://192.168.0.2:5555' or 'ws://192.168.0.2:5555/path'.
#' @param asyncdial [default FALSE] whether to perform dials asynchronously. The
#' default FALSE will error if a connection is not immediately possible
#' (e.g. \code{\link{daemons}} has yet to be called on the host, or the
#' specified port is not open etc.). Specifying TRUE continues retrying
#' (indefinitely) if not immediately successful, which is more resilient but
#' can mask potential connection issues.
#' @param output [default FALSE] logical value, to output generated stdout /
#' stderr if TRUE, or else discard if FALSE. Specify as TRUE in the '...'
#' argument to \code{\link{daemons}} or \code{\link{launch_local}} to
#' provide redirection of output to the host process. Applicable only when
#' not using dispatcher.
#' @param maxtasks [default Inf] the maximum number of tasks to execute (task
#' limit) before exiting.
#' @param idletime [default Inf] maximum idle time, since completion of the last
#' task (in milliseconds) before exiting.
#' @param walltime [default Inf] soft walltime, or the minimum amount of real
#' time taken (in milliseconds) before exiting.
#' @param timerstart [default 0L] number of completed tasks after which to start
#' the timer for 'idletime' and 'walltime'. 0L implies timers are started
#' upon launch.
#' @param tls [default NULL] required for secure TLS connections over 'tls+tcp://'
#' or 'wss://'. \strong{Either} the character path to a file containing
#' X.509 certificate(s) in PEM format, comprising the certificate authority
#' certificate chain starting with the TLS certificate and ending with the
#' CA certificate, \strong{or} a length 2 character vector comprising [i]
#' the certificate authority certificate chain and [ii] the empty character
#' \code{''}.
#' @param ... reserved but not currently used.
#' @param cleanup [default 7L] Integer additive bitmask controlling whether to
#' perform cleanup of the global environment (1L), reset loaded packages to
#' an initial state (2L), reset options to an initial state (4L), and
#' perform garbage collection (8L) after each evaluation. This option should
#' not normally be modified. Do not set unless you are certain you require
#' persistence across evaluations. Note: it may be an error to reset options
#' but not loaded packages if packages set options on load.
#' @param rs [default NULL] the initial value of .Random.seed. This is set
#' automatically using L'Ecuyer-CMRG RNG streams generated by the host
#' process and should not be independently supplied.
#'
#' @return Invisible NULL.
#'
#' @details The network topology is such that daemons dial into the host or
#' dispatcher, which listens at the 'url' address. In this way, network
#' resources may be added or removed dynamically and the host or
#' dispatcher automatically distributes tasks to all available daemons.
#'
#' @export
#'
daemon <- function(url, asyncdial = FALSE, maxtasks = Inf, idletime = Inf,
walltime = Inf, timerstart = 0L, output = FALSE, tls = NULL,
..., cleanup = 7L, rs = NULL) {
sock <- socket(protocol = "rep")
on.exit(reap(sock))
cv <- cv()
pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE)
if (length(tls)) tls <- tls_config(client = tls)
dial_and_sync_socket(sock = sock, url = url, asyncdial = asyncdial, tls = tls)
if (is.numeric(rs)) `[[<-`(.GlobalEnv, ".Random.seed", as.integer(rs))
if (idletime > walltime) idletime <- walltime else if (idletime == Inf) idletime <- NULL
cleanup <- parse_cleanup(cleanup)
if (!output) {
devnull <- file(nullfile(), open = "w", blocking = FALSE)
sink(file = devnull)
sink(file = devnull, type = "message")
on.exit({
sink(type = "message")
sink()
close(devnull)
}, add = TRUE)
}
op <- .Options
se <- search()
count <- 0L
start <- mclock()
repeat {
ctx <- .context(sock)
aio <- recv_aio_signal(ctx, cv = cv, mode = 1L, timeout = idletime)
wait(cv) || return(invisible())
._mirai_. <- .subset2(aio, "data")
is.environment(._mirai_.) || {
count < timerstart && {
start <- mclock()
next
}
break
}
data <- tryCatch(eval(expr = ._mirai_.[[".expr"]], envir = ._mirai_., enclos = NULL),
error = mk_mirai_error, interrupt = mk_interrupt_error)
count <- count + 1L
(count >= maxtasks || count > timerstart && mclock() - start >= walltime) && {
send(ctx, data = data, mode = 3L)
data <- recv_aio_signal(sock, cv = cv, mode = 8L)
wait(cv)
break
}
send(ctx, data = data, mode = 1L)
if (cleanup[1L]) rm(list = (vars <- names(.GlobalEnv))[vars != ".Random.seed"], envir = .GlobalEnv)
if (cleanup[2L]) lapply((new <- search())[!new %in% se], detach, unload = TRUE, character.only = TRUE)
if (cleanup[3L]) options(op)
if (cleanup[4L]) gc(verbose = FALSE)
if (count <= timerstart) start <- mclock()
}
}
#' dot Daemon
#'
#' Implements an ephemeral executor for the remote process.
#'
#' @inheritParams daemon
#'
#' @return Logical TRUE, invisibly.
#'
#' @keywords internal
#' @export
#'
.daemon <- function(url) {
sock <- socket(protocol = "rep", dial = url, autostart = NA)
on.exit(reap(sock))
cv <- cv()
pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = FALSE)
._mirai_. <- recv(sock, mode = 1L, block = TRUE)
data <- tryCatch(eval(expr = ._mirai_.[[".expr"]], envir = ._mirai_., enclos = NULL),
error = mk_mirai_error, interrupt = mk_interrupt_error)
send(sock, data = data, mode = 1L, block = TRUE)
data <- recv_aio_signal(sock, cv = cv, mode = 8L)
wait(cv)
}
#' Dispatcher
#'
#' Implements a dispatcher for tasks from a host to multiple daemons for
#' processing, using a FIFO scheduling rule, queuing tasks as required.
#'
#' @inheritParams daemon
#' @param host the character host URL to dial (where tasks are sent from),
#' including the port to connect to (and optionally for websockets, a path),
#' e.g. 'tcp://192.168.0.2:5555' or 'ws://192.168.0.2:5555/path'.
#' @param url (optional) the character URL or vector of URLs dispatcher should
#' listen at, including the port to connect to (and optionally for websockets,
#' a path), e.g. 'tcp://192.168.0.2:5555' or 'ws://192.168.0.2:5555/path'.
#' Specify 'tls+tcp://' or 'wss://' to use secure TLS connections. Tasks are
#' sent to daemons dialled into these URLs. If not supplied, 'n' local
#' inter-process URLs will be assigned automatically.
#' @param n (optional) if specified, the integer number of daemons to listen for.
#' Otherwise 'n' will be inferred from the number of URLs supplied in 'url'.
#' Where a single URL is supplied and 'n' > 1, 'n' unique URLs will be
#' automatically assigned for daemons to dial into.
#' @param token [default FALSE] if TRUE, appends a unique 40-character token
#' to each URL path the dispatcher listens at (not applicable for TCP URLs
#' which do not accept a path).
#' @param lock [default FALSE] if TRUE, sockets lock once a connection has been
#' accepted, preventing further connection attempts. This provides safety
#' against more than one daemon attempting to connect to a unique URL.
#' @param tls [default NULL] (required for secure TLS connections) \strong{either}
#' the character path to a file containing the PEM-encoded TLS certificate
#' and associated private key (may contain additional certificates leading
#' to a validation chain, with the TLS certificate first), \strong{or} a
#' length 2 character vector comprising [i] the TLS certificate (optionally
#' certificate chain) and [ii] the associated private key.
#' @param pass [default NULL] (required only if the private key supplied to 'tls'
#' is encrypted with a password) For security, should be provided through a
#' function that returns this value, rather than directly.
#' @param ... additional arguments passed through to \code{\link{daemon}} if
#' launching local daemons i.e. 'url' is not specified.
#' @param monitor (for package internal use only) do not set this parameter.
#'
#' @return Invisible NULL.
#'
#' @details The network topology is such that a dispatcher acts as a gateway
#' between the host and daemons, ensuring that tasks received from the host
#' are dispatched on a FIFO basis for processing. Tasks are queued at the
#' dispatcher to ensure tasks are only sent to daemons that can begin
#' immediate execution of the task.
#'
#' @export
#'
dispatcher <- function(host, url = NULL, n = NULL, asyncdial = FALSE,
token = FALSE, lock = FALSE, tls = NULL, pass = NULL, ...,
monitor = NULL, rs = NULL) {
n <- if (is.numeric(n)) as.integer(n) else length(url)
n > 0L || stop(.messages[["missing_url"]])
sock <- socket(protocol = "rep")
on.exit(reap(sock))
cv <- cv()
pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE)
dial_and_sync_socket(sock = sock, url = host, asyncdial = asyncdial)
auto <- is.null(url)
vectorised <- length(url) == n
seq_n <- seq_len(n)
basenames <- servernames <- character(n)
activestore <- instance <- complete <- assigned <- integer(n)
serverfree <- !integer(n)
active <- servers <- queue <- vector(mode = "list", length = n)
if (auto) {
dots <- parse_dots(...)
} else {
baseurl <- parse_url(url)
if (substr(baseurl[["scheme"]], 1L, 1L) == "t") {
ports <- if (baseurl[["port"]] == "0") integer(n) else seq.int(baseurl[["port"]], length.out = n)
token <- FALSE
} else {
ports <- NULL
}
if (substr(baseurl[["scheme"]], 1L, 3L) %in% c("wss", "tls") && is.null(tls)) {
tls <- get_and_reset_env("MIRAI_TEMP_FIELD1")
if (length(tls))
tls <- c(tls, get_and_reset_env("MIRAI_TEMP_FIELD2"))
}
if (length(tls)) {
if (is.null(pass))
pass <- get_and_reset_env("MIRAI_TEMP_VAR")
tls <- tls_config(server = tls, pass = pass)
pass <- NULL
}
}
envir <- ..[["default"]]
if (length(rs)) `[[<-`(envir, "stream", as.integer(rs))
for (i in seq_n) {
burl <- if (auto) .urlscheme else
if (vectorised) url[i] else
if (is.null(ports)) sprintf("%s/%d", url, i) else
sub(ports[1L], ports[i], url, fixed = TRUE)
basenames[i] <- burl
nurl <- if (auto) auto_tokenized_url() else if (token) new_tokenized_url(burl) else burl
nsock <- req_socket(NULL)
ncv <- cv()
pipe_notify(nsock, cv = ncv, cv2 = cv, flag = FALSE)
listen(nsock, url = nurl, tls = tls, error = TRUE)
lock && lock(nsock, cv = ncv)
listener <- attr(nsock, "listener")[[1L]]
if (i == 1L && !auto && parse_url(opt(listener, "url"))[["port"]] == "0") {
realport <- opt(listener, "tcp-bound-port")
servernames[i] <- sub_real_port(port = realport, url = nurl)
if (!vectorised || n == 1L) {
url <- sub_real_port(port = realport, url = url)
basenames[1L] <- sub_real_port(port = realport, url = burl)
}
} else {
servernames[i] <- opt(listener, "url")
}
auto && launch_daemon(nurl, dots, next_stream(envir))
servers[[i]] <- nsock
active[[i]] <- ncv
ctx <- .context(sock)
req <- recv_aio_signal(ctx, cv = cv, mode = 8L)
queue[[i]] <- list(ctx = ctx, req = req)
}
on.exit(lapply(servers, reap), add = TRUE, after = TRUE)
ctrchannel <- is.character(monitor)
if (ctrchannel) {
sockc <- socket(protocol = "rep")
on.exit(reap(sockc), add = TRUE, after = FALSE)
pipe_notify(sockc, cv = cv, add = FALSE, remove = TRUE, flag = TRUE)
dial_and_sync_socket(sock = sockc, url = monitor, asyncdial = asyncdial)
recv(sockc, mode = 5L, block = .timelimit) && stop(.messages[["sync_timeout"]])
send_aio(sockc, c(Sys.getpid(), servernames), mode = 2L)
cmessage <- recv_aio_signal(sockc, cv = cv, mode = 5L)
}
suspendInterrupts(
repeat {
wait(cv) || break
cv_values <- as.integer(lapply(active, cv_value))
activevec <- cv_values %% 2L
changes <- (activevec - activestore) > 0L
activestore <- activevec
if (any(changes)) {
instance[changes] <- abs(instance[changes]) + 1L
serverfree <- serverfree | changes
}
ctrchannel && !unresolved(cmessage) && {
i <- .subset2(cmessage, "data")
if (i) {
if (i > 0L && !activevec[[i]]) {
reap(attr(servers[[i]], "listener")[[1L]])
attr(servers[[i]], "listener") <- NULL
data <- servernames[i] <- if (auto) auto_tokenized_url() else new_tokenized_url(basenames[i])
instance[i] <- -abs(instance[i])
listen(servers[[i]], url = data, tls = tls, error = TRUE)
} else if (i < 0L) {
i <- -i
reap(servers[[i]])
servers[[i]] <- nsock <- req_socket(NULL)
pipe_notify(nsock, cv = active[[i]], cv2 = cv, flag = FALSE)
data <- servernames[i] <- if (auto) auto_tokenized_url() else new_tokenized_url(basenames[i])
instance[i] <- -abs(instance[i])
listen(nsock, url = data, tls = tls, error = TRUE)
lock && lock(nsock, cv = active[[i]])
} else {
data <- ""
}
} else {
data <- as.integer(c(seq_n, activevec, instance, assigned, complete))
}
send_aio(sockc, data = data, mode = 2L)
cmessage <- recv_aio_signal(sockc, cv = cv, mode = 5L)
next
}
for (i in seq_n)
if (length(queue[[i]]) > 2L && !unresolved(queue[[i]][["res"]])) {
req <- .subset2(queue[[i]][["res"]], "value")
if (is.object(req)) req <- serialize(req, NULL)
send(queue[[i]][["ctx"]], data = req, mode = 2L)
q <- queue[[i]][["daemon"]]
if (req[1L] == .next_format_identifier) {
ctx <- .context(servers[[q]])
send_aio(ctx, data = .next_format_identifier, mode = 2L)
reap(ctx)
} else {
serverfree[q] <- TRUE
}
complete[q] <- complete[q] + 1L
ctx <- .context(sock)
req <- recv_aio_signal(ctx, cv = cv, mode = 8L)
queue[[i]] <- list(ctx = ctx, req = req)
}
free <- which(serverfree & activevec)
if (length(free))
for (q in free)
for (i in seq_n) {
if (length(queue[[i]]) == 2L && !unresolved(queue[[i]][["req"]])) {
queue[[i]][["res"]] <- request_signal(.context(servers[[q]]), data = queue[[i]][["req"]], cv = cv, send_mode = 2L, recv_mode = 8L)
queue[[i]][["daemon"]] <- q
serverfree[q] <- FALSE
assigned[q] <- assigned[q] + 1L
break
}
serverfree[q] || break
}
}
)
}
#' mirai (Evaluate Async)
#'
#' Evaluate an expression asynchronously in a new background R process or
#' persistent daemon (local or remote). This function will return
#' immediately with a 'mirai', which will resolve to the evaluated result
#' once complete.
#'
#' @param .expr an expression to evaluate asynchronously (of arbitrary length,
#' wrapped in \{\} if necessary), \strong{or} a language object passed by
#' \link{name}.
#' @param ... (optional) named arguments (name = value pairs) specifying
#' objects referenced in '.expr'. Used in addition to, and taking precedence
#' over, any arguments specified via '.args'.
#' @param .args (optional) \strong{either} a list of objects to be passed by
#' \link{name} (found in the current scope), \strong{or else} a list of
#' name = value pairs, as in '...'.
#' @param .timeout [default NULL] for no timeout, or an integer value in
#' milliseconds. A mirai will resolve to an 'errorValue' 5 (timed out) if
#' evaluation exceeds this limit.
#' @param .compute [default 'default'] character value for the compute profile
#' to use when sending the mirai.
#'
#' @return A 'mirai' object.
#'
#' @details This function will return a 'mirai' object immediately.
#'
#' The value of a mirai may be accessed at any time at \code{$data}, and
#' if yet to resolve, an 'unresolved' logical NA will be returned instead.
#'
#' \code{\link{unresolved}} may be used on a mirai, returning TRUE if a
#' 'mirai' has yet to resolve and FALSE otherwise. This is suitable for use
#' in control flow statements such as \code{while} or \code{if}.
#'
#' Alternatively, to call (and wait for) the result, use \code{\link{call_mirai}}
#' on the returned mirai. This will block until the result is returned
#' (although interruptible with e.g. ctrl+c).
#'
#' The expression '.expr' will be evaluated in a separate R process in a
#' clean environment, which is not the global environment, consisting only
#' of the named objects passed as '...' and/or the list supplied to '.args'.
#'
#' If an error occurs in evaluation, the error message is returned as a
#' character string of class 'miraiError' and 'errorValue'.
#' \code{\link{is_mirai_error}} may be used to test for this.
#'
#' \code{\link{is_error_value}} tests for all error conditions including
#' 'mirai' errors, interrupts, and timeouts.
#'
#' Specify '.compute' to send the mirai using a specific compute profile (if
#' previously created by \code{\link{daemons}}), otherwise leave as 'default'.
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' # specifying objects via '...'
#' n <- 3
#' m <- mirai(x + y + 2, x = 2, y = n)
#' m
#' m$data
#' Sys.sleep(0.2)
#' m$data
#'
#' # passing existing objects by name via '.args'
#' df1 <- data.frame(a = 1, b = 2)
#' df2 <- data.frame(a = 3, b = 1)
#' m <- mirai(as.matrix(rbind(df1, df2)), .args = list(df1, df2), .timeout = 1000)
#' call_mirai(m)$data
#'
#' # using unresolved()
#' m <- mirai(
#' {
#' res <- rnorm(n)
#' res / rev(res)
#' },
#' n = 1e6
#' )
#' while (unresolved(m)) {
#' cat("unresolved\n")
#' Sys.sleep(0.1)
#' }
#' str(m$data)
#'
#' # evaluating scripts using source(local = TRUE) in '.expr'
#' n <- 10L
#' file <- tempfile()
#' cat("r <- rnorm(n)", file = file)
#' m <- mirai({source(file, local = TRUE); r}, .args = list(file, n))
#' call_mirai(m)[["data"]]
#' unlink(file)
#'
#' # specifying global variables using list2env(envir = .GlobalEnv) in '.expr'
#' n <- 10L
#' file <- tempfile()
#' cat("r <- rnorm(n)", file = file)
#' globals <- list(file = file, n = n)
#' m <- mirai(
#' {
#' list2env(globals, envir = .GlobalEnv)
#' source(file)
#' r
#' },
#' globals = globals
#' )
#' call_mirai(m)[["data"]]
#' unlink(file)
#'
#' # passing a language object to '.expr' and a named list to '.args'
#' expr <- quote(a + b + 2)
#' args <- list(a = 2, b = 3)
#' m <- mirai(.expr = expr, .args = args)
#' call_mirai(m)$data
#'
#' }
#'
#' @export
#'
mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = "default") {
missing(.expr) && stop(.messages[["missing_expression"]])
expr <- substitute(.expr)
arglist <- list(..., .expr = if (is.symbol(expr) && is.language(get0(expr, envir = sys.frame(-1L)))) .expr else expr)
if (length(.args)) {
is.list(.args) || stop(.messages[["requires_list"]])
arglist <- if (length(names(.args))) c(.args, arglist) else
c(`names<-`(.args, `storage.mode<-`(substitute(.args)[-1L], "character")), arglist)
}
envir <- list2env(arglist, envir = NULL, parent = .GlobalEnv)
if (length(..[[.compute]][["sock"]])) {
aio <- request(.context(..[[.compute]][["sock"]]), data = envir, send_mode = 1L, recv_mode = 1L, timeout = .timeout)
} else {
url <- auto_tokenized_url()
sock <- req_socket(url, resend = 0L)
if (length(.timeout)) launch_and_sync_daemon(sock = sock, url) else launch_daemon(url)
aio <- request(.context(sock), data = envir, send_mode = 1L, recv_mode = 1L, timeout = .timeout, ack = TRUE)
`attr<-`(.subset2(aio, "aio"), "sock", sock)
}
`class<-`(aio, c("mirai", "recvAio"))
}
#' Daemons (Configure Persistent Processes)
#'
#' Set 'daemons' or persistent background processes receiving \code{\link{mirai}}
#' requests. These are by default created on the local machine.
#' Alternatively, for distributing tasks across the network, a host URL
#' may be specified to receive connections from remote daemons started with
#' \code{\link{daemon}}. Daemons may use either the dispatcher, which
#' ensures tasks are assigned to daemons efficiently on a FIFO basis, or
#' else the low-level approach of distributing tasks to daemons equally.
#'
#' @inheritParams dispatcher
#' @param n integer number of daemons to set.
#' @param url [default NULL] if specified, the character URL or vector of URLs
#' on the host for remote daemons to dial into, including a port accepting
#' incoming connections (and optionally for websockets, a path), e.g.
#' 'tcp://192.168.0.2:5555' or 'ws://192.168.0.2:5555/path'. Specify a URL
#' starting 'tls+tcp://' or 'wss://' to use secure TLS connections.
#' @param dispatcher [default TRUE] logical value whether to use dispatcher.
#' Dispatcher is a local background process that connects to daemons on
#' behalf of the host and ensures FIFO scheduling, queueing tasks if
#' necessary (see Dispatcher section below).
#' @param seed [default NULL] (optional) supply a random seed (single value,
#' interpreted as an integer). This is used to inititalise the L'Ecuyer-CMRG
#' RNG streams sent to each daemon. Note that reproducible results can be
#' expected only for 'dispatcher = FALSE', as the unpredictable timing of
#' task completions would otherwise influence the tasks sent to each daemon.
#' Even for 'dispatcher = FALSE', reproducibility is not guaranteed if the
#' order in which tasks are sent is not deterministic.
#' @param tls [default NULL] (optional for secure TLS connections) if not
#' supplied, zero-configuration single-use keys and certificates are
#' automatically generated. If supplied, \strong{either} the character path
#' to a file containing the PEM-encoded TLS certificate and associated
#' private key (may contain additional certificates leading to a validation
#' chain, with the TLS certificate first), \strong{or} a length 2 character
#' vector comprising [i] the TLS certificate (optionally certificate chain)
#' and [ii] the associated private key.
#' @param ... additional arguments passed through to \code{\link{dispatcher}} if
#' using dispatcher and/or \code{\link{daemon}} if launching local daemons.
#' @param .compute [default 'default'] character compute profile to use for
#' creating the daemons (each compute profile has its own set of daemons for
#' connecting to different resources).
#'
#' @return Depending on the arguments supplied:
#'
#' \itemize{
#' \item{using dispatcher: integer number of daemons set.}
#' \item{or else launching local daemons: integer number of daemons launched.}
#' \item{otherwise: the character host URL.}
#' }
#'
#' @details Use \code{daemons(0)} to reset daemon connections:
#' \itemize{
#' \item{A reset is required before revising settings for the same compute
#' profile, otherwise changes are not registered.}
#' \item{All connected daemons and/or dispatchers exit automatically.}
#' \item{\pkg{mirai} reverts to the default behaviour of creating a new
#' background process for each request.}
#' \item{Any unresolved 'mirai' will return an 'errorValue' 7 (Object
#' closed) after a reset.}
#' }
#'
#' If the host session ends, for whatever reason, all connected dispatcher
#' and daemon processes automatically exit as soon as their connections are
#' dropped. If a daemon is processing a task, it will exit as soon as the
#' task is complete.
#'
#' For historical reasons, \code{daemons()} with no arguments returns the
#' value of \code{\link{status}}.
#'
#' @section Dispatcher:
#'
#' By default \code{dispatcher = TRUE}. This launches a background process
#' running \code{\link{dispatcher}}. Dispatcher connects to daemons on
#' behalf of the host and queues tasks until a daemon is able to begin
#' immediate execution of that task, ensuring FIFO scheduling. Dispatcher
#' uses synchronisation primitives from \code{nanonext}, waiting rather than
#' polling for tasks, which is efficient both in terms of consuming no
#' resources while waiting, and also being fully synchronised with events
#' (having no latency).
#'
#' By specifying \code{dispatcher = FALSE}, daemons connect to the host
#' directly rather than through dispatcher. The host sends tasks to
#' connected daemons immediately in an evenly-distributed fashion. However,
#' optimal scheduling is not guaranteed as the duration of tasks cannot be
#' known \emph{a priori}, such that tasks can be queued at a daemon behind
#' a long-running task while other daemons remain idle. Nevertheless, this
#' provides a resource-light approach suited to working with similar-length
#' tasks, or where concurrent tasks typically do not exceed available daemons.
#'
#' @section Local Daemons:
#'
#' Daemons provide a potentially more efficient solution for asynchronous
#' operations as new processes no longer need to be created on an \emph{ad
#' hoc} basis.
#'
#' Supply the argument 'n' to set the number of daemons. New background
#' \code{\link{daemon}} processes are automatically created on the local
#' machine connecting back to the host process, either directly or via a
#' dispatcher.
#'
#' @section Distributed Computing:
#'
#' Specifying 'url' allows tasks to be distributed across the network.
#'
#' The host URL should be a character string such as: 'tcp://192.168.0.2:5555'
#' at which daemon processes started using \code{\link{daemon}} should
#' connect to. The full shell command to deploy on remote machines may be
#' generated by \code{\link{launch_remote}}.
#'
#' IPv6 addresses are also supported and must be enclosed in square brackets
#' [ ] to avoid confusion with the final colon separating the port. For
#' example, port 5555 on the IPv6 loopback address ::1 would be specified
#' as 'tcp://[::1]:5555'.
#'
#' Alternatively, to listen to port 5555 on all interfaces on the local host,
#' specify either 'tcp://:5555', 'tcp://*:5555' or 'tcp://0.0.0.0:5555'.
#'
#' Specifying the wildcard value zero for the port number e.g. 'tcp://:0' or
#' 'ws://:0' will automatically assign a free ephemeral port. Use
#' \code{\link{status}} to inspect the actual assigned port at any time.
#'
#' \strong{With Dispatcher}
#'
#' When using dispatcher, it is recommended to use a websocket URL rather
#' than TCP, as this requires only one port to connect to all daemons: a
#' websocket URL supports a path after the port number, which can be made
#' unique for each daemon.
#'
#' Specifying a single host URL such as 'ws://192.168.0.2:5555' with
#' \code{n = 6} will automatically append a sequence to the path, listening
#' to the URLs 'ws://192.168.0.2:5555/1' through 'ws://192.168.0.2:5555/6'.
#'
#' Alternatively, specify a vector of URLs to listen to arbitrary port
#' numbers / paths. In this case it is optional to supply 'n' as this can
#' be inferred by the length of vector supplied.
#'
#' Individual \code{\link{daemon}} instances should then be started on the
#' remote resource, which dial in to each of these host URLs. At most one
#' daemon should be dialled into each URL at any given time.
#'
#' Dispatcher automatically adjusts to the number of daemons actually
#' connected. Hence it is possible to dynamically scale up or down the
#' number of daemons as required, subject to the maximum number initially
#' specified.
#'
#' Alternatively, supplying a single TCP URL will listen at a block of URLs
#' with ports starting from the supplied port number and incrementing by one
#' for 'n' specified e.g. the host URL 'tcp://192.168.0.2:5555' with
#' \code{n = 6} listens to the contiguous block of ports 5555 through 5560.
#'
#' \strong{Without Dispatcher}
#'
#' A TCP URL may be used in this case as the host listens at only one
#' address, utilising a single port.
#'
#' The network topology is such that daemons (started with \code{\link{daemon}})
#' or indeed dispatchers (started with \code{\link{dispatcher}}) dial into
#' the same host URL.
#'
#' 'n' is not required in this case, and disregarded if supplied, as network
#' resources may be added or removed at any time. The host automatically
#' distributes tasks to all connected daemons and dispatchers.
#'
#' @section Compute Profiles:
#'
#' By default, the 'default' compute profile is used. Providing a character
#' value for '.compute' creates a new compute profile with the name
#' specified. Each compute profile retains its own daemons settings, and may
#' be operated independently of each other. Some usage examples follow:
#'
#' \strong{local / remote} daemons may be set with a host URL and specifying
#' '.compute' as 'remote', which creates a new compute profile. Subsequent
#' mirai calls may then be sent for local computation by not specifying its
#' '.compute' argument, or for remote computation to connected daemons by
#' specifying its '.compute' argument as 'remote'.
#'
#' \strong{cpu / gpu} some tasks may require access to different types of
#' daemon, such as those with GPUs. In this case, \code{daemons()} may be
#' called twice to set up host URLs for CPU-only daemons and for those
#' with GPUs, specifying the '.compute' argument as 'cpu' and 'gpu'
#' respectively. By supplying the '.compute' argument to subsequent mirai
#' calls, tasks may be sent to either 'cpu' or 'gpu' daemons as appropriate.
#'
#' Note: further actions such as resetting daemons via \code{daemons(0)}
#' should be carried out with the desired '.compute' argument specified.
#'
#' @section Timeouts:
#'
#' Specifying the \code{.timeout} argument in \code{\link{mirai}} will ensure
#' that the 'mirai' always resolves.
#'
#' However, the task may not have completed and still be ongoing in the
#' daemon process. In such situations, dispatcher ensures that queued tasks
#' are not assigned to the busy process, however overall performance may
#' still be degraded if they remain in use. If a process hangs and cannot be
#' restarted manually, \code{\link{saisei}} specifying \code{force = TRUE}
#' may be used to cancel the task and regenerate any particular URL for a
#' new \code{\link{daemon}} to connect to.
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' # Create 2 local daemons (using dispatcher)
#' daemons(2)
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Create 2 local daemons (not using dispatcher)
#' daemons(2, dispatcher = FALSE)
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # 2 remote daemons via dispatcher (using zero wildcard)
#' daemons(2, url = "ws://:0")
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' # Set host URL for remote daemons to dial into (using zero wildcard)
#' daemons(url = "tcp://:0", dispatcher = FALSE)
#' status()
#' # Reset to zero
#' daemons(0)
#'
#' }
#'
#' @export
#'
daemons <- function(n, url = NULL, dispatcher = TRUE, seed = NULL, tls = NULL, pass = NULL, ..., .compute = "default") {
missing(n) && missing(url) && return(status(.compute))
envir <- ..[[.compute]]
if (is.null(envir))
envir <- `[[<-`(.., .compute, new.env(hash = FALSE, parent = environment(daemons)))[[.compute]]
if (is.character(url)) {
if (is.null(envir[["sock"]])) {
purl <- parse_url(url)
if (substr(purl[["scheme"]], 1L, 3L) %in% c("wss", "tls") && is.null(tls)) {
tls <- write_cert(cn = purl[["hostname"]])
envir[["tls"]] <- tls[["client"]]
tls <- tls[["server"]]
}
create_stream(n = n, seed = seed, envir = envir)
if (dispatcher) {
n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(.messages[["n_one"]])
if (length(tls)) tls_config(server = tls, pass = pass)
urld <- auto_tokenized_url()
urlc <- strcat(urld, "c")
sock <- req_socket(urld)
sockc <- req_socket(urlc, resend = 0L)
launch_and_sync_daemon(sock = sock, urld, parse_dots(...), url, n, urlc, tls = tls, pass = pass)
init_monitor(sockc = sockc, envir = envir)
} else {
sock <- req_socket(url, tls = if (length(tls)) tls_config(server = tls, pass = pass))
listener <- attr(sock, "listener")[[1L]]
n <- opt(listener, "url")
if (parse_url(n)[["port"]] == "0")
n <- sub_real_port(port = opt(listener, "tcp-bound-port"), url = n)
`[[<-`(envir, "urls", n)
}
`[[<-`(`[[<-`(envir, "sock", sock), "n", n)
}
} else {
is.numeric(n) || stop(.messages[["numeric_n"]])
n <- as.integer(n)
if (n == 0L) {
length(envir[["n"]]) || return(0L)
reap(envir[["sock"]])
length(envir[["sockc"]]) && reap(envir[["sockc"]])
envir <- NULL
`[[<-`(.., .compute, new.env(hash = FALSE))
} else if (is.null(envir[["sock"]])) {
n > 0L || stop(.messages[["n_zero"]])
urld <- auto_tokenized_url()
create_stream(n = n, seed = seed, envir = envir)
if (dispatcher) {
sock <- req_socket(urld, resend = 0L)
urlc <- strcat(urld, "c")
sockc <- req_socket(urlc, resend = 0L)
launch_and_sync_daemon(sock = sock, urld, parse_dots(...), n, urlc, rs = envir[["stream"]])
for (i in seq_len(n)) next_stream(envir)
init_monitor(sockc = sockc, envir = envir)
} else {
sock <- req_socket(urld)
if (is.null(seed) || n == 1L) {
for (i in seq_len(n))
launch_daemon(urld, parse_dots(...), next_stream(envir))
} else {
for (i in seq_len(n))
launch_and_sync_daemon(sock = sock, urld, parse_dots(...), next_stream(envir))
}
`[[<-`(envir, "urls", urld)
}
`[[<-`(`[[<-`(envir, "sock", sock), "n", n)
}
}
if (length(envir[["n"]])) envir[["n"]] else 0L
}
#' Saisei (Regenerate Token)
#'
#' When using daemons with dispatcher, regenerates the token for the URL a
#' dispatcher socket listens at.
#'
#' @param i integer index number URL to regenerate at dispatcher.
#' @param force [default FALSE] logical value whether to regenerate the URL even
#' when there is an existing active connection.
#' @param .compute [default 'default'] character compute profile (each compute
#' profile has its own set of daemons for connecting to different resources).
#'
#' @return The regenerated character URL upon success, or else NULL.
#'
#' @details When a URL is regenerated, the listener at the specified socket is
#' closed and replaced immediately, hence this function will only be
#' successful if there are no existing connections at the socket (i.e.
#' 'online' status shows 0), unless the argument 'force' is specified as TRUE.
#'
#' If 'force' is specified as TRUE, the socket is immediately closed and
#' regenerated. If this happens while a mirai is still ongoing, it will be
#' returned as an errorValue 7 'Object closed'. This may be used to cancel a
#' task that consistently hangs or crashes to prevent it from failing
#' repeatedly when new daemons connect.
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' daemons(1L)
#' Sys.sleep(1L)
#' status()
#' saisei(i = 1L, force = TRUE)
#' status()
#'
#' daemons(0)
#'
#' }
#'
#' @export
#'
saisei <- function(i, force = FALSE, .compute = "default") {
envir <- ..[[.compute]]
i <- as.integer(`length<-`(i, 1L))
length(envir[["sockc"]]) && i > 0L && i <= envir[["n"]] && substr(envir[["urls"]][i], 1L, 1L) != "t" || return()
r <- query_dispatcher(sock = envir[["sockc"]], command = if (force) -i else i, mode = 9L)
is.character(r) && nzchar(r) || return()
envir[["urls"]][i] <- r
r
}
#' Status Information
#'
#' Retrieve status information for the specified compute profile, comprising
#' current connections and daemons status.
#'
#' @inheritParams saisei
#'
#' @return A named list comprising:
#' \itemize{
#' \item{\strong{connections}} {- integer number of active connections.
#' \cr Using dispatcher: Always 1L as there is a single connection to
#' dispatcher, which connects to the daemons in turn.}
#' \item{\strong{daemons}} {- of variable type.
#' \cr Using dispatcher: a status matrix (see Status Matrix section below),
#' or else an integer 'errorValue' if communication with dispatcher failed.
#' \cr Not using dispatcher: the character host URL.
#' \cr Not set: 0L.}
#' }
#'
#' @section Status Matrix:
#'
#' When using dispatcher, \code{$daemons} comprises an integer matrix with
#' the following columns:
#' \itemize{
#' \item{\strong{i}} {- integer index number.}
#' \item{\strong{online}} {- shows as 1 when there is an active connection,
#' or else 0 if a daemon has yet to connect or has disconnected.}
#' \item{\strong{instance}} {- increments by 1 every time there is a new
#' connection at a URL. This counter is designed to track new daemon
#' instances connecting after previous ones have ended (due to time-outs
#' etc.). The count becomes negative immediately after a URL is regenerated
#' by \code{\link{saisei}}, but increments again once a new daemon connects.}
#' \item{\strong{assigned}} {- shows the cumulative number of tasks assigned
#' to the daemon.}
#' \item{\strong{complete}} {- shows the cumulative number of tasks
#' completed by the daemon.}
#' }
#' The dispatcher URLs are stored as row names to the matrix.
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' status()
#' daemons(n = 2L, url = "wss://[::1]:0")
#' status()
#' daemons(0)
#'
#' }
#'
#' @export
#'
status <- function(.compute = "default") {
envir <- ..[[.compute]]
sock <- envir[["sock"]]
list(connections = if (is.null(sock)) 0L else as.integer(stat(sock, "pipes")),
daemons = if (length(envir[["sockc"]])) query_status(envir) else if (length(envir[["urls"]])) envir[["urls"]] else 0L)
}
#' Launch Daemon
#'
#' \code{launch_local} spawns a new background \code{Rscript} process calling
#' \code{\link{daemon}} with the specified arguments. May be used to
#' re-launch daemons that have timed out on the local machine.
#'
#' @inheritParams saisei
#' @param url the character host URL or vector of host URLs, including the port
#' to connect to (and optionally for websockets, a path), e.g.
#' tcp://192.168.0.2:5555' or 'ws://192.168.0.2:5555/path'
#'
#' \strong{or} integer index value, or vector of index values, of the
#' dispatcher URLs, or 1L for the host URL (when not using dispatcher).
#' @param ... (optional) additional arguments passed to \code{\link{daemon}}
#' (see 'additional arguments' section below).
#' @param tls [default NULL] required for secure TLS connections over tls+tcp or
#' wss. Zero-configuration TLS certificates generated by \code{\link{daemons}}
#' are automatically passed to the daemon, without requiring to be specified
#' here. Otherwise, supply \strong{either} the character path to a file
#' containing X.509 certificate(s) in PEM format, comprising the certificate
#' authority certificate chain, \strong{or} a length 2 character vector
#' comprising [i] the certificate authority certificate chain and [ii] the
#' empty character ''.
#'
#' @return For \strong{launch_local}: Invisible NULL.
#'
#' @section Additional arguments:
#'
#' Additional arguments may be specified as part of '\code{...}' to be
#' passed on to \code{\link{daemon}}:
#'
#' \itemize{
#' \item{\strong{asyncdial}} {[default FALSE] whether to perform dials