Skip to content

Commit

Permalink
Make workers spawn rather than fork (5 of 5)
Browse files Browse the repository at this point in the history
Summary:
gabelevi (aka gabelevi) has split
facebook/flow#1184 up into multiple diffs
for to make testing and reviewing easier.

This is the last piece of the pull request and is one of the scariest bits. It
makes the workers use `Daemon.spawn` instead of fork. Now suddenly the workers
don't have access to the global state they had before!

To get all the tests passing I had to make sure to save and restore logging
state.

Reviewed By: int3

Differential Revision: D2796468

Pulled By: gabelevi

fbshipit-source-id: 73eb9dac0431360148dc520601c53702f5598dfb
  • Loading branch information
hnrgrgr authored and Hhvm Bot committed May 24, 2016
1 parent 0230153 commit b80c3da
Show file tree
Hide file tree
Showing 28 changed files with 664 additions and 458 deletions.
2 changes: 1 addition & 1 deletion hphp/hack/src/build.ocp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ begin library "hh-find"
end

begin library "hh-heap"
requires += [ "hh-third-party" "hh-utils" "hh-stubs" ]
requires += [ "hh-third-party" "hh-utils" "hh-stubs" "hh-globals" ]

files = [
"heap/prefix.ml"
Expand Down
52 changes: 37 additions & 15 deletions hphp/hack/src/heap/hh_shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ static uintptr_t early_counter = 1;
static char** heap;

/* Useful to add assertions */
static pid_t master_pid;
static pid_t* master_pid;
static pid_t my_pid;

/* Where the heap started (bottom) */
Expand Down Expand Up @@ -539,9 +539,12 @@ static void define_globals(char * shared_mem_init) {
assert (CACHE_LINE_SIZE >= sizeof(uintptr_t));
counter = (uintptr_t*)(mem + 2*CACHE_LINE_SIZE);

assert (CACHE_LINE_SIZE >= sizeof(pid_t));
master_pid = (pid_t*)(mem + 3*CACHE_LINE_SIZE);

mem += page_size;
// Just checking that the page is large enough.
assert(page_size > 2*CACHE_LINE_SIZE + (int)sizeof(int));
assert(page_size > 3*CACHE_LINE_SIZE + (int)sizeof(int));
/* END OF THE SMALL OBJECTS PAGE */

/* Global storage initialization */
Expand Down Expand Up @@ -615,11 +618,11 @@ CAMLprim value hh_shared_init(

// Keeping the pids around to make asserts.
#ifdef _WIN32
master_pid = 0;
my_pid = master_pid;
*master_pid = 0;
my_pid = *master_pid;
#else
master_pid = getpid();
my_pid = master_pid;
*master_pid = getpid();
my_pid = *master_pid;
#endif

#ifdef MADV_DONTDUMP
Expand Down Expand Up @@ -664,8 +667,8 @@ void hh_shared_reset() {
}

/* Must be called by every worker before any operation is performed */
value hh_worker_init(value connector) {
CAMLparam1(connector);
value hh_connect(value connector, value is_master) {
CAMLparam2(connector, is_master);
memfd = Handle_val(Field(connector, 0));
global_size_b = Long_val(Field(connector, 1));
heap_size = Long_val(Field(connector, 2));
Expand All @@ -676,6 +679,12 @@ value hh_worker_init(value connector) {
#endif
char *shared_mem_init = memfd_map(get_shared_mem_size());
define_globals(shared_mem_init);

if (Bool_val(is_master)) {
fprintf(stderr, "Reconnecting as master %d", my_pid);
*master_pid = my_pid;
}

CAMLreturn(Val_unit);
}

Expand Down Expand Up @@ -706,6 +715,18 @@ CAMLprim value hh_counter_next(void) {
CAMLreturn(result);
}

/*****************************************************************************/
/* There are a bunch of operations that only the designated master thread is
* allowed to do. This assert will fail if the current process is not the master
* process
*/
/*****************************************************************************/
void assert_master() {
assert(my_pid == *master_pid);
}

/*****************************************************************************/

/*****************************************************************************/
/* Global storage */
/*****************************************************************************/
Expand All @@ -714,7 +735,7 @@ void hh_shared_store(value data) {
CAMLparam1(data);
size_t size = caml_string_length(data);

assert(my_pid == master_pid); // only the master can store
assert_master(); // only the master can store
assert(global_storage[0] == 0); // Is it clear?
assert(size < global_size_b - sizeof(value)); // Do we have enough space?

Expand Down Expand Up @@ -746,7 +767,7 @@ CAMLprim value hh_shared_load(void) {
}

void hh_shared_clear(void) {
assert(my_pid == master_pid);
assert_master();
global_storage[0] = 0;
}

Expand Down Expand Up @@ -958,7 +979,7 @@ void hh_collect(value aggressive_val) {

tmp_heap = temp_memory_map();
dest = tmp_heap;
assert(my_pid == master_pid); // Comes from the master
assert_master(); // Comes from the master

// Walking the table
size_t i;
Expand Down Expand Up @@ -1115,6 +1136,7 @@ static unsigned int find_slot(value key) {
*/
/*****************************************************************************/
value hh_mem(value key) {
CAMLparam1(key);
unsigned int slot = find_slot(key);
if(hashtbl[slot].hash == get_hash(key) &&
hashtbl[slot].addr != NULL) {
Expand All @@ -1127,9 +1149,9 @@ value hh_mem(value key) {
asm volatile("pause" : : : "memory");
#endif
}
return Val_bool(1);
CAMLreturn(Val_bool(1));
}
return Val_bool(0);
CAMLreturn(Val_bool(0));
}

/*****************************************************************************/
Expand Down Expand Up @@ -1159,7 +1181,7 @@ void hh_move(value key1, value key2) {
unsigned int slot1 = find_slot(key1);
unsigned int slot2 = find_slot(key2);

assert(my_pid == master_pid);
assert_master();
assert(hashtbl[slot1].hash == get_hash(key1));
assert(hashtbl[slot2].addr == NULL);
hashtbl[slot2].hash = get_hash(key2);
Expand All @@ -1175,7 +1197,7 @@ void hh_move(value key1, value key2) {
void hh_remove(value key) {
unsigned int slot = find_slot(key);

assert(my_pid == master_pid);
assert_master();
assert(hashtbl[slot].hash == get_hash(key));
hashtbl[slot].addr = NULL;
}
Expand Down
16 changes: 4 additions & 12 deletions hphp/hack/src/heap/sharedMem.ml
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,16 @@ external hh_shared_init
: global_size:int -> heap_size:int -> shm_dir:string -> handle
= "hh_shared_init"

let handle = ref None

let init config =
handle := Some (hh_shared_init
hh_shared_init
~global_size:config.global_size
~heap_size:config.heap_size
~shm_dir:config.shm_dir)
~shm_dir:config.shm_dir

let init_default () =
let init_default () : handle =
init default_config

external hh_worker_init : handle -> unit = "hh_worker_init"

let connect () =
match !handle with
| Some handle -> hh_worker_init handle
| None ->
failwith "Worker tried to connect before SharedMem was initialized!"
external connect : handle -> is_master:bool -> unit = "hh_connect"

external reset: unit -> unit = "hh_shared_reset"

Expand Down
12 changes: 9 additions & 3 deletions hphp/hack/src/heap/sharedMem.mli
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,26 @@ type config = {

val default_config : config

type handle = private {
h_fd: Unix.file_descr;
h_global_size: int;
h_heap_size: int;
}

exception Out_of_shared_memory

(*****************************************************************************)
(* Initializes the shared memory. Must be called before forking! *)
(*****************************************************************************)

val init: config -> unit
val init_default: unit -> unit
val init: config -> handle
val init_default: unit -> handle

(*****************************************************************************)
(* Connect a slave to the shared heap *)
(*****************************************************************************)

val connect: unit -> unit
val connect: handle -> is_master:bool -> unit

(*****************************************************************************)
(* Resets the initialized and used memory to the state right after
Expand Down
14 changes: 7 additions & 7 deletions hphp/hack/src/hh_format.ml
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ let debug () fnl =
()
end

let debug_directory dir =
let debug_directory ~handle dir =
let path = Path.make dir in
let next = compose
(List.map ~f:Path.make)
(Find.make_next_files ~filter:FindUtils.is_php path) in
let workers = Worker.make GlobalConfig.nbr_procs GlobalConfig.gc_control in
let workers = ServerWorker.make GlobalConfig.gc_control handle in
MultiWorker.call
(Some workers)
~job:debug
Expand Down Expand Up @@ -215,12 +215,12 @@ let job_in_place modes acc fnl =
| Some err -> err :: acc
end ~init:acc

let directory modes dir =
let directory modes ~handle dir =
let path = Path.make dir in
let next = compose
(List.map ~f:Path.make)
(Find.make_next_files ~filter:FindUtils.is_php path) in
let workers = Worker.make GlobalConfig.nbr_procs GlobalConfig.gc_control in
let workers = ServerWorker.make GlobalConfig.gc_control handle in
let messages =
MultiWorker.call
(Some workers)
Expand Down Expand Up @@ -274,7 +274,7 @@ let format_stdin modes from to_ =
(*****************************************************************************)

let () =
let _handle = SharedMem.init_default () in
let handle = SharedMem.init_default () in
PidLog.log_oc := Some (open_out (Path.to_string Path.null_path));
let files, from, to_, apply_mode, debug, diff, modes, root, test =
parse_args() in
Expand Down Expand Up @@ -302,8 +302,8 @@ let () =
| [] -> format_stdin modes from to_
| [dir] when Sys.is_directory dir ->
if debug
then debug_directory dir
else directory modes dir
then debug_directory ~handle dir
else directory modes ~handle dir
| [filename] ->
let filepath = Path.make filename in
(match apply_mode with
Expand Down
10 changes: 5 additions & 5 deletions hphp/hack/src/hh_match.ml
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ let hh_match_job

(* Handle a directory by bucketing the contained php files and sending
them off to worker threads *)
let directory dir fn =
let directory ~handle dir fn =
let path = Path.make dir in
let next = Utils.compose
(List.map ~f:Path.make)
(Find.make_next_files ~filter:FindUtils.is_php path) in
let workers = Worker.make GlobalConfig.nbr_procs GlobalConfig.gc_control in
let workers = ServerWorker.make GlobalConfig.gc_control handle in
(* Information for the patcher to figure out what transformations to do *)
let fileschanged =
MultiWorker.call
Expand Down Expand Up @@ -294,7 +294,7 @@ let match_job pat_info acc fnl =
(*****************************************************************************)

let () =
let _handle = SharedMem.init_default () in
let handle = SharedMem.init_default () in
PidLog.log_oc := Some (open_out Path.(to_string null_path));
let files, pattern, target, format_patches, verbose, showpatch, expr_mode,
stmt_mode = parse_args() in
Expand All @@ -307,7 +307,7 @@ let () =
match files with
| [dir] when Sys.is_directory dir ->
let pat_info = (preproc_match pattern), verbose, expr_mode, stmt_mode in
directory dir (match_job pat_info)
directory ~handle dir (match_job pat_info)
| [filename] ->
let filepath = Path.make filename in
let pat_info = (preproc_match pattern), verbose, expr_mode, stmt_mode in
Expand Down Expand Up @@ -339,7 +339,7 @@ let () =
else
match files with
| [dir] when Sys.is_directory dir ->
directory dir (patch_job pat_info)
directory ~handle dir (patch_job pat_info)
| [filename] ->
let filepath = Path.make filename in
ignore(patch_file pat_info filepath)
Expand Down
2 changes: 1 addition & 1 deletion hphp/hack/src/monitor/serverMonitor.ml
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ let start_monitoring ~waiting_client monitor_config monitor_starter =
* can be notified when the monitor socket is ready. The FD number was passed
* in program args. *)
Option.iter waiting_client begin fun fd ->
let oc = Handle.to_out_channel fd in
let oc = Unix.out_channel_of_descr fd in
try
output_string oc (ServerMonitorUtils.ready^"\n");
close_out oc;
Expand Down
2 changes: 1 addition & 1 deletion hphp/hack/src/monitor/serverMonitor.mli
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
*)

val start_monitoring:
waiting_client:int option ->
waiting_client:Unix.file_descr option ->
ServerMonitorUtils.monitor_config ->
ServerMonitorUtils.monitor_starter -> 'a
Loading

0 comments on commit b80c3da

Please sign in to comment.