Skip to content

Commit

Permalink
consistent custom HTTP headers for streaming content and event over http
Browse files Browse the repository at this point in the history
  • Loading branch information
acn-ericlaw committed Dec 9, 2024
1 parent 65577da commit a050f89
Show file tree
Hide file tree
Showing 25 changed files with 259 additions and 309 deletions.
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

---
## Version 4.0.32, 12/6/2024
## Version 4.0.32, 12/9/2024

### Added

Expand All @@ -21,8 +21,8 @@ N/A

### Changed

Update Chapter-4 for the new AND/OR type mapping feature and the additional metadata
in the state machine of a flow instance.
1. Update Chapter-4 for the new AND/OR type mapping feature
2. Consistent custom HTTP headers for event over http protocol and streaming content

---
## Version 4.0.31, 12/5/2024
Expand Down
6 changes: 3 additions & 3 deletions guides/APPENDIX-I.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,6 @@ X-Raw-Xml=true
```
<br/>
| Chapter-9 | Home | Appendix-II |
|:----------------------------:|:-----------------------------------------:|:--------------------------------------:|
| [API Overview](CHAPTER-9.md) | [Table of Contents](TABLE-OF-CONTENTS.md) | [Reserved route names](APPENDIX-II.md) |
| Chapter-9 | Home | Appendix-II |
|:----------------------------:|:-----------------------------------------:|:--------------------------------------------:|
| [API Overview](CHAPTER-9.md) | [Table of Contents](TABLE-OF-CONTENTS.md) | [Reserved names and headers](APPENDIX-II.md) |
30 changes: 29 additions & 1 deletion guides/APPENDIX-II.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Reserved route names
# Reserved names

The system reserves some route names and headers for routing purpose.

## System route names

The Mercury foundation code is written using the same core API and each function has a route name.

Expand Down Expand Up @@ -72,6 +76,30 @@ PostOffice po = new PostOffice(headers, instance);
// Kotlin
val fastRPC = FastRPC(headers);
```

## Reserved HTTP header names

| Header | Purpose |
|:-------------------------|:----------------------------------------------------------------------------|
| X-Stream-Id | Temporal route name for streaming content |
| X-TTL | Time to live in milliseconds for a streaming content |
| X-Small-Payload-As-Bytes | This header, if set to true, tells system to render stream content as bytes |
| X-Event-Api | The system uses this header to indicate that the request is sent over HTTP |
| X-Async | This header, if set to true, indicates it is a drop-n-forget request |
| X-Trace-Id | This allows the system to propagate trace ID |
| X-Correlation-Id | Alternative to X-Trace-Id |
| X-Content-Length | If present, it is the expected length of a streaming content |
| X-Raw-Xml | This header, if set to true, tells to system to skip XML rendering |
| X-Flow-Id | This tells the event manager to select a flow configuration by ID |
| X-App-Instance | This header is used by some protected actuator REST endpoints |

To support traceId that is stored in X-Correlation-Id HTTP header, set this in application.properties.

```properties
# list of supported traceId headers where the first one is the default label
trace.http.header=X-Correlation-Id, X-Trace-Id
```

<br/>

| Appendix-I | Home | Appendix-III |
Expand Down
43 changes: 18 additions & 25 deletions guides/APPENDIX-III.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,30 +193,23 @@ If content length is not given, the response body would arrive as a stream.

Your application should check if the HTTP response header "stream" exists. Its value is the input "streamId".

For simplicity and readability, we recommend using the PostOffice's "request" API to read the input byte-array stream.

It looks like this:
You can process the input stream using the FluxConsumer class like this:

```java
PostOffice po = PostOffice(headers, instance);
EventEnvelope req = new EventEnvelope().setTo(streamId).setHeader("type", "read");
while (true) {
EventEnvelope event = po.request(req, 5000).get();
if (event.getStatus() == 400) {
// handle input stream timeout
}
if ("eof".equals(event.getHeader("type"))) {
log.info("Closing {}", streamId);
po.send(streamId, new Kv("type", "close"));
break;
}
if ("data".equals(event.getHeader("type"))) {
Object block = event.getBody();
if (block instanceof byte[] b) {
// handle the byte array "b"
}
}
}
String streamId = headers.get("stream");
long ttl = 10000; // anticipated time in milliseconds to stream the content
FluxConsumer<Map<String, Object>> fc = new FluxConsumer<>(streamId, ttl);
fc.consume(
data -> {
// handle incoming message
},
e -> {
// handle exception where e is a Throwable
},
() -> {
// handle stream completion
}
);
```

