Skip to content

Commit

Permalink
#13 also make reading process async.
Browse files Browse the repository at this point in the history
  • Loading branch information
CXuesong committed Jun 20, 2020
1 parent 6452693 commit 4c66294
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 55 deletions.
120 changes: 87 additions & 33 deletions JsonRpc.AspNetCore/AspNetCoreRpcServerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.Http.Headers;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -104,51 +105,94 @@ public virtual async Task ProcessRequestAsync(HttpContext context)
if (context == null) throw new ArgumentNullException(nameof(context));
if (!HttpMethods.IsPost(context.Request.Method) && !HttpMethods.IsGet(context.Request.Method))
{
await WriteResponseWithStatusCodeHintAsync(context.Response,
new ResponseMessage(MessageId.Empty, new ResponseError(JsonRpcErrorCode.InvalidRequest, "The request method is not allowed.", StatusCodes.Status405MethodNotAllowed)));
await WriteResponseWithStatusCodeHintAsync(context,
new ResponseError(JsonRpcErrorCode.InvalidRequest, "The request method is not allowed."), StatusCodes.Status405MethodNotAllowed);
return;
}
if (!context.Request.ContentType.StartsWith("application/json", StringComparison.OrdinalIgnoreCase))
// {"method":""} // 13 characters
if (context.Request.ContentLength < 12)
{
await WriteResponseWithStatusCodeHintAsync(context.Response,
new ResponseMessage(MessageId.Empty, new ResponseError(JsonRpcErrorCode.InvalidRequest, "The request payload type is not supported.", StatusCodes.Status415UnsupportedMediaType)));
await WriteResponseWithStatusCodeHintAsync(context,
new ResponseError(JsonRpcErrorCode.InvalidRequest, "The request body is too short."), 0);
return;
}
// {"method":""} // 13 characters
if (context.Request.ContentLength < 12)
if (context.Request.ContentType == null || !MediaTypeHeaderValue.TryParse(context.Request.ContentType, out var contentType))
{
await WriteResponseWithStatusCodeHintAsync(context,
new ResponseError(JsonRpcErrorCode.InvalidRequest, "The request payload type cannot not be parsed."),
StatusCodes.Status415UnsupportedMediaType);
return;
}
if (!contentType.MediaType.StartsWith("application/json", StringComparison.OrdinalIgnoreCase))
{
await WriteResponseWithStatusCodeHintAsync(context,
new ResponseError(JsonRpcErrorCode.InvalidRequest, "The request payload type is not supported."),
StatusCodes.Status415UnsupportedMediaType);
return;
}
Encoding encoding;
try
{
await WriteResponseWithStatusCodeHintAsync(context.Response,
new ResponseMessage(MessageId.Empty, new ResponseError(JsonRpcErrorCode.InvalidRequest, "The request body is too short.")));
encoding = string.IsNullOrEmpty(contentType.CharSet) ? Encoding.UTF8 : Encoding.GetEncoding(contentType.CharSet);
}
catch (ArgumentException)
{
await WriteResponseWithStatusCodeHintAsync(context,
new ResponseError(JsonRpcErrorCode.InvalidRequest, "The request content charset is not supported."),
StatusCodes.Status415UnsupportedMediaType);
return;
}
RequestMessage message;
try
{
using (var reader = new StreamReader(context.Request.Body))
message = (RequestMessage) Message.LoadJson(reader);
var ms = Interlocked.Exchange(ref cachedMemoryStream, null) ?? new MemoryStream(4096);
try
{
// Since Newtonsoft.Json does not support async object deserialization, we need to buffer it first.
#if BCL_FEATURE_ASYNC_DISPOSABLE
await
#endif
using (context.Request.Body)
await context.Request.Body.CopyToAsync(ms, 4096, context.RequestAborted);

// Then do deserialization synchronously.
ms.Position = 0;
using (var reader = new StreamReader(ms, encoding, false, 4096, true))
message = (RequestMessage) Message.LoadJson(reader);
}
finally
{
ms.Position = 0;
ms.SetLength(0);
if (Interlocked.CompareExchange(ref cachedMemoryStream, ms, null) != null)
ms.Dispose();
}
}
catch (JsonReaderException ex)
{
await WriteResponseWithStatusCodeHintAsync(context.Response,
new ResponseMessage(MessageId.Empty, new ResponseError(JsonRpcErrorCode.InvalidRequest, ex.Message)));
await WriteResponseWithStatusCodeHintAsync(context,
new ResponseError(JsonRpcErrorCode.InvalidRequest, ex.Message), 0);
return;
}
catch (Exception ex)
{
await WriteResponseWithStatusCodeHintAsync(context.Response,
new ResponseMessage(MessageId.Empty, ResponseError.FromException(ex, false)));
await WriteResponseWithStatusCodeHintAsync(context,
new ResponseMessage(MessageId.Empty, ResponseError.FromException(ex, false)), 0);
return;
}
context.RequestAborted.ThrowIfCancellationRequested();
var response = await ProcessRequestAsync(message, context, false);
await WriteResponseWithStatusCodeHintAsync(context.Response, response);
await WriteResponseWithStatusCodeHintAsync(context, response, 0);
}

