Skip to content

Commit

Permalink
Merge #63677
Browse files Browse the repository at this point in the history
63677: sql/pgwire: implicitly destroy portal on txn finish  r=jordanlewis a=rafiss

fixes #42912
touches #40195 

This also includes a commit to fix an error message hint that seems to be
accidentally using `WithSafeDetails`, which is meant for sentry reporting.
(Originally added in #40197.)

Release note (sql change): Previously, committing a transaction when a
portal was suspended would cause a "multiple active portals not
supported" error. Now, the portal is automatically destroyed.

Co-authored-by: Rafi Shamim <rafi@cockroachlabs.com>
  • Loading branch information
craig[bot] and rafiss committed Apr 27, 2021
2 parents fd2ab67 + 0ce4443 commit bf5547c
Show file tree
Hide file tree
Showing 3 changed files with 580 additions and 15 deletions.
43 changes: 43 additions & 0 deletions pkg/acceptance/testdata/java/src/main/java/MainTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import java.math.BigDecimal;
import java.sql.Array;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -339,4 +341,45 @@ public void testVirtualTableMetadata() throws Exception {
}
}
}

// Regression test for #42912: using setFetchSize in a transaction should
// not cause issues.
@Test
public void testSetFetchSize() throws Exception {
for (int fetchSize = 0; fetchSize <= 3; fetchSize++) {
testSetFetchSize(fetchSize, true);
testSetFetchSize(fetchSize, false);
}
}

private void testSetFetchSize(int fetchSize, boolean useTransaction) throws Exception {
int expectedResults = fetchSize;
if (fetchSize == 0 || fetchSize == 3) {
expectedResults = 2;
}

try (final Connection testConn = DriverManager.getConnection(getDBUrl(), "root", "")) {
testConn.setAutoCommit(!useTransaction);
try (final Statement stmt = testConn.createStatement()) {
stmt.setFetchSize(fetchSize);
ResultSet result = stmt.executeQuery("select n from generate_series(0,1) n");
for (int i = 0; i < expectedResults; i++) {
Assert.assertTrue(result.next());
Assert.assertEquals(i, result.getInt(1));
}
if (useTransaction) {
// This should implicitly close the ResultSet (i.e. portal).
testConn.commit();
if (fetchSize != 0 && fetchSize != 3) {
try {
result.next();
fail("expected portal to be closed");
} catch (PSQLException e) {
Assert.assertTrue(e.getMessage().contains("unknown portal"));
}
}
}
}
}
}
}
102 changes: 87 additions & 15 deletions pkg/sql/pgwire/command_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package pgwire

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/server/telemetry"
Expand Down Expand Up @@ -441,24 +442,15 @@ func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error {
}
switch c := cmd.(type) {
case sql.DeletePreparedStmt:
// The client wants to close a portal or statement. We
// support the case where it is exactly this
// portal. This is done by closing the portal in
// the same way implicit transactions do, but also
// rewinding the stmtBuf to still point to the portal
// close so that the state machine can do its part of
// the cleanup. We are in effect peeking to see if the
// The client wants to close a portal or statement. We support the case
// where it is exactly this portal. We are in effect peeking to see if the
// next message is a delete portal.
if c.Type != pgwirebase.PreparePortal || c.Name != r.portalName {
telemetry.Inc(sqltelemetry.InterleavedPortalRequestCounter)
return errors.WithDetail(sql.ErrLimitedResultNotSupported,
"cannot close a portal while a different one is open")
}
r.typ = noCompletionMsg
// Rewind to before the delete so the AdvanceOne in
// connExecutor.execCmd ends up back on it.
r.conn.stmtBuf.Rewind(ctx, prevPos)
return sql.ErrLimitedResultClosed
return r.rewindAndClosePortal(ctx, prevPos)
case sql.ExecPortal:
// The happy case: the client wants more rows from the portal.
if c.Name != r.portalName {
Expand All @@ -482,12 +474,92 @@ func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error {
return err
}
default:
// If the portal is immediately followed by a COMMIT, we can proceed and
// let the portal be destroyed at the end of the transaction.
if isCommit, err := r.isCommit(); err != nil {
return err
} else if isCommit {
return r.rewindAndClosePortal(ctx, prevPos)
}
// We got some other message, but we only support executing to completion.
telemetry.Inc(sqltelemetry.InterleavedPortalRequestCounter)
return errors.WithSafeDetails(sql.ErrLimitedResultNotSupported,
"cannot perform operation %T while a different portal is open",
errors.Safe(c))
return errors.WithDetail(sql.ErrLimitedResultNotSupported,
fmt.Sprintf("cannot perform operation %T while a different portal is open", c))
}
prevPos = curPos
}
}

// isCommit checks if the statement buffer has a COMMIT at the current
// position. It may either be (1) a COMMIT in the simple protocol, or (2) a
// Parse/Bind/Execute sequence for a COMMIT query.
func (r *limitedCommandResult) isCommit() (bool, error) {
cmd, _, err := r.conn.stmtBuf.CurCmd()
if err != nil {
return false, err
}
// Case 1: Check if cmd is a simple COMMIT statement.
if execStmt, ok := cmd.(sql.ExecStmt); ok {
if _, isCommit := execStmt.AST.(*tree.CommitTransaction); isCommit {
return true, nil
}
}

commitStmtName := ""
commitPortalName := ""
// Case 2a: Check if cmd is a prepared COMMIT statement.
if prepareStmt, ok := cmd.(sql.PrepareStmt); ok {
if _, isCommit := prepareStmt.AST.(*tree.CommitTransaction); isCommit {
commitStmtName = prepareStmt.Name
} else {
return false, nil
}
} else {
return false, nil
}

r.conn.stmtBuf.AdvanceOne()
cmd, _, err = r.conn.stmtBuf.CurCmd()
if err != nil {
return false, err
}
// Case 2b: The next cmd must be a bind command.
if bindStmt, ok := cmd.(sql.BindStmt); ok {
// This bind command must be for the COMMIT statement that we just saw.
if bindStmt.PreparedStatementName == commitStmtName {
commitPortalName = bindStmt.PortalName
} else {
return false, nil
}
} else {
return false, nil
}

r.conn.stmtBuf.AdvanceOne()
cmd, _, err = r.conn.stmtBuf.CurCmd()
if err != nil {
return false, err
}
// Case 2c: The next cmd must be an exec portal command.
if execPortal, ok := cmd.(sql.ExecPortal); ok {
// This exec command must be for the portal that was just bound.
if execPortal.Name == commitPortalName {
return true, nil
}
}
return false, nil
}

// rewindAndClosePortal closes the portal in the same way implicit transactions
// do, but also rewinds the stmtBuf to still point to the portal close so that
// the state machine can do its part of the cleanup.
func (r *limitedCommandResult) rewindAndClosePortal(
ctx context.Context, rewindTo sql.CmdPos,
) error {
// Don't send an CommandComplete for the portal; it got suspended.
r.typ = noCompletionMsg
// Rewind to before the delete so the AdvanceOne in connExecutor.execCmd ends
// up back on it.
r.conn.stmtBuf.Rewind(ctx, rewindTo)
return sql.ErrLimitedResultClosed
}
Loading

0 comments on commit bf5547c

Please sign in to comment.