diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bd6dc17..135b3ceb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). --- -## Version 4.0.32, 12/6/2024 +## Version 4.0.32, 12/9/2024 ### Added @@ -21,8 +21,8 @@ N/A ### Changed -Update Chapter-4 for the new AND/OR type mapping feature and the additional metadata -in the state machine of a flow instance. +1. Update Chapter-4 for the new AND/OR type mapping feature +2. Consistent custom HTTP headers for event over http protocol and streaming content --- ## Version 4.0.31, 12/5/2024 diff --git a/guides/APPENDIX-I.md b/guides/APPENDIX-I.md index 4d685628..f09be054 100644 --- a/guides/APPENDIX-I.md +++ b/guides/APPENDIX-I.md @@ -240,6 +240,6 @@ X-Raw-Xml=true ```
-| Chapter-9 | Home | Appendix-II | -|:----------------------------:|:-----------------------------------------:|:--------------------------------------:| -| [API Overview](CHAPTER-9.md) | [Table of Contents](TABLE-OF-CONTENTS.md) | [Reserved route names](APPENDIX-II.md) | +| Chapter-9 | Home | Appendix-II | +|:----------------------------:|:-----------------------------------------:|:--------------------------------------------:| +| [API Overview](CHAPTER-9.md) | [Table of Contents](TABLE-OF-CONTENTS.md) | [Reserved names and headers](APPENDIX-II.md) | diff --git a/guides/APPENDIX-II.md b/guides/APPENDIX-II.md index 0c85cf2f..5aadf346 100644 --- a/guides/APPENDIX-II.md +++ b/guides/APPENDIX-II.md @@ -1,4 +1,8 @@ -# Reserved route names +# Reserved names + +The system reserves some route names and headers for routing purpose. + +## System route names The Mercury foundation code is written using the same core API and each function has a route name. @@ -72,6 +76,30 @@ PostOffice po = new PostOffice(headers, instance); // Kotlin val fastRPC = FastRPC(headers); ``` + +## Reserved HTTP header names + +| Header | Purpose | +|:-------------------------|:----------------------------------------------------------------------------| +| X-Stream-Id | Temporal route name for streaming content | +| X-TTL | Time to live in milliseconds for a streaming content | +| X-Small-Payload-As-Bytes | This header, if set to true, tells system to render stream content as bytes | +| X-Event-Api | The system uses this header to indicate that the request is sent over HTTP | +| X-Async | This header, if set to true, indicates it is a drop-n-forget request | +| X-Trace-Id | This allows the system to propagate trace ID | +| X-Correlation-Id | Alternative to X-Trace-Id | +| X-Content-Length | If present, it is the expected length of a streaming content | +| X-Raw-Xml | This header, if set to true, tells to system to skip XML rendering | +| X-Flow-Id | This tells the event manager to select a flow configuration by ID | +| X-App-Instance | This header is used by some protected actuator REST endpoints | + +To support traceId that is stored in X-Correlation-Id HTTP header, set this in application.properties. + +```properties +# list of supported traceId headers where the first one is the default label +trace.http.header=X-Correlation-Id, X-Trace-Id +``` +
| Appendix-I | Home | Appendix-III | diff --git a/guides/APPENDIX-III.md b/guides/APPENDIX-III.md index 5dfb6e48..bd3d5100 100644 --- a/guides/APPENDIX-III.md +++ b/guides/APPENDIX-III.md @@ -193,30 +193,23 @@ If content length is not given, the response body would arrive as a stream. Your application should check if the HTTP response header "stream" exists. Its value is the input "streamId". -For simplicity and readability, we recommend using the PostOffice's "request" API to read the input byte-array stream. - -It looks like this: +You can process the input stream using the FluxConsumer class like this: ```java -PostOffice po = PostOffice(headers, instance); -EventEnvelope req = new EventEnvelope().setTo(streamId).setHeader("type", "read"); -while (true) { - EventEnvelope event = po.request(req, 5000).get(); - if (event.getStatus() == 400) { - // handle input stream timeout - } - if ("eof".equals(event.getHeader("type"))) { - log.info("Closing {}", streamId); - po.send(streamId, new Kv("type", "close")); - break; - } - if ("data".equals(event.getHeader("type"))) { - Object block = event.getBody(); - if (block instanceof byte[] b) { - // handle the byte array "b" - } - } -} +String streamId = headers.get("stream"); +long ttl = 10000; // anticipated time in milliseconds to stream the content +FluxConsumer> fc = new FluxConsumer<>(streamId, ttl); +fc.consume( + data -> { + // handle incoming message + }, + e -> { + // handle exception where e is a Throwable + }, + () -> { + // handle stream completion + } +); ``` By default, a user function is executed in a virtual thread which effectively is an "async" function and @@ -351,6 +344,6 @@ For other flow adapters, you may use different set of key-values.
-| Appendix-II | Home | -|:--------------------------------------:|:-----------------------------------------:| -| [Reserved route names](APPENDIX-II.md) | [Table of Contents](TABLE-OF-CONTENTS.md) | +| Appendix-II | Home | +|:--------------------------------------------:|:-----------------------------------------:| +| [Reserved names and headers](APPENDIX-II.md) | [Table of Contents](TABLE-OF-CONTENTS.md) | diff --git a/guides/CHAPTER-4.md b/guides/CHAPTER-4.md index c2a5e441..9c15dc2f 100644 --- a/guides/CHAPTER-4.md +++ b/guides/CHAPTER-4.md @@ -53,14 +53,14 @@ flow: first.task: 'greeting.demo' tasks: - - input: - - 'input.path_parameter.user -> user' - process: 'greeting.demo' - output: - - 'text(application/json) -> output.header.content-type' - - 'result -> output.body' - description: 'Hello World' - execution: end + - input: + - 'input.path_parameter.user -> user' + process: 'greeting.demo' + output: + - 'text(application/json) -> output.header.content-type' + - 'result -> output.body' + description: 'Hello World' + execution: end ``` In the application.properties, you can specify the following parameter: @@ -84,7 +84,7 @@ Then, you can add a new REST endpoint in the "rest.yaml" configuration file like ```yaml - service: "http.flow.adapter" methods: ['GET'] - url: "/api/greeting/{user}" + url: "/api/greetings/{user}" flow: 'greetings' timeout: 10s cors: cors_1 @@ -96,7 +96,7 @@ input arguments (headers and body) in your function. Now you can write your new "greeting.demo". Please copy-n-paste the following into a Java class called "Greetings" and save in the package under "my.organization.tasks" in the source project. -> "my.organization" package name is an example. Please replace it with your organization package path. +> Note: "my.organization" package name is an example. Please replace it with your organization package path. ```java @PreLoad(route="greeting.demo", instances=10, isPrivate = false) @@ -129,7 +129,7 @@ web.component.scan=my.organization To test your new REST endpoint, flow configuration and function, please point your browser to ```text -http://127.0.0.1:8100/api/greeting/my_name +http://127.0.0.1:8100/api/greetings/my_name ``` You can replace "my_name" with your first name to see the response to the browser. @@ -156,7 +156,7 @@ the incoming event. The configuration file contains a list of task entries where each task is defined by "input", "process", "output" and "execution" type. In the above example, the execution type is "end", meaning that it is the end of a transaction -and its result set can be sent to the user. +and its result set will be delivered to the user. ## Underlying Event System @@ -171,7 +171,8 @@ to make any API calls to the underlying event system. The most common transaction entry point is a REST endpoint. The event flow may look like this: ```text -Request -> "http.request" -> "task.executor" -> user defined tasks -> "async.http.response" -> Response +Request -> "http.request" -> "task.executor" -> user defined tasks + -> "async.http.response" -> Response ``` REST automation is part of the Mercury platform-core library. It contains a non-blocking HTTP server that converts @@ -223,11 +224,11 @@ For easy matching, keys of headers, cookies, query and path parameters are case- Regular API uses JSON and XML and they will be converted to a hashmap in the event's body. For special use cases like file upload/download, your application logic may invoke a streaming API to retrieve -the binary payload. Please refer to the following mercury 3.0 guide for details. +the binary payload. Please refer to the following sections for details. -> https://accenture.github.io/mercury-composable/guides/APPENDIX-III/#send-http-request-body-as-a-stream +https://accenture.github.io/mercury-composable/guides/APPENDIX-III/#send-http-request-body-as-a-stream -> https://accenture.github.io/mercury-composable/guides/APPENDIX-III/#read-http-response-body-stream +https://accenture.github.io/mercury-composable/guides/APPENDIX-III/#read-http-response-body-stream ## Task and its corresponding function @@ -507,7 +508,7 @@ The "decision" value is also saved to the state machine (`model`) for subsequent ### Metadata for each flow instance For each flow instance, the state machine in the "model" namespace provides the following metadata that -you can use in the input/output data mapping. For example, you can set this for an exceptional handler to +you can use in the input/output data mapping. For example, you can set this for an exception handler to log additional information. | Type | Keyword | Comment | diff --git a/guides/CHAPTER-7.md b/guides/CHAPTER-7.md index 6877cdfe..ba9364aa 100644 --- a/guides/CHAPTER-7.md +++ b/guides/CHAPTER-7.md @@ -130,6 +130,7 @@ public Future request(final EventEnvelope event, long timeout, suspend fun awaitRequest(request: EventEnvelope?, timeout: Long, headers: Map, eventEndpoint: String, rpc: Boolean): EventEnvelope +} ``` Optionally, you may add security headers in the "headers" argument. e.g. the "Authorization" header. @@ -248,6 +249,6 @@ before passing to the Event API service. You can plug in your own authentication Please refer to [Chapter-3 - REST automation](CHAPTER-3.md) for details.
-| Chapter-6 | Home | Chapter-8 | -|:---------------------------:|:-----------------------------------------:|:----------------------------:| -| [Spring Boot](CHAPTER-6.md) | [Table of Contents](TABLE-OF-CONTENTS.md) | [Service Mesh](CHAPTER-8.md) | +| Chapter-6 | Home | Chapter-8 | +|:---------------------------:|:-----------------------------------------:|:---------------------------------------:| +| [Spring Boot](CHAPTER-6.md) | [Table of Contents](TABLE-OF-CONTENTS.md) | [Minimalist Service Mesh](CHAPTER-8.md) | diff --git a/guides/CHAPTER-8.md b/guides/CHAPTER-8.md index 9232dea6..77a2da7c 100644 --- a/guides/CHAPTER-8.md +++ b/guides/CHAPTER-8.md @@ -1,4 +1,4 @@ -# Service Mesh +# Minimalist Service Mesh Service mesh is a dedicated infrastructure layer to facilitate inter-container communication using "sidecar" and "control plane". @@ -7,7 +7,7 @@ Service mesh systems require additional administrative containers (PODs) for "co The additional infrastructure requirements vary among products. -# Using kafka as a service mesh +# Using kafka as a minimalist service mesh We will discuss using Kafka as a minimalist service mesh. diff --git a/guides/CHAPTER-9.md b/guides/CHAPTER-9.md index e1b7347e..6b471b2e 100644 --- a/guides/CHAPTER-9.md +++ b/guides/CHAPTER-9.md @@ -668,6 +668,6 @@ For enterprise clients, optional technical support is available. Please contact for details.
-| Chapter-8 | Home | -|:----------------------------:|:-----------------------------------------:| -| [Service Mesh](CHAPTER-8.md) | [Table of Contents](TABLE-OF-CONTENTS.md) | +| Chapter-8 | Home | +|:---------------------------------------:|:-----------------------------------------:| +| [Minimalist Service Mesh](CHAPTER-8.md) | [Table of Contents](TABLE-OF-CONTENTS.md) | diff --git a/guides/TABLE-OF-CONTENTS.md b/guides/TABLE-OF-CONTENTS.md index 75192750..63377dbb 100644 --- a/guides/TABLE-OF-CONTENTS.md +++ b/guides/TABLE-OF-CONTENTS.md @@ -16,12 +16,12 @@ Mercury Composable is a software development toolkit for writing composable appl [Chapter 7 - Event over HTTP](CHAPTER-7.md) -[Chapter 8 - Service Mesh](CHAPTER-8.md) +[Chapter 8 - Minimalist Service Mesh](CHAPTER-8.md) [Chapter 9 - API Overview](CHAPTER-9.md) [Appendix I - application.properties](APPENDIX-I.md) -[Appendix II - Reserved route names](APPENDIX-II.md) +[Appendix II - Reserved names and headers](APPENDIX-II.md) -[Appendix III - Actuators and HTTP client](APPENDIX-III.md) +[Appendix III - Actuators, HTTP client and More](APPENDIX-III.md) diff --git a/system/platform-core/src/main/java/org/platformlambda/automation/http/AsyncHttpClient.java b/system/platform-core/src/main/java/org/platformlambda/automation/http/AsyncHttpClient.java index 538a30e2..e20bdc55 100644 --- a/system/platform-core/src/main/java/org/platformlambda/automation/http/AsyncHttpClient.java +++ b/system/platform-core/src/main/java/org/platformlambda/automation/http/AsyncHttpClient.java @@ -21,7 +21,6 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; -import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.file.AsyncFile; import io.vertx.core.file.FileSystem; @@ -92,14 +91,15 @@ public class AsyncHttpClient implements TypedLambdaFunction private static final String DELETE = "DELETE"; private static final String OPTIONS = "OPTIONS"; private static final String HEAD = "HEAD"; - private static final String STREAM = "stream"; + private static final String X_STREAM_ID = "x-stream-id"; + private static final String X_TTL = "x-ttl"; private static final String STREAM_PREFIX = "stream."; private static final String INPUT_STREAM_SUFFIX = ".in"; private static final String CONTENT_TYPE = "content-type"; private static final String CONTENT_LENGTH = "content-length"; private static final String X_CONTENT_LENGTH = "X-Content-Length"; - private static final String TIMEOUT = "timeout"; private static final String USER_AGENT_NAME = "async-http-client"; + private static final int DEFAULT_TTL_SECONDS = 30; // 30 seconds /* * Some headers must be dropped because they are not relevant for HTTP relay * e.g. "content-encoding" and "transfer-encoding" will break HTTP response rendering. @@ -415,7 +415,8 @@ private void handleUpload(EventEnvelope input, OutputStreamQueue queue, String streamId = request.getStreamRoute(); String contentType = request.getHeader(CONTENT_TYPE); String method = request.getMethod(); - objectStream2file(streamId, request.getTimeoutSeconds()) + int timeout = request.getTimeoutSeconds(); + objectStream2file(streamId, timeout > 0? timeout : DEFAULT_TTL_SECONDS) .onSuccess(temp -> { final Future> future; int contentLen = request.getContentLength(); @@ -471,11 +472,9 @@ private Future objectStream2file(String streamId, int timeoutSeconds) { Utility util = Utility.getInstance(); File temp = getTempFile(streamId); try { - // FluxConsumer is reactive and the file output stream will be closed at completion, - // thus no need to do "auto-close". + // No auto-close because FluxConsumer is reactive and the file output stream will close at completion FileOutputStream out = new FileOutputStream(temp); - long timeout = Math.max(5000, timeoutSeconds * 1000L); - FluxConsumer flux = new FluxConsumer<>(streamId, timeout); + FluxConsumer flux = new FluxConsumer<>(streamId, Math.max(5000L, timeoutSeconds * 1000L)); flux.consume(data -> { try { if (data instanceof byte[] b && b.length > 0) { @@ -551,7 +550,8 @@ public HttpResponseHandler(EventEnvelope input, AsyncHttpRequest request, Output this.input = input; this.request = request; this.queue = queue; - this.timeoutSeconds = Math.max(8, request.getTimeoutSeconds()); + int timeout = request.getTimeoutSeconds(); + this.timeoutSeconds = timeout > 0? timeout : DEFAULT_TTL_SECONDS; } @Override @@ -652,8 +652,8 @@ public void handle(HttpResponse res) { } } if (publisher != null) { - response.setHeader(STREAM, publisher.getStreamId()) - .setHeader(TIMEOUT, timeoutSeconds * 1000) + response.setHeader(X_STREAM_ID, publisher.getStreamId()) + .setHeader(X_TTL, timeoutSeconds * 1000) .setHeader(X_CONTENT_LENGTH, len); publisher.publishCompletion(); } diff --git a/system/platform-core/src/main/java/org/platformlambda/automation/models/StreamHolder.java b/system/platform-core/src/main/java/org/platformlambda/automation/models/StreamHolder.java index a2f74d0b..f5ea045d 100644 --- a/system/platform-core/src/main/java/org/platformlambda/automation/models/StreamHolder.java +++ b/system/platform-core/src/main/java/org/platformlambda/automation/models/StreamHolder.java @@ -19,12 +19,8 @@ package org.platformlambda.automation.models; import org.platformlambda.core.system.EventPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class StreamHolder { - private static final Logger log = LoggerFactory.getLogger(StreamHolder.class); - private final EventPublisher publisher; public StreamHolder(int timeoutSeconds) { @@ -42,5 +38,4 @@ public String getInputStreamId() { public void close() { publisher.publishCompletion(); } - } diff --git a/system/platform-core/src/main/java/org/platformlambda/automation/services/ServiceGateway.java b/system/platform-core/src/main/java/org/platformlambda/automation/services/ServiceGateway.java index 9a1a584b..e51a05b3 100644 --- a/system/platform-core/src/main/java/org/platformlambda/automation/services/ServiceGateway.java +++ b/system/platform-core/src/main/java/org/platformlambda/automation/services/ServiceGateway.java @@ -58,6 +58,8 @@ public class ServiceGateway { private static final String AUTH_HANDLER = "http.auth.handler"; private static final String CONTENT_TYPE = "Content-Type"; private static final String CONTENT_LEN = "Content-Length"; + private static final String X_NO_STREAM = "x-small-payload-as-bytes"; + private static final String X_TTL = "x-ttl"; private static final String APPLICATION_FORM_URLENCODED = "application/x-www-form-urlencoded"; private static final String MULTIPART_FORM_DATA = "multipart/form-data"; private static final String APPLICATION_JSON = "application/json"; @@ -674,7 +676,8 @@ private void routeRequest(String requestId, AssignedRoute route, AsyncContextHol * For large payload, it is better to deliver as a stream. */ int contentLen = util.str2int(request.getHeader(CONTENT_LEN)); - if (contentLen > 0 && contentLen <= route.info.threshold) { + boolean noStream = "true".equals(request.getHeader(X_NO_STREAM)); + if (noStream || (contentLen > 0 && contentLen <= route.info.threshold)) { request.bodyHandler(block -> { byte[] b = block.getBytes(0, block.length()); requestBody.write(b, 0, b.length); @@ -696,7 +699,8 @@ private void routeRequest(String requestId, AssignedRoute route, AsyncContextHol int size = total.get(); req.setContentLength(size); if (size > 0) { - req.setStreamRoute(stream.getInputStreamId()); + req.setStreamRoute(stream.getInputStreamId()) + .setHeader(X_TTL, String.valueOf(stream.getPublisher().getTimeToLive())); stream.close(); } sendRequestToService(request, requestEvent.setHttpRequest(req)); diff --git a/system/platform-core/src/main/java/org/platformlambda/automation/services/ServiceResponseHandler.java b/system/platform-core/src/main/java/org/platformlambda/automation/services/ServiceResponseHandler.java index 64102dc1..124120fc 100644 --- a/system/platform-core/src/main/java/org/platformlambda/automation/services/ServiceResponseHandler.java +++ b/system/platform-core/src/main/java/org/platformlambda/automation/services/ServiceResponseHandler.java @@ -50,10 +50,10 @@ public class ServiceResponseHandler implements TypedLambdaFunction 0? timeout : contextTimeout; } @Override @@ -108,10 +105,10 @@ public Void handleEvent(Map headers, EventEnvelope event, int in * 1. "stream" and "timeout" are reserved as stream ID and read timeout in seconds * 2. "trace_id" and "trace_path" should be dropped from HTTP response headers */ - if (STREAM.equals(key) && value.startsWith(STREAM_PREFIX) && + if (X_STREAM_ID.equals(key) && value.startsWith(STREAM_PREFIX) && value.contains(INPUT_STREAM_SUFFIX)) { streamId = value; - } else if (TIMEOUT.equalsIgnoreCase(key)) { + } else if (X_TTL.equalsIgnoreCase(key)) { streamTimeout = value; } else if (CONTENT_TYPE.equalsIgnoreCase(key)) { if (!httpHead) { diff --git a/system/platform-core/src/main/java/org/platformlambda/core/models/AsyncHttpRequest.java b/system/platform-core/src/main/java/org/platformlambda/core/models/AsyncHttpRequest.java index 15dd1cae..6abba055 100644 --- a/system/platform-core/src/main/java/org/platformlambda/core/models/AsyncHttpRequest.java +++ b/system/platform-core/src/main/java/org/platformlambda/core/models/AsyncHttpRequest.java @@ -18,7 +18,6 @@ package org.platformlambda.core.models; -import io.netty.handler.codec.http.cookie.ServerCookieEncoder; import org.platformlambda.core.serializers.PayloadMapper; import org.platformlambda.core.serializers.SimpleMapper; import org.platformlambda.core.util.Utility; @@ -28,11 +27,9 @@ import java.util.*; public class AsyncHttpRequest { - private static final String HTTP_HEADERS = "headers"; private static final String HTTP_METHOD = "method"; private static final String IP_ADDRESS = "ip"; - private static final String TIMEOUT = "timeout"; private static final String HTTP_SESSION = "session"; private static final String PARAMETERS = "parameters"; private static final String HTTP_PROTOCOL = "http://"; @@ -44,13 +41,13 @@ public class AsyncHttpRequest { private static final String URL_LABEL = "url"; private static final String HTTP_BODY = "body"; private static final String FILE_UPLOAD = "upload"; - private static final String STREAM = "stream"; private static final String FILE_NAME = "filename"; private static final String CONTENT_LENGTH = "size"; private static final String TRUST_ALL_CERT = "trust_all_cert"; private static final String TARGET_HOST = "host"; private static final String FILE = "file"; - + private static final String X_STREAM_ID = "x-stream-id"; + private static final String X_TTL = "x-ttl"; private String method; private String queryString; private String url; @@ -62,13 +59,11 @@ public class AsyncHttpRequest { private Map cookies = new HashMap<>(); private Map session = new HashMap<>(); private Object body; - private String streamRoute; private String fileName; private String targetHost; private boolean trustAllCert = false; private boolean https = false; private int contentLength = -1; - private int timeoutSeconds = -1; public AsyncHttpRequest() { } @@ -183,18 +178,18 @@ public AsyncHttpRequest setBody(Object body) { } public String getStreamRoute() { - return streamRoute; + return getHeader(X_STREAM_ID); } public AsyncHttpRequest setStreamRoute(String streamRoute) { if (streamRoute != null) { - this.streamRoute = streamRoute; + setHeader(X_STREAM_ID, streamRoute); } return this; } public boolean isStream() { - return streamRoute != null; + return getStreamRoute() != null; } public String getFileName() { @@ -213,11 +208,15 @@ public boolean isFile() { } public int getTimeoutSeconds() { - return Math.max(0, timeoutSeconds); + String timeout = getHeader(X_TTL); + if (timeout == null) { + return -1; + } + return Math.max(1, Utility.getInstance().str2int(timeout)) / 1000; } public AsyncHttpRequest setTimeoutSeconds(int timeoutSeconds) { - this.timeoutSeconds = Math.max(0, timeoutSeconds); + setHeader(X_TTL, String.valueOf(Math.max(1, timeoutSeconds) * 1000)); return this; } @@ -401,7 +400,7 @@ public List getQueryParameters(String key) { return Collections.emptyList(); } - @SuppressWarnings({"unchecked"}) + @SuppressWarnings({"rawtypes", "unchecked"}) public AsyncHttpRequest setQueryParameter(String key, Object value) { if (key != null) { switch (value) { @@ -434,7 +433,7 @@ public AsyncHttpRequest removeQueryParameter(String key) { /** * The set methods and toMap method are used for manually construct an HTTP request object * that are typically used for Unit Test or for a service to emulate a REST browser. - * + *

