From 2e84dccaf57b4fa803149884f2149dfa9e923593 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Thu, 29 Feb 2024 18:44:01 -0600 Subject: [PATCH] *Pipeline.getResults should close pipeline on error Otherwise, it might be possible to panic when closing the pipeline if it tries to read a connection that should be closed but still has a fatal error on the wire. https://github.com/jackc/pgx/issues/1920 --- pgconn/pgconn.go | 2 ++ pgconn/pgconn_test.go | 83 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/pgconn/pgconn.go b/pgconn/pgconn.go index b287e0205..ad81ec60a 100644 --- a/pgconn/pgconn.go +++ b/pgconn/pgconn.go @@ -2094,6 +2094,8 @@ func (p *Pipeline) getResults() (results any, err error) { for { msg, err := p.conn.receiveMessage() if err != nil { + p.closed = true + p.err = err p.conn.asyncClose() return nil, normalizeTimeoutError(p.ctx, err) } diff --git a/pgconn/pgconn_test.go b/pgconn/pgconn_test.go index 7ca3992e0..f04fa79ad 100644 --- a/pgconn/pgconn_test.go +++ b/pgconn/pgconn_test.go @@ -3389,3 +3389,86 @@ func TestSNISupport(t *testing.T) { }) } } + +// https://github.com/jackc/pgx/issues/1920 +func TestFatalErrorReceivedInPipelineMode(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + steps := pgmock.AcceptUnauthenticatedConnRequestSteps() + steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Parse{})) + steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Describe{})) + steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Parse{})) + steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Describe{})) + steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Parse{})) + steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Describe{})) + steps = append(steps, pgmock.SendMessage(&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{ + {Name: []byte("mock")}, + }})) + steps = append(steps, pgmock.SendMessage(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01"})) + // We shouldn't get anything after the first fatal error. But the reported issue was with PgBouncer so maybe that + // causes the issue. Anyway, a FATAL error after the connection had already been killed could cause a panic. + steps = append(steps, pgmock.SendMessage(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01"})) + + script := &pgmock.Script{Steps: steps} + + ln, err := net.Listen("tcp", "127.0.0.1:") + require.NoError(t, err) + defer ln.Close() + + serverKeepAlive := make(chan struct{}) + defer close(serverKeepAlive) + + serverErrChan := make(chan error, 1) + go func() { + defer close(serverErrChan) + + conn, err := ln.Accept() + if err != nil { + serverErrChan <- err + return + } + defer conn.Close() + + err = conn.SetDeadline(time.Now().Add(59 * time.Second)) + if err != nil { + serverErrChan <- err + return + } + + err = script.Run(pgproto3.NewBackend(conn, conn)) + if err != nil { + serverErrChan <- err + return + } + + <-serverKeepAlive + }() + + parts := strings.Split(ln.Addr().String(), ":") + host := parts[0] + port := parts[1] + connStr := fmt.Sprintf("sslmode=disable host=%s port=%s", host, port) + + ctx, cancel = context.WithTimeout(ctx, 59*time.Second) + defer cancel() + conn, err := pgconn.Connect(ctx, connStr) + require.NoError(t, err) + + pipeline := conn.StartPipeline(ctx) + pipeline.SendPrepare("s1", "select 1", nil) + pipeline.SendPrepare("s2", "select 2", nil) + pipeline.SendPrepare("s3", "select 3", nil) + err = pipeline.Sync() + require.NoError(t, err) + + _, err = pipeline.GetResults() + require.NoError(t, err) + _, err = pipeline.GetResults() + require.Error(t, err) + + err = pipeline.Close() + require.Error(t, err) +}