By default, a user function is executed in a virtual thread which effectively is an "async" function and
Expand Down Expand Up @@ -351,6 +344,6 @@ For other flow adapters, you may use different set of key-values.

<br/>

| Appendix-II | Home |
|:--------------------------------------:|:-----------------------------------------:|
| [Reserved route names](APPENDIX-II.md) | [Table of Contents](TABLE-OF-CONTENTS.md) |
| Appendix-II | Home |
|:--------------------------------------------:|:-----------------------------------------:|
| [Reserved names and headers](APPENDIX-II.md) | [Table of Contents](TABLE-OF-CONTENTS.md) |
35 changes: 18 additions & 17 deletions guides/CHAPTER-4.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ flow:
first.task: 'greeting.demo'

tasks:
- input:
- 'input.path_parameter.user -> user'
process: 'greeting.demo'
output:
- 'text(application/json) -> output.header.content-type'
- 'result -> output.body'
description: 'Hello World'
execution: end
- input:
- 'input.path_parameter.user -> user'
process: 'greeting.demo'
output:
- 'text(application/json) -> output.header.content-type'
- 'result -> output.body'
description: 'Hello World'
execution: end
```
In the application.properties, you can specify the following parameter:
Expand All @@ -84,7 +84,7 @@ Then, you can add a new REST endpoint in the "rest.yaml" configuration file like
```yaml
- service: "http.flow.adapter"
methods: ['GET']
url: "/api/greeting/{user}"
url: "/api/greetings/{user}"
flow: 'greetings'
timeout: 10s
cors: cors_1
Expand All @@ -96,7 +96,7 @@ input arguments (headers and body) in your function. Now you can write your new
"greeting.demo". Please copy-n-paste the following into a Java class called "Greetings" and save in the package
under "my.organization.tasks" in the source project.
> "my.organization" package name is an example. Please replace it with your organization package path.
> Note: "my.organization" package name is an example. Please replace it with your organization package path.
```java
@PreLoad(route="greeting.demo", instances=10, isPrivate = false)
Expand Down Expand Up @@ -129,7 +129,7 @@ web.component.scan=my.organization
To test your new REST endpoint, flow configuration and function, please point your browser to

```text
http://127.0.0.1:8100/api/greeting/my_name
http://127.0.0.1:8100/api/greetings/my_name
```

You can replace "my_name" with your first name to see the response to the browser.
Expand All @@ -156,7 +156,7 @@ the incoming event.

The configuration file contains a list of task entries where each task is defined by "input", "process", "output"
and "execution" type. In the above example, the execution type is "end", meaning that it is the end of a transaction
and its result set can be sent to the user.
and its result set will be delivered to the user.

## Underlying Event System

Expand All @@ -171,7 +171,8 @@ to make any API calls to the underlying event system.
The most common transaction entry point is a REST endpoint. The event flow may look like this:

```text
Request -> "http.request" -> "task.executor" -> user defined tasks -> "async.http.response" -> Response
Request -> "http.request" -> "task.executor" -> user defined tasks
-> "async.http.response" -> Response
```

REST automation is part of the Mercury platform-core library. It contains a non-blocking HTTP server that converts
Expand Down Expand Up @@ -223,11 +224,11 @@ For easy matching, keys of headers, cookies, query and path parameters are case-
Regular API uses JSON and XML and they will be converted to a hashmap in the event's body.
For special use cases like file upload/download, your application logic may invoke a streaming API to retrieve
the binary payload. Please refer to the following mercury 3.0 guide for details.
the binary payload. Please refer to the following sections for details.
> https://accenture.github.io/mercury-composable/guides/APPENDIX-III/#send-http-request-body-as-a-stream
https://accenture.github.io/mercury-composable/guides/APPENDIX-III/#send-http-request-body-as-a-stream
> https://accenture.github.io/mercury-composable/guides/APPENDIX-III/#read-http-response-body-stream
https://accenture.github.io/mercury-composable/guides/APPENDIX-III/#read-http-response-body-stream
## Task and its corresponding function
Expand Down Expand Up @@ -507,7 +508,7 @@ The "decision" value is also saved to the state machine (`model`) for subsequent
### Metadata for each flow instance

For each flow instance, the state machine in the "model" namespace provides the following metadata that
you can use in the input/output data mapping. For example, you can set this for an exceptional handler to
you can use in the input/output data mapping. For example, you can set this for an exception handler to
log additional information.

| Type | Keyword | Comment |
Expand Down
7 changes: 4 additions & 3 deletions guides/CHAPTER-7.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public Future<EventEnvelope> request(final EventEnvelope event, long timeout,
suspend fun awaitRequest(request: EventEnvelope?, timeout: Long,
headers: Map<String, String>,
eventEndpoint: String, rpc: Boolean): EventEnvelope
}
```

