Skip to content

Commit

Permalink
stream intel: update kotlin public interface to align with swift (#2013)
Browse files Browse the repository at this point in the history
Description: For terminal callbacks (onError, onCancel, onComplete) changes public interface to provide a single FinalStreamIntel struct that extends from the base StreamIntel available on other callbacks.
Risk Level: Low
Testing: Local & CI

Signed-off-by: Mike Schore <mike.schore@gmail.com>
  • Loading branch information
goaway authored Jan 25, 2022
1 parent 4f1ce29 commit f202ea5
Show file tree
Hide file tree
Showing 24 changed files with 67 additions and 70 deletions.
2 changes: 1 addition & 1 deletion examples/java/hello_world/MainActivity.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private void makeRequest() {
}
return Unit.INSTANCE;
})
.setOnError((error, ignored, also_ignored) -> {
.setOnError((error, ignored) -> {
String message = "failed with error after " + error.getAttemptCount() +
" attempts: " + error.getMessage();
Log.d("MainActivity", message);
Expand Down
5 changes: 2 additions & 3 deletions examples/kotlin/hello_world/AsyncDemoFilter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,15 @@ class AsyncDemoFilter : AsyncResponseFilter {
@Suppress("EmptyFunctionBlock")
override fun onError(
error: EnvoyError,
streamIntel: StreamIntel,
finalStreamIntel: FinalStreamIntel
) {
}

@Suppress("EmptyFunctionBlock")
override fun onCancel(streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) {
override fun onCancel(finalStreamIntel: FinalStreamIntel) {
}

@Suppress("EmptyFunctionBlock")
override fun onComplete(streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) {
override fun onComplete(finalStreamIntel: FinalStreamIntel) {
}
}
5 changes: 2 additions & 3 deletions examples/kotlin/hello_world/BufferDemoFilter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,15 @@ class BufferDemoFilter : ResponseFilter {
@Suppress("EmptyFunctionBlock")
override fun onError(
error: EnvoyError,
streamIntel: StreamIntel,
finalStreamIntel: FinalStreamIntel
) {
}

@Suppress("EmptyFunctionBlock")
override fun onCancel(streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) {
override fun onCancel(finalStreamIntel: FinalStreamIntel) {
}

@Suppress("EmptyFunctionBlock")
override fun onComplete(streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) {
override fun onComplete(finalStreamIntel: FinalStreamIntel) {
}
}
5 changes: 2 additions & 3 deletions examples/kotlin/hello_world/DemoFilter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,16 @@ class DemoFilter : ResponseFilter {

override fun onError(
error: EnvoyError,
streamIntel: StreamIntel,
finalStreamIntel: FinalStreamIntel
) {
Log.d("DemoFilter", "On error!")
}

override fun onCancel(streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) {
override fun onCancel(finalStreamIntel: FinalStreamIntel) {
Log.d("DemoFilter", "On cancel!")
}

@Suppress("EmptyFunctionBlock")
override fun onComplete(streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) {
override fun onComplete(finalStreamIntel: FinalStreamIntel) {
}
}
2 changes: 1 addition & 1 deletion examples/kotlin/hello_world/MainActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class MainActivity : Activity() {
recyclerView.post { viewAdapter.add(Failure(message)) }
}
}
.setOnError { error, _, _ ->
.setOnError { error, _ ->
val attemptCount = error.attemptCount ?: -1
val message = "failed with error after $attemptCount attempts: ${error.message}"
Log.d("MainActivity", message)
Expand Down
9 changes: 7 additions & 2 deletions library/kotlin/io/envoyproxy/envoymobile/FinalStreamIntel.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.envoyproxy.envoymobile

import io.envoyproxy.envoymobile.engine.types.EnvoyFinalStreamIntel
import io.envoyproxy.envoymobile.engine.types.EnvoyStreamIntel

/**
* Exposes one time HTTP stream metrics, context, and other details.
Expand Down Expand Up @@ -29,6 +30,9 @@ import io.envoyproxy.envoymobile.engine.types.EnvoyFinalStreamIntel
*/
@Suppress("LongParameterList")
class FinalStreamIntel constructor(
streamId: Long,
connectionId: Long,
attemptCount: Long,
val requestStartMs: Long,
val dnsStartMs: Long,
val dnsEndMs: Long,
Expand All @@ -43,8 +47,9 @@ class FinalStreamIntel constructor(
val socketReused: Boolean,
val sentByteCount: Long,
val receivedByteCount: Long
) {
constructor(base: EnvoyFinalStreamIntel) : this(
) : StreamIntel(streamId, connectionId, attemptCount) {
constructor(superBase: EnvoyStreamIntel, base: EnvoyFinalStreamIntel) : this(
superBase.streamId, superBase.connectionId, superBase.attemptCount,
base.requestStartMs, base.dnsStartMs,
base.dnsEndMs, base.connectStartMs,
base.connectEndMs, base.sslStartMs,
Expand Down
13 changes: 6 additions & 7 deletions library/kotlin/io/envoyproxy/envoymobile/StreamCallbacks.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ internal class StreamCallbacks {
)? = null
var onData: ((data: ByteBuffer, endStream: Boolean, streamIntel: StreamIntel) -> Unit)? = null
var onTrailers: ((trailers: ResponseTrailers, streamIntel: StreamIntel) -> Unit)? = null
var onCancel: ((streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) -> Unit)? = null
var onCancel: ((finalStreamIntel: FinalStreamIntel) -> Unit)? = null
var onError: (
(error: EnvoyError, streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) -> Unit
(error: EnvoyError, finalStreamIntel: FinalStreamIntel) -> Unit
)? = null
var onSendWindowAvailable: ((streamIntel: StreamIntel) -> Unit)? = null
var onComplete: ((streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) -> Unit)? = null
var onComplete: ((finalStreamIntel: FinalStreamIntel) -> Unit)? = null
}

/**
Expand Down Expand Up @@ -63,20 +63,19 @@ internal class EnvoyHTTPCallbacksAdapter(
) {
callbacks.onError?.invoke(
EnvoyError(errorCode, message, attemptCount),
StreamIntel(streamIntel),
FinalStreamIntel(finalStreamIntel)
FinalStreamIntel(streamIntel, finalStreamIntel)
)
}

override fun onCancel(streamIntel: EnvoyStreamIntel, finalStreamIntel: EnvoyFinalStreamIntel) {
callbacks.onCancel?.invoke(StreamIntel(streamIntel), FinalStreamIntel(finalStreamIntel))
callbacks.onCancel?.invoke(FinalStreamIntel(streamIntel, finalStreamIntel))
}

override fun onSendWindowAvailable(streamIntel: EnvoyStreamIntel) {
callbacks.onSendWindowAvailable?.invoke(StreamIntel(streamIntel))
}

override fun onComplete(streamIntel: EnvoyStreamIntel, finalStreamIntel: EnvoyFinalStreamIntel) {
callbacks.onComplete?.invoke(StreamIntel(streamIntel), FinalStreamIntel(finalStreamIntel))
callbacks.onComplete?.invoke(FinalStreamIntel(streamIntel, finalStreamIntel))
}
}
2 changes: 1 addition & 1 deletion library/kotlin/io/envoyproxy/envoymobile/StreamIntel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import io.envoyproxy.envoymobile.engine.types.EnvoyStreamIntel
* @param attemptCount The number of internal attempts to carry out a request/operation. 0 if
* not set.
*/
class StreamIntel constructor(
open class StreamIntel constructor(
val streamId: Long,
val connectionId: Long,
val attemptCount: Long
Expand Down
5 changes: 2 additions & 3 deletions library/kotlin/io/envoyproxy/envoymobile/StreamPrototype.kt
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ open class StreamPrototype(private val engine: EnvoyEngine) {
fun setOnError(
closure: (
error: EnvoyError,
streamIntel: StreamIntel,
finalStreamIntel: FinalStreamIntel
) -> Unit
): StreamPrototype {
Expand All @@ -129,7 +128,7 @@ open class StreamPrototype(private val engine: EnvoyEngine) {
* @return This stream, for chaining syntax.
*/
fun setOnComplete(
closure: (streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) -> Unit
closure: (finalStreamIntel: FinalStreamIntel) -> Unit
): StreamPrototype {
callbacks.onComplete = closure
return this
Expand All @@ -143,7 +142,7 @@ open class StreamPrototype(private val engine: EnvoyEngine) {
* @return This stream, for chaining syntax.
*/
fun setOnCancel(
closure: (streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) -> Unit
closure: (finalStreamIntel: FinalStreamIntel) -> Unit
): StreamPrototype {
callbacks.onCancel = closure
return this
Expand Down
6 changes: 3 additions & 3 deletions library/kotlin/io/envoyproxy/envoymobile/filters/Filter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,19 @@ internal class EnvoyHTTPFilterAdapter(

override fun onError(errorCode: Int, message: String, attemptCount: Int, streamIntel: EnvoyStreamIntel, finalStreamIntel: EnvoyFinalStreamIntel) {
(filter as? ResponseFilter)?.let { responseFilter ->
responseFilter.onError(EnvoyError(errorCode, message, attemptCount), StreamIntel(streamIntel), FinalStreamIntel(finalStreamIntel))
responseFilter.onError(EnvoyError(errorCode, message, attemptCount), FinalStreamIntel(streamIntel, finalStreamIntel))
}
}

override fun onCancel(streamIntel: EnvoyStreamIntel, finalStreamIntel: EnvoyFinalStreamIntel) {
(filter as? ResponseFilter)?.let { responseFilter ->
responseFilter.onCancel(StreamIntel(streamIntel), FinalStreamIntel(finalStreamIntel))
responseFilter.onCancel(FinalStreamIntel(streamIntel, finalStreamIntel))
}
}

override fun onComplete(streamIntel: EnvoyStreamIntel, finalStreamIntel: EnvoyFinalStreamIntel) {
(filter as? ResponseFilter)?.let { responseFilter ->
responseFilter.onComplete(StreamIntel(streamIntel), FinalStreamIntel(finalStreamIntel))
responseFilter.onComplete(FinalStreamIntel(streamIntel, finalStreamIntel))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ interface ResponseFilter : Filter {
* `stopIteration{...}`.
*
* @param error: The error that occurred within Envoy.
* @param streamIntel: Internal HTTP stream metrics, context, and other details.
* @param finalStreamIntel: Final internal HTTP stream metrics, context, and other details.
*/
fun onError(error: EnvoyError, streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel)
fun onError(error: EnvoyError, finalStreamIntel: FinalStreamIntel)

/**
* Called at most once when the client cancels the stream.
Expand All @@ -67,10 +66,9 @@ interface ResponseFilter : Filter {
* This should be considered a terminal state, and invalidates any previous attempts to
* `stopIteration{...}`.
*
* @param streamIntel: Internal HTTP stream metrics, context, and other details.
* @param finalStreamIntel: Final internal HTTP stream metrics, context, and other details.
*/
fun onCancel(streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel)
fun onCancel(finalStreamIntel: FinalStreamIntel)

/**
* Called at most once when the stream completes gracefully.
Expand All @@ -79,8 +77,7 @@ interface ResponseFilter : Filter {
* This should be considered a terminal state, and invalidates any previous attempts to
* `stopIteration{...}`.
*
* @param streamIntel: Internal HTTP stream metrics, context, and other details.
* @param finalStreamIntel: Final internal HTTP stream metrics, context, and other details.
*/
fun onComplete(streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel)
fun onComplete(finalStreamIntel: FinalStreamIntel)
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class GRPCStreamPrototype(
* @return This stream, for chaining syntax.
*/
fun setOnError(
closure: (error: EnvoyError, streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) -> Unit
closure: (error: EnvoyError, finalStreamIntel: FinalStreamIntel) -> Unit
): GRPCStreamPrototype {
underlyingStream.setOnError(closure)
return this
Expand All @@ -103,7 +103,7 @@ class GRPCStreamPrototype(
* @return This stream, for chaining syntax.
*/
fun setOnCancel(
closure: (streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) -> Unit
closure: (finalStreamIntel: FinalStreamIntel) -> Unit
): GRPCStreamPrototype {
underlyingStream.setOnCancel(closure)
return this
Expand Down
12 changes: 6 additions & 6 deletions test/java/integration/AndroidEnvoyExplicitFlowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -500,22 +500,22 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception {
response.get().addStreamIntel(streamIntel);
return null;
})
.setOnError((error, streamIntel, finalStreamIntel) -> {
.setOnError((error, finalStreamIntel) -> {
response.get().setEnvoyError(error);
response.get().addStreamIntel(streamIntel);
response.get().addStreamIntel(finalStreamIntel);
response.get().setFinalStreamIntel(finalStreamIntel);
latch.countDown();
return null;
})
.setOnCancel((streamIntel, finalStreamIntel) -> {
.setOnCancel((finalStreamIntel) -> {
response.get().setCancelled();
response.get().addStreamIntel(streamIntel);
response.get().addStreamIntel(finalStreamIntel);
response.get().setFinalStreamIntel(finalStreamIntel);
latch.countDown();
return null;
})
.setOnComplete((streamIntel, finalStreamIntel) -> {
response.get().addStreamIntel(streamIntel);
.setOnComplete((finalStreamIntel) -> {
response.get().addStreamIntel(finalStreamIntel);
response.get().setFinalStreamIntel(finalStreamIntel);
latch.countDown();
return null;
Expand Down
4 changes: 2 additions & 2 deletions test/java/integration/AndroidEnvoyFlowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,12 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception {
latch.countDown();
return null;
})
.setOnError((error, ignored, also_ignored) -> {
.setOnError((error, ignored) -> {
response.get().setEnvoyError(error);
latch.countDown();
return null;
})
.setOnCancel((ignored, also_ignored) -> {
.setOnCancel((ignored) -> {
response.get().setCancelled();
latch.countDown();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,17 @@ private QuicTestServerTest.Response sendRequest(RequestScenario requestScenario)
response.get().setTrailers(trailers);
return null;
})
.setOnError((error, ignored1, ignored2) -> {
.setOnError((error, ignored) -> {
response.get().setEnvoyError(error);
latch.countDown();
return null;
})
.setOnCancel((ignored1, ignored2) -> {
.setOnCancel((ignored) -> {
response.get().setCancelled();
latch.countDown();
return null;
})
.setOnComplete((ignored1, ignored2) -> {
.setOnComplete((ignored) -> {
latch.countDown();
return null;
})
Expand Down
8 changes: 4 additions & 4 deletions test/kotlin/integration/CancelStreamTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ class CancelStreamTest {
return FilterTrailersStatus.Continue(trailers)
}

override fun onError(error: EnvoyError, streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) {}
override fun onComplete(streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) {}
override fun onError(error: EnvoyError, finalStreamIntel: FinalStreamIntel) {}
override fun onComplete(finalStreamIntel: FinalStreamIntel) {}

override fun onCancel(streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) {
override fun onCancel(finalStreamIntel: FinalStreamIntel) {
latch.countDown()
}
}
Expand All @@ -139,7 +139,7 @@ class CancelStreamTest {
.build()

client.newStreamPrototype()
.setOnCancel { _, _ ->
.setOnCancel { _ ->
runExpectation.countDown()
}
.start(Executors.newSingleThreadExecutor())
Expand Down
4 changes: 2 additions & 2 deletions test/kotlin/integration/DrainConnectionsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class DrainConnectionsTest {
resultEndStream1 = endStream
headersExpectation.countDown()
}
.setOnError { _, _, _ -> fail("Unexpected error") }
.setOnError { _, _ -> fail("Unexpected error") }
.start()
.sendHeaders(requestHeaders, true)

Expand All @@ -101,7 +101,7 @@ class DrainConnectionsTest {
resultEndStream2 = endStream
headersExpectation.countDown()
}
.setOnError { _, _, _ -> fail("Unexpected error") }
.setOnError { _, _ -> fail("Unexpected error") }
.start()
.sendHeaders(requestHeaders, true)

Expand Down
10 changes: 5 additions & 5 deletions test/kotlin/integration/GRPCReceiveErrorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ class GRPCReceiveErrorTest {
return FilterTrailersStatus.Continue(trailers)
}

override fun onError(error: EnvoyError, streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) {
override fun onError(error: EnvoyError, finalStreamIntel: FinalStreamIntel) {
receivedError.countDown()
}
override fun onComplete(streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) {}
override fun onComplete(finalStreamIntel: FinalStreamIntel) {}

override fun onCancel(streamIntel: StreamIntel, finalStreamIntel: FinalStreamIntel) {
override fun onCancel(finalStreamIntel: FinalStreamIntel) {
notCancelled.countDown()
}
}
Expand All @@ -126,10 +126,10 @@ class GRPCReceiveErrorTest {
.newGRPCStreamPrototype()
.setOnResponseHeaders { _, _, _ -> }
.setOnResponseMessage { _, _ -> }
.setOnError { _, _, _ ->
.setOnError { _, _ ->
callbackReceivedError.countDown()
}
.setOnCancel { _, _ ->
.setOnCancel { _ ->
fail("Unexpected call to onCancel response callback")
}
.start()
Expand Down
2 changes: 1 addition & 1 deletion test/kotlin/integration/ReceiveDataTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ReceiveDataTest {
body = data
dataExpectation.countDown()
}
.setOnError { _, _, _ -> fail("Unexpected error") }
.setOnError { _, _ -> fail("Unexpected error") }
.start()
.sendHeaders(requestHeaders, true)

Expand Down
Loading

0 comments on commit f202ea5

Please sign in to comment.