* In normal case, the AsyncHttpRequest map is generated by the rest-automation application. * * @return async http request object as a map @@ -459,18 +458,12 @@ public Map toMap() { if (url != null) { result.put(URL_LABEL, url); } - if (timeoutSeconds != -1) { - result.put(TIMEOUT, timeoutSeconds); - } if (fileName != null) { result.put(FILE_NAME, fileName); } if (contentLength != -1) { result.put(CONTENT_LENGTH, contentLength); } - if (streamRoute != null) { - result.put(STREAM, streamRoute); - } if (body != null) { result.put(HTTP_BODY, body); } @@ -513,10 +506,8 @@ private void fromMap(Object input) { this.method = source.method; this.ip = source.ip; this.url = source.url; - this.timeoutSeconds = source.timeoutSeconds; this.fileName = source.fileName; this.contentLength = source.contentLength; - this.streamRoute = source.streamRoute; this.body = source.body; this.queryString = source.queryString; this.https = source.https; @@ -546,18 +537,12 @@ private void fromMap(Object input) { if (map.containsKey(URL_LABEL)) { url = (String) map.get(URL_LABEL); } - if (map.containsKey(TIMEOUT)) { - timeoutSeconds = (int) map.get(TIMEOUT); - } if (map.containsKey(FILE_NAME)) { fileName = (String) map.get(FILE_NAME); } if (map.containsKey(CONTENT_LENGTH)) { contentLength = (int) map.get(CONTENT_LENGTH); } - if (map.containsKey(STREAM)) { - streamRoute = (String) map.get(STREAM); - } if (map.containsKey(HTTP_BODY)) { body = map.get(HTTP_BODY); } diff --git a/system/platform-core/src/main/java/org/platformlambda/core/services/EventApiService.java b/system/platform-core/src/main/java/org/platformlambda/core/services/EventApiService.java index 3c6f15ad..89f6cecb 100644 --- a/system/platform-core/src/main/java/org/platformlambda/core/services/EventApiService.java +++ b/system/platform-core/src/main/java/org/platformlambda/core/services/EventApiService.java @@ -43,16 +43,13 @@ public class EventApiService implements TypedLambdaFunction private static final Logger log = LoggerFactory.getLogger(EventApiService.class); private static final String EVENT_API_SERVICE = "event.api.service"; - private static final String STREAM_TO_BYTES = "stream.to.bytes"; private static final String TYPE = "type"; private static final String ASYNC = "async"; private static final String DELIVERED = "delivered"; private static final String TIME = "time"; - private static final String STREAM = "stream"; - private static final String TIMEOUT = "timeout"; private static final String CONTENT_TYPE = "content-type"; private static final String OCTET_STREAM = "application/octet-stream"; - private static final String X_TIMEOUT = "X-Timeout"; + private static final String X_TTL = "x-ttl"; private static final String X_ASYNC = "X-Async"; private static final String MISSING_ROUTING_PATH = "Missing routing path"; private static final String PRIVATE_FUNCTION = " is private"; @@ -63,36 +60,18 @@ public class EventApiService implements TypedLambdaFunction public Void handleEvent(Map headers, EventEnvelope input, int instance) throws IOException { if (input.getRawBody() instanceof Map && input.getReplyTo() != null) { Utility util = Utility.getInstance(); - AsyncHttpRequest httpRequest = new AsyncHttpRequest(input.getRawBody()); - Map sessionInfo = httpRequest.getSessionInfo(); - long timeout = Math.max(100, util.str2long(httpRequest.getHeader(X_TIMEOUT))); - boolean async = "true".equals(httpRequest.getHeader(X_ASYNC)); - String streamId = httpRequest.getStreamRoute(); - if (streamId != null) { - // read the input stream into a byte array using the "stream.to.bytes" function - EventEmitter po = EventEmitter.getInstance(); - EventEnvelope req = new EventEnvelope().setTo(STREAM_TO_BYTES) - .setHeader(STREAM, streamId).setHeader(TIMEOUT, timeout); - po.asyncRequest(req, timeout) - .onSuccess(result -> { - if (result.getRawBody() instanceof byte[] b) { - try { - handleRequest(sessionInfo, headers, instance, b, input, timeout, async); - } catch (Exception e) { - sendError(input, 400, e.getMessage()); - } - } else { - sendError(input, 500, "Corrupted input stream"); - } - }) - .onFailure(e -> sendError(input, 408, e.getMessage())); - - } else if (httpRequest.getBody() instanceof byte[] b) { + AsyncHttpRequest request = new AsyncHttpRequest(input.getRawBody()); + Map sessionInfo = request.getSessionInfo(); + long timeout = Math.max(1000, util.str2long(request.getHeader(X_TTL))); + boolean async = "true".equals(request.getHeader(X_ASYNC)); + if (request.getBody() instanceof byte[] b) { try { handleRequest(sessionInfo, headers, instance, b, input, timeout, async); } catch (Exception e) { sendError(input, 400, e.getMessage()); } + } else { + sendError(input, 500, "Invalid event-over-http data format"); } } return null; diff --git a/system/platform-core/src/main/java/org/platformlambda/core/system/EventEmitter.java b/system/platform-core/src/main/java/org/platformlambda/core/system/EventEmitter.java index cfed3715..159eeca3 100644 --- a/system/platform-core/src/main/java/org/platformlambda/core/system/EventEmitter.java +++ b/system/platform-core/src/main/java/org/platformlambda/core/system/EventEmitter.java @@ -53,14 +53,17 @@ public class EventEmitter { private static final String TYPE = "type"; private static final String ERROR = "error"; private static final String MESSAGE = "message"; + private static final String X_EVENT_API = "x-event-api"; private static final String HTTP_REQUEST = "async.http.request"; + private static final String X_NO_STREAM = "x-small-payload-as-bytes"; private static final String HTTP = "http"; private static final String HTTPS = "https"; private static final String HTTP_OR_HTTPS = "Protocol must be http or https"; private static final String POST = "POST"; private static final String CONTENT_TYPE = "content-type"; + private static final String APPLICATION_OCTET_STREAM = "application/octet-stream"; private static final String ACCEPT = "accept"; - private static final String X_TIMEOUT = "x-timeout"; + private static final String X_TTL = "x-ttl"; private static final String X_ASYNC = "x-async"; private static final String X_TRACE_ID = "x-trace-id"; private static final String ROUTE_SUBSTITUTION = "route.substitution"; @@ -674,11 +677,11 @@ public void send(final EventEnvelope event) throws IOException { } String to = substituteRouteIfAny(destination); event.setTo(to); - var targetHttp = event.getHeader("_") == null? getEventHttpTarget(to) : null; + var targetHttp = event.getHeader(X_EVENT_API) == null? getEventHttpTarget(to) : null; if (targetHttp != null) { String callback = event.getReplyTo(); event.setReplyTo(null); - EventEnvelope forwardEvent = new EventEnvelope(event.toMap()).setHeader("_", "async"); + EventEnvelope forwardEvent = new EventEnvelope(event.toMap()).setHeader(X_EVENT_API, "async"); Future response = asyncRequest(forwardEvent, ASYNC_EVENT_HTTP_TIMEOUT, getEventHttpHeaders(to), targetHttp, callback != null); response.onSuccess(evt -> { @@ -808,9 +811,10 @@ public Future asyncRequest(final EventEnvelope event, long timeou } AsyncHttpRequest req = new AsyncHttpRequest(); req.setMethod(POST); - req.setHeader(CONTENT_TYPE, "application/octet-stream"); + req.setHeader(CONTENT_TYPE, APPLICATION_OCTET_STREAM); + req.setHeader(X_NO_STREAM, "true"); req.setHeader(ACCEPT, "*/*"); - req.setHeader(X_TIMEOUT, String.valueOf(Math.max(100L, timeout))); + req.setHeader(X_TTL, String.valueOf(Math.max(100L, timeout))); if (!rpc) { req.setHeader(X_ASYNC, "true"); } @@ -918,9 +922,10 @@ public java.util.concurrent.Future request(final EventEnvelope ev } AsyncHttpRequest req = new AsyncHttpRequest(); req.setMethod(POST); - req.setHeader(CONTENT_TYPE, "application/octet-stream"); + req.setHeader(CONTENT_TYPE, APPLICATION_OCTET_STREAM); + req.setHeader(X_NO_STREAM, "true"); req.setHeader(ACCEPT, "*/*"); - req.setHeader(X_TIMEOUT, String.valueOf(Math.max(100L, timeout))); + req.setHeader(X_TTL, String.valueOf(Math.max(100L, timeout))); if (!rpc) { req.setHeader(X_ASYNC, "true"); } @@ -1057,9 +1062,9 @@ public Future asyncRequest(final EventEnvelope event, long timeou } String to = substituteRouteIfAny(destination); event.setTo(to); - var targetHttp = event.getHeader("_") == null? getEventHttpTarget(to) : null; + var targetHttp = event.getHeader(X_EVENT_API) == null? getEventHttpTarget(to) : null; if (targetHttp != null) { - EventEnvelope forwardEvent = new EventEnvelope(event.toMap()).setHeader("_", "async_request"); + EventEnvelope forwardEvent = new EventEnvelope(event.toMap()).setHeader(X_EVENT_API, "asyncRequest"); return asyncRequest(forwardEvent, timeout, getEventHttpHeaders(to), targetHttp, true); } Platform platform = Platform.getInstance(); @@ -1115,9 +1120,9 @@ public java.util.concurrent.Future request(final EventEnvelope ev } String to = substituteRouteIfAny(destination); event.setTo(to); - var targetHttp = event.getHeader("_") == null? getEventHttpTarget(to) : null; + var targetHttp = event.getHeader(X_EVENT_API) == null? getEventHttpTarget(to) : null; if (targetHttp != null) { - EventEnvelope forwardEvent = new EventEnvelope(event.toMap()).setHeader("_", "request"); + EventEnvelope forwardEvent = new EventEnvelope(event.toMap()).setHeader(X_EVENT_API, "request"); return request(forwardEvent, timeout, getEventHttpHeaders(to), targetHttp, true); } Platform platform = Platform.getInstance(); diff --git a/system/platform-core/src/main/java/org/platformlambda/core/system/EventPublisher.java b/system/platform-core/src/main/java/org/platformlambda/core/system/EventPublisher.java index 59df9527..8654db01 100644 --- a/system/platform-core/src/main/java/org/platformlambda/core/system/EventPublisher.java +++ b/system/platform-core/src/main/java/org/platformlambda/core/system/EventPublisher.java @@ -44,10 +44,12 @@ public class EventPublisher { private final ObjectStreamIO stream; private final String outStream; private final long timer; + private final long ttl; private final AtomicBoolean eof = new AtomicBoolean(false); private final AtomicBoolean expired = new AtomicBoolean(false); public EventPublisher(long ttl) { + this.ttl = ttl; this.stream = new ObjectStreamIO((int) ttl / 1000); this.outStream = this.stream.getOutputStreamId(); long expiry = this.stream.getExpirySeconds() * 1000L; @@ -68,6 +70,10 @@ public String getStreamId() { return stream.getInputStreamId(); } + public long getTimeToLive() { + return ttl; + } + public void publish(Object data) { try { EventEmitter.getInstance().send(outStream, data, new Kv(TYPE, DATA)); diff --git a/system/platform-core/src/main/kotlin/org/platformlambda/core/services/StreamToBytes.kt b/system/platform-core/src/main/kotlin/org/platformlambda/core/services/StreamToBytes.kt deleted file mode 100644 index b22c916a..00000000 --- a/system/platform-core/src/main/kotlin/org/platformlambda/core/services/StreamToBytes.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - - Copyright 2018-2024 Accenture Technology - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - */ - -package org.platformlambda.core.services - -import org.platformlambda.core.annotations.PreLoad -import org.platformlambda.core.annotations.ZeroTracing -import org.platformlambda.core.exception.AppException -import org.platformlambda.core.models.EventEnvelope -import org.platformlambda.core.models.KotlinLambdaFunction -import org.platformlambda.core.models.Kv -import org.platformlambda.core.system.EventEmitter -import org.platformlambda.core.system.FastRPC -import org.platformlambda.core.util.Utility -import java.io.ByteArrayOutputStream - -@ZeroTracing -@PreLoad(route = "stream.to.bytes", instances = 50) -class StreamToBytes : KotlinLambdaFunction { - override suspend fun handleEvent(headers: Map, input: EventEnvelope, instance: Int): ByteArray { - val fastRPC = FastRPC(headers) - val util = Utility.getInstance() - val streamId = headers[STREAM] - val timeout = headers[TIMEOUT] - val out = ByteArrayOutputStream() - - var n = 0 - if (streamId != null && timeout != null) { - val req = EventEnvelope().setTo(streamId).setHeader(TYPE, READ) - while (true) { - val event = fastRPC.awaitRequest(req, 100L.coerceAtLeast(util.str2long(timeout))) - if (event.status == 408) { - throw AppException(408, "timeout for $timeout ms") - } - if (EOF == event.headers[TYPE]) { - EventEmitter.getInstance().send(streamId, Kv(TYPE, CLOSE)) - break - } - if (DATA == event.headers[TYPE]) { - val block = event.body - if (block is ByteArray) { - n++ - out.write(block) - } - } - } - } - return out.toByteArray() - } - - companion object { - private const val TYPE = "type" - private const val READ = "read" - private const val DATA = "data" - private const val EOF = "eof" - private const val CLOSE = "close" - private const val STREAM = "stream" - private const val TIMEOUT = "timeout" - } -} \ No newline at end of file diff --git a/system/platform-core/src/main/kotlin/org/platformlambda/core/system/FastRPC.kt b/system/platform-core/src/main/kotlin/org/platformlambda/core/system/FastRPC.kt index cc448429..2bc03835 100644 --- a/system/platform-core/src/main/kotlin/org/platformlambda/core/system/FastRPC.kt +++ b/system/platform-core/src/main/kotlin/org/platformlambda/core/system/FastRPC.kt @@ -56,9 +56,9 @@ class FastRPC(headers: Map) { val dest = request.to ?: throw IllegalArgumentException(EventEmitter.MISSING_ROUTING_PATH) val to = po.substituteRouteIfAny(dest) request.to = to - val targetHttp: String? = if (request.getHeader("_") == null) po.getEventHttpTarget(to) else null + val targetHttp: String? = if (request.getHeader(X_EVENT_API) == null) po.getEventHttpTarget(to) else null if (targetHttp != null) { - val forwardEvent = EventEnvelope(request.toMap()).setHeader("_", "await_request") + val forwardEvent = EventEnvelope(request.toMap()).setHeader(X_EVENT_API, "awaitRequest") return awaitRequest(forwardEvent, timeout, po.getEventHttpHeaders(to), targetHttp, true) } propagateTrace(request) @@ -131,7 +131,7 @@ class FastRPC(headers: Map) { req.setMethod(POST) req.setHeader(CONTENT_TYPE, "application/octet-stream") req.setHeader(ACCEPT, "*/*") - req.setHeader(X_TIMEOUT, 100L.coerceAtLeast(timeout).toString()) + req.setHeader(X_TTL, 100L.coerceAtLeast(timeout).toString()) if (!rpc) { req.setHeader(X_ASYNC, "true") } @@ -149,6 +149,7 @@ class FastRPC(headers: Map) { val b: ByteArray = request.toBytes() req.setBody(b) req.setContentLength(b.size) + req.setHeader(X_NO_STREAM, "true") val remoteRequest = EventEnvelope().setTo(HTTP_REQUEST).setBody(req) // add 100 ms to make sure it does not time out earlier than the target service val response = awaitRequest(remoteRequest, 100L.coerceAtLeast(timeout) + 100L) @@ -358,7 +359,7 @@ class FastRPC(headers: Map) { private const val POST = "POST" private const val CONTENT_TYPE = "content-type" private const val ACCEPT = "accept" - private const val X_TIMEOUT = "x-timeout" + private const val X_TTL = "x-ttl" private const val X_ASYNC = "x-async" private const val X_TRACE_ID = "x-trace-id" private const val HTTP = "http" @@ -368,5 +369,7 @@ class FastRPC(headers: Map) { private const val TYPE = "type" private const val ERROR = "error" private const val MESSAGE = "message" + private const val X_NO_STREAM = "x-small-payload-as-bytes" + private const val X_EVENT_API = "x-event-api" } } \ No newline at end of file diff --git a/system/platform-core/src/test/java/org/platformlambda/automation/RestEndpointTest.java b/system/platform-core/src/test/java/org/platformlambda/automation/RestEndpointTest.java index 22331ea2..a8a25bdc 100644 --- a/system/platform-core/src/test/java/org/platformlambda/automation/RestEndpointTest.java +++ b/system/platform-core/src/test/java/org/platformlambda/automation/RestEndpointTest.java @@ -19,7 +19,6 @@ package org.platformlambda.automation; import io.vertx.core.Future; -import io.vertx.core.Promise; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.platformlambda.common.TestBase; @@ -100,6 +99,7 @@ public void optionsMethodTest() throws IOException, InterruptedException { @SuppressWarnings(value = "unchecked") @Test public void serviceTest() throws IOException, InterruptedException { + final int TTL_SECONDS = 7; final BlockingQueue bench = new ArrayBlockingQueue<>(1); EventEmitter po = EventEmitter.getInstance(); AsyncHttpRequest req = new AsyncHttpRequest(); @@ -112,6 +112,7 @@ public void serviceTest() throws IOException, InterruptedException { list.add("b"); req.setQueryParameter("x2", list); req.setTargetHost("http://127.0.0.1:"+port); + req.setTimeoutSeconds(TTL_SECONDS); EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); Future res = po.asyncRequest(request, RPC_TIMEOUT); res.onSuccess(bench::offer); @@ -124,7 +125,7 @@ public void serviceTest() throws IOException, InterruptedException { assertEquals("/api/hello/world", map.getElement("url")); assertEquals("GET", map.getElement("method")); assertEquals("127.0.0.1", map.getElement("ip")); - assertEquals(10, map.getElement("timeout")); + assertEquals(String.valueOf(TTL_SECONDS * 1000), map.getElement("headers.x-ttl")); assertEquals("y", map.getElement("parameters.query.x1")); assertEquals(list, map.getElement("parameters.query.x2")); // the HTTP request filter will not execute because the request is not a static content request @@ -215,12 +216,41 @@ public void authRoutingTest2() throws IOException, InterruptedException { assertEquals("Unauthorized", map.get("message")); } + @Test + public void uploadBytesWithPut() throws IOException, InterruptedException { + final BlockingQueue bench = new ArrayBlockingQueue<>(1); + Utility util = Utility.getInstance(); + EventEmitter po = EventEmitter.getInstance(); + int len = 0; + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + for (int i=0; i < 100; i++) { + byte[] line = util.getUTF("hello world "+i+"\n"); + bytes.write(line); + len += line.length; + } + byte[] b = bytes.toByteArray(); + AsyncHttpRequest req = new AsyncHttpRequest(); + req.setMethod("PUT"); + req.setUrl("/api/hello/world"); + req.setTargetHost("http://127.0.0.1:"+port); + req.setBody(b); + req.setContentLength(len); + EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); + Future res = po.asyncRequest(request, RPC_TIMEOUT); + res.onSuccess(bench::offer); + EventEnvelope response = bench.poll(10, TimeUnit.SECONDS); + assert response != null; + assertInstanceOf(byte[].class, response.getBody()); + assertArrayEquals(b, (byte[]) response.getBody()); + } + @Test public void uploadSmallBlockWithPut() throws IOException, InterruptedException { final BlockingQueue bench1 = new ArrayBlockingQueue<>(1); final BlockingQueue bench2 = new ArrayBlockingQueue<>(1); final Utility util = Utility.getInstance(); final EventEmitter po = EventEmitter.getInstance(); + int TTL = 9; int len = 0; ByteArrayOutputStream bytes = new ByteArrayOutputStream(); EventPublisher publisher = new EventPublisher(10000); @@ -241,6 +271,7 @@ public void uploadSmallBlockWithPut() throws IOException, InterruptedException { req.setUrl("/api/v1/hello/world"); req.setTargetHost("http://127.0.0.1:"+port); req.setStreamRoute(publisher.getStreamId()); + req.setTimeoutSeconds(TTL); req.setContentLength(len); EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); Future res = po.asyncRequest(request, RPC_TIMEOUT); @@ -249,7 +280,8 @@ public void uploadSmallBlockWithPut() throws IOException, InterruptedException { assert response != null; assertNull(response.getBody()); // async.http.request returns a stream - String streamId = response.getHeader("stream"); + String streamId = response.getHeader("X-Stream-Id"); + assertEquals(String.valueOf(TTL * 1000), response.getHeader("x-ttl")); assertNotNull(streamId); ByteArrayOutputStream result = new ByteArrayOutputStream(); FluxConsumer flux = new FluxConsumer<>(streamId, RPC_TIMEOUT); @@ -265,34 +297,6 @@ public void uploadSmallBlockWithPut() throws IOException, InterruptedException { assertArrayEquals(b, result.toByteArray()); } - @Test - public void uploadBytesWithPut() throws IOException, InterruptedException { - final BlockingQueue bench = new ArrayBlockingQueue<>(1); - Utility util = Utility.getInstance(); - EventEmitter po = EventEmitter.getInstance(); - int len = 0; - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - for (int i=0; i < 100; i++) { - byte[] line = util.getUTF("hello world "+i+"\n"); - bytes.write(line); - len += line.length; - } - byte[] b = bytes.toByteArray(); - AsyncHttpRequest req = new AsyncHttpRequest(); - req.setMethod("PUT"); - req.setUrl("/api/hello/world"); - req.setTargetHost("http://127.0.0.1:"+port); - req.setBody(b); - req.setContentLength(len); - EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); - Future res = po.asyncRequest(request, RPC_TIMEOUT); - res.onSuccess(bench::offer); - EventEnvelope response = bench.poll(10, TimeUnit.SECONDS); - assert response != null; - assertInstanceOf(byte[].class, response.getBody()); - assertArrayEquals(b, (byte[]) response.getBody()); - } - @Test public void uploadLargePayloadWithPut() throws IOException, InterruptedException { final BlockingQueue bench1 = new ArrayBlockingQueue<>(1); @@ -324,7 +328,7 @@ public void uploadLargePayloadWithPut() throws IOException, InterruptedException assert response != null; assertNull(response.getBody()); // async.http.request returns a stream - String streamId = response.getHeader("stream"); + String streamId = response.getHeader("x-stream-id"); assertNotNull(streamId); ByteArrayOutputStream result = new ByteArrayOutputStream(); FluxConsumer flux = new FluxConsumer<>(streamId, RPC_TIMEOUT); @@ -365,15 +369,18 @@ public void uploadStreamWithPut() throws IOException, InterruptedException { req.setHeader("content-type", "application/octet-stream"); req.setContentLength(len); req.setStreamRoute(publisher.getStreamId()); + req.setTimeoutSeconds((int) RPC_TIMEOUT / 1000); EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); Future res = po.asyncRequest(request, RPC_TIMEOUT); res.onSuccess(bench1::offer); EventEnvelope response = bench1.poll(10, TimeUnit.SECONDS); assert response != null; - assertNotNull(response.getHeader("stream")); - String streamId = response.getHeader("stream"); + assertNotNull(response.getHeader("x-stream-id")); + String streamId = response.getHeader("x-stream-id"); + long ttl = util.str2long(response.getHeader("x-ttl")); + assertEquals(RPC_TIMEOUT, ttl); ByteArrayOutputStream result = new ByteArrayOutputStream(); - FluxConsumer flux = new FluxConsumer<>(streamId, RPC_TIMEOUT); + FluxConsumer flux = new FluxConsumer<>(streamId, ttl); flux.consume(data -> { try { result.write(data); @@ -417,8 +424,8 @@ public void uploadMultipartWithPost() throws IOException, InterruptedException { res.onSuccess(bench1::offer); EventEnvelope response = bench1.poll(10, TimeUnit.SECONDS); assert response != null; - assertNotNull(response.getHeader("stream")); - String streamId = response.getHeader("stream"); + assertNotNull(response.getHeader("x-stream-id")); + String streamId = response.getHeader("x-stream-id"); ByteArrayOutputStream result = new ByteArrayOutputStream(); FluxConsumer flux = new FluxConsumer<>(streamId, RPC_TIMEOUT); flux.consume(data -> { @@ -449,6 +456,7 @@ public void postJson() throws IOException, InterruptedException { req.setBody(json); req.setHeader("accept", "application/json"); req.setHeader("content-type", "application/json"); + req.setTimeoutSeconds((int) RPC_TIMEOUT / 1000); EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); Future res = po.asyncRequest(request, RPC_TIMEOUT); res.onSuccess(bench::offer); @@ -462,7 +470,7 @@ public void postJson() throws IOException, InterruptedException { assertEquals("/api/hello/world", map.getElement("url")); assertEquals("POST", map.getElement("method")); assertEquals("127.0.0.1", map.getElement("ip")); - assertEquals(10, map.getElement("timeout")); + assertEquals(String.valueOf(RPC_TIMEOUT), map.getElement("headers.x-ttl")); assertInstanceOf(Map.class, map.getElement("body")); Map received = (Map) map.getElement("body"); assertEquals(data, received); @@ -483,6 +491,7 @@ public void postXmlAsMap() throws IOException, InterruptedException { data.put("test", "message"); String xml = xmlWriter.write(data); req.setBody(xml); + req.setTimeoutSeconds((int) RPC_TIMEOUT / 1000); req.setHeader("accept", "application/xml"); req.setHeader("content-type", "application/xml"); EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); @@ -498,7 +507,7 @@ public void postXmlAsMap() throws IOException, InterruptedException { assertEquals("/api/hello/world", map.getElement("url")); assertEquals("POST", map.getElement("method")); assertEquals("127.0.0.1", map.getElement("ip")); - assertEquals("10", map.getElement("timeout")); + assertEquals(String.valueOf(RPC_TIMEOUT), map.getElement("headers.x-ttl")); assertInstanceOf(Map.class, map.getElement("body")); Map received = (Map) map.getElement("body"); assertEquals(data, received); @@ -516,6 +525,7 @@ public void postXmlAsText() throws IOException, InterruptedException { req.setHeader("accept", "application/xml"); req.setHeader("content-type", "application/xml"); req.setHeader("x-raw-xml", "true"); + req.setTimeoutSeconds((int) RPC_TIMEOUT / 1000); EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); Future res = po.asyncRequest(request, RPC_TIMEOUT); res.onSuccess(bench::offer); @@ -532,7 +542,7 @@ public void postXmlAsText() throws IOException, InterruptedException { assertEquals("/api/hello/world", map.getElement("url")); assertEquals("POST", map.getElement("method")); assertEquals("127.0.0.1", map.getElement("ip")); - assertEquals("10", map.getElement("timeout")); + assertEquals(String.valueOf(RPC_TIMEOUT), map.getElement("headers.x-ttl")); assertInstanceOf(String.class, map.getElement("body")); assertEquals("hello world", map.getElement("body")); } @@ -552,6 +562,7 @@ public void postJsonMap() throws IOException, InterruptedException { req.setBody(data); req.setHeader("accept", "application/json"); req.setHeader("content-type", "application/json"); + req.setTimeoutSeconds((int) RPC_TIMEOUT / 1000); EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); Future res = po.asyncRequest(request, RPC_TIMEOUT); res.onSuccess(bench::offer); @@ -565,7 +576,7 @@ public void postJsonMap() throws IOException, InterruptedException { assertEquals("/api/hello/world", map.getElement("url")); assertEquals("POST", map.getElement("method")); assertEquals("127.0.0.1", map.getElement("ip")); - assertEquals(10, map.getElement("timeout")); + assertEquals(String.valueOf(RPC_TIMEOUT), map.getElement("headers.x-ttl")); assertInstanceOf(Map.class, map.getElement("body")); Map received = (Map) map.getElement("body"); assertEquals(data, received); @@ -586,6 +597,7 @@ public void testJsonResultList() throws IOException, InterruptedException { req.setBody(data); req.setHeader("accept", "application/json"); req.setHeader("content-type", "application/json"); + req.setTimeoutSeconds((int) RPC_TIMEOUT / 1000); EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); Future res = po.asyncRequest(request, RPC_TIMEOUT); res.onSuccess(bench::offer); @@ -602,7 +614,7 @@ public void testJsonResultList() throws IOException, InterruptedException { assertEquals("/api/hello/list", map.getElement("url")); assertEquals("POST", map.getElement("method")); assertEquals("127.0.0.1", map.getElement("ip")); - assertEquals(15, map.getElement("timeout")); + assertEquals(String.valueOf(RPC_TIMEOUT), map.getElement("headers.x-ttl")); assertInstanceOf(Map.class, map.getElement("body")); Map received = (Map) map.getElement("body"); assertEquals(data, received); @@ -622,7 +634,7 @@ public void testXmlResultList() throws IOException, InterruptedException { data.put("hello", "world"); data.put("test", "message"); String xml = xmlWriter.write(data); - req.setBody(xml); + req.setTimeoutSeconds((int) (RPC_TIMEOUT / 1000)).setBody(xml); req.setHeader("accept", "application/xml"); req.setHeader("content-type", "application/xml"); EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); @@ -639,7 +651,7 @@ public void testXmlResultList() throws IOException, InterruptedException { assertEquals("/api/hello/list", map.getElement("result.url")); assertEquals("POST", map.getElement("result.method")); assertEquals("127.0.0.1", map.getElement("result.ip")); - assertEquals("15", map.getElement("result.timeout")); + assertEquals(String.valueOf(RPC_TIMEOUT), map.getElement("result.headers.x-ttl")); assertInstanceOf(Map.class, map.getElement("result.body")); Map received = (Map) map.getElement("result.body"); assertEquals(data, received); @@ -655,6 +667,7 @@ public void sendHttpDelete() throws IOException, InterruptedException { req.setUrl("/api/hello/world"); req.setTargetHost("http://127.0.0.1:"+port); req.setHeader("accept", "application/json"); + req.setTimeoutSeconds((int) RPC_TIMEOUT / 1000); EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); Future res = po.asyncRequest(request, RPC_TIMEOUT); res.onSuccess(bench::offer); @@ -667,7 +680,7 @@ public void sendHttpDelete() throws IOException, InterruptedException { assertEquals("/api/hello/world", map.getElement("url")); assertEquals("DELETE", map.getElement("method")); assertEquals("127.0.0.1", map.getElement("ip")); - assertEquals(10, map.getElement("timeout")); + assertEquals(String.valueOf(RPC_TIMEOUT), map.getElement("headers.x-ttl")); assertNull(map.getElement("body")); } @@ -735,6 +748,7 @@ public void postXmlMap() throws IOException, InterruptedException { req.setBody(data); req.setHeader("accept", "application/json"); req.setHeader("content-type", "application/xml"); + req.setTimeoutSeconds((int) RPC_TIMEOUT / 1000); EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); Future res = po.asyncRequest(request, RPC_TIMEOUT); res.onSuccess(bench::offer); @@ -748,13 +762,12 @@ public void postXmlMap() throws IOException, InterruptedException { assertEquals("/api/hello/world", map.getElement("url")); assertEquals("POST", map.getElement("method")); assertEquals("127.0.0.1", map.getElement("ip")); - assertEquals(10, map.getElement("timeout")); + assertEquals(String.valueOf(RPC_TIMEOUT), map.getElement("headers.x-ttl")); assertInstanceOf(Map.class, map.getElement("body")); Map received = (Map) map.getElement("body"); assertEquals(data, received); } - @SuppressWarnings("unchecked") @Test public void postList() throws IOException, InterruptedException { @@ -770,6 +783,7 @@ public void postList() throws IOException, InterruptedException { req.setBody(Collections.singletonList(data)); req.setHeader("accept", "application/json"); req.setHeader("content-type", "application/json"); + req.setTimeoutSeconds((int) RPC_TIMEOUT / 1000); EventEnvelope request = new EventEnvelope().setTo(HTTP_REQUEST).setBody(req); Future res = po.asyncRequest(request, RPC_TIMEOUT); res.onSuccess(bench::offer); @@ -783,7 +797,7 @@ public void postList() throws IOException, InterruptedException { assertEquals("/api/hello/world", map.getElement("url")); assertEquals("POST", map.getElement("method")); assertEquals("127.0.0.1", map.getElement("ip")); - assertEquals(10, map.getElement("timeout")); + assertEquals(String.valueOf(RPC_TIMEOUT), map.getElement("headers.x-ttl")); assertInstanceOf(List.class, map.getElement("body")); List> received = (List>) map.getElement("body"); assertEquals(1, received.size()); diff --git a/system/platform-core/src/test/java/org/platformlambda/automation/service/MockHelloWorld.java b/system/platform-core/src/test/java/org/platformlambda/automation/service/MockHelloWorld.java index ec903803..354c4546 100644 --- a/system/platform-core/src/test/java/org/platformlambda/automation/service/MockHelloWorld.java +++ b/system/platform-core/src/test/java/org/platformlambda/automation/service/MockHelloWorld.java @@ -27,17 +27,18 @@ import java.util.Map; /** - * KernelThreadRunner is added for unit test purpose. + * KernelThreadRunner is added for demonstration purpose only. *

* Normally KernelThread should be reserved for computational intensive functions - * without RPC calls + * or legacy code that cannot be refactored into non-blocking operation. */ @KernelThreadRunner public class MockHelloWorld implements TypedLambdaFunction { + private static final String X_STREAM_ID = "x-stream-id"; + private static final String X_TTL = "x-ttl"; @Override - public Object handleEvent(Map headers, AsyncHttpRequest body, int instance) throws IOException { - AsyncHttpRequest input = new AsyncHttpRequest(body); // test AsyncHttpRequest clone feature + public Object handleEvent(Map headers, AsyncHttpRequest input, int instance) throws IOException { if ("HEAD".equals(input.getMethod())) { EventEnvelope result = new EventEnvelope().setHeader("X-Response", "HEAD request received") .setHeader("Content-Length", 100); @@ -47,14 +48,20 @@ public Object handleEvent(Map headers, AsyncHttpRequest body, in return result; } if (input.getStreamRoute() != null) { - return new EventEnvelope().setBody(input.getBody()).setHeader("stream", input.getStreamRoute()) - .setHeader("content-type", "application/octet-stream"); + var result = new EventEnvelope().setBody(input.getBody()) + .setHeader("content-type", "application/octet-stream"); + + String streamId = input.getHeader(X_STREAM_ID); + String ttl = input.getHeader(X_TTL); + if (streamId != null && ttl != null) { + result.setHeader(X_STREAM_ID, streamId).setHeader(X_TTL, ttl); + } + return result; } else if (input.getBody() instanceof byte[]) { return new EventEnvelope().setBody(input.getBody()) .setHeader("content-type", "application/octet-stream"); } else { - // returning AsyncHttpRequest is an edge case - return body; + return input.toMap(); } } } diff --git a/system/platform-core/src/test/java/org/platformlambda/core/PostOfficeTest.java b/system/platform-core/src/test/java/org/platformlambda/core/PostOfficeTest.java index 3f946e86..948d064f 100644 --- a/system/platform-core/src/test/java/org/platformlambda/core/PostOfficeTest.java +++ b/system/platform-core/src/test/java/org/platformlambda/core/PostOfficeTest.java @@ -218,6 +218,7 @@ public void httpClientRenderSmallPayloadAsBytes() throws IOException, ExecutionE @Test public void httpClientDetectStreamingContent() throws IOException, ExecutionException, InterruptedException { + Utility util = Utility.getInstance(); final String HELLO = "hello world 0123456789"; final AppConfigReader config = AppConfigReader.getInstance(); String port = config.getProperty("server.port"); @@ -235,28 +236,29 @@ public void httpClientDetectStreamingContent() throws IOException, ExecutionExce */ assertEquals(200, response.getStatus()); assertNull(response.getBody()); - String streamId = response.getHeader("stream"); + String streamId = response.getHeader("x-stream-id"); + int ttl = util.str2int(response.getHeader("x-ttl")); assertNotNull(streamId); assertTrue(streamId.startsWith("stream.")); // HTTP response header "x-content-length" is provided by AsyncHttpClient when rendering small payload as bytes assertEquals(String.valueOf(HELLO.length()), response.getHeader("x-content-length")); // Read stream content - EventEnvelope streamRequest = new EventEnvelope().setTo(streamId).setHeader("type", "read"); + BlockingQueue completion = new LinkedBlockingQueue<>(); ByteArrayOutputStream out = new ByteArrayOutputStream(); - while (true) { - EventEnvelope event = po.request(streamRequest, 5000).get(); - if ("eof".equals(event.getHeader("type"))) { - log.info("Closing {}", streamId); - po.send(streamId, new Kv("type", "close")); - break; - } - if ("data".equals(event.getHeader("type"))) { - Object block = event.getBody(); - if (block instanceof byte[] b) { - out.write(b); - } + FluxConsumer consumer = new FluxConsumer<>(streamId, ttl); + consumer.consume(b -> { + try { + out.write(b); + } catch (IOException e) { + // ok to ignore } - } + }, e -> { + log.error("unexpected error", e); + }, () -> { + completion.add(true); + }); + Boolean done = completion.take(); + assertEquals(true, done); String content = Utility.getInstance().getUTF(out.toByteArray()); assertEquals(HELLO, content); } diff --git a/system/platform-core/src/test/java/org/platformlambda/core/mock/HelloBytes.java b/system/platform-core/src/test/java/org/platformlambda/core/mock/HelloBytes.java index e77e5bbe..843168ab 100644 --- a/system/platform-core/src/test/java/org/platformlambda/core/mock/HelloBytes.java +++ b/system/platform-core/src/test/java/org/platformlambda/core/mock/HelloBytes.java @@ -31,12 +31,17 @@ public class HelloBytes implements TypedLambdaFunction { private static final String APPLICATION_OCTET_STREAM = "application/octet-stream"; + private static final String CONTENT_TYPE = "content-type"; + private static final String X_STREAM_ID = "x-stream-id"; + private static final String X_TTL = "x-ttl"; + private static final long EXPIRY = 10000; + @Override public EventEnvelope handleEvent(Map headers, AsyncHttpRequest input, int instance) { - EventPublisher publisher = new EventPublisher(10000); + EventPublisher publisher = new EventPublisher(EXPIRY); publisher.publish(Utility.getInstance().getUTF("hello world 0123456789")); publisher.publishCompletion(); - return new EventEnvelope().setHeader("stream", publisher.getStreamId()) - .setHeader("content-type", APPLICATION_OCTET_STREAM); + return new EventEnvelope().setHeader(X_STREAM_ID, publisher.getStreamId()) + .setHeader(X_TTL, EXPIRY).setHeader(CONTENT_TYPE, APPLICATION_OCTET_STREAM); } } diff --git a/system/platform-core/src/test/resources/event-over-http.yaml b/system/platform-core/src/test/resources/event-over-http.yaml index adfd885c..8939e4b5 100644 --- a/system/platform-core/src/test/resources/event-over-http.yaml +++ b/system/platform-core/src/test/resources/event-over-http.yaml @@ -8,4 +8,4 @@ event: - route: 'event.save.get' target: 'http://127.0.0.1:${server.port}/api/event' headers: - authorization: 'demo' \ No newline at end of file + authorization: 'demo' diff --git a/system/platform-core/src/test/resources/rest.yaml b/system/platform-core/src/test/resources/rest.yaml index b88c435f..fe4c69e6 100644 --- a/system/platform-core/src/test/resources/rest.yaml +++ b/system/platform-core/src/test/resources/rest.yaml @@ -28,7 +28,7 @@ rest: methods: ['GET', 'PUT', 'POST', 'HEAD', 'PATCH', 'DELETE'] url: "/api/hello/world" flow: "hello-world" - timeout: 10s + timeout: 15s # # Optional authentication service which should return a true or false result # The authentication service can also add session info in headers using EventEnvelope as a response