deleteState(DeleteStateRequest request);
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClientBuilder.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClientBuilder.java
new file mode 100644
index 0000000000..ae51d48e61
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClientBuilder.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.client.reactor;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.mosn.layotto.v1.config.Properties;
+import io.mosn.layotto.v1.serializer.DefaultObjectSerializer;
+import io.mosn.layotto.v1.serializer.LayottoObjectSerializer;
+import io.mosn.layotto.v1.value.LayottoApiProtocol;
+import spec.proto.runtime.v1.RuntimeGrpc;
+
+import java.io.Closeable;
+
+/**
+ * A builder for the LayottoClient, Currently only gRPC Client will be supported.
+ */
+public class LayottoReactorClientBuilder {
+
+ /**
+ * Determine if this builder will create GRPC clients instead of HTTP clients.
+ */
+ private final LayottoApiProtocol apiProtocol;
+
+ /**
+ * Serializer used for request and response objects in LayottoClient.
+ */
+ private LayottoObjectSerializer objectSerializer;
+
+ /**
+ * Serializer used for state objects in LayottoClient.
+ */
+ private LayottoObjectSerializer stateSerializer;
+
+ /**
+ * Creates a constructor for LayottoClient.
+ *
+ * {@link DefaultObjectSerializer} is used for object and state serializers by default but is not recommended
+ * for production scenarios.
+ */
+ public LayottoReactorClientBuilder() {
+ this.objectSerializer = new DefaultObjectSerializer();
+ this.stateSerializer = new DefaultObjectSerializer();
+ this.apiProtocol = Properties.API_PROTOCOL.get();
+ }
+
+ /**
+ * Sets the serializer for objects to be sent and received from Layotto.
+ * See {@link DefaultObjectSerializer} as possible serializer for non-production scenarios.
+ *
+ * @param objectSerializer Serializer for objects to be sent and received from Layotto.
+ * @return This instance.
+ */
+ public LayottoReactorClientBuilder withObjectSerializer(LayottoObjectSerializer objectSerializer) {
+ if (objectSerializer == null) {
+ throw new IllegalArgumentException("Object serializer is required");
+ }
+
+ if (objectSerializer.getContentType() == null || objectSerializer.getContentType().isEmpty()) {
+ throw new IllegalArgumentException("Content Type should not be null or empty");
+ }
+
+ this.objectSerializer = objectSerializer;
+ return this;
+ }
+
+ /**
+ * Sets the serializer for objects to be persisted.
+ * See {@link DefaultObjectSerializer} as possible serializer for non-production scenarios.
+ *
+ * @param stateSerializer Serializer for objects to be persisted.
+ * @return This instance.
+ */
+ public LayottoReactorClientBuilder withStateSerializer(LayottoObjectSerializer stateSerializer) {
+ if (stateSerializer == null) {
+ throw new IllegalArgumentException("State serializer is required");
+ }
+
+ this.stateSerializer = stateSerializer;
+ return this;
+ }
+
+ /**
+ * Build an instance of the Client based on the provided setup.
+ *
+ * @return an instance of the setup Client
+ * @throws java.lang.IllegalStateException if any required field is missing
+ */
+ public LayottoReactorClient build() {
+ return buildLayottoClient(this.apiProtocol);
+ }
+
+ /**
+ * Creates an instance of a Layotto Client based on the chosen protocol.
+ *
+ * @param protocol Layotto API's protocol.
+ * @return the GRPC Client.
+ * @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number.
+ */
+ private LayottoReactorClient buildLayottoClient(LayottoApiProtocol protocol) {
+ if (protocol == null) {
+ throw new IllegalStateException("Protocol is required.");
+ }
+
+ switch (protocol) {
+ case GRPC:
+ return buildLayottoClientGrpc();
+ default:
+ throw new IllegalStateException("Unsupported protocol: " + protocol.name());
+ }
+ }
+
+ /**
+ * Creates an instance of the GPRC Client.
+ *
+ * @return the GRPC Client.
+ * @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number.
+ */
+ private LayottoReactorClient buildLayottoClientGrpc() {
+ int port = Properties.GRPC_PORT.get();
+ if (port <= 0) {
+ throw new IllegalArgumentException("Invalid port.");
+ }
+ ManagedChannel channel = ManagedChannelBuilder
+ .forAddress(Properties.SIDECAR_IP.get(), port)
+ .usePlaintext()
+ .build();
+ Closeable closeableChannel = () -> {
+ if (channel != null && !channel.isShutdown()) {
+ channel.shutdown();
+ }
+ };
+ RuntimeGrpc.RuntimeStub asyncStub = RuntimeGrpc.newStub(channel);
+ return new LayottoReactorClientGrpc(this.objectSerializer, this.stateSerializer, closeableChannel, asyncStub);
+ }
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClientGrpc.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClientGrpc.java
new file mode 100644
index 0000000000..996502c4ed
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClientGrpc.java
@@ -0,0 +1,624 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.client.reactor;
+
+import com.google.common.base.Strings;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.stub.StreamObserver;
+import io.mosn.layotto.v1.config.Properties;
+import io.mosn.layotto.v1.exceptions.LayottoException;
+import io.mosn.layotto.v1.serializer.LayottoObjectSerializer;
+import io.mosn.layotto.v1.utils.GrpcWrapper;
+import io.mosn.layotto.v1.utils.NetworkUtils;
+import io.mosn.layotto.v1.value.Headers;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
+import reactor.util.context.Context;
+import spec.proto.runtime.v1.RuntimeGrpc;
+import spec.proto.runtime.v1.RuntimeProto;
+import spec.sdk.reactor.v1.domain.core.configuration.ConfigurationItem;
+import spec.sdk.reactor.v1.domain.core.configuration.ConfigurationRequestItem;
+import spec.sdk.reactor.v1.domain.core.configuration.SaveConfigurationRequest;
+import spec.sdk.reactor.v1.domain.core.configuration.SubConfigurationResp;
+import spec.sdk.reactor.v1.domain.core.invocation.HttpExtension;
+import spec.sdk.reactor.v1.domain.core.invocation.InvokeMethodRequest;
+import spec.sdk.reactor.v1.domain.core.pubsub.PublishEventRequest;
+import spec.sdk.reactor.v1.domain.core.state.DeleteStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.ExecuteStateTransactionRequest;
+import spec.sdk.reactor.v1.domain.core.state.GetBulkStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.GetStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.SaveStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.State;
+import spec.sdk.reactor.v1.domain.core.state.StateOptions;
+import spec.sdk.reactor.v1.domain.core.state.TransactionalStateOperation;
+import spec.sdk.reactor.v1.utils.TypeRef;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class LayottoReactorClientGrpc extends AbstractLayottoReactorClient {
+
+ /**
+ * The GRPC managed channel to be used.
+ */
+ private final Closeable channel;
+
+ /**
+ * The async gRPC stub.
+ */
+ private final RuntimeGrpc.RuntimeStub asyncStub;
+
+ /**
+ * Default access level constructor, in order to create an instance of this class.
+ *
+ * @param closeableChannel A closeable for a Managed GRPC channel
+ * @param asyncStub async gRPC stub
+ */
+ LayottoReactorClientGrpc(LayottoObjectSerializer objectSerializer,
+ LayottoObjectSerializer stateSerializer,
+ Closeable closeableChannel,
+ RuntimeGrpc.RuntimeStub asyncStub) {
+ super(objectSerializer, stateSerializer);
+ this.channel = closeableChannel;
+ this.asyncStub = intercept(asyncStub);
+ }
+
+ @Override
+ public Mono>> getConfiguration(ConfigurationRequestItem configurationRequestItem,
+ TypeRef type) {
+ // TODO: 2021/9/26
+ return null;
+ }
+
+ @Override
+ public Mono saveConfiguration(SaveConfigurationRequest saveConfigurationRequest) {
+ // TODO: 2021/9/26
+ return null;
+ }
+
+ @Override
+ public Mono deleteConfiguration(ConfigurationRequestItem configurationRequestItem) {
+ // TODO: 2021/9/26
+ return null;
+ }
+
+ @Override
+ public Flux> subscribeConfiguration(ConfigurationRequestItem configurationRequestItem,
+ TypeRef type) {
+ // TODO: 2021/9/26
+ return null;
+ }
+
+ @Override
+ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef type) {
+ try {
+ String appId = invokeMethodRequest.getAppId();
+ String method = invokeMethodRequest.getMethod();
+ Object body = invokeMethodRequest.getBody();
+ HttpExtension httpExtension = invokeMethodRequest.getHttpExtension();
+ RuntimeProto.InvokeServiceRequest envelope = this.buildInvokeServiceRequest(
+ httpExtension,
+ appId,
+ method,
+ body);
+ // Regarding missing metadata in method invocation for gRPC:
+ // gRPC to gRPC does not handle metadata in Layotto runtime proto.
+ // gRPC to HTTP does not map correctly in Layotto runtime as per https://github.com/layotto/layotto/issues/2342
+
+ return Mono.subscriberContext().flatMap(
+ context -> this.createMono(
+ it -> intercept(context, asyncStub).invokeService(envelope, it)
+ )
+ ).flatMap(
+ it -> {
+ try {
+ return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type));
+ } catch (IOException e) {
+ throw LayottoException.propagate(e);
+ }
+ }
+ );
+ } catch (Exception ex) {
+ return LayottoException.wrapMono(ex);
+ }
+ }
+
+ @Override
+ public Mono publishEvent(PublishEventRequest request) {
+ try {
+ String pubsubName = request.getPubsubName();
+ String topic = request.getTopic();
+ Object data = request.getData();
+ RuntimeProto.PublishEventRequest.Builder envelopeBuilder = RuntimeProto.PublishEventRequest.newBuilder()
+ .setTopic(topic)
+ .setPubsubName(pubsubName)
+ .setData(ByteString.copyFrom(objectSerializer.serialize(data)));
+
+ // Content-type can be overwritten on a per-request basis.
+ // It allows CloudEvents to be handled differently, for example.
+ String contentType = request.getContentType();
+ if (contentType == null || contentType.isEmpty()) {
+ contentType = objectSerializer.getContentType();
+ }
+ envelopeBuilder.setDataContentType(contentType);
+
+ Map metadata = request.getMetadata();
+ if (metadata != null) {
+ envelopeBuilder.putAllMetadata(metadata);
+ }
+
+ return Mono.subscriberContext().flatMap(
+ context ->
+ this.createMono(
+ it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
+ )
+ ).then();
+ } catch (Exception ex) {
+ return LayottoException.wrapMono(ex);
+ }
+ }
+
+ @Override
+ public Mono> getState(GetStateRequest request, TypeRef type) {
+ try {
+ final String stateStoreName = request.getStoreName();
+ final String key = request.getKey();
+ final StateOptions options = request.getStateOptions();
+ final Map metadata = request.getMetadata();
+
+ if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("State store name cannot be null or empty.");
+ }
+ if ((key == null) || (key.trim().isEmpty())) {
+ throw new IllegalArgumentException("Key cannot be null or empty.");
+ }
+ RuntimeProto.GetStateRequest.Builder builder = RuntimeProto.GetStateRequest.newBuilder()
+ .setStoreName(stateStoreName)
+ .setKey(key);
+ if (metadata != null) {
+ builder.putAllMetadata(metadata);
+ }
+ if (options != null && options.getConsistency() != null) {
+ builder.setConsistency(getGrpcStateConsistency(options));
+ }
+
+ RuntimeProto.GetStateRequest envelope = builder.build();
+
+ return Mono.subscriberContext().flatMap(
+ context ->
+ this.createMono(
+ it -> intercept(context, asyncStub).getState(envelope, it)
+ )
+ ).map(
+ it -> {
+ try {
+ return buildStateKeyValue(it, key, options, type);
+ } catch (IOException ex) {
+ throw LayottoException.propagate(ex);
+ }
+ }
+ );
+ } catch (Exception ex) {
+ return LayottoException.wrapMono(ex);
+ }
+ }
+
+ @Override
+ public Mono>> getBulkState(GetBulkStateRequest request, TypeRef type) {
+ try {
+ final String stateStoreName = request.getStoreName();
+ final List keys = request.getKeys();
+ final int parallelism = request.getParallelism();
+ final Map metadata = request.getMetadata();
+ if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("State store name cannot be null or empty.");
+ }
+ if (keys == null || keys.isEmpty()) {
+ throw new IllegalArgumentException("Key cannot be null or empty.");
+ }
+
+ if (parallelism < 0) {
+ throw new IllegalArgumentException("Parallelism cannot be negative.");
+ }
+ RuntimeProto.GetBulkStateRequest.Builder builder = RuntimeProto.GetBulkStateRequest.newBuilder()
+ .setStoreName(stateStoreName)
+ .addAllKeys(keys)
+ .setParallelism(parallelism);
+ if (metadata != null) {
+ builder.putAllMetadata(metadata);
+ }
+
+ RuntimeProto.GetBulkStateRequest envelope = builder.build();
+
+ return Mono.subscriberContext().flatMap(
+ context -> this.createMono(it -> intercept(context, asyncStub)
+ .getBulkState(envelope, it)
+ )
+ ).map(
+ it ->
+ it
+ .getItemsList()
+ .stream()
+ .map(b -> {
+ try {
+ return buildStateKeyValue(b, type);
+ } catch (Exception e) {
+ throw LayottoException.propagate(e);
+ }
+ })
+ .collect(Collectors.toList())
+ );
+ } catch (Exception ex) {
+ return LayottoException.wrapMono(ex);
+ }
+ }
+
+ @Override
+ public Mono executeStateTransaction(ExecuteStateTransactionRequest request) {
+ try {
+ final String stateStoreName = request.getStateStoreName();
+ final List> operations = request.getOperations();
+ final Map metadata = request.getMetadata();
+ if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("State store name cannot be null or empty.");
+ }
+ RuntimeProto.ExecuteStateTransactionRequest.Builder builder = RuntimeProto.ExecuteStateTransactionRequest
+ .newBuilder();
+ builder.setStoreName(stateStoreName);
+ if (metadata != null) {
+ builder.putAllMetadata(metadata);
+ }
+ for (TransactionalStateOperation> operation : operations) {
+ RuntimeProto.TransactionalStateOperation.Builder operationBuilder = RuntimeProto.TransactionalStateOperation
+ .newBuilder();
+ operationBuilder.setOperationType(operation.getOperation().toString().toLowerCase());
+ operationBuilder.setRequest(buildStateRequest(operation.getRequest()).build());
+ builder.addOperations(operationBuilder.build());
+ }
+ RuntimeProto.ExecuteStateTransactionRequest req = builder.build();
+
+ return Mono.subscriberContext().flatMap(
+ context -> this.createMono(it -> intercept(context, asyncStub).executeStateTransaction(req, it))
+ ).then();
+ } catch (Exception e) {
+ return LayottoException.wrapMono(e);
+ }
+ }
+
+ @Override
+ public Mono saveBulkState(SaveStateRequest request) {
+ try {
+ final String stateStoreName = request.getStoreName();
+ final List> states = request.getStates();
+ if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("State store name cannot be null or empty.");
+ }
+ RuntimeProto.SaveStateRequest.Builder builder = RuntimeProto.SaveStateRequest.newBuilder();
+ builder.setStoreName(stateStoreName);
+ for (State> state : states) {
+ builder.addStates(buildStateRequest(state).build());
+ }
+ RuntimeProto.SaveStateRequest req = builder.build();
+
+ return Mono.subscriberContext().flatMap(
+ context -> this.createMono(it -> intercept(context, asyncStub).saveState(req, it))
+ ).then();
+ } catch (Exception ex) {
+ return LayottoException.wrapMono(ex);
+ }
+ }
+
+ @Override
+ public Mono deleteState(DeleteStateRequest request) {
+ try {
+ final String stateStoreName = request.getStateStoreName();
+ final String key = request.getKey();
+ final StateOptions options = request.getStateOptions();
+ final String etag = request.getEtag();
+ final Map metadata = request.getMetadata();
+
+ if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("State store name cannot be null or empty.");
+ }
+ if ((key == null) || (key.trim().isEmpty())) {
+ throw new IllegalArgumentException("Key cannot be null or empty.");
+ }
+
+ RuntimeProto.StateOptions.Builder optionBuilder = null;
+ if (options != null) {
+ optionBuilder = RuntimeProto.StateOptions.newBuilder();
+ if (options.getConcurrency() != null) {
+ optionBuilder.setConcurrency(getGrpcStateConcurrency(options));
+ }
+ if (options.getConsistency() != null) {
+ optionBuilder.setConsistency(getGrpcStateConsistency(options));
+ }
+ }
+ RuntimeProto.DeleteStateRequest.Builder builder = RuntimeProto.DeleteStateRequest.newBuilder()
+ .setStoreName(stateStoreName)
+ .setKey(key);
+ if (metadata != null) {
+ builder.putAllMetadata(metadata);
+ }
+ if (etag != null) {
+ builder.setEtag(RuntimeProto.Etag.newBuilder().setValue(etag).build());
+ }
+
+ if (optionBuilder != null) {
+ builder.setOptions(optionBuilder.build());
+ }
+
+ RuntimeProto.DeleteStateRequest req = builder.build();
+
+ return Mono.subscriberContext().flatMap(
+ context -> this.createMono(it -> intercept(context, asyncStub).deleteState(req, it))
+ ).then();
+ } catch (Exception ex) {
+ return LayottoException.wrapMono(ex);
+ }
+ }
+
+ /**
+ * Builds the object io.layotto.{@link RuntimeProto.InvokeServiceRequest} to be send based on the parameters.
+ *
+ * @param httpExtension Object for HttpExtension
+ * @param appId The application id to be invoked
+ * @param method The application method to be invoked
+ * @param body The body of the request to be send as part of the invocation
+ * @param The Type of the Body
+ * @return The object to be sent as part of the invocation.
+ * @throws IOException If there's an issue serializing the request.
+ */
+ private RuntimeProto.InvokeServiceRequest buildInvokeServiceRequest(
+ HttpExtension httpExtension,
+ String appId,
+ String method,
+ K body) throws IOException {
+ if (httpExtension == null) {
+ throw new IllegalArgumentException("HttpExtension cannot be null. Use HttpExtension.NONE instead.");
+ }
+ RuntimeProto.CommonInvokeRequest.Builder requestBuilder = RuntimeProto.CommonInvokeRequest.newBuilder();
+ requestBuilder.setMethod(method);
+ if (body != null) {
+ byte[] byteRequest = objectSerializer.serialize(body);
+ Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteRequest)).build();
+ requestBuilder.setData(data);
+ } else {
+ requestBuilder.setData(Any.newBuilder().build());
+ }
+ RuntimeProto.HTTPExtension.Builder httpExtensionBuilder = RuntimeProto.HTTPExtension.newBuilder();
+
+ httpExtensionBuilder.setVerb(RuntimeProto.HTTPExtension.Verb.valueOf(httpExtension.getMethod().toString()))
+ .setQuerystring(httpExtension.encodeQueryString());
+ requestBuilder.setHttpExtension(httpExtensionBuilder.build());
+
+ requestBuilder.setContentType(objectSerializer.getContentType());
+
+ RuntimeProto.InvokeServiceRequest.Builder envelopeBuilder = RuntimeProto.InvokeServiceRequest.newBuilder()
+ .setId(appId)
+ .setMessage(requestBuilder.build());
+ return envelopeBuilder.build();
+ }
+
+ private State buildStateKeyValue(
+ RuntimeProto.BulkStateItem item,
+ TypeRef type) throws IOException {
+ String key = item.getKey();
+ String error = item.getError();
+ if (!Strings.isNullOrEmpty(error)) {
+ return new State<>(key, error);
+ }
+
+ ByteString payload = item.getData();
+ byte[] data = payload == null ? null : payload.toByteArray();
+ T value = stateSerializer.deserialize(data, type);
+ String etag = item.getEtag();
+ if (etag.equals("")) {
+ etag = null;
+ }
+ return new State<>(key, value, etag, item.getMetadataMap(), null);
+ }
+
+ private State buildStateKeyValue(
+ RuntimeProto.GetStateResponse response,
+ String requestedKey,
+ StateOptions stateOptions,
+ TypeRef type) throws IOException {
+ ByteString payload = response.getData();
+ byte[] data = payload == null ? null : payload.toByteArray();
+ T value = stateSerializer.deserialize(data, type);
+ String etag = response.getEtag();
+ if (etag.equals("")) {
+ etag = null;
+ }
+ return new State<>(requestedKey, value, etag, response.getMetadataMap(), stateOptions);
+ }
+
+ private RuntimeProto.StateItem.Builder buildStateRequest(State state) throws IOException {
+ byte[] bytes = stateSerializer.serialize(state.getValue());
+
+ RuntimeProto.StateItem.Builder stateBuilder = RuntimeProto.StateItem.newBuilder();
+ if (state.getEtag() != null) {
+ stateBuilder.setEtag(RuntimeProto.Etag.newBuilder().setValue(state.getEtag()).build());
+ }
+ if (state.getMetadata() != null) {
+ stateBuilder.putAllMetadata(state.getMetadata());
+ }
+ if (bytes != null) {
+ stateBuilder.setValue(ByteString.copyFrom(bytes));
+ }
+ stateBuilder.setKey(state.getKey());
+ RuntimeProto.StateOptions.Builder optionBuilder = null;
+ if (state.getOptions() != null) {
+ StateOptions options = state.getOptions();
+ optionBuilder = RuntimeProto.StateOptions.newBuilder();
+ if (options.getConcurrency() != null) {
+ optionBuilder.setConcurrency(getGrpcStateConcurrency(options));
+ }
+ if (options.getConsistency() != null) {
+ optionBuilder.setConsistency(getGrpcStateConsistency(options));
+ }
+ }
+ if (optionBuilder != null) {
+ stateBuilder.setOptions(optionBuilder.build());
+ }
+ return stateBuilder;
+ }
+
+ private RuntimeProto.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) {
+ switch (options.getConsistency()) {
+ case EVENTUAL:
+ return RuntimeProto.StateOptions.StateConsistency.CONSISTENCY_EVENTUAL;
+ case STRONG:
+ return RuntimeProto.StateOptions.StateConsistency.CONSISTENCY_STRONG;
+ default:
+ throw new IllegalArgumentException("Missing Consistency mapping to gRPC Consistency enum");
+ }
+ }
+
+ private RuntimeProto.StateOptions.StateConcurrency getGrpcStateConcurrency(StateOptions options) {
+ switch (options.getConcurrency()) {
+ case FIRST_WRITE:
+ return RuntimeProto.StateOptions.StateConcurrency.CONCURRENCY_FIRST_WRITE;
+ case LAST_WRITE:
+ return RuntimeProto.StateOptions.StateConcurrency.CONCURRENCY_LAST_WRITE;
+ default:
+ throw new IllegalArgumentException("Missing StateConcurrency mapping to gRPC Concurrency enum");
+ }
+ }
+
+ // -- Lifecycle Functions
+
+ @Override
+ public Mono waitForSidecar(int timeoutInMilliseconds) {
+ return Mono.fromRunnable(() -> {
+ try {
+ NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.GRPC_PORT.get(), timeoutInMilliseconds);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Override
+ public Mono shutdown() {
+ return Mono.subscriberContext()
+ // FIXME: 2021/9/26 Refer to Dapr
+ // .flatMap(context ->
+ // this.createMono(it ->
+ // intercept(context, asyncStub)
+ // .shutdown(Empty.getDefaultInstance(), it)))
+ .then();
+ }
+
+ private Mono createMono(Consumer> consumer) {
+ return Mono.create(sink ->
+ LayottoException
+ .wrap(() -> consumer.accept(createStreamObserver(sink)))
+ .run());
+ }
+
+ private StreamObserver createStreamObserver(MonoSink sink) {
+ return new StreamObserver() {
+ @Override
+ public void onNext(T value) {
+ sink.success(value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ sink.error(LayottoException.propagate(new ExecutionException(t)));
+ }
+
+ @Override
+ public void onCompleted() {
+ sink.success();
+ }
+ };
+ }
+
+ /**
+ * Closes the ManagedChannel for GRPC.
+ *
+ * @throws IOException on exception.
+ * @see io.grpc.ManagedChannel#shutdown()
+ */
+ @Override
+ public void close() throws Exception {
+ if (channel != null) {
+ LayottoException
+ .wrap(() -> {
+ channel.close();
+ return true;
+ })
+ .call();
+ }
+ }
+
+ /**
+ * Populates GRPC client with interceptors for telemetry.
+ *
+ * @param context Reactor's context.
+ * @param client GRPC client for Layotto.
+ * @return Client after adding interceptors.
+ */
+ private static RuntimeGrpc.RuntimeStub intercept(Context context, RuntimeGrpc.RuntimeStub client) {
+ return GrpcWrapper.intercept(context, client);
+ }
+
+ /**
+ * Populates GRPC client with interceptors.
+ *
+ * @param client GRPC client for Layotto.
+ * @return Client after adding interceptors.
+ */
+ private static RuntimeGrpc.RuntimeStub intercept(RuntimeGrpc.RuntimeStub client) {
+ ClientInterceptor interceptor = new ClientInterceptor() {
+ @Override
+ public ClientCall interceptCall(MethodDescriptor methodDescriptor,
+ CallOptions callOptions,
+ Channel channel) {
+ ClientCall clientCall = channel.newCall(methodDescriptor, callOptions);
+ return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) {
+ @Override
+ public void start(final Listener responseListener, final Metadata metadata) {
+ String layottoApiToken = Properties.API_TOKEN.get();
+ if (layottoApiToken != null) {
+ metadata.put(Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER),
+ layottoApiToken);
+ }
+ super.start(responseListener, metadata);
+ }
+ };
+ }
+ };
+ return client.withInterceptors(interceptor);
+ }
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/BooleanProperty.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/BooleanProperty.java
new file mode 100644
index 0000000000..d2211cda62
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/BooleanProperty.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.config;
+
+/**
+ * Boolean configuration property.
+ */
+public class BooleanProperty extends Property {
+
+ /**
+ * {@inheritDoc}
+ */
+ BooleanProperty(String name, String envName, Boolean defaultValue) {
+ super(name, envName, defaultValue);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected Boolean parse(String value) {
+ return Boolean.valueOf(value);
+ }
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/GenericProperty.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/GenericProperty.java
new file mode 100644
index 0000000000..3c060b0f83
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/GenericProperty.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.config;
+
+import java.util.function.Function;
+
+/**
+ * Configuration property for any type.
+ */
+public class GenericProperty extends Property {
+
+ private final Function parser;
+
+ /**
+ * {@inheritDoc}
+ */
+ GenericProperty(String name, String envName, T defaultValue, Function parser) {
+ super(name, envName, defaultValue);
+ this.parser = parser;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected T parse(String value) {
+ return parser.apply(value);
+ }
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/IntegerProperty.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/IntegerProperty.java
new file mode 100644
index 0000000000..8eb6e0be25
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/IntegerProperty.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.config;
+
+/**
+ * Integer configuration property.
+ */
+public class IntegerProperty extends Property {
+
+ /**
+ * {@inheritDoc}
+ */
+ IntegerProperty(String name, String envName, Integer defaultValue) {
+ super(name, envName, defaultValue);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected Integer parse(String value) {
+ return Integer.valueOf(value);
+ }
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/Properties.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/Properties.java
new file mode 100644
index 0000000000..78b0caa062
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/Properties.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.config;
+
+
+import io.mosn.layotto.v1.value.LayottoApiProtocol;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Global properties for Layotto's SDK, using Supplier so they are dynamically resolved.
+ */
+public class Properties {
+
+ /**
+ * Layotto's default IP for gRPC communication.
+ */
+ private static final String DEFAULT_SIDECAR_IP = "127.0.0.1";
+
+ /**
+ * Layotto's default gRPC port.
+ */
+ private static final Integer DEFAULT_GRPC_PORT = 34904;
+
+ /**
+ * Layotto's default use of gRPC.
+ */
+ private static final LayottoApiProtocol DEFAULT_API_PROTOCOL = LayottoApiProtocol.GRPC;
+
+ /**
+ * Layotto's default String encoding: UTF-8.
+ */
+ private static final Charset DEFAULT_STRING_CHARSET = StandardCharsets.UTF_8;
+
+ /**
+ * IP for Layotto's sidecar.
+ */
+ public static final Property SIDECAR_IP = new StringProperty(
+ "layotto.sidecar.ip",
+ "LAYOTTO_SIDECAR_IP",
+ DEFAULT_SIDECAR_IP);
+
+ /**
+ * GRPC port for Layotto after checking system property and environment variable.
+ */
+ public static final Property GRPC_PORT = new IntegerProperty(
+ "layotto.grpc.port",
+ "LAYOTTO_GRPC_PORT",
+ DEFAULT_GRPC_PORT);
+
+ /**
+ * Determines if Layotto client will use gRPC to talk to Layotto's sidecar.
+ */
+ public static final Property API_PROTOCOL = new GenericProperty<>(
+ "layotto.api.protocol",
+ "LAYOTTO_API_PROTOCOL",
+ DEFAULT_API_PROTOCOL,
+ (s) -> LayottoApiProtocol.valueOf(s.toUpperCase()));
+
+ /**
+ * API token for authentication between App and Layotto's sidecar.
+ */
+ public static final Property API_TOKEN = new StringProperty(
+ "layotto.api.token",
+ "LAYOTTO_API_TOKEN",
+ null);
+
+ /**
+ * Determines which string encoding is used in Layotto's Java SDK.
+ */
+ public static final Property STRING_CHARSET = new GenericProperty<>(
+ "layotto.string.charset",
+ "LAYOTTO_STRING_CHARSET",
+ DEFAULT_STRING_CHARSET,
+ (s) -> Charset.forName(s));
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/Property.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/Property.java
new file mode 100644
index 0000000000..f9801e1e3e
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/Property.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A configuration property in the Layotto's SDK.
+ */
+public abstract class Property {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Property.class.getName());
+
+ /**
+ * Property's name as a Java Property.
+ */
+ private final String name;
+
+ /**
+ * Property's name as a environment variable.
+ */
+ private final String envName;
+
+ /**
+ * Default value.
+ */
+ private final T defaultValue;
+
+ /**
+ * Instantiates a new configuration property.
+ *
+ * @param name Java property name.
+ * @param envName Environment variable name.
+ * @param defaultValue Default value.
+ */
+ Property(String name, String envName, T defaultValue) {
+ this.name = name;
+ this.envName = envName;
+ this.defaultValue = defaultValue;
+ }
+
+ /**
+ * Gets the Java property's name.
+ *
+ * @return Name.
+ */
+ public String getName() {
+ return this.name;
+ }
+
+ /**
+ * Gets the environment variable's name.
+ *
+ * @return Name.
+ */
+ public String getEnvName() {
+ return this.envName;
+ }
+
+ /**
+ * Gets the value defined by system property first, then env variable or sticks to default.
+ *
+ * @return Value from system property (1st) or env variable (2nd) or default (last).
+ */
+ public T get() {
+ String propValue = System.getProperty(this.name);
+ if (propValue != null && !propValue.trim().isEmpty()) {
+ try {
+ return this.parse(propValue);
+ } catch (IllegalArgumentException e) {
+ LOGGER.warn(String.format("Invalid value in property: %s", this.name));
+ // OK, we tried. Falling back to system environment variable.
+ }
+ }
+
+ String envValue = System.getenv(this.envName);
+ if (envValue != null && !envValue.trim().isEmpty()) {
+ try {
+ return this.parse(envValue);
+ } catch (IllegalArgumentException e) {
+ LOGGER.warn(String.format("Invalid value in environment variable: %s", this.envName));
+ // OK, we tried. Falling back to default.
+ }
+ }
+
+ return this.defaultValue;
+ }
+
+ /**
+ * Parses the value to the specific type.
+ *
+ * @param value String value to be parsed.
+ * @return Value in the specific type.
+ */
+ protected abstract T parse(String value);
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/StringProperty.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/StringProperty.java
new file mode 100644
index 0000000000..34939aa047
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/StringProperty.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.config;
+
+/**
+ * String configuration property.
+ */
+public class StringProperty extends Property {
+
+ /**
+ * {@inheritDoc}
+ */
+ StringProperty(String name, String envName, String defaultValue) {
+ super(name, envName, defaultValue);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected String parse(String value) {
+ return value;
+ }
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/domain/CloudEvent.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/domain/CloudEvent.java
new file mode 100644
index 0000000000..312c36d5b3
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/domain/CloudEvent.java
@@ -0,0 +1,306 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.domain;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A cloud event in Layotto.
+ *
+ * @param The type of the payload.
+ */
+public final class CloudEvent {
+
+ /**
+ * Mime type used for CloudEvent.
+ */
+ public static final String CONTENT_TYPE = "application/cloudevents+json";
+
+ /**
+ * Shared Json serializer/deserializer as per Jackson's documentation.
+ */
+ protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
+ false)
+ .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+ /**
+ * Identifier of the message being processed.
+ */
+ private String id;
+
+ /**
+ * Event's source.
+ */
+ private String source;
+
+ /**
+ * Envelope type.
+ */
+ private String type;
+
+ /**
+ * Version of the specification.
+ */
+ private String specversion;
+
+ /**
+ * Type of the data's content.
+ */
+ private String datacontenttype;
+
+ /**
+ * Cloud event specs says data can be a JSON object or string.
+ */
+ private T data;
+
+ /**
+ * Cloud event specs says binary data should be in data_base64.
+ */
+ @JsonProperty("data_base64")
+ private byte[] binaryData;
+
+ /**
+ * Instantiates a CloudEvent.
+ */
+ public CloudEvent() {
+ }
+
+ /**
+ * Instantiates a CloudEvent.
+ *
+ * @param id Identifier of the message being processed.
+ * @param source Source for this event.
+ * @param type Type of event.
+ * @param specversion Version of the event spec.
+ * @param datacontenttype Type of the payload.
+ * @param data Payload.
+ */
+ public CloudEvent(
+ String id,
+ String source,
+ String type,
+ String specversion,
+ String datacontenttype,
+ T data) {
+ this.id = id;
+ this.source = source;
+ this.type = type;
+ this.specversion = specversion;
+ this.datacontenttype = datacontenttype;
+ this.data = data;
+ }
+
+ /**
+ * Instantiates a CloudEvent.
+ *
+ * @param id Identifier of the message being processed.
+ * @param source Source for this event.
+ * @param type Type of event.
+ * @param specversion Version of the event spec.
+ * @param binaryData Payload.
+ */
+ public CloudEvent(
+ String id,
+ String source,
+ String type,
+ String specversion,
+ byte[] binaryData) {
+ this.id = id;
+ this.source = source;
+ this.type = type;
+ this.specversion = specversion;
+ this.datacontenttype = "application/octet-stream";
+ this.binaryData = binaryData == null ? null : Arrays.copyOf(binaryData, binaryData.length);
+ ;
+ }
+
+ /**
+ * Deserialize a message topic from Layotto.
+ *
+ * @param payload Payload sent from Layotto.
+ * @return Message (can be null if input is null)
+ * @throws IOException If cannot parse.
+ */
+ public static CloudEvent> deserialize(byte[] payload) throws IOException {
+ if (payload == null) {
+ return null;
+ }
+
+ return OBJECT_MAPPER.readValue(payload, CloudEvent.class);
+ }
+
+ /**
+ * Gets the identifier of the message being processed.
+ *
+ * @return Identifier of the message being processed.
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Sets the identifier of the message being processed.
+ *
+ * @param id Identifier of the message being processed.
+ */
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ /**
+ * Gets the event's source.
+ *
+ * @return Event's source.
+ */
+ public String getSource() {
+ return source;
+ }
+
+ /**
+ * Sets the event's source.
+ *
+ * @param source Event's source.
+ */
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ /**
+ * Gets the envelope type.
+ *
+ * @return Envelope type.
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * Sets the envelope type.
+ *
+ * @param type Envelope type.
+ */
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ /**
+ * Gets the version of the specification.
+ *
+ * @return Version of the specification.
+ */
+ public String getSpecversion() {
+ return specversion;
+ }
+
+ /**
+ * Sets the version of the specification.
+ *
+ * @param specversion Version of the specification.
+ */
+ public void setSpecversion(String specversion) {
+ this.specversion = specversion;
+ }
+
+ /**
+ * Gets the type of the data's content.
+ *
+ * @return Type of the data's content.
+ */
+ public String getDatacontenttype() {
+ return datacontenttype;
+ }
+
+ /**
+ * Sets the type of the data's content.
+ *
+ * @param datacontenttype Type of the data's content.
+ */
+ public void setDatacontenttype(String datacontenttype) {
+ this.datacontenttype = datacontenttype;
+ }
+
+ /**
+ * Gets the cloud event data.
+ *
+ * @return Cloud event's data. As per specs, data can be a JSON object or string.
+ */
+ public T getData() {
+ return data;
+ }
+
+ /**
+ * Sets the cloud event data. As per specs, data can be a JSON object or string.
+ *
+ * @param data Cloud event's data. As per specs, data can be a JSON object or string.
+ */
+ public void setData(T data) {
+ this.data = data;
+ }
+
+ /**
+ * Gets the cloud event's binary data.
+ *
+ * @return Cloud event's binary data.
+ */
+ public byte[] getBinaryData() {
+ return this.binaryData == null ? null : Arrays.copyOf(this.binaryData, this.binaryData.length);
+ }
+
+ /**
+ * Sets the cloud event's binary data.
+ *
+ * @param binaryData Cloud event's binary data.
+ */
+ public void setBinaryData(byte[] binaryData) {
+ this.binaryData = binaryData == null ? null : Arrays.copyOf(binaryData, binaryData.length);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CloudEvent> that = (CloudEvent>) o;
+ return Objects.equals(id, that.id)
+ && Objects.equals(source, that.source)
+ && Objects.equals(type, that.type)
+ && Objects.equals(specversion, that.specversion)
+ && Objects.equals(datacontenttype, that.datacontenttype)
+ && Objects.equals(data, that.data)
+ && Arrays.equals(binaryData, that.binaryData);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, source, type, specversion, datacontenttype, data, binaryData);
+ }
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/exceptions/LayottoError.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/exceptions/LayottoError.java
new file mode 100644
index 0000000000..c7ddaa0f10
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/exceptions/LayottoError.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.exceptions;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import io.grpc.Status;
+
+/**
+ * Represents an error message from Layotto.
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
+public class LayottoError {
+
+ /**
+ * Error code.
+ */
+ private String errorCode;
+
+ /**
+ * Error Message.
+ */
+ private String message;
+
+ /**
+ * Error code from gRPC.
+ */
+ private Integer code;
+
+ /**
+ * Gets the error code.
+ *
+ * @return Error code.
+ */
+ public String getErrorCode() {
+ if ((errorCode == null) && (code != null)) {
+ return Status.fromCodeValue(code).getCode().name();
+ }
+ return errorCode;
+ }
+
+ /**
+ * Sets the error code.
+ *
+ * @param errorCode Error code.
+ * @return This instance.
+ */
+ public LayottoError setErrorCode(String errorCode) {
+ this.errorCode = errorCode;
+ return this;
+ }
+
+ /**
+ * Gets the error message.
+ *
+ * @return Error message.
+ */
+ public String getMessage() {
+ return message;
+ }
+
+ /**
+ * Sets the error message.
+ *
+ * @param message Error message.
+ * @return This instance.
+ */
+ public LayottoError setMessage(String message) {
+ this.message = message;
+ return this;
+ }
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/exceptions/LayottoException.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/exceptions/LayottoException.java
new file mode 100644
index 0000000000..dedadef353
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/exceptions/LayottoException.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.exceptions;
+
+import io.grpc.StatusRuntimeException;
+import reactor.core.Exceptions;
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A Layotto's specific exception.
+ */
+public class LayottoException extends RuntimeException {
+
+ /**
+ * Layotto's error code for this exception.
+ */
+ private final String errorCode;
+
+ /**
+ * New exception from a server-side generated error code and message.
+ *
+ * @param daprError Server-side error.
+ */
+ public LayottoException(LayottoError daprError) {
+ this(daprError.getErrorCode(), daprError.getMessage());
+ }
+
+ /**
+ * New exception from a server-side generated error code and message.
+ *
+ * @param daprError Client-side error.
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A {@code null} value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public LayottoException(LayottoError daprError, Throwable cause) {
+ this(daprError.getErrorCode(), daprError.getMessage(), cause);
+ }
+
+ /**
+ * Wraps an exception into a LayottoException.
+ *
+ * @param exception the exception to be wrapped.
+ */
+ public LayottoException(Throwable exception) {
+ this("UNKNOWN", exception.getMessage(), exception);
+ }
+
+ /**
+ * New Exception from a client-side generated error code and message.
+ *
+ * @param errorCode Client-side error code.
+ * @param message Client-side error message.
+ */
+ public LayottoException(String errorCode, String message) {
+ super(String.format("%s: %s", errorCode, message));
+ this.errorCode = errorCode;
+ }
+
+ /**
+ * New exception from a server-side generated error code and message.
+ *
+ * @param errorCode Client-side error code.
+ * @param message Client-side error message.
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A {@code null} value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public LayottoException(String errorCode, String message, Throwable cause) {
+ super(String.format("%s: %s", errorCode, emptyIfNull(message)), cause);
+ this.errorCode = errorCode;
+ }
+
+ /**
+ * Returns the exception's error code.
+ *
+ * @return Error code.
+ */
+ public String getErrorCode() {
+ return this.errorCode;
+ }
+
+ /**
+ * Wraps an exception into LayottoException (if not already LayottoException).
+ *
+ * @param exception Exception to be wrapped.
+ */
+ public static void wrap(Throwable exception) {
+ if (exception == null) {
+ return;
+ }
+
+ throw propagate(exception);
+ }
+
+ /**
+ * Wraps a callable with a try-catch to throw LayottoException.
+ *
+ * @param callable callable to be invoked.
+ * @param type to be returned
+ * @return object of type T.
+ */
+ public static Callable wrap(Callable callable) {
+ return () -> {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ wrap(e);
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Wraps a runnable with a try-catch to throw LayottoException.
+ *
+ * @param runnable runnable to be invoked.
+ * @return object of type T.
+ */
+ public static Runnable wrap(Runnable runnable) {
+ return () -> {
+ try {
+ runnable.run();
+ } catch (Exception e) {
+ wrap(e);
+ }
+ };
+ }
+
+ /**
+ * Wraps an exception into LayottoException (if not already LayottoException).
+ *
+ * @param exception Exception to be wrapped.
+ * @param Mono's response type.
+ * @return Mono containing LayottoException.
+ */
+ public static Mono wrapMono(Exception exception) {
+ try {
+ wrap(exception);
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+
+ return Mono.empty();
+ }
+
+ /**
+ * Wraps an exception into LayottoException (if not already LayottoException).
+ *
+ * @param exception Exception to be wrapped.
+ * @return wrapped RuntimeException
+ */
+ public static RuntimeException propagate(Throwable exception) {
+ Exceptions.throwIfFatal(exception);
+
+ if (exception instanceof LayottoException) {
+ return (LayottoException) exception;
+ }
+
+ Throwable e = exception;
+ while (e != null) {
+ if (e instanceof StatusRuntimeException) {
+ StatusRuntimeException statusRuntimeException = (StatusRuntimeException) e;
+ return new LayottoException(
+ statusRuntimeException.getStatus().getCode().toString(),
+ statusRuntimeException.getStatus().getDescription(),
+ exception);
+ }
+
+ e = e.getCause();
+ }
+
+ if (exception instanceof IllegalArgumentException) {
+ return (IllegalArgumentException) exception;
+ }
+
+ return new LayottoException(exception);
+ }
+
+ private static String emptyIfNull(String str) {
+ if (str == null) {
+ return "";
+ }
+
+ return str;
+ }
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/serializer/DefaultObjectSerializer.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/serializer/DefaultObjectSerializer.java
new file mode 100644
index 0000000000..e049d96212
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/serializer/DefaultObjectSerializer.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.serializer;
+
+import spec.sdk.reactor.v1.utils.TypeRef;
+
+import java.io.IOException;
+
+/**
+ * Default serializer/deserializer for request/response objects and for state objects too.
+ */
+public class DefaultObjectSerializer extends ObjectSerializer implements LayottoObjectSerializer {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] serialize(Object o) throws IOException {
+ return super.serialize(o);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public T deserialize(byte[] data, TypeRef type) throws IOException {
+ return super.deserialize(data, type);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getContentType() {
+ return "application/json";
+ }
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/serializer/LayottoObjectSerializer.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/serializer/LayottoObjectSerializer.java
new file mode 100644
index 0000000000..6d74a6315f
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/serializer/LayottoObjectSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.serializer;
+
+import spec.sdk.reactor.v1.utils.TypeRef;
+
+import java.io.IOException;
+
+/**
+ * Serializes and deserializes application's objects.
+ */
+public interface LayottoObjectSerializer {
+
+ /**
+ * Serializes the given object as byte[].
+ *
+ * @param o Object to be serialized.
+ * @return Serialized object.
+ * @throws IOException If cannot serialize.
+ */
+ byte[] serialize(Object o) throws IOException;
+
+ /**
+ * Deserializes the given byte[] into a object.
+ *
+ * @param data Data to be deserialized.
+ * @param type Type of object to be deserialized.
+ * @param Type of object to be deserialized.
+ * @return Deserialized object.
+ * @throws IOException If cannot deserialize object.
+ */
+ T deserialize(byte[] data, TypeRef type) throws IOException;
+
+ /**
+ * Returns the content type of the request.
+ *
+ * @return content type of the request
+ */
+ String getContentType();
+}
diff --git a/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/serializer/ObjectSerializer.java b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/serializer/ObjectSerializer.java
new file mode 100644
index 0000000000..f09ace378c
--- /dev/null
+++ b/sdk/java-sdk/sdk-reactor/src/main/java/io/mosn/layotto/v1/serializer/ObjectSerializer.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.serializer;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.MessageLite;
+import io.mosn.layotto.v1.domain.CloudEvent;
+import spec.sdk.reactor.v1.utils.TypeRef;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+/**
+ * Serializes and deserializes an internal object.
+ */
+public class ObjectSerializer {
+
+ /**
+ * Shared Json serializer/deserializer as per Jackson's documentation.
+ */
+ protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
+ false)
+ .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+ /**
+ * Default constructor to avoid class from being instantiated outside package but still inherited.
+ */
+ protected ObjectSerializer() {
+ }
+
+ /**
+ * Serializes a given state object into byte array.
+ *
+ * @param state State object to be serialized.
+ * @return Array of bytes[] with the serialized content.
+ * @throws IOException In case state cannot be serialized.
+ */
+ public byte[] serialize(Object state) throws IOException {
+ if (state == null) {
+ return null;
+ }
+
+ if (state.getClass() == Void.class) {
+ return null;
+ }
+
+ // Have this check here to be consistent with deserialization (see deserialize() method below).
+ if (state instanceof byte[]) {
+ return (byte[]) state;
+ }
+
+ // Proto buffer class is serialized directly.
+ if (state instanceof MessageLite) {
+ return ((MessageLite) state).toByteArray();
+ }
+
+ // Not string, not primitive, so it is a complex type: we use JSON for that.
+ return OBJECT_MAPPER.writeValueAsBytes(state);
+ }
+
+ /**
+ * Deserializes the byte array into the original object.
+ *
+ * @param content Content to be parsed.
+ * @param type Type of the object being deserialized.
+ * @param Generic type of the object being deserialized.
+ * @return Object of type T.
+ * @throws IOException In case content cannot be deserialized.
+ */
+ public T deserialize(byte[] content, TypeRef type) throws IOException {
+ return deserialize(content, OBJECT_MAPPER.constructType(type.getType()));
+ }
+
+ /**
+ * Deserializes the byte array into the original object.
+ *
+ * @param content Content to be parsed.
+ * @param clazz Type of the object being deserialized.
+ * @param