Skip to content

Commit

Permalink
Keep transport thread signaling in PCM structure
Browse files Browse the repository at this point in the history
  • Loading branch information
arkq committed Dec 26, 2023
1 parent c36b944 commit 234819f
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 104 deletions.
66 changes: 62 additions & 4 deletions src/ba-transport-pcm.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ int transport_pcm_init(
pcm->fd = -1;
pcm->fd_bt = -1;
pcm->active = true;
pcm->pipe[0] = -1;
pcm->pipe[1] = -1;

/* link PCM and transport thread */
pcm->th = th;
Expand All @@ -89,6 +91,9 @@ int transport_pcm_init(
pthread_mutex_init(&pcm->client_mtx, NULL);
pthread_cond_init(&pcm->cond, NULL);

if (pipe(pcm->pipe) == -1)
return -1;

pcm->delay_adjustments = g_hash_table_new(NULL, NULL);

pcm->ba_dbus_path = g_strdup_printf("%s/%s/%s",
Expand All @@ -110,6 +115,11 @@ void transport_pcm_free(
pthread_mutex_destroy(&pcm->client_mtx);
pthread_cond_destroy(&pcm->cond);

if (pcm->pipe[0] != -1)
close(pcm->pipe[0]);
if (pcm->pipe[1] != -1)
close(pcm->pipe[1]);

g_hash_table_unref(pcm->delay_adjustments);
g_free(pcm->ba_dbus_path);

Expand Down Expand Up @@ -304,7 +314,7 @@ int ba_transport_pcm_pause(struct ba_transport_pcm *pcm) {
pcm->active = false;
pthread_mutex_unlock(&pcm->mutex);

return ba_transport_thread_signal_send(pcm->th, BA_TRANSPORT_THREAD_SIGNAL_PCM_PAUSE);
return ba_transport_pcm_signal_send(pcm, BA_TRANSPORT_PCM_SIGNAL_PAUSE);
}

int ba_transport_pcm_resume(struct ba_transport_pcm *pcm) {
Expand All @@ -314,7 +324,7 @@ int ba_transport_pcm_resume(struct ba_transport_pcm *pcm) {
pcm->active = true;
pthread_mutex_unlock(&pcm->mutex);

return ba_transport_thread_signal_send(pcm->th, BA_TRANSPORT_THREAD_SIGNAL_PCM_RESUME);
return ba_transport_pcm_signal_send(pcm, BA_TRANSPORT_PCM_SIGNAL_RESUME);
}

int ba_transport_pcm_drain(struct ba_transport_pcm *pcm) {
Expand All @@ -329,7 +339,7 @@ int ba_transport_pcm_drain(struct ba_transport_pcm *pcm) {
debug("PCM drain: %d", pcm->fd);

pcm->synced = false;
ba_transport_thread_signal_send(pcm->th, BA_TRANSPORT_THREAD_SIGNAL_PCM_SYNC);
ba_transport_pcm_signal_send(pcm, BA_TRANSPORT_PCM_SIGNAL_SYNC);

while (!pcm->synced)
pthread_cond_wait(&pcm->cond, &pcm->mutex);
Expand Down Expand Up @@ -361,13 +371,61 @@ int ba_transport_pcm_drop(struct ba_transport_pcm *pcm) {
if (io_pcm_flush(pcm) == -1)
return -1;

int rv = ba_transport_thread_signal_send(pcm->th, BA_TRANSPORT_THREAD_SIGNAL_PCM_DROP);
int rv = ba_transport_pcm_signal_send(pcm, BA_TRANSPORT_PCM_SIGNAL_DROP);
if (rv == -1 && errno == ESRCH)
rv = 0;

return rv;
}

int ba_transport_pcm_signal_send(
struct ba_transport_pcm *pcm,
enum ba_transport_pcm_signal signal) {

struct ba_transport_thread *th = pcm->th;
int ret = -1;

pthread_mutex_lock(&th->mutex);

if (th->state != BA_TRANSPORT_THREAD_STATE_RUNNING) {
errno = ESRCH;
goto fail;
}

if (write(pcm->pipe[1], &signal, sizeof(signal)) != sizeof(signal)) {
warn("Couldn't write transport PCM signal: %s", strerror(errno));
goto fail;
}

ret = 0;

fail:
pthread_mutex_unlock(&th->mutex);
return ret;
}

/**
* Receive signal sent by ba_transport_pcm_signal_send().
*
* @note
* In case of error, this function will return -1 instead of signal value. */
enum ba_transport_pcm_signal ba_transport_pcm_signal_recv(
struct ba_transport_pcm *pcm) {

enum ba_transport_pcm_signal signal;
ssize_t ret;

while ((ret = read(pcm->pipe[0], &signal, sizeof(signal))) == -1 &&
errno == EINTR)
continue;

if (ret == sizeof(signal))
return signal;

warn("Couldn't read transport PCM signal: %s", strerror(errno));
return -1;
}

bool ba_transport_pcm_is_active(const struct ba_transport_pcm *pcm) {
pthread_mutex_lock(MUTABLE(&pcm->mutex));
bool active = pcm->fd != -1 && pcm->active;
Expand Down
18 changes: 18 additions & 0 deletions src/ba-transport-pcm.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ struct ba_transport_pcm_volume {
double scale;
};

enum ba_transport_pcm_signal {
BA_TRANSPORT_PCM_SIGNAL_OPEN,
BA_TRANSPORT_PCM_SIGNAL_CLOSE,
BA_TRANSPORT_PCM_SIGNAL_PAUSE,
BA_TRANSPORT_PCM_SIGNAL_RESUME,
BA_TRANSPORT_PCM_SIGNAL_SYNC,
BA_TRANSPORT_PCM_SIGNAL_DROP,
};

struct ba_transport_thread;

struct ba_transport_pcm {
Expand Down Expand Up @@ -111,6 +120,9 @@ struct ba_transport_pcm {
/* new PCM client mutex */
pthread_mutex_t client_mtx;

/* notification PIPE */
int pipe[2];

/* exported PCM D-Bus API */
char *ba_dbus_path;
bool ba_dbus_exported;
Expand Down Expand Up @@ -152,6 +164,12 @@ int ba_transport_pcm_resume(struct ba_transport_pcm *pcm);
int ba_transport_pcm_drain(struct ba_transport_pcm *pcm);
int ba_transport_pcm_drop(struct ba_transport_pcm *pcm);

int ba_transport_pcm_signal_send(
struct ba_transport_pcm *pcm,
enum ba_transport_pcm_signal signal);
enum ba_transport_pcm_signal ba_transport_pcm_signal_recv(
struct ba_transport_pcm *pcm);

bool ba_transport_pcm_is_active(const struct ba_transport_pcm *pcm);

int ba_transport_pcm_volume_level_to_range(int value, int max);
Expand Down
51 changes: 0 additions & 51 deletions src/ba-transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,10 @@ static int transport_thread_init(

th->t = t;
th->state = BA_TRANSPORT_THREAD_STATE_TERMINATED;
th->pipe[0] = -1;
th->pipe[1] = -1;

pthread_mutex_init(&th->mutex, NULL);
pthread_cond_init(&th->cond, NULL);

if (pipe(th->pipe) == -1)
return -1;

return 0;
}

Expand Down Expand Up @@ -176,10 +171,6 @@ static void transport_thread_cancel(struct ba_transport_thread *th) {
* Release transport thread resources. */
static void transport_thread_free(
struct ba_transport_thread *th) {
if (th->pipe[0] != -1)
close(th->pipe[0]);
if (th->pipe[1] != -1)
close(th->pipe[1]);
pthread_mutex_destroy(&th->mutex);
pthread_cond_destroy(&th->cond);
}
Expand Down Expand Up @@ -272,48 +263,6 @@ int ba_transport_thread_state_wait(
return -1;
}

int ba_transport_thread_signal_send(
struct ba_transport_thread *th,
enum ba_transport_thread_signal signal) {

int ret = -1;

pthread_mutex_lock(&th->mutex);

if (th->state != BA_TRANSPORT_THREAD_STATE_RUNNING) {
errno = ESRCH;
goto fail;
}

if (write(th->pipe[1], &signal, sizeof(signal)) != sizeof(signal)) {
warn("Couldn't write transport thread signal: %s", strerror(errno));
goto fail;
}

ret = 0;

fail:
pthread_mutex_unlock(&th->mutex);
return ret;
}

int ba_transport_thread_signal_recv(
struct ba_transport_thread *th,
enum ba_transport_thread_signal *signal) {

ssize_t ret;
while ((ret = read(th->pipe[0], signal, sizeof(*signal))) == -1 &&
errno == EINTR)
continue;

if (ret == sizeof(*signal))
return 0;

warn("Couldn't read transport thread signal: %s", strerror(errno));
*signal = BA_TRANSPORT_THREAD_SIGNAL_PING;
return -1;
}

static void transport_threads_cancel(struct ba_transport *t) {

transport_thread_cancel(&t->thread_enc);
Expand Down
19 changes: 0 additions & 19 deletions src/ba-transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ enum ba_transport_thread_state {
BA_TRANSPORT_THREAD_STATE_TERMINATED,
};

enum ba_transport_thread_signal {
BA_TRANSPORT_THREAD_SIGNAL_PING,
BA_TRANSPORT_THREAD_SIGNAL_PCM_OPEN,
BA_TRANSPORT_THREAD_SIGNAL_PCM_CLOSE,
BA_TRANSPORT_THREAD_SIGNAL_PCM_PAUSE,
BA_TRANSPORT_THREAD_SIGNAL_PCM_RESUME,
BA_TRANSPORT_THREAD_SIGNAL_PCM_SYNC,
BA_TRANSPORT_THREAD_SIGNAL_PCM_DROP,
};

struct ba_transport_thread {

/* backward reference to transport */
Expand All @@ -66,8 +56,6 @@ struct ba_transport_thread {
pthread_t id;
/* indicates a master thread */
bool master;
/* notification PIPE */
int pipe[2];

};

Expand Down Expand Up @@ -102,13 +90,6 @@ int ba_transport_thread_state_wait(
#define ba_transport_thread_state_wait_terminated(th) \
ba_transport_thread_state_wait(th, BA_TRANSPORT_THREAD_STATE_TERMINATED)

int ba_transport_thread_signal_send(
struct ba_transport_thread *th,
enum ba_transport_thread_signal signal);
int ba_transport_thread_signal_recv(
struct ba_transport_thread *th,
enum ba_transport_thread_signal *signal);

enum ba_transport_thread_manager_command {
BA_TRANSPORT_THREAD_MANAGER_TERMINATE = 0,
BA_TRANSPORT_THREAD_MANAGER_CANCEL_THREADS,
Expand Down
4 changes: 2 additions & 2 deletions src/bluealsa-dbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ static gboolean bluealsa_pcm_controller(GIOChannel *ch, GIOCondition condition,
case G_IO_STATUS_EOF:
pthread_mutex_lock(&pcm->mutex);
ba_transport_pcm_release(pcm);
ba_transport_thread_signal_send(pcm->th, BA_TRANSPORT_THREAD_SIGNAL_PCM_CLOSE);
ba_transport_pcm_signal_send(pcm, BA_TRANSPORT_PCM_SIGNAL_CLOSE);
pthread_mutex_unlock(&pcm->mutex);
/* Check whether we've just closed the last PCM client and in
* such a case schedule transport IO threads termination. */
Expand Down Expand Up @@ -528,7 +528,7 @@ static void bluealsa_pcm_open(GDBusMethodInvocation *inv, void *userdata) {
g_io_channel_unref(ch);

/* notify our audio thread that the FIFO is ready */
ba_transport_thread_signal_send(th, BA_TRANSPORT_THREAD_SIGNAL_PCM_OPEN);
ba_transport_pcm_signal_send(pcm, BA_TRANSPORT_PCM_SIGNAL_OPEN);

int fds[2] = { pcm_fds[is_sink ? 1 : 0], pcm_fds[3] };
GUnixFDList *fd_list = g_unix_fd_list_new_from_array(fds, 2);
Expand Down
28 changes: 11 additions & 17 deletions src/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ ssize_t io_pcm_write(
return ret;
}

static enum ba_transport_thread_signal io_poll_signal_filter_none(
enum ba_transport_thread_signal signal,
static enum ba_transport_pcm_signal io_poll_signal_filter_none(
enum ba_transport_pcm_signal signal,
void *userdata) {
(void)userdata;
return signal;
Expand All @@ -282,9 +282,8 @@ ssize_t io_poll_and_read_bt(
void *buffer,
size_t count) {

struct ba_transport_thread *th = pcm->th;
struct pollfd fds[2] = {
{ th->pipe[0], POLLIN, 0 },
{ pcm->pipe[0], POLLIN, 0 },
{ pcm->fd_bt, POLLIN, 0 }};

repoll:
Expand All @@ -303,9 +302,7 @@ ssize_t io_poll_and_read_bt(
/* dispatch incoming event */
io_poll_signal_filter *filter = io->signal.filter != NULL ?
io->signal.filter : io_poll_signal_filter_none;
enum ba_transport_thread_signal signal;
ba_transport_thread_signal_recv(th, &signal);
switch (filter(signal, io->signal.userdata)) {
switch (filter(ba_transport_pcm_signal_recv(pcm), io->signal.userdata)) {
default:
goto repoll;
}
Expand All @@ -325,9 +322,8 @@ ssize_t io_poll_and_read_pcm(
void *buffer,
size_t samples) {

struct ba_transport_thread *th = pcm->th;
struct pollfd fds[2] = {
{ th->pipe[0], POLLIN, 0 },
{ pcm->pipe[0], POLLIN, 0 },
{ -1, POLLIN, 0 }};

repoll:
Expand Down Expand Up @@ -360,21 +356,19 @@ ssize_t io_poll_and_read_pcm(
/* dispatch incoming event */
io_poll_signal_filter *filter = io->signal.filter != NULL ?
io->signal.filter : io_poll_signal_filter_none;
enum ba_transport_thread_signal signal;
ba_transport_thread_signal_recv(th, &signal);
switch (filter(signal, io->signal.userdata)) {
case BA_TRANSPORT_THREAD_SIGNAL_PCM_OPEN:
case BA_TRANSPORT_THREAD_SIGNAL_PCM_RESUME:
switch (filter(ba_transport_pcm_signal_recv(pcm), io->signal.userdata)) {
case BA_TRANSPORT_PCM_SIGNAL_OPEN:
case BA_TRANSPORT_PCM_SIGNAL_RESUME:
io->asrs.frames = 0;
io->timeout = -1;
goto repoll;
case BA_TRANSPORT_THREAD_SIGNAL_PCM_CLOSE:
case BA_TRANSPORT_PCM_SIGNAL_CLOSE:
/* reuse PCM read disconnection logic */
break;
case BA_TRANSPORT_THREAD_SIGNAL_PCM_SYNC:
case BA_TRANSPORT_PCM_SIGNAL_SYNC:
io->timeout = 100;
goto repoll;
case BA_TRANSPORT_THREAD_SIGNAL_PCM_DROP:
case BA_TRANSPORT_PCM_SIGNAL_DROP:
/* Notify caller that the PCM FIFO has been dropped. This will give
* the caller a chance to reinitialize its internal state. */
errno = ESTALE;
Expand Down
4 changes: 2 additions & 2 deletions src/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

/**
* Callback function for thread signal filtering. */
typedef enum ba_transport_thread_signal io_poll_signal_filter(
enum ba_transport_thread_signal signal,
typedef enum ba_transport_pcm_signal io_poll_signal_filter(
enum ba_transport_pcm_signal signal,
void *userdata);

/**
Expand Down
Loading

0 comments on commit 234819f

Please sign in to comment.