-
Notifications
You must be signed in to change notification settings - Fork 15
/
interface.rs
1094 lines (943 loc) · 39.9 KB
/
interface.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//
// r_interface.rs
//
// Copyright (C) 2023 Posit Software, PBC. All rights reserved.
//
//
// All code in this file runs synchronously with R. We store the global
// state inside of a global `R_MAIN` singleton that implements `RMain`.
// The frontend methods called by R are forwarded to the corresponding
// `RMain` methods via `R_MAIN`.
use std::ffi::*;
use std::os::raw::c_uchar;
use std::os::raw::c_void;
use std::result::Result::Ok;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Once;
use std::time::Duration;
use std::time::SystemTime;
use amalthea::events::BusyEvent;
use amalthea::events::PositronEvent;
use amalthea::events::PromptStateEvent;
use amalthea::events::ShowMessageEvent;
use amalthea::socket::iopub::IOPubMessage;
use amalthea::wire::exception::Exception;
use amalthea::wire::execute_error::ExecuteError;
use amalthea::wire::execute_input::ExecuteInput;
use amalthea::wire::execute_reply::ExecuteReply;
use amalthea::wire::execute_reply_exception::ExecuteReplyException;
use amalthea::wire::execute_request::ExecuteRequest;
use amalthea::wire::execute_response::ExecuteResponse;
use amalthea::wire::execute_result::ExecuteResult;
use amalthea::wire::input_request::InputRequest;
use amalthea::wire::input_request::ShellInputRequest;
use amalthea::wire::jupyter_message::Status;
use amalthea::wire::originator::Originator;
use amalthea::wire::stream::Stream;
use amalthea::wire::stream::StreamOutput;
use anyhow::*;
use bus::Bus;
use crossbeam::channel::Receiver;
use crossbeam::channel::RecvTimeoutError;
use crossbeam::channel::Sender;
use harp::exec::r_source;
use harp::exec::RFunction;
use harp::exec::RFunctionExt;
use harp::interrupts::RInterruptsSuspendedScope;
use harp::lock::R_RUNTIME_LOCK;
use harp::lock::R_RUNTIME_LOCK_COUNT;
use harp::object::RObject;
use harp::r_lock;
use harp::r_symbol;
use harp::routines::r_register_routines;
use harp::session::r_poke_option_show_error_messages;
use harp::utils::r_get_option;
use harp::utils::r_is_data_frame;
use libR_sys::*;
use log::*;
use nix::sys::signal::*;
use parking_lot::ReentrantMutexGuard;
use serde_json::json;
use stdext::result::ResultOrLog;
use stdext::*;
use crate::dap::dap::DapBackendEvent;
use crate::dap::Dap;
use crate::errors;
use crate::help_proxy;
use crate::kernel::Kernel;
use crate::lsp::events::EVENTS;
use crate::modules;
use crate::plots::graphics_device;
use crate::request::debug_request_command;
use crate::request::RRequest;
extern "C" {
pub static mut R_running_as_main_program: ::std::os::raw::c_int;
pub static mut R_SignalHandlers: ::std::os::raw::c_int;
pub static mut R_Interactive: Rboolean;
pub static mut R_Consolefile: *mut FILE;
pub static mut R_Outputfile: *mut FILE;
pub static mut ptr_R_WriteConsole: ::std::option::Option<
unsafe extern "C" fn(arg1: *const ::std::os::raw::c_char, arg2: ::std::os::raw::c_int),
>;
pub static mut ptr_R_WriteConsoleEx: ::std::option::Option<
unsafe extern "C" fn(
arg1: *const ::std::os::raw::c_char,
arg2: ::std::os::raw::c_int,
arg3: ::std::os::raw::c_int,
),
>;
pub static mut ptr_R_ReadConsole: ::std::option::Option<
unsafe extern "C" fn(
arg1: *const ::std::os::raw::c_char,
arg2: *mut ::std::os::raw::c_uchar,
arg3: ::std::os::raw::c_int,
arg4: ::std::os::raw::c_int,
) -> ::std::os::raw::c_int,
>;
pub static mut ptr_R_ShowMessage:
::std::option::Option<unsafe extern "C" fn(arg1: *const ::std::os::raw::c_char)>;
pub static mut ptr_R_Busy:
::std::option::Option<unsafe extern "C" fn(arg1: ::std::os::raw::c_int)>;
pub fn R_HomeDir() -> *mut ::std::os::raw::c_char;
// NOTE: Some of these routines don't really return (or use) void pointers,
// but because we never introspect these values directly and they're always
// passed around in R as pointers, it suffices to just use void pointers.
fn R_checkActivity(usec: i32, ignore_stdin: i32) -> *const c_void;
fn R_runHandlers(handlers: *const c_void, fdset: *const c_void);
fn R_ProcessEvents();
fn run_Rmainloop();
pub static mut R_wait_usec: i32;
pub static mut R_InputHandlers: *const c_void;
pub static mut R_PolledEvents: Option<unsafe extern "C" fn()>;
}
// --- Globals ---
// These values must be global in order for them to be accessible from R
// callbacks, which do not have a facility for passing or returning context.
/// Ensures that the kernel is only ever initialized once
static INIT: Once = Once::new();
static INIT_KERNEL: Once = Once::new();
// The global state used by R callbacks.
//
// Doesn't need a mutex because it's only accessed by the R thread. Should
// not be used elsewhere than from an R frontend callback or an R function
// invoked by the REPL.
pub static mut R_MAIN: Option<RMain> = None;
/// Starts the main R thread. Doesn't return.
pub fn start_r(
r_args: Vec<String>,
startup_file: Option<String>,
kernel_mutex: Arc<Mutex<Kernel>>,
r_request_rx: Receiver<RRequest>,
input_request_tx: Sender<ShellInputRequest>,
iopub_tx: Sender<IOPubMessage>,
kernel_init_tx: Bus<KernelInfo>,
dap: Arc<Mutex<Dap>>,
) {
// Initialize global state (ensure we only do this once!)
INIT.call_once(|| unsafe {
R_MAIN = Some(RMain::new(
kernel_mutex,
r_request_rx,
input_request_tx,
iopub_tx,
kernel_init_tx,
dap,
));
});
unsafe {
// Build the argument list from the command line arguments. The default
// list is `--interactive` unless altered with the `--` passthrough
// argument.
let mut args = cargs!["ark"];
for arg in r_args {
args.push(CString::new(arg).unwrap().into_raw());
}
R_running_as_main_program = 1;
R_SignalHandlers = 0;
Rf_initialize_R(args.len() as i32, args.as_mut_ptr() as *mut *mut c_char);
// Initialize the interrupt handler.
RMain::initialize_signal_handlers();
// Disable stack checking; R doesn't know the starting point of the
// stack for threads other than the main thread. Consequently, it will
// report a stack overflow if we don't disable it. This is a problem
// on all platforms, but is most obvious on aarch64 Linux due to how
// thread stacks are allocated on that platform.
//
// See https://cran.r-project.org/doc/manuals/R-exts.html#Threading-issues
// for more information.
R_CStackLimit = usize::MAX;
// Log the value of R_HOME, so we can know if something hairy is afoot
let home = CStr::from_ptr(R_HomeDir());
trace!("R_HOME: {:?}", home);
// Mark R session as interactive
R_Interactive = 1;
// Redirect console
R_Consolefile = std::ptr::null_mut();
R_Outputfile = std::ptr::null_mut();
ptr_R_WriteConsole = None;
ptr_R_WriteConsoleEx = Some(r_write_console);
ptr_R_ReadConsole = Some(r_read_console);
ptr_R_ShowMessage = Some(r_show_message);
ptr_R_Busy = Some(r_busy);
// Listen for polled events
R_wait_usec = 10000;
R_PolledEvents = Some(r_polled_events);
// Set up main loop
setup_Rmainloop();
// Optionally run a user specified R startup script
if let Some(file) = &startup_file {
r_source(file).or_log_error(&format!("Failed to source startup file '{file}' due to"));
}
// Register embedded routines
r_register_routines();
// Initialize support functions (after routine registration)
let r_module_info = modules::initialize().unwrap();
// TODO: Should starting the R help server proxy really be here?
// Are we sure we want our own server when ark runs in a Jupyter notebook?
// Moving this requires detangling `help_server_port` from
// `modules::initialize()`, which seems doable.
// Start R help server proxy
help_proxy::start(r_module_info.help_server_port);
// Set up the global error handler (after support function initialization)
errors::initialize();
// Run the main loop -- does not return
run_Rmainloop();
}
}
pub struct RMain {
initializing: bool,
kernel_init_tx: Bus<KernelInfo>,
/// Execution requests from the frontend. Processed from `ReadConsole()`.
/// Requests for code execution provide input to that method.
r_request_rx: Receiver<RRequest>,
/// Input requests to the frontend. Processed from `ReadConsole()`
/// calls triggered by e.g. `readline()`.
input_request_tx: Sender<ShellInputRequest>,
/// IOPub channel for broadcasting outputs
iopub_tx: Sender<IOPubMessage>,
/// Active request passed to `ReadConsole()`. Contains response channel
/// the reply should be send to once computation has finished.
active_request: Option<ActiveReadConsoleRequest>,
/// Execution request counter used to populate `In[n]` and `Out[n]` prompts
execution_count: u32,
stdout: String,
stderr: String,
banner: String,
/// A lock guard, used to manage access to the R runtime. The main
/// thread holds the lock by default, but releases it at opportune
/// times to allow the LSP to access the R runtime where appropriate.
runtime_lock_guard: Option<ReentrantMutexGuard<'static, ()>>,
/// Shared reference to kernel. Currently used by the ark-execution
/// thread, the R frontend callbacks, and LSP routines called from R
pub kernel: Arc<Mutex<Kernel>>,
/// Represents whether an error occurred during R code execution.
pub error_occurred: bool,
pub error_message: String, // `evalue` in the Jupyter protocol
pub error_traceback: Vec<String>,
dap: Arc<Mutex<Dap>>,
is_debugging: bool,
/// The `show.error.messages` global option is set to `TRUE` whenever
/// we get in the browser. We save the previous value here and restore
/// it the next time we see a non-browser prompt.
old_show_error_messages: Option<bool>,
}
/// Represents the currently active execution request from the frontend. It
/// resolves at the next invocation of the `ReadConsole()` frontend method.
struct ActiveReadConsoleRequest {
exec_count: u32,
request: ExecuteRequest,
orig: Option<Originator>,
response_tx: Sender<ExecuteResponse>,
}
/// Represents kernel metadata (available after the kernel has fully started)
#[derive(Debug, Clone)]
pub struct KernelInfo {
pub version: String,
pub banner: String,
pub input_prompt: Option<String>,
pub continuation_prompt: Option<String>,
}
/// This struct represents the data that we wish R would pass to
/// `ReadConsole()` methods. We need this information to determine what kind
/// of prompt we are dealing with.
#[derive(Clone)]
pub struct PromptInfo {
/// The prompt string to be presented to the user. This does not
/// necessarily correspond to `getOption("prompt")`, for instance in
/// case of a browser prompt or a readline prompt.
input_prompt: String,
/// The continuation prompt string when user supplies incomplete
/// inputs. This always corresponds to `getOption("continue"). We send
/// it to frontends along with `prompt` because some frontends such as
/// Positron do not send incomplete inputs to Ark and take charge of
/// continuation prompts themselves.
continuation_prompt: String,
/// Whether this is a `browser()` prompt. A browser prompt can be
/// incomplete but is never a user request.
browser: bool,
/// Whether the last input didn't fully parse and R is waiting for more input
incomplete: bool,
/// Whether this is a prompt from a fresh REPL iteration (browser or
/// top level) or a prompt from some user code, e.g. via `readline()`
input_request: bool,
}
pub enum ConsoleInput {
EOF,
Input(String),
}
impl RMain {
pub fn new(
kernel: Arc<Mutex<Kernel>>,
r_request_rx: Receiver<RRequest>,
input_request_tx: Sender<ShellInputRequest>,
iopub_tx: Sender<IOPubMessage>,
kernel_init_tx: Bus<KernelInfo>,
dap: Arc<Mutex<Dap>>,
) -> Self {
// The main thread owns the R runtime lock by default, but releases
// it when appropriate to give other threads a chance to execute.
let lock_guard = unsafe { R_RUNTIME_LOCK.lock() };
Self {
initializing: true,
r_request_rx,
input_request_tx,
iopub_tx,
kernel_init_tx,
active_request: None,
execution_count: 0,
stdout: String::new(),
stderr: String::new(),
banner: String::new(),
runtime_lock_guard: Some(lock_guard),
kernel,
error_occurred: false,
error_message: String::new(),
error_traceback: Vec::new(),
dap,
is_debugging: false,
old_show_error_messages: None,
}
}
/// Completes the kernel's initialization
pub fn complete_initialization(&mut self, prompt_info: &PromptInfo) {
if self.initializing {
let version = unsafe {
let version = Rf_findVarInFrame(R_BaseNamespace, r_symbol!("R.version.string"));
RObject::new(version).to::<String>().unwrap()
};
let kernel_info = KernelInfo {
version: version.clone(),
banner: self.banner.clone(),
input_prompt: Some(prompt_info.input_prompt.clone()),
continuation_prompt: Some(prompt_info.continuation_prompt.clone()),
};
debug!("Sending kernel info: {}", version);
self.kernel_init_tx.broadcast(kernel_info);
self.initializing = false;
} else {
warn!("Initialization already complete!");
}
}
fn initialize_signal_handlers() {
// Reset the signal block.
//
// This appears to be necessary on macOS; 'sigprocmask()' specifically
// blocks the signals in _all_ threads associated with the process, even
// when called from a spawned child thread. See:
//
// https://github.com/opensource-apple/xnu/blob/0a798f6738bc1db01281fc08ae024145e84df927/bsd/kern/kern_sig.c#L1238-L1285
// https://github.com/opensource-apple/xnu/blob/0a798f6738bc1db01281fc08ae024145e84df927/bsd/kern/kern_sig.c#L796-L839
//
// and note that 'sigprocmask()' uses 'block_procsigmask()' to apply the
// requested block to all threads in the process:
//
// https://github.com/opensource-apple/xnu/blob/0a798f6738bc1db01281fc08ae024145e84df927/bsd/kern/kern_sig.c#L571-L599
//
// We may need to re-visit this on Linux later on, since 'sigprocmask()' and
// 'pthread_sigmask()' may only target the executing thread there.
//
// The behavior of 'sigprocmask()' is unspecified after all, so we're really
// just relying on what the implementation happens to do.
let mut sigset = SigSet::empty();
sigset.add(SIGINT);
sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), None).unwrap();
// Unblock signals on this thread.
pthread_sigmask(SigmaskHow::SIG_UNBLOCK, Some(&sigset), None).unwrap();
// Install an interrupt handler.
unsafe {
signal(SIGINT, SigHandler::Handler(handle_interrupt)).unwrap();
}
}
fn init_execute_request(&mut self, req: &ExecuteRequest) -> (ConsoleInput, u32) {
// Initialize stdout, stderr
self.stdout = String::new();
self.stderr = String::new();
// Increment counter if we are storing this execution in history
if req.store_history {
self.execution_count = self.execution_count + 1;
}
// If the code is not to be executed silently, re-broadcast the
// execution to all frontends
if !req.silent {
if let Err(err) = self.iopub_tx.send(IOPubMessage::ExecuteInput(ExecuteInput {
code: req.code.clone(),
execution_count: self.execution_count,
})) {
warn!(
"Could not broadcast execution input {} to all front ends: {}",
self.execution_count, err
);
}
}
// Return the code to the R console to be evaluated and the corresponding exec count
(ConsoleInput::Input(req.code.clone()), self.execution_count)
}
/// Invoked by R to read console input from the user.
///
/// * `prompt` - The prompt shown to the user
/// * `buf` - Pointer to buffer to receive the user's input (type `CONSOLE_BUFFER_CHAR`)
/// * `buflen` - Size of the buffer to receiver user's input
/// * `hist` - Whether to add the input to the history (1) or not (0)
///
/// Returns a tuple. First value is to be passed on to `ReadConsole()` and
/// indicates whether new input is available. Second value indicates whether
/// we need to call `Rf_onintr()` to process an interrupt.
fn read_console(
&mut self,
prompt: *const c_char,
buf: *mut c_uchar,
buflen: c_int,
_hist: c_int,
) -> (i32, bool) {
let info = Self::prompt_info(prompt);
debug!("R prompt: {}", info.input_prompt);
INIT_KERNEL.call_once(|| {
self.complete_initialization(&info);
trace!(
"Got initial R prompt '{}', ready for execution requests",
info.input_prompt
);
});
// TODO: Can we remove this below code?
// If the prompt begins with "Save workspace", respond with (n)
//
// NOTE: Should be able to overwrite the `Cleanup` frontend method.
// This would also help with detecting normal exits versus crashes.
if info.input_prompt.starts_with("Save workspace") {
let n = CString::new("n\n").unwrap();
unsafe {
libc::strcpy(buf as *mut c_char, n.as_ptr());
}
return (1, false);
}
// We got a prompt request marking the end of the previous
// execution. We can now send a reply to unblock the active Shell
// request.
if let Some(req) = &self.active_request {
// FIXME: The messages below are involved in a race between the
// StdIn and Shell threads and the messages might arrive out of
// order on the frontend side. This is generally well handled
// (top-level prompt is updated to become a user prompt) except
// if a response is sent very quickly before the
// `input_request` message arrives. In that case, the elements
// in the console are displayed out of order.
if info.input_request {
self.request_input(req, info.input_prompt.to_string());
}
// FIXME: Race condition between the comm and shell socket threads.
//
// Send info for the next prompt to frontend. This handles
// custom prompts set by users, e.g. `options(prompt = ,
// continue = )`, as well as debugging prompts, e.g. after a
// call to `browser()`.
if !info.input_request {
let event = PositronEvent::PromptState(PromptStateEvent {
input_prompt: info.input_prompt.clone(),
continuation_prompt: info.continuation_prompt.clone(),
});
let kernel = self.kernel.lock().unwrap();
kernel.send_event(event);
}
self.reply_execute_request(req, info.clone());
// Clear active request. This doesn't matter if we return here
// after receiving an `ExecuteCode` request (as
// `self.active_request` will be set to a fresh request), but
// we might also return here after an interrupt.
self.active_request = None;
}
// Signal prompt
EVENTS.console_prompt.emit(());
if info.browser {
// Calling handlers don't currently reach inside the
// debugger. So we temporarily reenable the
// `show.error.messages` option to let error messages
// stream to stderr.
if let None = self.old_show_error_messages {
let old = r_poke_option_show_error_messages(true);
self.old_show_error_messages = Some(old);
}
let mut dap = self.dap.lock().unwrap();
match harp::session::r_stack_info() {
Ok(stack) => {
self.is_debugging = true;
dap.start_debug(stack)
},
Err(err) => error!("ReadConsole: Can't get stack info: {err}"),
};
} else {
// We've left the `browser()` state, so we can disable the
// `show.error.messages` option again to let our global handler
// capture error messages as before.
if let Some(old) = self.old_show_error_messages {
r_poke_option_show_error_messages(old);
self.old_show_error_messages = None;
}
if self.is_debugging {
// Terminate debugging session
let mut dap = self.dap.lock().unwrap();
dap.stop_debug();
self.is_debugging = false;
}
}
// Match with a timeout. Necessary because we need to
// pump the event loop while waiting for console input.
//
// Alternatively, we could try to figure out the file
// descriptors that R has open and select() on those for
// available data?
loop {
// Release the R runtime lock while we're waiting for input.
self.runtime_lock_guard = None;
// FIXME: Race between interrupt and new code request. To fix
// this, we could manage the Shell and Control sockets on the
// common message event thread. The Control messages would need
// to be handled in a blocking way to ensure subscribers are
// notified before the next incoming message is processed.
// Wait for an execution request from the front end.
match self.r_request_rx.recv_timeout(Duration::from_millis(200)) {
Ok(req) => {
let input = match req {
RRequest::ExecuteCode(exec_req, orig, response_tx) => {
// Extract input from request
let (input, exec_count) = { self.init_execute_request(&exec_req) };
// Save `ExecuteCode` request so we can respond to it at next prompt
self.active_request = Some(ActiveReadConsoleRequest {
exec_count,
request: exec_req,
orig,
response_tx,
});
input
},
RRequest::Shutdown(_) => ConsoleInput::EOF,
RRequest::DebugCommand(cmd) => {
// Just ignore command in case we left the debugging state already
if !self.is_debugging {
continue;
}
// Translate requests from the debugger frontend to actual inputs for
// the debug interpreter
ConsoleInput::Input(debug_request_command(cmd))
},
};
// Take back the lock after we've received some console input.
unsafe { self.runtime_lock_guard = Some(R_RUNTIME_LOCK.lock()) };
if Self::process_interrupts(&info) {
return (0, true);
}
// Process events.
unsafe { Self::process_events() };
// Clear error flag
self.error_occurred = false;
match input {
ConsoleInput::Input(code) => {
// Handle commands for the debug interpreter
if self.is_debugging {
let continue_cmds = vec!["n", "f", "c", "cont"];
if continue_cmds.contains(&&code[..]) {
self.send_dap(DapBackendEvent::Continued);
}
}
Self::on_console_input(buf, buflen, code);
return (1, false);
},
ConsoleInput::EOF => return (0, false),
}
},
Err(err) => {
unsafe { self.runtime_lock_guard = Some(R_RUNTIME_LOCK.lock()) };
use RecvTimeoutError::*;
match err {
Timeout => {
if Self::process_interrupts(&info) {
return (0, true);
}
// Process events and keep waiting for console input.
unsafe { Self::process_events() };
continue;
},
Disconnected => {
return (1, false);
},
}
},
};
}
}
// We prefer to panic if there is an error while trying to determine the
// prompt type because any confusion here is prone to put the frontend in a
// bad state (e.g. causing freezes)
fn prompt_info(prompt_c: *const c_char) -> PromptInfo {
let n_frame = harp::session::r_n_frame().unwrap();
trace!("prompt_info(): n_frame = '{}'", n_frame);
let prompt_slice = unsafe { CStr::from_ptr(prompt_c) };
let prompt = prompt_slice.to_string_lossy().into_owned();
// Detect browser prompts by inspecting the `RDEBUG` flag of the
// last frame on the stack. This is not 100% infallible, for
// instance `debug(readline)` followed by `n` will instantiate a
// user request prompt that will look like a browser prompt
// according to this heuristic. However it has the advantage of
// correctly detecting that continue prompts are top-level browser
// prompts in case of incomplete inputs within `browser()`.
let frame = harp::session::r_sys_frame(n_frame).unwrap();
let browser = harp::session::r_env_is_browsed(frame).unwrap();
// If there are frames on the stack and we're not in a browser prompt,
// this means some user code is requesting input, e.g. via `readline()`
let user_request = !browser && n_frame > 0;
// The request is incomplete if we see the continue prompt, except if
// we're in a user request, e.g. `readline("+ ")`
let continuation_prompt = unsafe { r_get_option::<String>("continue").unwrap() };
let incomplete = !user_request && prompt == continuation_prompt;
if incomplete {
trace!("Got R prompt '{}', marking request incomplete", prompt);
} else if user_request {
trace!("Got R prompt '{}', asking user for input", prompt);
}
return PromptInfo {
input_prompt: prompt,
continuation_prompt,
browser,
incomplete,
input_request: user_request,
};
}
fn on_console_input(buf: *mut c_uchar, buflen: c_int, mut input: String) {
// TODO: What if the input is too large for the buffer?
input.push_str("\n");
if input.len() > buflen as usize {
info!("Error: input too large for buffer.");
return;
}
let src = CString::new(input).unwrap();
unsafe {
libc::strcpy(buf as *mut c_char, src.as_ptr());
}
}
// If we received an interrupt while the user was typing input, and we are
// at top level, we can assume the interrupt was 'handled' and so reset the
// flag. If on the other hand we are in a user request prompt,
// e.g. `readline()`, we need to propagate the interrupt to the R stack.
fn process_interrupts(prompt_info: &PromptInfo) -> bool {
unsafe {
if R_interrupts_suspended == 0 {
if R_interrupts_pending != 0 && prompt_info.input_request {
return true;
}
R_interrupts_pending = 0;
}
false
}
}
// Reply to the previously active request. The current prompt type and
// whether an error has occurred defines the response kind.
fn reply_execute_request(&self, req: &ActiveReadConsoleRequest, prompt_info: PromptInfo) {
let prompt = prompt_info.input_prompt;
let reply = if prompt_info.incomplete {
trace!("Got prompt {} signaling incomplete request", prompt);
new_incomplete_response(&req.request, req.exec_count)
} else if prompt_info.input_request {
trace!(
"Got input request for prompt {}, waiting for reply...",
prompt
);
new_execute_response(req.exec_count)
} else {
trace!("Got R prompt '{}', completing execution", prompt);
peek_execute_response(req.exec_count)
};
req.response_tx.send(reply).unwrap();
}
/// Request input from frontend in case code like `readline()` is
/// waiting for input
fn request_input(&self, req: &ActiveReadConsoleRequest, prompt: String) {
unwrap!(
self.input_request_tx
.send(ShellInputRequest {
originator: req.orig.clone(),
request: InputRequest {
prompt,
password: false,
},
}),
Err(err) => panic!("Could not send input request: {}", err)
)
}
/// Invoked by R to write output to the console.
fn write_console(&mut self, buf: *const c_char, _buflen: i32, otype: i32) {
let content = unsafe { CStr::from_ptr(buf).to_str().unwrap() };
let stream = if otype == 0 {
Stream::Stdout
} else {
Stream::Stderr
};
if self.initializing {
// During init, consider all output to be part of the startup banner
self.banner.push_str(content);
return;
}
let buffer = match stream {
Stream::Stdout => &mut self.stdout,
Stream::Stderr => &mut self.stderr,
};
// Append content to buffer.
buffer.push_str(content);
// Stream output via the IOPub channel.
let message = IOPubMessage::Stream(StreamOutput {
name: stream,
text: content.to_string(),
});
unwrap!(self.iopub_tx.send(message), Err(error) => {
log::error!("{}", error);
});
}
/// Invoked by R to change busy state
fn busy(&self, which: i32) {
// Ensure signal handlers are initialized.
//
// We perform this awkward dance because R tries to set and reset
// the interrupt signal handler here, using 'signal()':
//
// https://github.com/wch/r-source/blob/e7a21904029917a63b4717b53a173b01eeabcc7b/src/unix/sys-std.c#L171-L178
//
// However, it seems like this can cause the old interrupt handler to be
// 'moved' to a separate thread, such that interrupts end up being handled
// on a thread different from the R execution thread. At least, on macOS.
Self::initialize_signal_handlers();
// Create an event representing the new busy state
let event = PositronEvent::Busy(BusyEvent { busy: which != 0 });
// Wait for a lock on the kernel and have it deliver the event to
// the front end
let kernel = self.kernel.lock().unwrap();
kernel.send_event(event);
}
/// Invoked by R to show a message to the user.
fn show_message(&self, buf: *const c_char) {
let message = unsafe { CStr::from_ptr(buf) };
// Create an event representing the message
let event = PositronEvent::ShowMessage(ShowMessageEvent {
message: message.to_str().unwrap().to_string(),
});
// Wait for a lock on the kernel and have the kernel deliver the
// event to the front end
let kernel = self.kernel.lock().unwrap();
kernel.send_event(event);
}
/// Invoked by the R event loop
fn polled_events(&mut self) {
// Check for pending tasks.
let count = R_RUNTIME_LOCK_COUNT.load(std::sync::atomic::Ordering::Acquire);
if count == 0 {
return;
}
info!(
"{} thread(s) are waiting; the main thread is releasing the R runtime lock.",
count
);
let now = SystemTime::now();
// `bump()` does a fair unlock, giving other threads
// waiting for the lock a chance to acquire it, and then
// relocks it.
ReentrantMutexGuard::bump(self.runtime_lock_guard.as_mut().unwrap());
info!(
"The main thread re-acquired the R runtime lock after {} milliseconds.",
now.elapsed().unwrap().as_millis()
);
}
unsafe fn process_events() {
// Don't process interrupts in this scope.
let _interrupts_suspended = RInterruptsSuspendedScope::new();
// Process regular R events.
R_ProcessEvents();
// Run handlers if we have data available. This is necessary
// for things like the HTML help server, which will listen
// for requests on an open socket() which would then normally
// be handled in a select() call when reading input from stdin.
//
// https://github.com/wch/r-source/blob/4ca6439c1ffc76958592455c44d83f95d5854b2a/src/unix/sys-std.c#L1084-L1086
//
// We run this in a loop just to make sure the R help server can
// be as responsive as possible when rendering help pages.
let mut fdset = R_checkActivity(0, 1);
while fdset != std::ptr::null_mut() {
R_runHandlers(R_InputHandlers, fdset);
fdset = R_checkActivity(0, 1);
}
// Run pending finalizers. We need to do this eagerly as otherwise finalizers
// might end up being executed on the LSP thread.
// https://github.com/rstudio/positron/issues/431
R_RunPendingFinalizers();
// Check for Positron render requests
graphics_device::on_process_events();
}
fn send_dap(&self, event: DapBackendEvent) {
let dap = self.dap.lock().unwrap();
if let Some(tx) = &dap.backend_events_tx {
log_error!(tx.send(event));
}
}
}
/// Report an incomplete request to the front end
fn new_incomplete_response(req: &ExecuteRequest, exec_count: u32) -> ExecuteResponse {
ExecuteResponse::ReplyException(ExecuteReplyException {
status: Status::Error,
execution_count: exec_count,
exception: Exception {
ename: "IncompleteInput".to_string(),
evalue: format!("Code fragment is not complete: {}", req.code),
traceback: vec![],
},
})
}
// Gets response data from R state
fn peek_execute_response(exec_count: u32) -> ExecuteResponse {
let main = unsafe { R_MAIN.as_mut().unwrap() };
// Save and reset error occurred flag
let error_occurred = main.error_occurred;
main.error_occurred = false;
// Send the reply to the front end
if error_occurred {
// We don't fill out `ename` with anything meaningful because typically
// R errors don't have names. We could consider using the condition class
// here, which r-lib/tidyverse packages have been using more heavily.
let ename = String::from("");
let evalue = main.error_message.clone();
let traceback = main.error_traceback.clone();
log::info!("An R error occurred: {}", evalue);
let exception = Exception {
ename,
evalue,
traceback,
};
main.iopub_tx
.send(IOPubMessage::ExecuteError(ExecuteError {
exception: exception.clone(),
}))
.or_log_warning(&format!("Could not publish error {} on iopub", exec_count));
new_execute_error_response(exception, exec_count)
} else {
// TODO: Implement rich printing of certain outputs.
// Will we need something similar to the RStudio model,
// where we implement custom print() methods? Or can
// we make the stub below behave sensibly even when
// streaming R output?
let mut data = serde_json::Map::new();
data.insert("text/plain".to_string(), json!(""));
// Include HTML representation of data.frame
let value = r_lock! { Rf_findVarInFrame(R_GlobalEnv, r_symbol!(".Last.value")) };
if r_is_data_frame(value) {
match to_html(value) {
Ok(html) => data.insert("text/html".to_string(), json!(html)),
Err(error) => {
error!("{:?}", error);
None
},
};
}
main.iopub_tx
.send(IOPubMessage::ExecuteResult(ExecuteResult {
execution_count: exec_count,
data: serde_json::Value::Object(data),
metadata: json!({}),
}))
.or_log_warning(&format!(