Optionally, you may add security headers in the "headers" argument. e.g. the "Authorization" header.
Expand Down Expand Up @@ -248,6 +249,6 @@ before passing to the Event API service. You can plug in your own authentication
Please refer to [Chapter-3 - REST automation](CHAPTER-3.md) for details.
<br/>

| Chapter-6 | Home | Chapter-8 |
|:---------------------------:|:-----------------------------------------:|:----------------------------:|
| [Spring Boot](CHAPTER-6.md) | [Table of Contents](TABLE-OF-CONTENTS.md) | [Service Mesh](CHAPTER-8.md) |
| Chapter-6 | Home | Chapter-8 |
|:---------------------------:|:-----------------------------------------:|:---------------------------------------:|
| [Spring Boot](CHAPTER-6.md) | [Table of Contents](TABLE-OF-CONTENTS.md) | [Minimalist Service Mesh](CHAPTER-8.md) |
4 changes: 2 additions & 2 deletions guides/CHAPTER-8.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Service Mesh
# Minimalist Service Mesh

Service mesh is a dedicated infrastructure layer to facilitate inter-container communication using "sidecar" and
"control plane".
Expand All @@ -7,7 +7,7 @@ Service mesh systems require additional administrative containers (PODs) for "co

The additional infrastructure requirements vary among products.

# Using kafka as a service mesh
# Using kafka as a minimalist service mesh

We will discuss using Kafka as a minimalist service mesh.

Expand Down
6 changes: 3 additions & 3 deletions guides/CHAPTER-9.md
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,6 @@ For enterprise clients, optional technical support is available. Please contact
for details.
<br/>

| Chapter-8 | Home |
|:----------------------------:|:-----------------------------------------:|
| [Service Mesh](CHAPTER-8.md) | [Table of Contents](TABLE-OF-CONTENTS.md) |
| Chapter-8 | Home |
|:---------------------------------------:|:-----------------------------------------:|
| [Minimalist Service Mesh](CHAPTER-8.md) | [Table of Contents](TABLE-OF-CONTENTS.md) |
6 changes: 3 additions & 3 deletions guides/TABLE-OF-CONTENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ Mercury Composable is a software development toolkit for writing composable appl

[Chapter 7 - Event over HTTP](CHAPTER-7.md)

[Chapter 8 - Service Mesh](CHAPTER-8.md)
[Chapter 8 - Minimalist Service Mesh](CHAPTER-8.md)

[Chapter 9 - API Overview](CHAPTER-9.md)

[Appendix I - application.properties](APPENDIX-I.md)

[Appendix II - Reserved route names](APPENDIX-II.md)
[Appendix II - Reserved names and headers](APPENDIX-II.md)

