Skip to content

Commit

Permalink
issue #11458:Triple stub support async mode (#11464)
Browse files Browse the repository at this point in the history
* issue #11458:Triple stub support async mode

* issue 11458:Triple stub support async mode

* issue 11458:Triple stub support async mode
  • Loading branch information
icodening authored Feb 6, 2023
1 parent 34b92b0 commit c0b257a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
33 changes: 33 additions & 0 deletions dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ public final class {{className}} {
{{inputType}}.class, {{outputType}}.class, serviceDescriptor, MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
{{outputType}}::parseFrom);

private static final StubMethodDescriptor {{methodName}}AsyncMethod = new StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, java.util.concurrent.CompletableFuture.class, serviceDescriptor, MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
{{outputType}}::parseFrom);

private static final StubMethodDescriptor {{methodName}}ProxyAsyncMethod = new StubMethodDescriptor("{{originMethodName}}Async",
{{inputType}}.class, {{outputType}}.class, serviceDescriptor, MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
{{outputType}}::parseFrom);

{{/unaryMethods}}

{{#serverStreamingMethods}}
Expand Down Expand Up @@ -121,6 +132,10 @@ public final class {{className}} {
return StubInvocationUtil.unaryCall(invoker, {{methodName}}Method, request);
}

public CompletableFuture<{{outputType}}> {{methodName}}Async({{inputType}} request){
return StubInvocationUtil.unaryCall(invoker, {{methodName}}AsyncMethod, request);
}

{{#javaDoc}}
{{{javaDoc}}}
{{/javaDoc}}
Expand Down Expand Up @@ -163,6 +178,21 @@ public final class {{className}} {

public static abstract class {{interfaceClassName}}ImplBase implements {{interfaceClassName}}, ServerService<{{interfaceClassName}}> {
private <T, R> BiConsumer<T, StreamObserver<R>> syncToAsync(java.util.function.Function<T, R> syncFun) {
return new BiConsumer<T, StreamObserver<R>>() {
@Override
public void accept(T t, StreamObserver<R> observer) {
try {
R ret = syncFun.apply(t);
observer.onNext(ret);
observer.onCompleted();
} catch (Throwable e) {
observer.onError(e);
}
}
};
}

@Override
public final Invoker<{{interfaceClassName}}> getInvoker(URL url) {
PathResolver pathResolver = url.getOrDefaultFrameworkModel()
Expand All @@ -172,11 +202,14 @@ public final class {{className}} {
{{#methods}}
pathResolver.addNativeStub( "/" + SERVICE_NAME + "/{{originMethodName}}" );
pathResolver.addNativeStub( "/" + SERVICE_NAME + "/{{originMethodName}}Async" );
{{/methods}}

{{#unaryMethods}}
BiConsumer<{{inputType}}, StreamObserver<{{outputType}}>> {{methodName}}Func = this::{{methodName}};
handlers.put({{methodName}}Method.getMethodName(), new UnaryStubMethodHandler<>({{methodName}}Func));
BiConsumer<{{inputType}}, StreamObserver<{{outputType}}>> {{methodName}}AsyncFunc = syncToAsync(this::{{methodName}});
handlers.put({{methodName}}ProxyAsyncMethod.getMethodName(), new UnaryStubMethodHandler<>({{methodName}}AsyncFunc));
{{/unaryMethods}}

{{#serverStreamingMethods}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ private static Object call(Invoker<?> invoker, MethodDescriptor methodDescriptor
methodDescriptor.getMethodName(), invoker.getInterface().getName(),
invoker.getUrl().getProtocolServiceKey(), methodDescriptor.getParameterClasses(),
arguments);
//When there are multiple MethodDescriptors with the same method name, the return type will be wrong
rpcInvocation.setReturnType(methodDescriptor.getReturnClass());
try {
return InvocationUtil.invoke(invoker, rpcInvocation);
} catch (Throwable e) {
Expand Down

0 comments on commit c0b257a

Please sign in to comment.