Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PROTON-2818: Move epoll proctor connection logic to a task thread. #427

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions c/src/proactor/epoll-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ typedef struct pconnection_t {
bool server; /* accept, not connect */
bool tick_pending;
bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
bool first_schedule;
pn_condition_t *disconnect_condition;
// Following values only changed by (sole) working task:
uint32_t current_arm; // active epoll io events
Expand Down
57 changes: 36 additions & 21 deletions c/src/proactor/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con
pc->wbuf_current = NULL;
pc->hog_count = 0;
pc->batch.next_event = pconnection_batch_next;
pc->first_schedule = false;

if (server) {
pn_transport_set_server(pc->driver.transport);
Expand Down Expand Up @@ -1122,6 +1123,7 @@ static void write_flush(pconnection_t *pc) {

static void pconnection_connected_lh(pconnection_t *pc);
static void pconnection_maybe_connect_lh(pconnection_t *pc);
static bool pconnection_first_connect_lh(pconnection_t *pc);

static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_ready, bool topup) {
bool waking = false;
Expand All @@ -1139,6 +1141,17 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
}
if (sched_ready) schedule_done(&pc->task);

if (pc->first_schedule) {
pc->first_schedule = false;
assert(!topup && !events);
if (!pc->queued_disconnect) {
if (pconnection_first_connect_lh(pc)) {
unlock(&pc->task.mutex);
return NULL;
}
}
}

if (topup) {
// Only called by the batch owner. Does not loop, just "tops up"
// once. May be back depending on hog_count.
Expand Down Expand Up @@ -1396,6 +1409,7 @@ static void pconnection_maybe_connect_lh(pconnection_t *pc) {

int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
{
// NOTE: getaddrinfo can block on DNS lookup (PROTON-2812).
struct addrinfo hints = { 0 };
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
Expand All @@ -1416,7 +1430,27 @@ bool schedule_if_inactive(pn_proactor_t *p) {
return false;
}

// Call from pconnection_process with task lock held.
// Return true if the socket is connecting and there are no Proton events to deliver.
static bool pconnection_first_connect_lh(pconnection_t *pc) {
unlock(&pc->task.mutex);
// TODO: move this step to a separate worker thread that scales in response to multiple blocking DNS lookups.
int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
lock(&pc->task.mutex);

if (!gai_error) {
pc->ai = pc->addrinfo;
pconnection_maybe_connect_lh(pc); /* Start connection attempts */
if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect && !pni_task_wake_pending(&pc->task))
return true;
} else {
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
}
return false;
}

void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
// Called from an arbitrary thread. Do setup prior to getaddrinfo, then switch to a worker thread.
size_t addrlen = strlen(addr);
pconnection_t *pc = (pconnection_t*) malloc(sizeof(pconnection_t)+addrlen);
assert(pc); // TODO: memory safety
Expand All @@ -1430,27 +1464,8 @@ void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *
lock(&pc->task.mutex);
proactor_add(&pc->task);
pn_connection_open(pc->driver.connection); /* Auto-open */

bool notify = false;

if (pc->disconnected) {
notify = schedule(&pc->task); /* Error during initialization */
} else {
int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
if (!gai_error) {
pn_connection_open(pc->driver.connection); /* Auto-open */
pc->ai = pc->addrinfo;
pconnection_maybe_connect_lh(pc); /* Start connection attempts */
if (pc->disconnected) notify = schedule(&pc->task);
} else {
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
notify = schedule(&pc->task);
lock(&p->task.mutex);
notify |= schedule_if_inactive(p);
unlock(&p->task.mutex);
}
}
/* We need to issue INACTIVE on immediate failure */
pc->first_schedule = true; // Resume connection setup when next scheduled.
bool notify = schedule(&pc->task);
unlock(&pc->task.mutex);
if (notify) notify_poller(p);
}
Expand Down
69 changes: 47 additions & 22 deletions c/src/proactor/epoll_raw_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ struct praw_connection_t {
bool disconnected;
bool hup_detected;
bool read_check;
bool first_schedule;
char *taddr;
};

static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
Expand Down Expand Up @@ -145,6 +147,8 @@ static void praw_connection_init(praw_connection_t *prc, pn_proactor_t *p, pn_ra

prc->connected = false;
prc->disconnected = false;
prc->first_schedule = false;
prc->taddr = NULL;
prc->batch.next_event = pni_raw_batch_next;

pmutex_init(&prc->rearm_mutex);
Expand All @@ -163,6 +167,7 @@ static void praw_connection_cleanup(praw_connection_t *prc) {
task_finalize(&prc->task);
if (prc->addrinfo)
freeaddrinfo(prc->addrinfo);
free(prc->taddr);
free(prc);
}
// else proactor_disconnect logic owns prc and its final free
Expand All @@ -177,39 +182,48 @@ pn_raw_connection_t *pn_raw_connection(void) {
return &conn->raw_connection;
}

void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {
assert(rc);
praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
praw_connection_init(prc, p, rc);
// TODO: check case of proactor shutting down

lock(&prc->task.mutex);
proactor_add(&prc->task);

bool notify = false;

// Call from pconnection_process with task lock held.
// Return true if the socket is connecting and there are no Proton events to deliver.
static bool praw_connection_first_connect_lh(praw_connection_t *prc) {
const char *host;
const char *port;
size_t addrlen = strlen(addr);
char *addr_buf = (char*) alloca(addrlen+1);
pni_parse_addr(addr, addr_buf, addrlen+1, &host, &port);

unlock(&prc->task.mutex);
size_t addrlen = strlen(prc->taddr);
char *addr_buf = (char*) alloca(addrlen+1);
pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port);
// TODO: move this step to a separate worker thread that scales in response to multiple blocking DNS lookups.
int gai_error = pgetaddrinfo(host, port, 0, &prc->addrinfo);
lock(&prc->task.mutex);

if (!gai_error) {
prc->ai = prc->addrinfo;
praw_connection_maybe_connect_lh(prc); /* Start connection attempts */
if (prc->disconnected) notify = schedule(&prc->task);
if (prc->psocket.epoll_io.fd != -1 && !pni_task_wake_pending(&prc->task))
return true;
} else {
psocket_gai_error(prc, gai_error, "connect to ", addr);
prc->disconnected = true;
notify = schedule(&prc->task);
lock(&p->task.mutex);
notify |= schedule_if_inactive(p);
unlock(&p->task.mutex);
psocket_gai_error(prc, gai_error, "connect to ", prc->taddr);
}
return false;
}

/* We need to issue INACTIVE on immediate failure */
void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {
// Called from an arbitrary thread. Do setup prior to getaddrinfo, then switch to a worker thread.
assert(rc);
praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
praw_connection_init(prc, p, rc);
// TODO: check case of proactor shutting down

lock(&prc->task.mutex);
size_t addrlen = strlen(addr);
prc->taddr = (char*) malloc(addrlen+1);
assert(prc->taddr); // TODO: memory safety
memcpy(prc->taddr, addr, addrlen+1);
prc->first_schedule = true; // Resume connection setup when next scheduled.
proactor_add(&prc->task);
bool notify = schedule(&prc->task);
unlock(&prc->task.mutex);

if (notify) notify_poller(p);
}

Expand Down Expand Up @@ -394,6 +408,16 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
}
int events = io_events;
int fd = rc->psocket.epoll_io.fd;

if (rc->first_schedule) {
rc->first_schedule = false;
assert(!events); // No socket yet.
assert(!rc->connected);
if (praw_connection_first_connect_lh(rc)) {
unlock(&rc->task.mutex);
return NULL;
}
}
if (!rc->connected) {
if (events & (EPOLLHUP | EPOLLERR)) {
praw_connection_maybe_connect_lh(rc);
Expand All @@ -413,6 +437,7 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
}
if (events & EPOLLOUT)
praw_connection_connected_lh(rc);

unlock(&rc->task.mutex);
return &rc->batch;
}
Expand Down
1 change: 0 additions & 1 deletion c/tests/raw_wake_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ TEST_CASE("proactor_raw_connection_wake") {
pn_proactor_raw_connect(pn_listener_proactor(l), rc, addr.c_str());


REQUIRE_RUN(p, PN_LISTENER_ACCEPT);
REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
CHECK(pn_proactor_get(p) == NULL); /* idle */
Expand Down
Loading