diff --git a/cs/cli/vw_base.cpp b/cs/cli/vw_base.cpp index fa0b4f72d08..2d9e34d99f4 100644 --- a/cs/cli/vw_base.cpp +++ b/cs/cli/vw_base.cpp @@ -147,7 +147,6 @@ void VowpalWabbitBase::InternalDispose() try { if (m_vw != nullptr) { reset_source(*m_vw, m_vw->num_bits); - release_parser_datastructures(*m_vw); // make sure don't try to free m_vw twice in case VW::finish throws. vw* vw_tmp = m_vw; @@ -190,8 +189,6 @@ void VowpalWabbitBase::Reload([System::Runtime::InteropServices::Optional] Strin VW::save_predictor(*m_vw, mem_buf); mem_buf.flush(); - release_parser_datastructures(*m_vw); - // make sure don't try to free m_vw twice in case VW::finish throws. vw* vw_tmp = m_vw; m_vw = nullptr; diff --git a/vowpalwabbit/global_data.h b/vowpalwabbit/global_data.h index c5705688e6f..9a0eb9f0475 100644 --- a/vowpalwabbit/global_data.h +++ b/vowpalwabbit/global_data.h @@ -15,6 +15,17 @@ license as described in the file LICENSE. #include #include +// Thread cannot be used in managed C++, tell the compiler that this is unmanaged even if included in a managed project. +#ifdef _M_CEE +#pragma managed(push,off) +#undef _M_CEE +#include +#define _M_CEE 001 +#pragma managed(pop) +#else +#include +#endif + #include "v_array.h" #include "array_parameters.h" #include "parse_primitives.h" @@ -434,11 +445,8 @@ struct vw { shared_data* sd; parser* p; -#ifndef _WIN32 - pthread_t parse_thread; -#else - HANDLE parse_thread; -#endif + std::thread parse_thread; + AllReduceType all_reduce_type; AllReduce* all_reduce; diff --git a/vowpalwabbit/parse_args.cc b/vowpalwabbit/parse_args.cc index 816ddda46ac..ea612cc7c17 100644 --- a/vowpalwabbit/parse_args.cc +++ b/vowpalwabbit/parse_args.cc @@ -1693,7 +1693,7 @@ void finish(vw& all, bool delete_all) finalize_source(all.p); all.p->parse_name.clear(); all.p->parse_name.delete_v(); - free(all.p); + delete all.p; bool seeded; if (all.weights.seeded() > 0) seeded = true; diff --git a/vowpalwabbit/parse_primitives.h b/vowpalwabbit/parse_primitives.h index 23062109a41..2859be5b2c9 100644 --- a/vowpalwabbit/parse_primitives.h +++ b/vowpalwabbit/parse_primitives.h @@ -10,14 +10,9 @@ license as described in the file LICENSE. #include "v_array.h" #include "floatbits.h" -#ifdef _WIN32 -#include -#include -typedef CRITICAL_SECTION MUTEX; -typedef CONDITION_VARIABLE CV; -#else -typedef pthread_mutex_t MUTEX; -typedef pthread_cond_t CV; +#ifdef _WIN32 +#include +#include #endif struct substring diff --git a/vowpalwabbit/parser.cc b/vowpalwabbit/parser.cc index c83c20ee5c0..2ebafb74e4b 100644 --- a/vowpalwabbit/parser.cc +++ b/vowpalwabbit/parser.cc @@ -56,78 +56,6 @@ int getpid() using namespace std; -void initialize_mutex(MUTEX * pm) -{ -#ifndef _WIN32 - pthread_mutex_init(pm, nullptr); -#else - ::InitializeCriticalSection(pm); -#endif -} - -#ifndef _WIN32 -void delete_mutex(MUTEX *) { /* no operation necessary here*/ } -#else -void delete_mutex(MUTEX * pm) -{ - ::DeleteCriticalSection(pm); -} -#endif - -void initialize_condition_variable(CV * pcv) -{ -#ifndef _WIN32 - pthread_cond_init(pcv, nullptr); -#else - ::InitializeConditionVariable(pcv); -#endif -} - -void mutex_lock(MUTEX * pm) -{ -#ifndef _WIN32 - pthread_mutex_lock(pm); -#else - ::EnterCriticalSection(pm); -#endif -} - -void mutex_unlock(MUTEX * pm) -{ -#ifndef _WIN32 - pthread_mutex_unlock(pm); -#else - ::LeaveCriticalSection(pm); -#endif -} - -void condition_variable_wait(CV * pcv, MUTEX * pm) -{ -#ifndef _WIN32 - pthread_cond_wait(pcv, pm); -#else - ::SleepConditionVariableCS(pcv, pm, INFINITE); -#endif -} - -void condition_variable_signal(CV * pcv) -{ -#ifndef _WIN32 - pthread_cond_signal(pcv); -#else - ::WakeConditionVariable(pcv); -#endif -} - -void condition_variable_signal_all(CV * pcv) -{ -#ifndef _WIN32 - pthread_cond_broadcast(pcv); -#else - ::WakeAllConditionVariable(pcv); -#endif -} - //This should not? matter in a library mode. bool got_sigterm; @@ -147,7 +75,7 @@ bool is_test_only(uint32_t counter, uint32_t period, uint32_t after, bool holdou parser* new_parser() { - parser& ret = calloc_or_throw(); + auto& ret = *(new parser()); ret.input = new io_buf; ret.output = new io_buf; ret.local_example_number = 0; @@ -256,10 +184,11 @@ void reset_source(vw& all, size_t numbits) if (all.daemon) { // wait for all predictions to be sent back to client - mutex_lock(&all.p->output_lock); - while (all.p->local_example_number != all.p->end_parsed_examples) - condition_variable_wait(&all.p->output_done, &all.p->output_lock); - mutex_unlock(&all.p->output_lock); + { + std::unique_lock lock(all.p->output_lock); + all.p->output_done.wait(lock, + [&] { return all.p->local_example_number == all.p->end_parsed_examples; }); + } // close socket, erase final prediction sink and socket io_buf::close_file_or_socket(all.p->input->files[0]); @@ -668,11 +597,10 @@ void enable_sources(vw& all, bool quiet, size_t passes, input_options& input_opt void lock_done(parser& p) { - mutex_lock(&p.examples_lock); + std::lock_guard lock(p.examples_lock); p.done = true; //in case get_example() is waiting for a fresh example, wake so it can realize there are no more. - condition_variable_signal_all(&p.example_available); - mutex_unlock(&p.examples_lock); + p.example_available.notify_all(); } void set_done(vw& all) @@ -767,17 +695,15 @@ example& get_unused_example(vw* all) parser* p = all->p; while (true) { - mutex_lock(&p->examples_lock); + std::unique_lock lock(p->examples_lock); if (p->examples[p->begin_parsed_examples % p->ring_size].in_use == false) { example& ret = p->examples[p->begin_parsed_examples++ % p->ring_size]; ret.in_use = true; - mutex_unlock(&p->examples_lock); return ret; } else - condition_variable_wait(&p->example_unused, &p->examples_lock); - mutex_unlock(&p->examples_lock); + p->example_unused.wait(lock); } } @@ -983,13 +909,14 @@ void clean_example(vw& all, example& ec, bool rewind) empty_example(all, ec); - mutex_lock(&all.p->examples_lock); - assert(ec.in_use); - ec.in_use = false; - condition_variable_signal(&all.p->example_unused); - if (all.p->done) - condition_variable_signal_all(&all.p->example_available); - mutex_unlock(&all.p->examples_lock); + { + std::lock_guard lock(all.p->examples_lock); + assert(ec.in_use); + ec.in_use = false; + all.p->example_unused.notify_one(); + if (all.p->done) + all.p->example_available.notify_all(); + } } void finish_example(vw& all, multi_ex& ec_seq) @@ -1005,10 +932,11 @@ void finish_example(vw& all, example& ec) if (!is_ring_example(all, &ec)) return; - mutex_lock(&all.p->output_lock); - all.p->local_example_number++; - condition_variable_signal(&all.p->output_done); - mutex_unlock(&all.p->output_lock); + { + std::lock_guard lock(all.p->output_lock); + all.p->local_example_number++; + all.p->output_done.notify_one(); + } clean_example(all, ec, false); } @@ -1016,51 +944,43 @@ void finish_example(vw& all, example& ec) void thread_dispatch(vw& all, v_array examples) { - mutex_lock(&all.p->examples_lock); + std::lock_guard lock(all.p->examples_lock); all.p->end_parsed_examples+=examples.size(); - condition_variable_signal_all(&all.p->example_available); - mutex_unlock(&all.p->examples_lock); + all.p->example_available.notify_all(); } -#ifdef _WIN32 -DWORD WINAPI main_parse_loop(LPVOID in) -#else -void *main_parse_loop(void *in) -#endif +void main_parse_loop(vw* all) { - vw* all = (vw*)in; parse_dispatch(*all, thread_dispatch); - return 0L; } namespace VW { example* get_example(parser* p) { - mutex_lock(&p->examples_lock); - if (p->end_parsed_examples != p->used_index) - { - size_t ring_index = p->used_index++ % p->ring_size; - if (!(p->examples+ring_index)->in_use) - cout << "error: example should be in_use " << p->used_index << " " << p->end_parsed_examples << " " << ring_index << endl; - assert((p->examples+ring_index)->in_use); - mutex_unlock(&p->examples_lock); - return p->examples + ring_index; - } - else + std::unique_lock lock(p->examples_lock); + while (true) { - if (!p->done) + if (p->end_parsed_examples != p->used_index) { - condition_variable_wait(&p->example_available, &p->examples_lock); - mutex_unlock(&p->examples_lock); - return get_example(p); + size_t ring_index = p->used_index++ % p->ring_size; + if (!(p->examples + ring_index)->in_use) + cout << "error: example should be in_use " << p->used_index << " " << p->end_parsed_examples << " " << ring_index << endl; + assert((p->examples + ring_index)->in_use); + return p->examples + ring_index; } else { - mutex_unlock(&p->examples_lock); - return nullptr; + if (!p->done) + { + p->example_available.wait(lock); + } + else + { + return nullptr; + } } - } + } } float get_topic_prediction(example* ec, size_t i) @@ -1152,22 +1072,13 @@ void adjust_used_index(vw& all) void initialize_parser_datastructures(vw& all) { initialize_examples(all); - initialize_mutex(&all.p->examples_lock); - initialize_condition_variable(&all.p->example_available); - initialize_condition_variable(&all.p->example_unused); - initialize_mutex(&all.p->output_lock); - initialize_condition_variable(&all.p->output_done); } namespace VW { void start_parser(vw& all) { -#ifndef _WIN32 - pthread_create(&all.parse_thread, nullptr, main_parse_loop, &all); -#else - all.parse_thread = ::CreateThread(nullptr, 0, static_cast(main_parse_loop), &all, 0L, nullptr); -#endif + all.parse_thread = std::thread(main_parse_loop, &all); } } void free_parser(vw& all) @@ -1197,23 +1108,11 @@ void free_parser(vw& all) all.p->counts.delete_v(); } -void release_parser_datastructures(vw& all) -{ - delete_mutex(&all.p->examples_lock); - delete_mutex(&all.p->output_lock); -} - namespace VW { void end_parser(vw& all) { -#ifndef _WIN32 - pthread_join(all.parse_thread, nullptr); -#else - ::WaitForSingleObject(all.parse_thread, INFINITE); - ::CloseHandle(all.parse_thread); -#endif - release_parser_datastructures(all); + all.parse_thread.join(); } bool is_ring_example(vw& all, example* ae) diff --git a/vowpalwabbit/parser.h b/vowpalwabbit/parser.h index 350eb360bce..d9e74d1781d 100644 --- a/vowpalwabbit/parser.h +++ b/vowpalwabbit/parser.h @@ -8,6 +8,18 @@ license as described in the file LICENSE. #include "parse_primitives.h" #include "example.h" +// Mutex and CV cannot be used in managed C++, tell the compiler that this is unmanaged even if included in a managed project. +#ifdef _M_CEE +#pragma managed(push,off) +#undef _M_CEE +#include +#include +#define _M_CEE 001 +#pragma managed(pop) +#else +#include +#include +#endif struct vw; struct input_options; @@ -34,11 +46,15 @@ struct parser example* examples; uint64_t used_index; bool emptylines_separate_examples; // true if you want to have holdout computed on a per-block basis rather than a per-line basis - MUTEX examples_lock; - CV example_available; - CV example_unused; - MUTEX output_lock; - CV output_done; + + // Both example condition_variables use the same mutex. + // examples_lock protects: p->done, ec.in_use, p->begin_parsed_examples, p->end_parse_examples, and p->used_index. + std::mutex examples_lock; + std::condition_variable example_available; + std::condition_variable example_unused; + + std::mutex output_lock; + std::condition_variable output_done; bool done; v_array gram_mask; @@ -67,7 +83,6 @@ bool examples_to_finish(); //only call these from the library form: void initialize_parser_datastructures(vw& all); -void release_parser_datastructures(vw& all); void adjust_used_index(vw& all); //parser control diff --git a/vowpalwabbit/vwdll.cpp b/vowpalwabbit/vwdll.cpp index 76004043178..2bb00562f86 100644 --- a/vowpalwabbit/vwdll.cpp +++ b/vowpalwabbit/vwdll.cpp @@ -74,7 +74,6 @@ VW_DLL_MEMBER void VW_CALLING_CONV VW_Finish_Passes(VW_HANDLE handle) VW_DLL_MEMBER void VW_CALLING_CONV VW_Finish(VW_HANDLE handle) { vw * pointer = static_cast(handle); - release_parser_datastructures(*pointer); VW::finish(*pointer); }