Skip to content

Commit

Permalink
in streaming data mode, throw an exception if stream is closed during…
Browse files Browse the repository at this point in the history
… an event (#68)

* in streaming data mode, throw an exception if stream is closed during an event

* one more unit test
  • Loading branch information
eli-darkly authored Jan 10, 2023
1 parent c9b1e11 commit 09b5e0f
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 10 deletions.
14 changes: 11 additions & 3 deletions src/main/java/com/launchdarkly/eventsource/EventParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,10 @@ public int read(byte[] b, int off, int len) throws IOException {
// (B) We must ask lineParser to give us another chunk of a not-yet-finished line.
if (!lineEnded) {
if (!canGetNextChunk()) {
return -1; // EOF
// The underlying SSE stream has run out of data while we were still trying to
// read the rest of the message. This is an abnormal condition, so we'll treat
// it as an exception, rather than just returning -1 to indicate EOF.
throw new StreamClosedWithIncompleteMessageException();
}
haveChunk = true;
continue; // We'll go to (A) in the next loop
Expand All @@ -433,10 +436,15 @@ public int read(byte[] b, int off, int len) throws IOException {
// (C) The previous line was done; ask lineParser to give us the next line (or at
// least the first chunk of it).
if (!canGetNextChunk()) {
return -1; // EOF
// See comment above about abnormal termination. Even if we just finished
// reading a complete line of data, the message is incomplete because we didn't
// see a blank line.
throw new StreamClosedWithIncompleteMessageException();
}
if (lineEnded && chunkSize == 0) {
// Blank line means end of message - close this stream and return EOF.
// Blank line means end of message - close this stream and return EOF. This is a
// normal condition: the stream of data for this message is done because the
// message is finished.
closed.set(true);
resetState();
return -1;
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/com/launchdarkly/eventsource/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -962,9 +962,10 @@ public Builder logger(LDLogger logger) {
* If you set it to {@code true}, it will instead call the handler as soon as it sees a {@code data} field--
* setting {@link MessageEvent#getDataReader()} to a {@link java.io.Reader} that reads directly from the data as
* it arrives. The EventSource will perform any necessary parsing under the covers, so that for instance if there
* are multiple {@code data:} lines in the event, the {@link java.io.Reader} will emit a newline character between
* each and will not see the "data:" field names. The {@link java.io.Reader} will report "end of stream" as soon
* as the event is terminated normally by a blank line.
* are multiple {@code data:} lines in the event, the Reader will emit a newline character between
* each and will not see the "data:" field names. The Reader will report "end of stream" as soon
* as the event is terminated normally by a blank line. If the stream is closed before normal termination of
* the event, the Reader will throw a {@link StreamClosedWithIncompleteMessageException}.
* <p>
* This mode is designed for applications that expect very large data items to be delivered over SSE. Use it
* with caution, since there are several limitations:
Expand All @@ -978,7 +979,8 @@ public Builder logger(LDLogger logger) {
* <li> The SSE protocol specifies that an event should be processed only if it is terminated by a blank line, but
* in this mode the handler will receive the event as soon as a {@code data:} field appears-- so, if the stream
* happens to cut off abnormally without a trailing blank line, technically you will be receiving an incomplete
* event that should have been ignored. </li>
* event that should have been ignored. You will know this has happened ifbecause reading from the Reader throws
* a {@link StreamClosedWithIncompleteMessageException}.</li>
* </ul>
*
* @param streamEventData true if events should be dispatched immediately with asynchronous data rather than
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/com/launchdarkly/eventsource/MessageEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,13 @@ public String getData() {
* <p>
* The method will never return {@code null}; every event has data, even if the data is empty
* (zero length).
*
* <p>
* If the stream connection is closed before a complete SSE message has been received (that is,
* before the usual blank line that would terminate a message), then instead of a normal EOF,
* the Reader will throw a {@link StreamClosedWithIncompleteMessageException}. If this happens,
* the application should generally discard the entire {@link MessageEvent} and not try to
* process it further.
*
* @return a reader for the event data
* @since 2.6.0
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.launchdarkly.eventsource;

import java.io.IOException;

/**
* @author elibishop
*
*/
@SuppressWarnings("serial")
public class StreamClosedWithIncompleteMessageException extends IOException {

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.launchdarkly.eventsource;

import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
Expand All @@ -10,6 +11,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.fail;

@SuppressWarnings("javadoc")
public class EventParserStreamingDataTest extends EventParserBaseTest {
Expand Down Expand Up @@ -179,7 +181,22 @@ public void incompletelyReadEventIsSkippedIfAnotherMessageIsRead() throws Except
}

@Test
public void streamIsClosedBeforeEndOfEvent() throws Exception {
public void streamIsClosedImmediatelyAfterEndOfEvent() throws Exception {
String streamData = "data: hello\n\n";
initParser(100, true);
processData(streamData);

MessageEvent e1 = awaitMessageEvent();
assertThat(e1.isStreamingData(), is(true));
assertThat(readUpToLimit(e1.getDataReader(), 5), equalTo("hello"));

closeStream();

assertThat(e1.getDataReader().read(), equalTo(-1)); // normal EOF, not an error
}

@Test
public void streamIsClosedBeforeEndOfEventAtEndOfLine() throws Exception {
String streamData = "data: hello\n";
initParser(100, true);
processData(streamData);
Expand All @@ -190,7 +207,28 @@ public void streamIsClosedBeforeEndOfEvent() throws Exception {

closeStream();

assertThat(e1.getDataReader().read(), equalTo(-1));
try {
e1.getDataReader().read();
fail("expected exception");
} catch (StreamClosedWithIncompleteMessageException e) {}
}

@Test
public void streamIsClosedBeforeEndOfEventWithinLine() throws Exception {
String streamData = "data: hello";
initParser(100, true);
processData(streamData);

MessageEvent e1 = awaitMessageEvent();
assertThat(e1.isStreamingData(), is(true));
assertThat(readUpToLimit(e1.getDataReader(), 5), equalTo("hello"));

closeStream();

try {
e1.getDataReader().read();
fail("expected exception");
} catch (StreamClosedWithIncompleteMessageException e) {}
}

@Test
Expand Down

0 comments on commit 09b5e0f

Please sign in to comment.