Skip to content

Commit

Permalink
Move to std types for concurrency (VowpalWabbit#1731)
Browse files Browse the repository at this point in the history
* change to std datatypes for multithreading. Still need to fix usage in CLR mode, should probably use pimpl pattern

* Fix concurrency classes for managed cpp

* unwind recursive call and fix deadlock

* Address comments

* Add comment

* Add back required undef and define
  • Loading branch information
jackgerrits authored and Borislav Nikolov committed Mar 7, 2019
1 parent 7f3fe88 commit 08d4ab1
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 170 deletions.
3 changes: 0 additions & 3 deletions cs/cli/vw_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ void VowpalWabbitBase::InternalDispose()
try
{ if (m_vw != nullptr)
{ reset_source(*m_vw, m_vw->num_bits);
release_parser_datastructures(*m_vw);

// make sure don't try to free m_vw twice in case VW::finish throws.
vw* vw_tmp = m_vw;
Expand Down Expand Up @@ -190,8 +189,6 @@ void VowpalWabbitBase::Reload([System::Runtime::InteropServices::Optional] Strin
VW::save_predictor(*m_vw, mem_buf);
mem_buf.flush();

release_parser_datastructures(*m_vw);

// make sure don't try to free m_vw twice in case VW::finish throws.
vw* vw_tmp = m_vw;
m_vw = nullptr;
Expand Down
18 changes: 13 additions & 5 deletions vowpalwabbit/global_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ license as described in the file LICENSE.
#include <inttypes.h>
#include <climits>

// Thread cannot be used in managed C++, tell the compiler that this is unmanaged even if included in a managed project.
#ifdef _M_CEE
#pragma managed(push,off)
#undef _M_CEE
#include <thread>
#define _M_CEE 001
#pragma managed(pop)
#else
#include <thread>
#endif

#include "v_array.h"
#include "array_parameters.h"
#include "parse_primitives.h"
Expand Down Expand Up @@ -434,11 +445,8 @@ struct vw
{ shared_data* sd;

parser* p;
#ifndef _WIN32
pthread_t parse_thread;
#else
HANDLE parse_thread;
#endif
std::thread parse_thread;

AllReduceType all_reduce_type;
AllReduce* all_reduce;

Expand Down
2 changes: 1 addition & 1 deletion vowpalwabbit/parse_args.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1693,7 +1693,7 @@ void finish(vw& all, bool delete_all)
finalize_source(all.p);
all.p->parse_name.clear();
all.p->parse_name.delete_v();
free(all.p);
delete all.p;
bool seeded;
if (all.weights.seeded() > 0)
seeded = true;
Expand Down
11 changes: 3 additions & 8 deletions vowpalwabbit/parse_primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,9 @@ license as described in the file LICENSE.
#include "v_array.h"
#include "floatbits.h"

#ifdef _WIN32
#include <WinSock2.h>
#include <Windows.h>
typedef CRITICAL_SECTION MUTEX;
typedef CONDITION_VARIABLE CV;
#else
typedef pthread_mutex_t MUTEX;
typedef pthread_cond_t CV;
#ifdef _WIN32
#include <WinSock2.h>
#include <Windows.h>
#endif

struct substring
Expand Down
191 changes: 45 additions & 146 deletions vowpalwabbit/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,78 +56,6 @@ int getpid()

using namespace std;

void initialize_mutex(MUTEX * pm)
{
#ifndef _WIN32
pthread_mutex_init(pm, nullptr);
#else
::InitializeCriticalSection(pm);
#endif
}

#ifndef _WIN32
void delete_mutex(MUTEX *) { /* no operation necessary here*/ }
#else
void delete_mutex(MUTEX * pm)
{
::DeleteCriticalSection(pm);
}
#endif

void initialize_condition_variable(CV * pcv)
{
#ifndef _WIN32
pthread_cond_init(pcv, nullptr);
#else
::InitializeConditionVariable(pcv);
#endif
}

void mutex_lock(MUTEX * pm)
{
#ifndef _WIN32
pthread_mutex_lock(pm);
#else
::EnterCriticalSection(pm);
#endif
}

void mutex_unlock(MUTEX * pm)
{
#ifndef _WIN32
pthread_mutex_unlock(pm);
#else
::LeaveCriticalSection(pm);
#endif
}

void condition_variable_wait(CV * pcv, MUTEX * pm)
{
#ifndef _WIN32
pthread_cond_wait(pcv, pm);
#else
::SleepConditionVariableCS(pcv, pm, INFINITE);
#endif
}

void condition_variable_signal(CV * pcv)
{
#ifndef _WIN32
pthread_cond_signal(pcv);
#else
::WakeConditionVariable(pcv);
#endif
}

void condition_variable_signal_all(CV * pcv)
{
#ifndef _WIN32
pthread_cond_broadcast(pcv);
#else
::WakeAllConditionVariable(pcv);
#endif
}

//This should not? matter in a library mode.
bool got_sigterm;

Expand All @@ -147,7 +75,7 @@ bool is_test_only(uint32_t counter, uint32_t period, uint32_t after, bool holdou

parser* new_parser()
{
parser& ret = calloc_or_throw<parser>();
auto& ret = *(new parser());
ret.input = new io_buf;
ret.output = new io_buf;
ret.local_example_number = 0;
Expand Down Expand Up @@ -256,10 +184,11 @@ void reset_source(vw& all, size_t numbits)
if (all.daemon)
{
// wait for all predictions to be sent back to client
mutex_lock(&all.p->output_lock);
while (all.p->local_example_number != all.p->end_parsed_examples)
condition_variable_wait(&all.p->output_done, &all.p->output_lock);
mutex_unlock(&all.p->output_lock);
{
std::unique_lock<std::mutex> lock(all.p->output_lock);
all.p->output_done.wait(lock,
[&] { return all.p->local_example_number == all.p->end_parsed_examples; });
}

// close socket, erase final prediction sink and socket
io_buf::close_file_or_socket(all.p->input->files[0]);
Expand Down Expand Up @@ -668,11 +597,10 @@ void enable_sources(vw& all, bool quiet, size_t passes, input_options& input_opt

void lock_done(parser& p)
{
mutex_lock(&p.examples_lock);
std::lock_guard<std::mutex> lock(p.examples_lock);
p.done = true;
//in case get_example() is waiting for a fresh example, wake so it can realize there are no more.
condition_variable_signal_all(&p.example_available);
mutex_unlock(&p.examples_lock);
p.example_available.notify_all();
}

void set_done(vw& all)
Expand Down Expand Up @@ -767,17 +695,15 @@ example& get_unused_example(vw* all)
parser* p = all->p;
while (true)
{
mutex_lock(&p->examples_lock);
std::unique_lock<std::mutex> lock(p->examples_lock);
if (p->examples[p->begin_parsed_examples % p->ring_size].in_use == false)
{
example& ret = p->examples[p->begin_parsed_examples++ % p->ring_size];
ret.in_use = true;
mutex_unlock(&p->examples_lock);
return ret;
}
else
condition_variable_wait(&p->example_unused, &p->examples_lock);
mutex_unlock(&p->examples_lock);
p->example_unused.wait(lock);
}
}

Expand Down Expand Up @@ -983,13 +909,14 @@ void clean_example(vw& all, example& ec, bool rewind)

empty_example(all, ec);

mutex_lock(&all.p->examples_lock);
assert(ec.in_use);
ec.in_use = false;
condition_variable_signal(&all.p->example_unused);
if (all.p->done)
condition_variable_signal_all(&all.p->example_available);
mutex_unlock(&all.p->examples_lock);
{
std::lock_guard<std::mutex> lock(all.p->examples_lock);
assert(ec.in_use);
ec.in_use = false;
all.p->example_unused.notify_one();
if (all.p->done)
all.p->example_available.notify_all();
}
}

void finish_example(vw& all, multi_ex& ec_seq)
Expand All @@ -1005,62 +932,55 @@ void finish_example(vw& all, example& ec)
if (!is_ring_example(all, &ec))
return;

mutex_lock(&all.p->output_lock);
all.p->local_example_number++;
condition_variable_signal(&all.p->output_done);
mutex_unlock(&all.p->output_lock);
{
std::lock_guard<std::mutex> lock(all.p->output_lock);
all.p->local_example_number++;
all.p->output_done.notify_one();
}

clean_example(all, ec, false);
}
}

void thread_dispatch(vw& all, v_array<example*> examples)
{
mutex_lock(&all.p->examples_lock);
std::lock_guard<std::mutex> lock(all.p->examples_lock);
all.p->end_parsed_examples+=examples.size();
condition_variable_signal_all(&all.p->example_available);
mutex_unlock(&all.p->examples_lock);
all.p->example_available.notify_all();
}

#ifdef _WIN32
DWORD WINAPI main_parse_loop(LPVOID in)
#else
void *main_parse_loop(void *in)
#endif
void main_parse_loop(vw* all)
{
vw* all = (vw*)in;
parse_dispatch(*all, thread_dispatch);
return 0L;
}

namespace VW
{
example* get_example(parser* p)
{
mutex_lock(&p->examples_lock);
if (p->end_parsed_examples != p->used_index)
{
size_t ring_index = p->used_index++ % p->ring_size;
if (!(p->examples+ring_index)->in_use)
cout << "error: example should be in_use " << p->used_index << " " << p->end_parsed_examples << " " << ring_index << endl;
assert((p->examples+ring_index)->in_use);
mutex_unlock(&p->examples_lock);
return p->examples + ring_index;
}
else
std::unique_lock<std::mutex> lock(p->examples_lock);
while (true)
{
if (!p->done)
if (p->end_parsed_examples != p->used_index)
{
condition_variable_wait(&p->example_available, &p->examples_lock);
mutex_unlock(&p->examples_lock);
return get_example(p);
size_t ring_index = p->used_index++ % p->ring_size;
if (!(p->examples + ring_index)->in_use)
cout << "error: example should be in_use " << p->used_index << " " << p->end_parsed_examples << " " << ring_index << endl;
assert((p->examples + ring_index)->in_use);
return p->examples + ring_index;
}
else
{
mutex_unlock(&p->examples_lock);
return nullptr;
if (!p->done)
{
p->example_available.wait(lock);
}
else
{
return nullptr;
}
}
}
}
}

float get_topic_prediction(example* ec, size_t i)
Expand Down Expand Up @@ -1152,22 +1072,13 @@ void adjust_used_index(vw& all)
void initialize_parser_datastructures(vw& all)
{
initialize_examples(all);
initialize_mutex(&all.p->examples_lock);
initialize_condition_variable(&all.p->example_available);
initialize_condition_variable(&all.p->example_unused);
initialize_mutex(&all.p->output_lock);
initialize_condition_variable(&all.p->output_done);
}

namespace VW
{
void start_parser(vw& all)
{
#ifndef _WIN32
pthread_create(&all.parse_thread, nullptr, main_parse_loop, &all);
#else
all.parse_thread = ::CreateThread(nullptr, 0, static_cast<LPTHREAD_START_ROUTINE>(main_parse_loop), &all, 0L, nullptr);
#endif
all.parse_thread = std::thread(main_parse_loop, &all);
}
}
void free_parser(vw& all)
Expand Down Expand Up @@ -1197,23 +1108,11 @@ void free_parser(vw& all)
all.p->counts.delete_v();
}

void release_parser_datastructures(vw& all)
{
delete_mutex(&all.p->examples_lock);
delete_mutex(&all.p->output_lock);
}

namespace VW
{
void end_parser(vw& all)
{
#ifndef _WIN32
pthread_join(all.parse_thread, nullptr);
#else
::WaitForSingleObject(all.parse_thread, INFINITE);
::CloseHandle(all.parse_thread);
#endif
release_parser_datastructures(all);
all.parse_thread.join();
}

bool is_ring_example(vw& all, example* ae)
Expand Down
Loading

0 comments on commit 08d4ab1

Please sign in to comment.