Skip to content

Commit

Permalink
server: allow specifying max number of stalls, issue #172
Browse files Browse the repository at this point in the history
For some scenarios, stalling may be undesirable, or just in a different
amount than the hardwired default.  Therefore, allow to control the
number of stalls before dropping metrics.  The setting 0 is allowed,
disabling stalls.
  • Loading branch information
grobian committed May 8, 2016
1 parent c858635 commit 4b40175
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 11 deletions.
3 changes: 3 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
### New Features
* **router** `include` directive was added to add content of another
file, see also [Issue #165](https://github.com/grobian/carbon-c-relay/issues/165). The include can also use glob patterns, see [Pull #174](https://github.com/grobian/carbon-c-relay/pull/174)
* **server** the number of stalls performed on writes can now be
controlled (and also disabled) using the `-L` flag.
[Issue #172](https://github.com/grobian/carbon-c-relay/issues/172)

### Breaking Changes

Expand Down
19 changes: 15 additions & 4 deletions relay.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ unsigned char mode = 0;
static char *config = NULL;
static int batchsize = 2500;
static int queuesize = 25000;
static int maxstalls = 4;
static unsigned short iotimeout = 600;
static dispatcher **workers = NULL;
static char workercnt = 0;
Expand Down Expand Up @@ -165,7 +166,7 @@ hup_handler(int sig)

logout("reloading config from '%s'\n", config);
if ((newrtr = router_readconfig(NULL, config,
queuesize, batchsize, iotimeout)) == NULL)
queuesize, batchsize, maxstalls, iotimeout)) == NULL)
{
logerr("failed to read configuration '%s', aborting reload\n", config);
return;
Expand Down Expand Up @@ -289,6 +290,7 @@ do_usage(int exitcode)
printf(" -w use <workers> worker threads, defaults to %d\n", get_cores());
printf(" -b server send batch size, defaults to 2500\n");
printf(" -q server queue size, defaults to 25000\n");
printf(" -L server max stalls, defaults to %d\n", maxstalls);
printf(" -S statistics sending interval in seconds, defaults to 60\n");
printf(" -B connection listen backlog, defaults to 3\n");
printf(" -T IO timeout in milliseconds for server connections, defaults to 600\n");
Expand Down Expand Up @@ -328,7 +330,7 @@ main(int argc, char * const argv[])
if (gethostname(relay_hostname, sizeof(relay_hostname)) < 0)
snprintf(relay_hostname, sizeof(relay_hostname), "127.0.0.1");

while ((ch = getopt(argc, argv, ":hvdmstf:i:l:p:w:b:q:S:T:c:H:B:DP:")) != -1) {
while ((ch = getopt(argc, argv, ":hvdmstf:i:l:p:w:b:q:L:S:T:c:H:B:DP:")) != -1) {
switch (ch) {
case 'v':
do_version();
Expand Down Expand Up @@ -382,6 +384,14 @@ main(int argc, char * const argv[])
do_usage(1);
}
break;
case 'L':
maxstalls = atoi(optarg);
if (maxstalls < 0 || maxstalls >= (1 << SERVER_STALL_BITS)) {
fprintf(stderr, "error: maxium stalls needs to be a number "
"between 0 and %d\n", (1 << SERVER_STALL_BITS) - 1);
do_usage(1);
}
break;
case 'S':
collector_interval = atoi(optarg);
if (collector_interval <= 0) {
Expand Down Expand Up @@ -577,6 +587,7 @@ main(int argc, char * const argv[])
fprintf(relay_stdout, " workers = %d\n", workercnt);
fprintf(relay_stdout, " send batch size = %d\n", batchsize);
fprintf(relay_stdout, " server queue size = %d\n", queuesize);
fprintf(relay_stdout, " server max stalls = %d\n", maxstalls);
fprintf(relay_stdout, " statistics submission interval = %ds\n",
collector_interval);
fprintf(relay_stdout, " listen backlog = %u\n", listenbacklog);
Expand All @@ -595,7 +606,7 @@ main(int argc, char * const argv[])
fprintf(relay_stdout, "\n");

if ((rtr = router_readconfig(NULL, config,
queuesize, batchsize, iotimeout)) == NULL)
queuesize, batchsize, maxstalls, iotimeout)) == NULL)
{
logerr("failed to read configuration '%s'\n", config);
return 1;
Expand Down Expand Up @@ -701,7 +712,7 @@ main(int argc, char * const argv[])
/* server used for delivering metrics produced inside the relay,
* that is, the collector (statistics) */
if ((internal_submission = server_new("internal", listenport, CON_PIPE,
NULL, 3000, batchsize, iotimeout)) == NULL)
NULL, 3000, batchsize, maxstalls, iotimeout)) == NULL)
{
logerr("failed to create internal submission queue, shutting down\n");
keep_running = 0;
Expand Down
7 changes: 4 additions & 3 deletions router.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ router_readconfig(router *orig,
const char *path,
size_t queuesize,
size_t batchsize,
int maxstalls,
unsigned short iotimeout)
{
FILE *cnf;
Expand Down Expand Up @@ -811,7 +812,7 @@ router_readconfig(router *orig,
*proto == 'f' ? CON_FILE :
*proto == 'u' ? CON_UDP : CON_TCP,
walk == (void *)1 ? NULL : walk,
queuesize, batchsize, iotimeout);
queuesize, batchsize, maxstalls, iotimeout);
if (newserver == NULL) {
logerr("failed to add server %s:%d (%s) "
"to cluster %s: %s\n", ip, port, proto,
Expand Down Expand Up @@ -1897,7 +1898,7 @@ router_readconfig(router *orig,
for (i = 0; i < globbuf.gl_matchc; i++) {
globpath = globbuf.gl_pathv[i];
ret = router_readconfig(ret, globpath, queuesize,
batchsize, iotimeout);
batchsize, maxstalls, iotimeout);
if (ret == NULL)
break;
}
Expand All @@ -1908,7 +1909,7 @@ router_readconfig(router *orig,
} else {
/* include path is a regular file path */
ret = router_readconfig(ret, name, queuesize,
batchsize, iotimeout);
batchsize, maxstalls, iotimeout);
}
if (ret == NULL)
/* router_readconfig already barked and freed ret */
Expand Down
2 changes: 1 addition & 1 deletion router.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ typedef struct _router router;

#define RE_MAX_MATCHES 64

router *router_readconfig(router *orig, const char *path, size_t queuesize, size_t batchsize, unsigned short iotimeout);
router *router_readconfig(router *orig, const char *path, size_t queuesize, size_t batchsize, int maxstalls, unsigned short iotimeout);
void router_optimise(router *r);
char router_printdiffs(router *old, router *new, FILE *out);
void router_transplant_queues(router *new, router *old);
Expand Down
8 changes: 5 additions & 3 deletions server.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct _server {
queue *queue;
size_t bsize;
short iotimeout;
unsigned char maxstalls:SERVER_STALL_BITS;
const char **batch;
serv_ctype ctype;
pthread_t tid;
Expand All @@ -55,7 +56,7 @@ struct _server {
char failure:5;
char running:1;
char keep_running:1;
unsigned char stallseq:3; /* align with MAX_STALLS */
unsigned char stallseq:SERVER_STALL_BITS;
size_t metrics;
size_t dropped;
size_t stalls;
Expand Down Expand Up @@ -447,6 +448,7 @@ server_new(
struct addrinfo *saddr,
size_t qsize,
size_t bsize,
int maxstalls,
unsigned short iotimeout)
{
server *ret;
Expand All @@ -467,6 +469,7 @@ server_new(
ret->instance = NULL;
ret->bsize = bsize;
ret->iotimeout = iotimeout < 250 ? 600 : iotimeout;
ret->maxstalls = maxstalls;
if ((ret->batch = malloc(sizeof(char *) * (bsize + 1))) == NULL) {
free((char *)ret->ip);
free(ret);
Expand Down Expand Up @@ -566,8 +569,7 @@ server_send(server *s, const char *d, char force)
}
}
}
#define MAX_STALLS 4 /* 4 * ~1s = 4s */
if (failure || force || s->stallseq > MAX_STALLS) {
if (failure || force || s->stallseq == s->maxstalls) {
s->dropped++;
/* excess event will be dropped by the enqueue below */
} else {
Expand Down
3 changes: 3 additions & 0 deletions server.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include "relay.h"

#define SERVER_STALL_BITS 4 /* 0 up to 15 */

typedef struct _server server;

server *server_new(
Expand All @@ -31,6 +33,7 @@ server *server_new(
struct addrinfo *saddr,
size_t queuesize,
size_t batchsize,
int maxstalls,
unsigned short iotimeout);
char server_start(server *s);
void server_add_secondaries(server *d, server **sec, size_t cnt);
Expand Down

0 comments on commit 4b40175

Please sign in to comment.