[Appendix III - Actuators and HTTP client](APPENDIX-III.md)
[Appendix III - Actuators, HTTP client and More](APPENDIX-III.md)
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.FileSystem;
Expand Down Expand Up @@ -92,14 +91,15 @@ public class AsyncHttpClient implements TypedLambdaFunction<EventEnvelope, Void>
private static final String DELETE = "DELETE";
private static final String OPTIONS = "OPTIONS";
private static final String HEAD = "HEAD";
private static final String STREAM = "stream";
private static final String X_STREAM_ID = "x-stream-id";
private static final String X_TTL = "x-ttl";
private static final String STREAM_PREFIX = "stream.";
private static final String INPUT_STREAM_SUFFIX = ".in";
private static final String CONTENT_TYPE = "content-type";
private static final String CONTENT_LENGTH = "content-length";
private static final String X_CONTENT_LENGTH = "X-Content-Length";
private static final String TIMEOUT = "timeout";
private static final String USER_AGENT_NAME = "async-http-client";
private static final int DEFAULT_TTL_SECONDS = 30; // 30 seconds
/*
* Some headers must be dropped because they are not relevant for HTTP relay
* e.g. "content-encoding" and "transfer-encoding" will break HTTP response rendering.
Expand Down Expand Up @@ -415,7 +415,8 @@ private void handleUpload(EventEnvelope input, OutputStreamQueue queue,
String streamId = request.getStreamRoute();
String contentType = request.getHeader(CONTENT_TYPE);
String method = request.getMethod();
objectStream2file(streamId, request.getTimeoutSeconds())
int timeout = request.getTimeoutSeconds();
objectStream2file(streamId, timeout > 0? timeout : DEFAULT_TTL_SECONDS)
.onSuccess(temp -> {
final Future<HttpResponse<Void>> future;
int contentLen = request.getContentLength();
Expand Down Expand Up @@ -471,11 +472,9 @@ private Future<File> objectStream2file(String streamId, int timeoutSeconds) {
Utility util = Utility.getInstance();
File temp = getTempFile(streamId);
try {
// FluxConsumer is reactive and the file output stream will be closed at completion,
// thus no need to do "auto-close".
// No auto-close because FluxConsumer is reactive and the file output stream will close at completion
FileOutputStream out = new FileOutputStream(temp);
long timeout = Math.max(5000, timeoutSeconds * 1000L);
FluxConsumer<Object> flux = new FluxConsumer<>(streamId, timeout);
FluxConsumer<Object> flux = new FluxConsumer<>(streamId, Math.max(5000L, timeoutSeconds * 1000L));
flux.consume(data -> {
try {
if (data instanceof byte[] b && b.length > 0) {
Expand Down Expand Up @@ -551,7 +550,8 @@ public HttpResponseHandler(EventEnvelope input, AsyncHttpRequest request, Output
this.input = input;
this.request = request;
this.queue = queue;
this.timeoutSeconds = Math.max(8, request.getTimeoutSeconds());
int timeout = request.getTimeoutSeconds();
this.timeoutSeconds = timeout > 0? timeout : DEFAULT_TTL_SECONDS;
}

@Override
Expand Down Expand Up @@ -652,8 +652,8 @@ public void handle(HttpResponse<Void> res) {
}
}
if (publisher != null) {
response.setHeader(STREAM, publisher.getStreamId())
.setHeader(TIMEOUT, timeoutSeconds * 1000)
response.setHeader(X_STREAM_ID, publisher.getStreamId())
.setHeader(X_TTL, timeoutSeconds * 1000)
.setHeader(X_CONTENT_LENGTH, len);
publisher.publishCompletion();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@
package org.platformlambda.automation.models;

import org.platformlambda.core.system.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamHolder {
private static final Logger log = LoggerFactory.getLogger(StreamHolder.class);

private final EventPublisher publisher;

public StreamHolder(int timeoutSeconds) {
Expand All @@ -42,5 +38,4 @@ public String getInputStreamId() {
public void close() {
publisher.publishCompletion();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class ServiceGateway {
private static final String AUTH_HANDLER = "http.auth.handler";
private static final String CONTENT_TYPE = "Content-Type";
private static final String CONTENT_LEN = "Content-Length";
private static final String X_NO_STREAM = "x-small-payload-as-bytes";
private static final String X_TTL = "x-ttl";
private static final String APPLICATION_FORM_URLENCODED = "application/x-www-form-urlencoded";
private static final String MULTIPART_FORM_DATA = "multipart/form-data";
private static final String APPLICATION_JSON = "application/json";
Expand Down Expand Up @@ -674,7 +676,8 @@ private void routeRequest(String requestId, AssignedRoute route, AsyncContextHol
* For large payload, it is better to deliver as a stream.
*/
int contentLen = util.str2int(request.getHeader(CONTENT_LEN));
if (contentLen > 0 && contentLen <= route.info.threshold) {
boolean noStream = "true".equals(request.getHeader(X_NO_STREAM));
if (noStream || (contentLen > 0 && contentLen <= route.info.threshold)) {
request.bodyHandler(block -> {
byte[] b = block.getBytes(0, block.length());
requestBody.write(b, 0, b.length);
Expand All @@ -696,7 +699,8 @@ private void routeRequest(String requestId, AssignedRoute route, AsyncContextHol
int size = total.get();
req.setContentLength(size);
if (size > 0) {
req.setStreamRoute(stream.getInputStreamId());
req.setStreamRoute(stream.getInputStreamId())
.setHeader(X_TTL, String.valueOf(stream.getPublisher().getTimeToLive()));
stream.close();
}
sendRequestToService(request, requestEvent.setHttpRequest(req));
Expand Down
Loading

0 comments on commit a050f89

Please sign in to comment.