Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PROTON-2818: Move epoll proctor connection logic to a task thread. #427

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions c/src/proactor/epoll-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ typedef struct pconnection_t {
bool server; /* accept, not connect */
bool tick_pending;
bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
bool first_schedule;
pn_condition_t *disconnect_condition;
// Following values only changed by (sole) working task:
uint32_t current_arm; // active epoll io events
Expand Down
57 changes: 36 additions & 21 deletions c/src/proactor/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con
pc->wbuf_current = NULL;
pc->hog_count = 0;
pc->batch.next_event = pconnection_batch_next;
pc->first_schedule = false;

if (server) {
pn_transport_set_server(pc->driver.transport);
Expand Down Expand Up @@ -1122,6 +1123,7 @@ static void write_flush(pconnection_t *pc) {

static void pconnection_connected_lh(pconnection_t *pc);
static void pconnection_maybe_connect_lh(pconnection_t *pc);
static void pconnection_first_connect_lh(pconnection_t *pc);

static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_ready, bool topup) {
bool waking = false;
Expand All @@ -1139,6 +1141,21 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
}
if (sched_ready) schedule_done(&pc->task);

if (pc->first_schedule) {
// Normal case: resumed logic from pn_proactor_connect2.
// But possible tie: pn_connection_wake() or pn_proactor_disconnect().
Copy link
Member

@astitcher astitcher Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo? "possibility of"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a typo but obviously not clear. While the original logic is preserved as much as possible, the dropping of the task lock and context switch allows competitor threads that were not possible prior to this change. Either of those two calls are possible from an arbitrary thread between the setting of first_schedule and arriving at this code.

A comment which doesn't make sense on its own is obviously not helpful. I will try to rework the comments and code structure for clarity on their own.

// Respect the latter. Check former after connect attempt.
pc->first_schedule = false;
assert(!topup && !events);
if (!pc->queued_disconnect) {
pconnection_first_connect_lh(pc); // Drops and reaquires lock. Wake or disconnect state may change.
if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect && !pni_task_wake_pending(&pc->task)) {
unlock(&pc->task.mutex);
return NULL;
}
}
}

if (topup) {
// Only called by the batch owner. Does not loop, just "tops up"
// once. May be back depending on hog_count.
Expand Down Expand Up @@ -1396,6 +1413,7 @@ static void pconnection_maybe_connect_lh(pconnection_t *pc) {

int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
{
// NOTE: getaddrinfo can block on DNS lookup (PROTON-2812).
struct addrinfo hints = { 0 };
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
Expand All @@ -1416,7 +1434,23 @@ bool schedule_if_inactive(pn_proactor_t *p) {
return false;
}

// Call from pconnection_process with task lock held.
static void pconnection_first_connect_lh(pconnection_t *pc) {
unlock(&pc->task.mutex);
// TODO: move this step to a separate worker thread that scales in response to multiple blocking DNS lookups.
int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
lock(&pc->task.mutex);

if (!gai_error) {
pc->ai = pc->addrinfo;
pconnection_maybe_connect_lh(pc); /* Start connection attempts */
} else {
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
}
}

void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
// Called from an arbitrary thread. Do setup prior to getaddrinfo, then switch to a worker thread.
size_t addrlen = strlen(addr);
pconnection_t *pc = (pconnection_t*) malloc(sizeof(pconnection_t)+addrlen);
assert(pc); // TODO: memory safety
Expand All @@ -1430,27 +1464,8 @@ void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *
lock(&pc->task.mutex);
proactor_add(&pc->task);
pn_connection_open(pc->driver.connection); /* Auto-open */

bool notify = false;

if (pc->disconnected) {
notify = schedule(&pc->task); /* Error during initialization */
} else {
int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
if (!gai_error) {
pn_connection_open(pc->driver.connection); /* Auto-open */
pc->ai = pc->addrinfo;
pconnection_maybe_connect_lh(pc); /* Start connection attempts */
if (pc->disconnected) notify = schedule(&pc->task);
} else {
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
notify = schedule(&pc->task);
lock(&p->task.mutex);
notify |= schedule_if_inactive(p);
unlock(&p->task.mutex);
}
}
/* We need to issue INACTIVE on immediate failure */
pc->first_schedule = true; // Resume connection setup when next scheduled.
bool notify = schedule(&pc->task);
unlock(&pc->task.mutex);
if (notify) notify_poller(p);
}
Expand Down
66 changes: 44 additions & 22 deletions c/src/proactor/epoll_raw_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ struct praw_connection_t {
bool disconnected;
bool hup_detected;
bool read_check;
bool first_schedule;
char *taddr;
};

static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
Expand Down Expand Up @@ -145,6 +147,8 @@ static void praw_connection_init(praw_connection_t *prc, pn_proactor_t *p, pn_ra

prc->connected = false;
prc->disconnected = false;
prc->first_schedule = false;
prc->taddr = NULL;
prc->batch.next_event = pni_raw_batch_next;

pmutex_init(&prc->rearm_mutex);
Expand All @@ -163,6 +167,7 @@ static void praw_connection_cleanup(praw_connection_t *prc) {
task_finalize(&prc->task);
if (prc->addrinfo)
freeaddrinfo(prc->addrinfo);
free(prc->taddr);
free(prc);
}
// else proactor_disconnect logic owns prc and its final free
Expand All @@ -177,39 +182,43 @@ pn_raw_connection_t *pn_raw_connection(void) {
return &conn->raw_connection;
}

void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {
assert(rc);
praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
praw_connection_init(prc, p, rc);
// TODO: check case of proactor shutting down

lock(&prc->task.mutex);
proactor_add(&prc->task);

bool notify = false;

// Call from pconnection_process with task lock held.
static void praw_connection_first_connect_lh(praw_connection_t *prc) {
const char *host;
const char *port;
size_t addrlen = strlen(addr);
char *addr_buf = (char*) alloca(addrlen+1);
pni_parse_addr(addr, addr_buf, addrlen+1, &host, &port);

unlock(&prc->task.mutex);
size_t addrlen = strlen(prc->taddr);
char *addr_buf = (char*) alloca(addrlen+1);
pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port);
int gai_error = pgetaddrinfo(host, port, 0, &prc->addrinfo);
lock(&prc->task.mutex);

if (!gai_error) {
prc->ai = prc->addrinfo;
praw_connection_maybe_connect_lh(prc); /* Start connection attempts */
if (prc->disconnected) notify = schedule(&prc->task);
} else {
psocket_gai_error(prc, gai_error, "connect to ", addr);
prc->disconnected = true;
notify = schedule(&prc->task);
lock(&p->task.mutex);
notify |= schedule_if_inactive(p);
unlock(&p->task.mutex);
psocket_gai_error(prc, gai_error, "connect to ", prc->taddr);
}
}

void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {
// Called from an arbitrary thread. Do setup prior to getaddrinfo, then switch to a worker thread.
assert(rc);
praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
praw_connection_init(prc, p, rc);
// TODO: check case of proactor shutting down

/* We need to issue INACTIVE on immediate failure */
lock(&prc->task.mutex);
size_t addrlen = strlen(addr);
prc->taddr = (char*) malloc(addrlen+1);
assert(prc->taddr); // TODO: memory safety
memcpy(prc->taddr, addr, addrlen+1);
prc->first_schedule = true; // Resume connection setup when next scheduled.
proactor_add(&prc->task);
bool notify = schedule(&prc->task);
unlock(&prc->task.mutex);

if (notify) notify_poller(p);
}

Expand Down Expand Up @@ -413,6 +422,19 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
}
if (events & EPOLLOUT)
praw_connection_connected_lh(rc);
if (rc->first_schedule) {
// Normal case: resumed logic from pn_proactor_raw_connect.
// But possible tie: pn_raw_connection_wake()
// Defer wake check until getaddrinfo is done.
rc->first_schedule = false;
assert(!events); // No socket yet.
praw_connection_first_connect_lh(rc); // Drops and reacquires lock.
if (rc->psocket.epoll_io.fd != -1 && !pni_task_wake_pending(&rc->task)) {
unlock(&rc->task.mutex);
return NULL;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this code should be the first piece of code in the possibilities, as it is the first that should happen; currently the logi cto kick off the connect is first; but now the logic to do the lookup must be earlier in the lifecycle of the connection so for clarity it should be the first condition in the sequence (unless the semantics mean that this doesn't work for some reason).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully addressed in the reworked version.

Agreed about the lifecycle state issue. This proposed fix strives for the minimal code logic changes to move the blocking activity to a different thread. The subsequent "real fix" for the parent JIRA will necessarily introduce a new state (presumably with early cancel option compared to the current blocked-until-done). The initiating of the getaddrinfo call will also presumably be sensibly moved back to the pn_xxx_connect call to avoid a pointless thread switch and the first_call boolean will have no purpose.


unlock(&rc->task.mutex);
return &rc->batch;
}
Expand Down
1 change: 0 additions & 1 deletion c/tests/raw_wake_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ TEST_CASE("proactor_raw_connection_wake") {
pn_proactor_raw_connect(pn_listener_proactor(l), rc, addr.c_str());


REQUIRE_RUN(p, PN_LISTENER_ACCEPT);
REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
CHECK(pn_proactor_get(p) == NULL); /* idle */
Expand Down