Skip to content

Commit

Permalink
#8: replace busy polling with pthread_cond_wait
Browse files Browse the repository at this point in the history
  • Loading branch information
ballle98 committed Sep 14, 2020
1 parent 42586e5 commit 633fdff
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 40 deletions.
101 changes: 62 additions & 39 deletions aq_programmer.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ unsigned char _pgm_command = NUL;

bool _last_sent_was_cmd = false;

static pthread_mutex_t _pgm_command_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t _pgm_command_sent_cond = PTHREAD_COND_INITIALIZER;

// External view of adding to queue
void aq_send_cmd(unsigned char cmd) {
push_aq_cmd(cmd);
Expand Down Expand Up @@ -146,9 +149,12 @@ unsigned char pop_aq_cmd(struct aqualinkdata *aq_data)
if (in_programming_mode(aq_data) && ( in_ot_programming_mode(aq_data) == false && in_iaqt_programming_mode(aq_data) == false )) {
//if (aq_data->active_thread.thread_id != 0) {
if ( _pgm_command != NUL && aq_data->last_packet_type == CMD_STATUS) {
pthread_mutex_lock(&_pgm_command_mutex);
cmd = _pgm_command;
_pgm_command = NUL;
LOG(PROG_LOG, LOG_DEBUG_SERIAL, "RS SEND cmd '0x%02hhx' (programming)\n", cmd);
pthread_cond_signal(&_pgm_command_sent_cond);
pthread_mutex_unlock(&_pgm_command_mutex);
} else if (_pgm_command != NUL) {
LOG(PROG_LOG, LOG_DEBUG_SERIAL, "RS Waiting to send cmd '0x%02hhx' (programming)\n", _pgm_command);
} else {
Expand Down Expand Up @@ -900,39 +906,33 @@ void aq_programmer(program_type r_type, char *args, struct aqualinkdata *aq_data

void waitForSingleThreadOrTerminate(struct programmingThreadCtrl *threadCtrl, program_type type)
{
//static int tries = 120;
int tries = 120;
static int waitTime = 1;
int i=0;

i = 0;
while (get_aq_cmd_length() > 0 && ( i++ <= tries) ) {
LOG(PROG_LOG, LOG_DEBUG, "Thread %p (%s) sleeping, waiting command queue to empty\n", &threadCtrl->thread_id, ptypeName(type));
sleep(waitTime);
}
if (i >= tries) {
LOG(PROG_LOG, LOG_ERR, "Thread %p (%s) timeout waiting, ending\n",&threadCtrl->thread_id,ptypeName(type));
free(threadCtrl);
pthread_exit(0);
}
int ret = 0;
struct timespec max_wait;
clock_gettime(CLOCK_REALTIME, &max_wait);
max_wait.tv_sec += 30;

while ( (threadCtrl->aq_data->active_thread.thread_id != 0) && ( i++ <= tries) ) {
//LOG(PROG_LOG, LOG_DEBUG, "Thread %d sleeping, waiting for thread %d to finish\n", threadCtrl->thread_id, threadCtrl->aq_data->active_thread.thread_id);
LOG(PROG_LOG, LOG_DEBUG, "Thread %p (%s) sleeping, waiting for thread %p (%s) to finish\n",
&threadCtrl->thread_id, ptypeName(type),
threadCtrl->aq_data->active_thread.thread_id, ptypeName(threadCtrl->aq_data->active_thread.ptype));
sleep(waitTime);
}

if (i >= tries) {
//LOG(PROG_LOG, LOG_ERR, "Thread %d timeout waiting, ending\n",threadCtrl->thread_id);
LOG(PROG_LOG, LOG_ERR, "Thread %d,%p timeout waiting for thread %d,%p to finish\n",
type, &threadCtrl->thread_id, threadCtrl->aq_data->active_thread.ptype,
threadCtrl->aq_data->active_thread.thread_id);
free(threadCtrl);
pthread_exit(0);
}

pthread_mutex_lock(&threadCtrl->aq_data->mutex);
while (threadCtrl->aq_data->active_thread.thread_id != 0)
{
LOG(PROG_LOG, LOG_DEBUG, "Thread %d,%p (%s) sleeping, waiting for thread %d,%p (%s) to finish\n",
type, &threadCtrl->thread_id, ptypeName(type),
threadCtrl->aq_data->active_thread.ptype, threadCtrl->aq_data->active_thread.thread_id, ptypeName(threadCtrl->aq_data->active_thread.ptype));
if ((ret = pthread_cond_timedwait(&threadCtrl->aq_data->thread_finished_cond,
&threadCtrl->aq_data->mutex, &max_wait)))
{
LOG(PROG_LOG, LOG_ERR, "Thread %d,%p err %s waiting for thread %d,%p to finish\n",
type, &threadCtrl->thread_id, strerror(ret),
threadCtrl->aq_data->active_thread.ptype,
threadCtrl->aq_data->active_thread.thread_id);

if ((ret = pthread_mutex_unlock(&threadCtrl->aq_data->mutex)))
{
LOG(PROG_LOG, LOG_ERR, "waitForSingleThreadOrTerminate mutex unlock ret %s\n", strerror(ret));
}
free(threadCtrl);
pthread_exit(0);
}
}
// Clear out any messages to the UI.
threadCtrl->aq_data->last_display_message[0] = '\0';
threadCtrl->aq_data->active_thread.thread_id = &threadCtrl->thread_id;
Expand All @@ -948,10 +948,12 @@ void waitForSingleThreadOrTerminate(struct programmingThreadCtrl *threadCtrl, pr
threadCtrl->aq_data->active_thread.ptype,
threadCtrl->aq_data->active_thread.thread_id,
ptypeName(threadCtrl->aq_data->active_thread.ptype));
pthread_mutex_unlock(&threadCtrl->aq_data->mutex);
}

void cleanAndTerminateThread(struct programmingThreadCtrl *threadCtrl)
{
pthread_mutex_lock(&threadCtrl->aq_data->mutex);
#ifndef AQ_DEBUG
LOG(PROG_LOG, LOG_DEBUG, "Thread %d,%p (%s) finished\n",threadCtrl->aq_data->active_thread.ptype, threadCtrl->thread_id,ptypeName(threadCtrl->aq_data->active_thread.ptype));
#else
Expand All @@ -965,10 +967,11 @@ void cleanAndTerminateThread(struct programmingThreadCtrl *threadCtrl)
elapsed.tv_sec, elapsed.tv_nsec / 1000000L);
#endif

// Quick delay to allow for last message to be sent.
delay(500);
threadCtrl->aq_data->active_thread.thread_id = 0;
threadCtrl->aq_data->active_thread.ptype = AQP_NULL;
pthread_cond_signal(&threadCtrl->aq_data->thread_finished_cond);
pthread_mutex_unlock(&threadCtrl->aq_data->mutex);

threadCtrl->thread_id = 0;
// Force update, change display message
threadCtrl->aq_data->updated = true;
Expand Down Expand Up @@ -1996,14 +1999,34 @@ void longwaitfor_queue2empty()
_waitfor_queue2empty(true);
}

void send_cmd(unsigned char cmd)
bool send_cmd(unsigned char cmd)
{
waitfor_queue2empty();

_pgm_command = cmd;
//delay(200);
bool ret=true;
int pret = 0;
struct timespec max_wait;

clock_gettime(CLOCK_REALTIME, &max_wait);
max_wait.tv_sec += 5;

pthread_mutex_lock(&_pgm_command_mutex);
_pgm_command = cmd;
LOG(PROG_LOG, LOG_INFO, "Queue send '0x%02hhx' to controller (programming)\n", _pgm_command);
while (_pgm_command != NUL)
{
if ((pret = pthread_cond_timedwait(&_pgm_command_sent_cond,
&_pgm_command_mutex, &max_wait)))
{
LOG(PROG_LOG, LOG_ERR, "send_cmd 0x%02hhx err %s\n",
cmd, strerror(pret));
ret = false;
break;
}
}
if (ret) {
LOG(PROG_LOG, LOG_INFO, "sent '0x%02hhx' to controller\n", _pgm_command);
}
pthread_mutex_unlock(&_pgm_command_mutex);
return ret;
}

/*
Expand Down
2 changes: 1 addition & 1 deletion aq_programmer.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ const char *ptypeName(program_type type);
const char *programtypeDisplayName(program_type type);

// These shouldn't be here, but just for the PDA AQ PROGRAMMER
void send_cmd(unsigned char cmd);
bool send_cmd(unsigned char cmd);
bool push_aq_cmd(unsigned char cmd);
void waitForSingleThreadOrTerminate(struct programmingThreadCtrl *threadCtrl, program_type type);
void cleanAndTerminateThread(struct programmingThreadCtrl *threadCtrl);
Expand Down
2 changes: 2 additions & 0 deletions aqualink.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ struct aqualinkdata
struct timespec last_active_time;
struct timespec start_active_time;
#endif
pthread_mutex_t mutex;
pthread_cond_t thread_finished_cond;
};


Expand Down
3 changes: 3 additions & 0 deletions aqualinkd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,9 @@ void main_loop()
_aqualink_data.swg_ppm = 0;
}

pthread_mutex_init(&_aqualink_data.mutex, NULL);
pthread_cond_init(&_aqualink_data.thread_finished_cond, NULL);

if (!start_net_services(&mgr, &_aqualink_data))
{
LOG(AQUA_LOG,LOG_ERR, "Can not start webserver on port %s.\n", _aqconfig_.socket_port);
Expand Down

0 comments on commit 633fdff

Please sign in to comment.