Skip to content

Commit

Permalink
dispatch_connection: don't wrongly assume noexpire == udp
Browse files Browse the repository at this point in the history
Both aggregator and UDP connections have the noexpire bit set, but we
don't want to treat aggregator connections as all-in-one packet
connections.
So, instead of messing around with len (loosing the ability to say
something about whether work was done or not) and making some implicit
assumptions, introduce an isudp bit, which we can use instead to trigger
the incomplete message discard (6a17865).

This is likely the fix for issue #242, because it uses aggregations, and
the aggregator gets heavily blocked since all work the dispatchers did
were flagged as "no-work" leading to sleeps.
  • Loading branch information
grobian committed Jan 6, 2017
1 parent f485699 commit 730743b
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

* [Issue #239](https://github.com/grobian/carbon-c-relay/issues/239)
segfault when date format is incorrect
* [Issue #242](https://github.com/grobian/carbon-c-relay/issues/242)
dispatcher/aggregations broken (relay seems too slow)


# 2.4 (2017-01-03)
Expand Down
9 changes: 5 additions & 4 deletions dispatcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ typedef struct _connection {
unsigned int maxsenddelay;
char hadwork:1;
char isaggr:1;
char isudp:1;
} connection;

struct _dispatcher {
Expand Down Expand Up @@ -270,6 +271,7 @@ dispatch_addconnection(int sock)
connections[c].needmore = 0;
connections[c].noexpire = 0;
connections[c].isaggr = 0;
connections[c].isudp = 0;
connections[c].destlen = 0;
gettimeofday(&connections[c].lastwork, NULL);
connections[c].hadwork = 1; /* force first iteration before stalling */
Expand Down Expand Up @@ -316,6 +318,7 @@ dispatch_addlistener_udp(int sock)
return 1;

connections[conn].noexpire = 1;
connections[conn].isudp = 1;
acceptedconnections--;

return 0;
Expand Down Expand Up @@ -413,8 +416,6 @@ dispatch_connection(connection *conn, dispatcher *self, struct timeval start)
tracef("dispatcher %d, connfd %d, read %d bytes from socket\n",
self->id, conn->sock, len);
}
if (conn->noexpire)
len = -1; /* simulate EOF for UDP packet */

/* metrics look like this: metric_path value timestamp\n
* due to various messups we need to sanitise the
Expand Down Expand Up @@ -520,14 +521,14 @@ dispatch_connection(connection *conn, dispatcher *self, struct timeval start)
return 0;
}
}
if (len == -1 || len == 0) { /* error + EOF */
if (len == -1 || len == 0 || conn->isudp) { /* error + EOF */
/* we also disconnect the client in this case if our reading
* buffer is full, but we still need more (read returns 0 if the
* size argument is 0) -> this is good, because we can't do much
* with such client */

if (conn->noexpire) {
/* reset buffer only (UDP) and move on */
/* reset buffer only (UDP/aggregations) and move on */
conn->needmore = 1;
conn->buflen = 0;
__sync_bool_compare_and_swap(&(conn->takenby), self->id, 0);
Expand Down

0 comments on commit 730743b

Please sign in to comment.