From 447444bcef1d512a6943c436fc71e22242e7b05d Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Wed, 23 Nov 2022 20:32:15 -0500 Subject: [PATCH 1/2] BenchmarkPingPong: use net.Pipe() Which seems to fix #344. This indicates there's a bug in transport.NewPipe(), but I kinda want to phase that out anyway, so... --- rpc/bench_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rpc/bench_test.go b/rpc/bench_test.go index ebab4b11..1918eebc 100644 --- a/rpc/bench_test.go +++ b/rpc/bench_test.go @@ -2,18 +2,18 @@ package rpc_test import ( "context" + "net" "testing" "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/rpc" testcp "capnproto.org/go/capnp/v3/rpc/internal/testcapnp" - "capnproto.org/go/capnp/v3/rpc/transport" ) func BenchmarkPingPong(b *testing.B) { - p1, p2 := transport.NewPipe(1) + p1, p2 := net.Pipe() srv := testcp.PingPong_ServerToClient(pingPongServer{}) - conn1 := rpc.NewConn(rpc.NewTransport(p2), &rpc.Options{ + conn1 := rpc.NewConn(rpc.NewStreamTransport(p2), &rpc.Options{ ErrorReporter: testErrorReporter{tb: b}, BootstrapClient: capnp.Client(srv), }) @@ -23,7 +23,7 @@ func BenchmarkPingPong(b *testing.B) { b.Error("conn1.Close:", err) } }() - conn2 := rpc.NewConn(rpc.NewTransport(p1), &rpc.Options{ + conn2 := rpc.NewConn(rpc.NewStreamTransport(p1), &rpc.Options{ ErrorReporter: testErrorReporter{tb: b}, }) defer func() { From f8581d99824d7ebfb03a111008c0cdfdc9f4cc42 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Wed, 23 Nov 2022 20:58:13 -0500 Subject: [PATCH 2/2] Add a benchmark for streaming calls --- rpc/bench_test.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/rpc/bench_test.go b/rpc/bench_test.go index 1918eebc..47cc7792 100644 --- a/rpc/bench_test.go +++ b/rpc/bench_test.go @@ -6,10 +6,56 @@ import ( "testing" "capnproto.org/go/capnp/v3" + "capnproto.org/go/capnp/v3/flowcontrol" "capnproto.org/go/capnp/v3/rpc" testcp "capnproto.org/go/capnp/v3/rpc/internal/testcapnp" + "capnproto.org/go/capnp/v3/std/capnp/stream" ) +func BenchmarkStreaming(b *testing.B) { + ctx := context.Background() + p1, p2 := net.Pipe() + srv := testcp.StreamTest_ServerToClient(nullStream{}) + conn1 := rpc.NewConn(rpc.NewStreamTransport(p1), &rpc.Options{ + BootstrapClient: capnp.Client(srv), + }) + defer conn1.Close() + conn2 := rpc.NewConn(rpc.NewStreamTransport(p2), nil) + defer conn2.Close() + bootstrap := testcp.StreamTest(conn2.Bootstrap(ctx)) + defer bootstrap.Release() + var ( + futures []stream.StreamResult_Future + releaseFuncs []capnp.ReleaseFunc + ) + bootstrap.SetFlowLimiter(flowcontrol.NewFixedLimiter(1 << 9)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := 0; j < 1<<16; j++ { + fut, rel := bootstrap.Push(ctx, nil) + futures = append(futures, fut) + releaseFuncs = append(releaseFuncs, rel) + } + } + for i, fut := range futures { + _, err := fut.Struct() + if err != nil { + b.Errorf("Error waiting on future #%v: %v", i, err) + } + } + for _, rel := range releaseFuncs { + rel() + } +} + +// nullStream implements testcp.StreamTest, ignoring the data it is sent. +type nullStream struct { +} + +func (nullStream) Push(context.Context, testcp.StreamTest_push) error { + return nil +} + func BenchmarkPingPong(b *testing.B) { p1, p2 := net.Pipe() srv := testcp.PingPong_ServerToClient(pingPongServer{})