Skip to content

Commit

Permalink
Stop streams on finish
Browse files Browse the repository at this point in the history
Can't use dispose (or close) as can be disposed too early by user code

Resolves #263
  • Loading branch information
benaadams authored and halter73 committed Nov 17, 2015
1 parent 8c0a170 commit f60f6c9
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 2 deletions.
9 changes: 7 additions & 2 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,10 @@ public async Task RequestProcessingAsync()
{
MessageBody = MessageBody.For(HttpVersion, _requestHeaders, this);
_keepAlive = MessageBody.RequestKeepAlive;
RequestBody = new FrameRequestStream(MessageBody);
ResponseBody = new FrameResponseStream(this);
var requestBody = new FrameRequestStream(MessageBody);
RequestBody = requestBody;
var responseBody = new FrameResponseStream(this);
ResponseBody = responseBody;
DuplexStream = new FrameDuplexStream(RequestBody, ResponseBody);

var httpContext = HttpContextFactory.Create(this);
Expand Down Expand Up @@ -236,6 +238,9 @@ public async Task RequestProcessingAsync()

// Finish reading the request body in case the app did not.
await MessageBody.Consume();

requestBody.StopAcceptingReads();
responseBody.StopAcceptingWrites();
}

terminated = !_keepAlive;
Expand Down
21 changes: 21 additions & 0 deletions src/Microsoft.AspNet.Server.Kestrel/Http/FrameRequestStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public class FrameRequestStream : Stream
{
private readonly MessageBody _body;
private bool _stopped;

public FrameRequestStream(MessageBody body)
{
Expand Down Expand Up @@ -50,12 +51,22 @@ public override void SetLength(long value)

public override int Read(byte[] buffer, int offset, int count)
{
if (_stopped)
{
throw new ObjectDisposedException("RequestStream has been disposed");
}

return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
}

#if NET451
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
if (_stopped)
{
throw new ObjectDisposedException("RequestStream has been disposed");
}

var task = ReadAsync(buffer, offset, count, CancellationToken.None, state);
if (callback != null)
{
Expand All @@ -71,6 +82,11 @@ public override int EndRead(IAsyncResult asyncResult)

private Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
if (_stopped)
{
throw new ObjectDisposedException("RequestStream has been disposed");
}

var tcs = new TaskCompletionSource<int>(state);
var task = _body.ReadAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken);
task.ContinueWith((task2, state2) =>
Expand Down Expand Up @@ -102,5 +118,10 @@ public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}

public void StopAcceptingReads()
{
_stopped = true;
}
}
}
26 changes: 26 additions & 0 deletions src/Microsoft.AspNet.Server.Kestrel/Http/FrameResponseStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
class FrameResponseStream : Stream
{
private readonly FrameContext _context;
private bool _stopped;

public FrameResponseStream(FrameContext context)
{
Expand All @@ -35,11 +36,21 @@ public override long Length

public override void Flush()
{
if (_stopped)
{
throw new ObjectDisposedException("ResponseStream has been disposed");
}

_context.FrameControl.Flush();
}

public override Task FlushAsync(CancellationToken cancellationToken)
{
if (_stopped)
{
throw new ObjectDisposedException("ResponseStream has been disposed");
}

return _context.FrameControl.FlushAsync(cancellationToken);
}

Expand All @@ -60,12 +71,27 @@ public override int Read(byte[] buffer, int offset, int count)

public override void Write(byte[] buffer, int offset, int count)
{
if (_stopped)
{
throw new ObjectDisposedException("ResponseStream has been disposed");
}

_context.FrameControl.Write(new ArraySegment<byte>(buffer, offset, count));
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (_stopped)
{
throw new ObjectDisposedException("ResponseStream has been disposed");
}

return _context.FrameControl.WriteAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken);
}

public void StopAcceptingWrites()
{
_stopped = true;
}
}
}

0 comments on commit f60f6c9

Please sign in to comment.