Skip to content

Commit

Permalink
prepare 4.0.1 release (#83)
Browse files Browse the repository at this point in the history
* add time threshold for backoff reset

* allow endpoint to be specified as either URI or HttpUrl

* add @SInCE

* add interface for customizing requests

* javadoc fixes

* add changelog for past releases

* remove JSR305

* replace SSL-specific config method with general-purpose HTTP config method

* make helper method static

* add end-to-end EventSource tests

* spacing

* omit default header value if there's a custom value

* avoid trailing period in logger name

* add 1.x branch

* update to OkHttp 4.x and Java 8

* javadoc fixes

* remove EventSource setters, + test improvements

* update Gradle release

* enable Github Pages

* skip tests in release

* add ability to force a stream restart; improve tests so we can test this

* revert whitespace change

* bump OkHttp version to latest Java 7-compatible version

* add ability to force a stream restart; improve tests so we can test this (#29)

* update to okhttp 4.5.0

* longer timeout for cleaner shutdown of test servers

* fix Gradle scopes

* allow setting specific thread priority

* remove misleading logging & unnecessary backoff, improve tests (#34)

* known issue with onClose() - add comment, disable test assertions

* allow caller to specify a custom logger instead of SLF4J (#32)

* add method for changing base name of SLF4J logger

* enable coverage reports in CI, improve CI to test all supported Java versions

* rm inapplicable CI copy-paste

* another CI fix

* add checkstyle config

* fix jitter calculation when upper bound is a power of 2

* misc coverage + test improvements, add CI enforcement of coverage (#39)

* fix shutdown state logic, simplify code paths (#40)

* Fix Java 7 compatibility.

* add OpenJDK 7 build + fix test race condition + javadoc fix (#42)

* update Gradle to 6.8.3

* Kotlinize build script

* fix logic for shutting down after an unrecoverable error

* use newer HTTP test helpers

* use Releaser v2 config + newer CI images (#47)

* use new stream-reading implementation to support CR-only line endings

* make buffer size configurable

* rm usage that's not allowed in Java 8

* add Guava test dependency

* add code coverage ovverride

* implement contract tests (#48)

* use Gradle 7

* Bounded queues for the EventHandler thread (#58)

* Bounded queue for the EventHandler thread

The unbounded queue fronting the 'event' thread can cause trouble when the
EventHandler is unable to keep up with the workload. This can lead to heap
exhaustion, GC issues and failure modes that are generally considered "bad".

Band-aid over this with a semaphore to limit the number of tasks in the queue.
The semaphore is opt-in and disabled by default to avoid any nasty surprises
for folks upgrading.

Also add 'EventSource.awaitClosed()' to allow users to wait for underlying
thread pools to completely shut down. We can't know if it's safe to clean
up resources used by the EventHandler thread if we can't be certain that it
has completely terminated.

* Address checkstyle griping in StubServer

* Fix JavaDoc issue

* Tighten up exception handling

Co-authored-by: Eli Bishop <eli@launchdarkly.com>

* update @SInCE

* test Java 17 in CI (#51)

* improve tests for AsyncEventHandler and EventSource.awaitClosed (#52)

* add streaming data mode for very large events (#53)

* add option to ensure that expected fields are always read

* add Gradle option to suppress kotlin-stdlib in our pom

* update okhttp to 4.9.3

* use LaunchDarkly logging facade

* rm unused

* misc fixes

* improve javadoc links

* remove SLF4J dependency, use only com.launchdarkly.logging

* update com.launchdarkly.logging version

* consistently use placeholders instead of concatenation in log output

* update release metadata

* use SecureRandom instead of Random, just to make scanners happier

* use SecureRandom instead of Random, just to make scanners happier

* use SecureRandom instead of Random, just to make scanners happier

* fix release metadata

* remove usage of Duration for Android compatibility

* new synchronous EventSource implementation (#64)

* add async wrapper to emulate old EventSource (#65)

* update doc comments regarding thread behavior in Android

* update Gradle to 7.6 + fix snapshot releases

* in streaming data mode, throw an exception if stream is closed during an event (#68)

* in streaming data mode, throw an exception if stream is closed during an event

* one more unit test

Co-authored-by: Eli Bishop <eli@launchdarkly.com>
Co-authored-by: LaunchDarklyCI <dev@launchdarkly.com>
Co-authored-by: Gavin Whelan <gwhelan@launchdarkly.com>
Co-authored-by: LaunchDarklyReleaseBot <launchdarklyreleasebot@launchdarkly.com>
Co-authored-by: Tom Lee <93216+thomaslee@users.noreply.github.com>
  • Loading branch information
6 people authored Jan 10, 2023
1 parent 0551bae commit 639eb1f
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 10 deletions.
14 changes: 11 additions & 3 deletions src/main/java/com/launchdarkly/eventsource/EventParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,10 @@ public int read(byte[] b, int off, int len) throws IOException {
// (B) We must ask lineParser to give us another chunk of a not-yet-finished line.
if (!lineEnded) {
if (!canGetNextChunk()) {
return -1; // EOF
// The underlying SSE stream has run out of data while we were still trying to
// read the rest of the message. This is an abnormal condition, so we'll treat
// it as an exception, rather than just returning -1 to indicate EOF.
throw new StreamClosedWithIncompleteMessageException();
}
haveChunk = true;
continue; // We'll go to (A) in the next loop
Expand All @@ -433,10 +436,15 @@ public int read(byte[] b, int off, int len) throws IOException {
// (C) The previous line was done; ask lineParser to give us the next line (or at
// least the first chunk of it).
if (!canGetNextChunk()) {
return -1; // EOF
// See comment above about abnormal termination. Even if we just finished
// reading a complete line of data, the message is incomplete because we didn't
// see a blank line.
throw new StreamClosedWithIncompleteMessageException();
}
if (lineEnded && chunkSize == 0) {
// Blank line means end of message - close this stream and return EOF.
// Blank line means end of message - close this stream and return EOF. This is a
// normal condition: the stream of data for this message is done because the
// message is finished.
closed.set(true);
resetState();
return -1;
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/com/launchdarkly/eventsource/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -962,9 +962,10 @@ public Builder logger(LDLogger logger) {
* If you set it to {@code true}, it will instead call the handler as soon as it sees a {@code data} field--
* setting {@link MessageEvent#getDataReader()} to a {@link java.io.Reader} that reads directly from the data as
* it arrives. The EventSource will perform any necessary parsing under the covers, so that for instance if there
* are multiple {@code data:} lines in the event, the {@link java.io.Reader} will emit a newline character between
* each and will not see the "data:" field names. The {@link java.io.Reader} will report "end of stream" as soon
* as the event is terminated normally by a blank line.
* are multiple {@code data:} lines in the event, the Reader will emit a newline character between
* each and will not see the "data:" field names. The Reader will report "end of stream" as soon
* as the event is terminated normally by a blank line. If the stream is closed before normal termination of
* the event, the Reader will throw a {@link StreamClosedWithIncompleteMessageException}.
* <p>
* This mode is designed for applications that expect very large data items to be delivered over SSE. Use it
* with caution, since there are several limitations:
Expand All @@ -978,7 +979,8 @@ public Builder logger(LDLogger logger) {
* <li> The SSE protocol specifies that an event should be processed only if it is terminated by a blank line, but
* in this mode the handler will receive the event as soon as a {@code data:} field appears-- so, if the stream
* happens to cut off abnormally without a trailing blank line, technically you will be receiving an incomplete
* event that should have been ignored. </li>
* event that should have been ignored. You will know this has happened ifbecause reading from the Reader throws
* a {@link StreamClosedWithIncompleteMessageException}.</li>
* </ul>
*
* @param streamEventData true if events should be dispatched immediately with asynchronous data rather than
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/com/launchdarkly/eventsource/MessageEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,13 @@ public String getData() {
* <p>
* The method will never return {@code null}; every event has data, even if the data is empty
* (zero length).
*
* <p>
* If the stream connection is closed before a complete SSE message has been received (that is,
* before the usual blank line that would terminate a message), then instead of a normal EOF,
* the Reader will throw a {@link StreamClosedWithIncompleteMessageException}. If this happens,
* the application should generally discard the entire {@link MessageEvent} and not try to
* process it further.
*
* @return a reader for the event data
* @since 2.6.0
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.launchdarkly.eventsource;

import java.io.IOException;

/**
* @author elibishop
*
*/
@SuppressWarnings("serial")
public class StreamClosedWithIncompleteMessageException extends IOException {

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.launchdarkly.eventsource;

import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
Expand All @@ -10,6 +11,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.fail;

@SuppressWarnings("javadoc")
public class EventParserStreamingDataTest extends EventParserBaseTest {
Expand Down Expand Up @@ -179,7 +181,22 @@ public void incompletelyReadEventIsSkippedIfAnotherMessageIsRead() throws Except
}

@Test
public void streamIsClosedBeforeEndOfEvent() throws Exception {
public void streamIsClosedImmediatelyAfterEndOfEvent() throws Exception {
String streamData = "data: hello\n\n";
initParser(100, true);
processData(streamData);

MessageEvent e1 = awaitMessageEvent();
assertThat(e1.isStreamingData(), is(true));
assertThat(readUpToLimit(e1.getDataReader(), 5), equalTo("hello"));

closeStream();

assertThat(e1.getDataReader().read(), equalTo(-1)); // normal EOF, not an error
}

@Test
public void streamIsClosedBeforeEndOfEventAtEndOfLine() throws Exception {
String streamData = "data: hello\n";
initParser(100, true);
processData(streamData);
Expand All @@ -190,7 +207,28 @@ public void streamIsClosedBeforeEndOfEvent() throws Exception {

closeStream();

assertThat(e1.getDataReader().read(), equalTo(-1));
try {
e1.getDataReader().read();
fail("expected exception");
} catch (StreamClosedWithIncompleteMessageException e) {}
}

@Test
public void streamIsClosedBeforeEndOfEventWithinLine() throws Exception {
String streamData = "data: hello";
initParser(100, true);
processData(streamData);

MessageEvent e1 = awaitMessageEvent();
assertThat(e1.isStreamingData(), is(true));
assertThat(readUpToLimit(e1.getDataReader(), 5), equalTo("hello"));

closeStream();

try {
e1.getDataReader().read();
fail("expected exception");
} catch (StreamClosedWithIncompleteMessageException e) {}
}

@Test
Expand Down

0 comments on commit 639eb1f

Please sign in to comment.