private Task WriteResponseWithStatusCodeHintAsync(HttpResponse httpResponse, ResponseMessage response, int statusCodeHint = 0)
private Task WriteResponseWithStatusCodeHintAsync(HttpContext httpContext, ResponseError error, int statusCodeHint)
=> WriteResponseWithStatusCodeHintAsync(httpContext, new ResponseMessage(MessageId.Empty, error), statusCodeHint);

private Task WriteResponseWithStatusCodeHintAsync(HttpContext httpContext, ResponseMessage response, int statusCodeHint)
{
if (statusCodeHint == 0)
statusCodeHint = response == null ? StatusCodes.Status204NoContent : StatusCodes.Status200OK;
return WriteResponseAsync(httpResponse, response, GetStatusCodeFromResponse(response, statusCodeHint));
return WriteResponseAsync(httpContext.Response, response, GetStatusCodeFromResponse(response, statusCodeHint), httpContext.RequestAborted);
}

/// <summary>
Expand All @@ -157,36 +201,46 @@ private Task WriteResponseWithStatusCodeHintAsync(HttpResponse httpResponse, Res
/// <param name="httpResponse">HTTP response object.</param>
/// <param name="response">JSON-RPC response, or <c>null</c> if the response is empty.</param>
/// <param name="statusCode">the HTTP status code.</param>
protected async Task WriteResponseAsync(HttpResponse httpResponse, ResponseMessage response, int statusCode)
/// <param name="cancellationToken">a token used to cancel the operation.</param>
protected async Task WriteResponseAsync(HttpResponse httpResponse, ResponseMessage response,
int statusCode, CancellationToken cancellationToken)
{
if (httpResponse == null) throw new ArgumentNullException(nameof(httpResponse));
cancellationToken.ThrowIfCancellationRequested();

httpResponse.StatusCode = statusCode;
if (response == null) return;

if (fullContentType != null)
httpResponse.ContentType = fullContentType;

// For notification, we don't wait for the task.
// Buffer content synchronously.
var ms = Interlocked.Exchange(ref cachedMemoryStream, null) ?? new MemoryStream(1024);
Debug.Assert(ms.Position == 0);
using (var writer = new StreamWriter(ms, Encoding, 4096, true))
response.WriteJson(writer);
httpResponse.ContentLength = ms.Length;

// Write content asynchronously.
ms.Position = 0;
try
{
Debug.Assert(ms.Position == 0);
using (var writer = new StreamWriter(ms, Encoding, 4096, true))
response.WriteJson(writer);
httpResponse.ContentLength = ms.Length;

// Write content asynchronously.
ms.Position = 0;
#if BCL_FEATURE_ASYNC_DISPOSABLE
await
await
#endif
using (httpResponse.Body)
using (httpResponse.Body)
{
await ms.CopyToAsync(httpResponse.Body, 4096, cancellationToken);
}
}
finally
{
await ms.CopyToAsync(httpResponse.Body, 4096);
ms.Position = 0;
ms.SetLength(0);
if (Interlocked.CompareExchange(ref cachedMemoryStream, ms, null) != null)
ms.Dispose();
}
ms.Position = 0;
ms.SetLength(0);
Interlocked.CompareExchange(ref cachedMemoryStream, ms, null);
}

/// <summary>
Expand Down
15 changes: 14 additions & 1 deletion JsonRpc.AspNetCore/JsonRpcAspNetExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
#if NETCOREAPP
using Microsoft.AspNetCore.Routing;
#endif

namespace JsonRpc.AspNetCore
{
Expand Down Expand Up @@ -32,6 +35,15 @@ public static IJsonRpcBuilder AddJsonRpc(this IServiceCollection serviceCollecti
return builder;
}

// TODO support IEndpointRouteBuilder when we can add conditional FrameworkReference without breaking the whole project.
#if NETCOREAPP
public static IApplicationBuilder MapJsonRpc(this IEndpointRouteBuilder builder, string requestPath,
Func<HttpContext, AspNetCoreRpcServerHandler> serverHandlerFactory)
{

}
#endif

/// <summary>
/// Uses <see cref="AspNetCoreRpcServerHandler"/> to handle the JSON RPC requests on certain URL.
/// </summary>
Expand All @@ -49,7 +61,7 @@ public static IApplicationBuilder UseJsonRpc(this IApplicationBuilder builder, s
{
if (context.Request.Path.Value == requestPath)
{
if (context.Request.Method != "POST")
if (!HttpMethods.IsPost(context.Request.Method))
{
context.Response.StatusCode = 405;
context.Response.ContentType = "text/plain;charset=utf-8";
Expand Down Expand Up @@ -101,5 +113,6 @@ public static HttpContext GetHttpContext(this RequestContext requestContext)
if (requestContext == null) throw new ArgumentNullException(nameof(requestContext));
return requestContext.Features.Get<IAspNetCoreFeature>()?.HttpContext;
}

}
}
3 changes: 1 addition & 2 deletions JsonRpc.Commons/JsonRpc.Commons.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
<Import Project="../Shared/JsonRpc.Packages.props" />

<PropertyGroup>
<TargetFrameworks>netstandard1.1;net45;netstandard2.1</TargetFrameworks>
<Version>0.5.3</Version>
<TargetFrameworks>net45;netstandard1.1;netstandard2.1</TargetFrameworks>
<PackageId>CXuesong.JsonRpc.Commons</PackageId>
<Description>An asynchronous JSON RPC server &amp; client library.</Description>
<PackageTags>json rpc json-rpc json-rpc-server json-rpc-client</PackageTags>
Expand Down
42 changes: 23 additions & 19 deletions WebTestApplication/wwwroot/js/jsonrpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@ function JsonRpcClient(/**@type string*/ endpointUrl) {
/**@type JsonRpcClient */
var _this = this;
this._ws.addEventListener("message",
function(e) {
if (typeof (e.data) === "string") {
var data = JSON.parse(e.data);
if (data.jsonrpc !== "2.0" || data.id == null) {
console.warn("Ignored invalid JSON-RPC message from server.", data);
}
var d = _this._impendingResponses[data.id];
delete _this._impendingResponses[data.id];
if (data.error) {
d.reject(data.error);
} else {
d.resolve(data.result);
}
} else {
console.warn("Received non-JSON-RPC message from server.", e.data);
}
});
function(e) {
if (typeof (e.data) === "string") {
var data = JSON.parse(e.data);
if (data.jsonrpc !== "2.0" || data.id == null) {
console.warn("Ignored invalid JSON-RPC message from server.", data);
}
var d = _this._impendingResponses[data.id];
delete _this._impendingResponses[data.id];
if (data.error) {
d.reject(data.error);
} else {
d.resolve(data.result);
}
} else {
console.warn("Received non-JSON-RPC message from server.", e.data);
}
});
}
}

Expand All @@ -52,13 +52,17 @@ JsonRpcClient.prototype.send = function (methodName, parameters, id) {
d.resolve();
this._ws.send(JSON.stringify(body));
} else {
$.post(this._endpointUrl, JSON.stringify(body)).done(function (response, status, xhr) {
$.post({
url: this._endpointUrl,
contentType: "application/json",
data: JSON.stringify(body),
}).done(function(response, status, xhr) {
if (response.error) {
d.reject(response.error, xhr.status);
} else {
d.resolve(response.result, xhr.status);
}
}).fail(function (xhr, status, error) {
}).fail(function(xhr, status, error) {
var response = xhr.responseJSON;
d.reject(response.error, xhr.status);
});
Expand Down

0 comments on commit 4c66294

Please sign in to comment.