Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add graceful shutdowns for fixed-size streams #43

Merged
merged 2 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/tgen-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ TGenIOResponse tgenserver_onEvent(TGenServer* server, gint descriptor, TGenEvent
g_assert((events & TGEN_EVENT_READ) && descriptor == server->socketD);

gboolean blocked = FALSE;
#ifdef DEBUG
gint acceptedCount = 0;

#endif
/* accept as many connections as we have available, until we get EWOULDBLOCK error */
while(!blocked) {
gint result = _tgenserver_acceptPeer(server);
Expand All @@ -86,7 +87,9 @@ TGenIOResponse tgenserver_onEvent(TGenServer* server, gint descriptor, TGenEvent
server->socketD, result, errno, g_strerror(errno));
}
} else {
#ifdef DEBUG
acceptedCount++;
#endif
}
}

Expand Down
121 changes: 115 additions & 6 deletions src/tgen-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

/* an auth password so we know both sides understand tgen */
#define TGEN_AUTH_PW "T8nNx9L95LATtckJkR5n"
#define TGEN_PROTO_VERS_MAJ 1
#define TGEN_PROTO_VERS_MAJ 2
#define TGEN_PROTO_VERS_MIN 0

/* the various states the read side of the connection can take */
Expand All @@ -31,6 +31,7 @@ typedef enum _TGenStreamRecvState {
TGEN_STREAM_RECV_MODEL,
TGEN_STREAM_RECV_PAYLOAD,
TGEN_STREAM_RECV_CHECKSUM,
TGEN_STREAM_RECV_FOOTER,
TGEN_STREAM_RECV_SUCCESS,
TGEN_STREAM_RECV_ERROR,
} TGenStreamRecvState;
Expand All @@ -42,6 +43,7 @@ typedef enum _TGenStreamSendState {
TGEN_STREAM_SEND_RESPONSE,
TGEN_STREAM_SEND_PAYLOAD,
TGEN_STREAM_SEND_CHECKSUM,
TGEN_STREAM_SEND_FOOTER,
TGEN_STREAM_SEND_FLUSH,
TGEN_STREAM_SEND_SUCCESS,
TGEN_STREAM_SEND_ERROR,
Expand All @@ -59,6 +61,7 @@ typedef enum _TGenStreamErrorType {
TGEN_STREAM_ERR_HEADER_MODELSIZE,
TGEN_STREAM_ERR_MODEL,
TGEN_STREAM_ERR_CHECKSUM,
TGEN_STREAM_ERR_FOOTER,
TGEN_STREAM_ERR_READ,
TGEN_STREAM_ERR_WRITE,
TGEN_STREAM_ERR_READEOF,
Expand Down Expand Up @@ -184,9 +187,11 @@ struct _TGenStream {
gint64 firstPayloadByteRecv;
gint64 lastPayloadByteRecv;
gint64 checksumRecv;
gint64 footerRecv;
gint64 firstPayloadByteSend;
gint64 lastPayloadByteSend;
gint64 checksumSend;
gint64 footerSend;
gint64 lastBytesStatusReport;
gint64 lastTimeStatusReport;
gint64 lastTimeErrorReport;
Expand Down Expand Up @@ -224,6 +229,9 @@ static const gchar* _tgenstream_recvStateToString(TGenStreamRecvState state) {
case TGEN_STREAM_RECV_CHECKSUM: {
return "RECV_CHECKSUM";
}
case TGEN_STREAM_RECV_FOOTER: {
return "RECV_FOOTER";
}
/* success and error are terminal states */
case TGEN_STREAM_RECV_SUCCESS: {
return "RECV_SUCCESS";
Expand Down Expand Up @@ -253,6 +261,9 @@ static const gchar* _tgenstream_sendStateToString(TGenStreamSendState state) {
case TGEN_STREAM_SEND_CHECKSUM: {
return "SEND_CHECKSUM";
}
case TGEN_STREAM_SEND_FOOTER: {
return "SEND_FOOTER";
}
case TGEN_STREAM_SEND_FLUSH: {
return "SEND_FLUSH";
}
Expand Down Expand Up @@ -299,6 +310,9 @@ static const gchar* _tgenstream_errorToString(TGenStreamErrorType error) {
case TGEN_STREAM_ERR_CHECKSUM: {
return "CHECKSUM";
}
case TGEN_STREAM_ERR_FOOTER: {
return "FOOTER";
}
case TGEN_STREAM_ERR_READ: {
return "READ";
}
Expand Down Expand Up @@ -1037,6 +1051,42 @@ static gboolean _tgenstream_readChecksum(TGenStream* stream) {
return isSuccess;
}

static gboolean _tgenstream_readFooter(TGenStream *stream) {
TGEN_ASSERT(stream);

if (stream->recv.requestedBytes == 0 && !stream->recv.requestedZero) {
/* we don't handle footers if we are using Markov models and
* don't know the total size, so just move on. */
tgen_debug("Ignoring footer on stream with no requested bytes");
return TRUE;
}

GString *line = _tgenstream_getLine(stream);
if (!line) {
return FALSE;
}

/* we have read the entire footer from the other end */
stream->time.footerRecv = _tgenstream_getTime(stream);

gboolean isSuccess = FALSE;
if (g_ascii_strncasecmp(line->str, "FIN", 3) == 0) {
tgen_info("transport %s stream %s received proper FIN, graceful shutdown succeeded.",
tgentransport_toString(stream->transport), _tgenstream_toString(stream));
isSuccess = TRUE;
} else {
tgen_message("transport %s stream %s received malformed FIN, graceful shutdown failed.",
tgentransport_toString(stream->transport), _tgenstream_toString(stream));
_tgenstream_changeRecvState(stream, TGEN_STREAM_RECV_ERROR);
_tgenstream_changeError(stream, TGEN_STREAM_ERR_FOOTER);
isSuccess = FALSE;
}

g_string_free(line, TRUE);

return isSuccess;
}

static void _tgenstream_onReadable(TGenStream* stream) {
TGEN_ASSERT(stream);

Expand Down Expand Up @@ -1083,6 +1133,12 @@ static void _tgenstream_onReadable(TGenStream* stream) {

if(stream->recv.state == TGEN_STREAM_RECV_CHECKSUM) {
if(_tgenstream_readChecksum(stream)) {
_tgenstream_changeRecvState(stream, TGEN_STREAM_RECV_FOOTER);
}
}

if (stream->recv.state == TGEN_STREAM_RECV_FOOTER) {
if (_tgenstream_readFooter(stream)) {
/* yay, now we are done! */
_tgenstream_changeRecvState(stream, TGEN_STREAM_RECV_SUCCESS);
}
Expand Down Expand Up @@ -1438,6 +1494,48 @@ static gboolean _tgenstream_writeChecksum(TGenStream* stream) {
}
}

static gboolean _tgenstream_writeFooter(TGenStream *stream) {
TGEN_ASSERT(stream);

if (stream->send.requestedBytes == 0 && !stream->send.requestedZero) {
/* we don't handle footers if we are using Markov models and
* don't know the total size, so just move on. */
tgen_debug("Ignoring footer on stream with no requested bytes");
return TRUE;
}

/* The goal is to send the footer after we have sent everything we want to send
* AND we have received everything that we expect the other side to have sent us
* (including their checksum but not including their footer). Our footer is
* informing the other side that we have received all of their pre-footer bytes.
* Thus, we only proceed to write our footer if we received everything we expect
* from the other side and we are waiting for their footer, or we also got their
* footer and our receive side is successful. */
if (stream->recv.state != TGEN_STREAM_RECV_FOOTER &&
stream->recv.state != TGEN_STREAM_RECV_SUCCESS) {
return FALSE;
}

/* buffer the footer if we have not done that yet */
if (!stream->send.buffer) {
stream->send.buffer = g_string_new(NULL);
g_string_printf(stream->send.buffer, "FIN");
g_string_append_c(stream->send.buffer, '\n');
tgen_debug("Sending footer '%s'", stream->send.buffer->str);
}

_tgenstream_flushOut(stream);

if (!stream->send.buffer) {
/* we were able to send all of the footer */
stream->time.footerSend = _tgenstream_getTime(stream);
return TRUE;
} else {
/* unable to send entire footer, wait for next chance to write */
return FALSE;
}
}

static void _tgenstream_onWritable(TGenStream* stream) {
TGEN_ASSERT(stream);

Expand Down Expand Up @@ -1486,6 +1584,13 @@ static void _tgenstream_onWritable(TGenStream* stream) {

if(stream->send.state == TGEN_STREAM_SEND_CHECKSUM) {
if(_tgenstream_writeChecksum(stream)) {
/* now we just need to make sure we finished flushing */
_tgenstream_changeSendState(stream, TGEN_STREAM_SEND_FOOTER);
}
}

if (stream->send.state == TGEN_STREAM_SEND_FOOTER) {
if (_tgenstream_writeFooter(stream)) {
robgjansen marked this conversation as resolved.
Show resolved Hide resolved
/* now we just need to make sure we finished flushing */
_tgenstream_changeSendState(stream, TGEN_STREAM_SEND_FLUSH);
}
Expand Down Expand Up @@ -1753,11 +1858,15 @@ static TGenEvent _tgenstream_computeWantedEvents(TGenStream* stream) {
wantedEvents |= TGEN_EVENT_READ;
}
if(!sendDone && stream->send.state != TGEN_STREAM_SEND_NONE) {
/* check if we should defer writes */
if(stream->send.deferBarrierMicros > 0) {
wantedEvents |= TGEN_EVENT_WRITE_DEFERRED;
} else {
wantedEvents |= TGEN_EVENT_WRITE;
/* we don't want to write if we need to send the footer but have not
* yet got everything we expected to receive yet. */
if (stream->send.state != TGEN_STREAM_SEND_FOOTER || stream->time.checksumRecv > 0) {
/* check if we should defer writes */
if (stream->send.deferBarrierMicros > 0) {
wantedEvents |= TGEN_EVENT_WRITE_DEFERRED;
} else {
wantedEvents |= TGEN_EVENT_WRITE;
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions test/test-markovmodel.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ static void generate(TGenMarkovModel* mmodel) {
}
}
}

tgen_info("%d server packets and %d origin packets", numServerPackets, numOriginPackets);
}

gint main(gint argc, gchar *argv[]) {
Expand Down