Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network: TorStream should inherit IO.Stream #56

Merged
merged 3 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions NOnion.Tests/HiddenServicesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System;
using System.IO;
using System.Linq;
using System.Net.Security;
using System.Security.Authentication;
using System.Security.Cryptography;
using System.Threading.Tasks;

Expand Down Expand Up @@ -86,7 +88,7 @@ private async Task<int> ReadExact(TorStream stream, byte[] buffer, int off, int
{
if (len - off <= 0) return 0;

var bytesRead = await stream.ReceiveAsync(buffer, off, len - off);
var bytesRead = await stream.ReadAsync(buffer, off, len - off);

if (bytesRead == 0 || bytesRead == -1)
throw new Exception("Not enough data");
Expand All @@ -110,6 +112,27 @@ public void CanBrowseFacebookOverHS()
Assert.ThrowsAsync(typeof(UnsuccessfulHttpRequestException), BrowseFacebookOverHS);
}

public async Task BrowseFacebookOverHSWithTLS()
{
TorDirectory directory = await TorDirectory.BootstrapAsync(FallbackDirectorySelector.GetRandomFallbackDirectory(), new DirectoryInfo(Path.GetTempPath()));

var client = await TorServiceClient.ConnectAsync(directory, "facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd.onion:443");

var sslStream = new SslStream(client.GetStream(), true, (sender, cert, chain, sslPolicyErrors) => true);
await sslStream.AuthenticateAsClientAsync(string.Empty, null, SslProtocols.Tls12, false);

var httpClientOverSslStream = new TorHttpClient(sslStream, "www.facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd.onion");
var facebookResponse = await httpClientOverSslStream.GetAsStringAsync("/", false);
Assert.That(facebookResponse.Contains("<html"), "Response from facebook was invalid.");
}

[Test]
[Retry(TestsRetryCount)]
public void CanBrowseFacebookOverHSWithTLS()
{
Assert.DoesNotThrowAsync(BrowseFacebookOverHSWithTLS);
}

public async Task EstablishAndCommunicateOverHSConnectionOnionStyle()
{
int descriptorUploadRetryLimit = 2;
Expand All @@ -134,7 +157,7 @@ public async Task EstablishAndCommunicateOverHSConnectionOnionStyle()
Task.Run(async () => {
var stream = await host.AcceptClientAsync();
var bytesToSendWithLength = BitConverter.GetBytes(dataToSendAndReceive.Length).Concat(dataToSendAndReceive).ToArray();
await stream.SendDataAsync(bytesToSendWithLength);
await stream.WriteAsync(bytesToSendWithLength, 0, bytesToSendWithLength.Length);
await stream.EndAsync();
});

Expand Down
2 changes: 0 additions & 2 deletions NOnion/Constants.fs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ module Constants =
// Time limit used for receving data in stream
let internal StreamReceiveTimeout = TimeSpan.FromSeconds 1.

let internal HttpClientBufferSize = 1024

let internal DefaultHttpHost = "127.0.0.1"

// NTor Handshake Constants
Expand Down
8 changes: 4 additions & 4 deletions NOnion/Directory/TorDirectory.fs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type TorDirectory =
)

let circuit = TorCircuit(guard)
let stream = TorStream(circuit)
use stream = new TorStream(circuit)

(*
* We always use FastCreate authentication because privacy is not important for mono-hop
Expand Down Expand Up @@ -216,7 +216,7 @@ type TorDirectory =
)

let circuit = TorCircuit(guard)
let stream = TorStream(circuit)
use stream = new TorStream(circuit)

(*
* We always use FastCreate authentication because privacy is not important for mono-hop
Expand Down Expand Up @@ -252,7 +252,7 @@ type TorDirectory =
circuit.Create CircuitNodeDetail.FastCreate
|> Async.Ignore

let consensusStream = TorStream circuit
use consensusStream = new TorStream(circuit)
aarani marked this conversation as resolved.
Show resolved Hide resolved
do! consensusStream.ConnectToDirectory() |> Async.Ignore

let consensusHttpClient =
Expand Down Expand Up @@ -340,7 +340,7 @@ type TorDirectory =
(digestsChunk: array<string>)
=
async {
let descriptorsStream = TorStream circuit
use descriptorsStream = new TorStream(circuit)

do!
descriptorsStream.ConnectToDirectory()
Expand Down
35 changes: 12 additions & 23 deletions NOnion/Http/TorHttpClient.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,11 @@ open NOnion
open NOnion.Network
open NOnion.Utility

type TorHttpClient(stream: TorStream, host: string) =
type TorHttpClient(stream: Stream, host: string) =

// Receives all the data stream until it reaches EOF (until stream receive a RELAY_END)
let rec ReceiveAll(memStream: MemoryStream) =
async {
let buffer = Array.zeroCreate Constants.HttpClientBufferSize

// Try to fill the buffer
let! bytesRead =
stream.Receive buffer 0 Constants.HttpClientBufferSize

if bytesRead > 0 then
memStream.Write(buffer, 0, bytesRead)
return! ReceiveAll memStream
}
let ReceiveAll memStream =
stream.CopyToAsync memStream |> Async.AwaitTask

member __.GetAsString (path: string) (forceUncompressed: bool) =
async {
Expand All @@ -42,10 +32,11 @@ type TorHttpClient(stream: TorStream, host: string) =
|> List.map(fun (k, v) -> sprintf "%s: %s\r\n" k v)
|> String.concat String.Empty

do!
let buffer =
sprintf "GET %s HTTP/1.0\r\n%s\r\n" path headers
|> Encoding.UTF8.GetBytes
|> stream.SendData
|> Encoding.ASCII.GetBytes

do! stream.AsyncWrite(buffer, 0, buffer.Length)

use memStream = new MemoryStream()

Expand Down Expand Up @@ -99,9 +90,7 @@ type TorHttpClient(stream: TorStream, host: string) =
|> Map.ofArray

match headersMap.TryGetValue "Content-Encoding" with
| false, _ ->
return
failwith "GetAsString: Content-Encoding header is missing"
| false, _
| true, "identity" -> return body |> Encoding.UTF8.GetString
| true, "deflate" ->
// DeflateStream needs the zlib header to be chopped off first
Expand Down Expand Up @@ -136,10 +125,11 @@ type TorHttpClient(stream: TorStream, host: string) =
|> List.map(fun (k, v) -> sprintf "%s: %s\r\n" k v)
|> String.concat String.Empty

do!
let buffer =
sprintf "POST %s HTTP/1.0\r\n%s\r\n%s" path headers payload
|> Encoding.ASCII.GetBytes
|> stream.SendData

do! stream.AsyncWrite(buffer, 0, buffer.Length)

use memStream = new MemoryStream()

Expand Down Expand Up @@ -188,8 +178,7 @@ type TorHttpClient(stream: TorStream, host: string) =

match headersMap.TryGetValue "Content-Encoding" with
| false, _ when body.Length = 0 -> return String.Empty
| false, _ ->
return failwith "PostString: Content-Encoding header is missing"
| false, _
| true, "identity" -> return body |> Encoding.UTF8.GetString
| true, "deflate" ->
// DeflateStream needs the zlib header to be chopped off first
Expand Down
114 changes: 89 additions & 25 deletions NOnion/Network/TorStream.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace NOnion.Network

open System
open System.IO
open System.Threading
open System.Threading.Tasks
open System.Threading.Tasks.Dataflow

Expand All @@ -22,7 +24,9 @@ type private StreamReceiveMessage =
type private StreamControlMessage =
| End of replyChannel: AsyncReplyChannel<OperationResult<unit>>
| Send of
array<byte> *
data: array<byte> *
offset: int *
length: int *
replyChannel: AsyncReplyChannel<OperationResult<unit>>
| StartServiceConnectionProcess of
port: int *
Expand All @@ -44,6 +48,7 @@ type private StreamControlMessage =
| SendSendMe of replyChannel: AsyncReplyChannel<OperationResult<unit>>

type TorStream(circuit: TorCircuit) =
inherit Stream()

let mutable streamState: StreamState = StreamState.Initialized

Expand Down Expand Up @@ -79,12 +84,15 @@ type TorStream(circuit: TorCircuit) =
| _ -> failwith "Unexpected state when trying to end the stream"
}

let safeSend(data: array<byte>) =
let safeSend (data: array<byte>) (offset: int) (length: int) =
async {
match streamState with
| Connected streamId ->
let dataChunks =
SeqUtils.Chunk Constants.MaximumRelayPayloadLength data
data
|> Seq.skip offset
|> Seq.take length
|> Seq.chunkBySize Constants.MaximumRelayPayloadLength

let rec sendChunks dataChunks =
async {
Expand All @@ -98,7 +106,7 @@ type TorStream(circuit: TorCircuit) =
circuit.SendRelayCell
streamId
(head
|> Array.ofSeq
|> Seq.toArray
|> RelayData.RelayData)
None

Expand Down Expand Up @@ -224,9 +232,9 @@ type TorStream(circuit: TorCircuit) =
match command with
| End replyChannel ->
do! safeEnd() |> TryExecuteAsyncAndReplyAsResult replyChannel
| Send(data, replyChannel) ->
| Send(data, offset, length, replyChannel) ->
do!
safeSend data
safeSend data offset length
|> TryExecuteAsyncAndReplyAsResult replyChannel
| StartServiceConnectionProcess(port, streamObj, replyChannel) ->
do!
Expand Down Expand Up @@ -348,7 +356,7 @@ type TorStream(circuit: TorCircuit) =
do! refillBufferIfNeeded()

if isEOF then
return -1
return 0
else
let rec tryRead bytesRead bytesRemaining =
async {
Expand Down Expand Up @@ -401,9 +409,31 @@ type TorStream(circuit: TorCircuit) =
let streamReceiveMailBox =
MailboxProcessor.Start StreamReceiveMailBoxProcessor

override _.CanRead = not isEOF
override _.CanWrite = not isEOF

override _.CanSeek = false

override _.Length = failwith "Length is not supported"

override _.SetLength _ =
failwith "SetLength is not supported"

override _.Position
with get () = failwith "No seek, GetPosition is not supported"
and set _position = failwith "No seek, SetPosition is not supported"

override _.Seek(_, _) =
failwith "No seek, Seek is not supported"

override _.Flush() =
()

static member Accept (streamId: uint16) (circuit: TorCircuit) =
async {
let stream = TorStream circuit
// We can't use the "use" keyword since this stream needs
// to outlive this function.
let stream = new TorStream(circuit)
do! stream.RegisterIncomingStream streamId

do! circuit.SendRelayCell streamId (RelayConnected Array.empty) None
Expand All @@ -428,20 +458,6 @@ type TorStream(circuit: TorCircuit) =
member self.EndAsync() =
self.End() |> Async.StartAsTask


member __.SendData(data: array<byte>) =
async {
let! sendResult =
streamControlMailBox.PostAndAsyncReply(fun replyChannel ->
StreamControlMessage.Send(data, replyChannel)
)

return UnwrapResult sendResult
}

member self.SendDataAsync data =
self.SendData data |> Async.StartAsTask

member self.ConnectToService(port: int) =
async {
let! completionTaskRes =
Expand Down Expand Up @@ -500,7 +516,34 @@ type TorStream(circuit: TorCircuit) =
return UnwrapResult registerationResult
}

member self.Receive (buffer: array<byte>) (offset: int) (length: int) =
override _.Read(buffer: array<byte>, offset: int, length: int) =
let receiveResult =
streamReceiveMailBox.PostAndReply(fun replyChannel ->
{
StreamBuffer = buffer
BufferOffset = offset
BufferLength = length
ReplyChannel = replyChannel
}
)

UnwrapResult receiveResult

override _.Write(buffer: array<byte>, offset: int, length: int) =
let sendResult =
streamControlMailBox.PostAndReply(fun replyChannel ->
StreamControlMessage.Send(buffer, offset, length, replyChannel)
)

UnwrapResult sendResult

override _.ReadAsync
(
buffer: array<byte>,
offset: int,
length: int,
_cancelToken: CancellationToken
) =
async {
let! receiveResult =
streamReceiveMailBox.PostAndAsyncReply(fun replyChannel ->
Expand All @@ -514,9 +557,30 @@ type TorStream(circuit: TorCircuit) =

return UnwrapResult receiveResult
}
|> Async.StartAsTask

override _.WriteAsync
(
buffer: array<byte>,
offset: int,
length: int,
_cancelToken: CancellationToken
) =
async {
let! sendResult =
streamControlMailBox.PostAndAsyncReply(fun replyChannel ->
StreamControlMessage.Send(
buffer,
offset,
length,
replyChannel
)
)

member self.ReceiveAsync(buffer: array<byte>, offset: int, length: int) =
self.Receive buffer offset length |> Async.StartAsTask
return UnwrapResult sendResult
knocte marked this conversation as resolved.
Show resolved Hide resolved
}
|> Async.StartAsTask
:> Task

interface ITorStream with
member __.HandleDestroyedCircuit() =
Expand Down
6 changes: 4 additions & 2 deletions NOnion/Services/TorServiceClient.fs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type TorServiceClient =
circuit.Extend hsDirectoryNode
|> Async.Ignore

let dirStream = TorStream circuit
use dirStream = new TorStream(circuit)

do!
dirStream.ConnectToDirectory()
Expand Down Expand Up @@ -493,7 +493,9 @@ type TorServiceClient =
Async.Parallel [ introduceJob; rendezvousJoin ]
|> Async.Ignore

let serviceStream = TorStream rendezvousCircuit
// We can't use the "use" keyword since this stream needs
// to outlive this function.
let serviceStream = new TorStream(rendezvousCircuit)
do! serviceStream.ConnectToService port |> Async.Ignore

return
Expand Down
Loading