Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: handler server transport continuously reads from streams without waiting for application #7261

Open
wmxhw opened this issue May 23, 2024 · 13 comments
Labels
Area: Transport Includes HTTP/2 client/server and HTTP server handler transports and advanced transport features. fixit P2 Type: Bug

Comments

@wmxhw
Copy link

wmxhw commented May 23, 2024

What version of gRPC are you using?

grpc v1.51.0

What version of Go are you using (go version)?

go version go1.21.7 windows/amd64

What operating system (Linux, Windows, …) and version?

Windows 11

What did you do?

My client has a large amount of data to send, but the server's processing performance (such as writing to disk) may not be as good. Here is a test code, and it also exhibits the same issue.

test proto

service HelloService {
    rpc SayHelloStream(stream SayHelloRequest) returns (google.protobuf.Empty){}
}

message SayHelloRequest {
    string hello = 1;
}

server code

func (s helloServer) SayHelloStream(stream hello.HelloService_SayHelloStreamServer) error {
    for {
        r, err := stream,Recv()
        if err == io.EOF { 
            break 
        }
        if err != nil {
            return err
        }
        fmt.Println(r.Hello)
        time.Sleep(10*time.Second)  // Print once every 10 seconds.
    }
}

client code

func main() {
	clt := hello.NewHelloServiceClient(cc)
	
	body := bytes.NewBuffer(nil)  // Simulate a large amount of data.
	for i:=0; i< 1e6; i++ {
		body.WriteString("n")
	}
	for {
		err = stream.Send(&hello.SayHelloRequest{
			Hello: body.String(),
		})
		if err != nil {
			panic(err)
		}
		time.Sleep(10*time.Millisecond)  // Send once every 10 milliseconds.
	}
}

What did you expect to see?

Even if the client sends a large amount of traffic, the server should not experience memory leaks.
gRPC should have flow control and will not have memory leaks.

What did you see instead?

There is a memory leak occurring in the HandleStreams function at internal/transport/handler_server.go.

func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*Stream)) {
    ....
	go func() {
		defer close(readerDone)

		// TODO: minimize garbage, optimize recvBuffer code/ownership
		const readSize = 8196
		for buf := make([]byte, readSize); ; {
			n, err := req.Body.Read(buf)
			if n > 0 {
				s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
				buf = buf[n:]
			}
			if err != nil {
				s.buf.put(recvMsg{err: mapRecvMsgError(err)})
				return
			}
			if len(buf) == 0 {
				buf = make([]byte, readSize)     // Does this piece of code have flow control?
			}
		}
	}()
    ....
}
@wmxhw
Copy link
Author

wmxhw commented May 23, 2024

leak

@arjan-bal arjan-bal self-assigned this May 23, 2024
@arjan-bal
Copy link
Contributor

Hi wmxhw@, can you try the same repro and share the profiler's output in a more recent grcp-go version (v1.61+)?

The server transport code has changed significantly since 1.51 was released in 2022.

@wmxhw
Copy link
Author

wmxhw commented May 24, 2024

@arjan-bal
Thank you for your response. Even after switching to gRPC v1.64.0, I am still encountering the same problem.

@arjan-bal
Copy link
Contributor

Can you share the graph generated by the profiler for 1.64?

@wmxhw
Copy link
Author

wmxhw commented May 24, 2024

This is the test code and a screenshot of its pprof results

grpc_client_stream_example.zip

image
image
image

@wmxhw
Copy link
Author

wmxhw commented May 24, 2024

@arjan-bal

@arjan-bal
Copy link
Contributor

wmxhw@, the server code sleeps for 10 seconds after reading one message while the client code sleeps for 10 millis after sending each message. This means that the client can send 1000 message before the server reads 1. Since each message is of 1 Mb, this means that gRPC will need to buffer 1 Gb of messages till the application calls stream.Recv(). The buffering would explain why the server's memory consumption is increasing by more than 100 Mb each second. After removing the sleep in the server side, the memory usage stayed around 50 Mb.

If your aim is to only restrict the logging interval, you can do something like this:

{
        logInterval := 10 * time.Second
	lastLogTime := time.Time{}
	for {
		recv, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			panic(err)
		}
		now := time.Now()
		if now.Sub(lastLogTime) >= logInterval {
			lastLogTime = now
			fmt.Printf("Len: %d\n", len(recv.GetHello()))
		}
	}
	return nil
}

@wmxhw
Copy link
Author

wmxhw commented May 26, 2024

Perhaps I didn't make it clear earlier. The example code above was just for testing. The actual scenario is that the client sends a large amount of data through the gRPC client-streaming mode, and the server needs to write the received data to the hard disk, Due to the slow writing speed of the hard drive,the memory leak mentioned earlier occurred.

Can I understand that the stream mode of GRPC does not limit the upper limit of cached packets, and can only control traffic through negotiation between the client and server at the application layer?

For example, using bidirectional streaming mode, the server can send an acknowledgment to the client when it completes writing to the hard disk, and the client can then send more data.

Do you have any suggestions for this? @arjan-bal


@arjan-bal
Copy link
Contributor

arjan-bal commented May 26, 2024

Can you try letting grpc manage the listener?

l, err := net.Listen("tcp", ":12345")
if err != nil {
	panic(err)
}
defer l.Close()

server := grpc.NewServer()

hello.RegisterHelloServiceServer(server, helloServer{})
if err := server.Serve(l); err != nil {
		// handle the error
}

This allows grpc to implement flow control. I tried this and the server memory remained low even when the server was slow to read (due to the 10 sec delay).

If you want an http server for other application like pprof, you can use a different port.

@wmxhw
Copy link
Author

wmxhw commented May 27, 2024

Thanks, I now know that ServerTransport has two implementations.

Why is serverHandlerTransport asynchronous and what are the restrictions?

@arjan-bal
Copy link
Contributor

arjan-bal commented May 29, 2024

I discussed this issue with the team. The serverHandlerTransport is an experimental adaptor to make gRPC work with the std library's http server. It is missing many features including flow control. Users are encouraged to use the http2server transport instead. This is the transport used when calling server.Serve(listener).

Having said that, both the transports buffer http2 frames to ensure that the server doesn't waste time waiting for more packets to arrive. In case of serverHandlerTransport, there is no bound on the size of the buffer, it keeps reading frames until the connection is closed. This is a bug.

As serverHandlerTransport is an experimental transport and isn't a high priority feature, we will not be fixing it immediately, but PRs are welcome from contributors.

@arjan-bal arjan-bal removed their assignment May 29, 2024
@wmxhw
Copy link
Author

wmxhw commented May 31, 2024

Thank you very much for your answer. Now I have changed the listener to grpc.

@purnesh42H purnesh42H removed the P2 label Jun 4, 2024
@easwars easwars added the P2 label Jun 4, 2024
@arvindbr8 arvindbr8 added the fixit label Jun 4, 2024
@arvindbr8
Copy link
Member

We can pick this up during a fixit in the coming Qs. We should definitely be able to come up with a plan to resolve this.

@dfawley dfawley changed the title It seems that there is a memory leak issue in the HandleStreams function. transport: handler server transport continuously reads from streams without waiting for application Jul 3, 2024
@purnesh42H purnesh42H added the Area: Transport Includes HTTP/2 client/server and HTTP server handler transports and advanced transport features. label Jan 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area: Transport Includes HTTP/2 client/server and HTTP server handler transports and advanced transport features. fixit P2 Type: Bug
Projects
None yet
Development

No branches or pull requests

5 participants