From 3c5b406f6b154c086a2cf23194d55cdbfd16d7e1 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Mon, 27 Mar 2023 23:20:06 -0500 Subject: [PATCH 1/8] pgwire: update comments for `limitedCommandResult` With the introduction of pausable portals, the comment for `limitedCommandResult` needs to be updated to reflect the current behavior. Release note: None --- pkg/sql/pgwire/command_result.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/pkg/sql/pgwire/command_result.go b/pkg/sql/pgwire/command_result.go index 37aa6d030625..716b280b81e7 100644 --- a/pkg/sql/pgwire/command_result.go +++ b/pkg/sql/pgwire/command_result.go @@ -460,21 +460,6 @@ func (c *conn) newMiscResult(pos sql.CmdPos, typ completionMsgType) *commandResu // to AddRow will block until the associated client connection asks for more // rows. It essentially implements the "execute portal with limit" part of the // Postgres protocol. -// -// This design is known to be flawed. It only supports a specific subset of the -// protocol. We only allow a portal suspension in an explicit transaction where -// the suspended portal is completely exhausted before any other pgwire command -// is executed, otherwise an error is produced. You cannot, for example, -// interleave portal executions (a portal must be executed to completion before -// another can be executed). It also breaks the software layering by adding an -// additional state machine here, instead of teaching the state machine in the -// sql package about portals. -// -// This has been done because refactoring the executor to be able to correctly -// suspend a portal will require a lot of work, and we wanted to move -// forward. The work included is things like auditing all of the defers and -// post-execution stuff (like stats collection) to have it only execute once -// per statement instead of once per portal. type limitedCommandResult struct { *commandResult portalName string From 688d398bfdef90ae2918d9f6533abd696db35295 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Wed, 29 Mar 2023 20:36:27 -0500 Subject: [PATCH 2/8] sql: add session variable for multiple active portals This change introduces a new session variable for a preview feature. When set to `true`, all non-internal portals with read-only [`SELECT`](../v23.1/selection-queries.html) queries without sub-queries or post-queries can be paused and resumed in an interleaving manner, but are executed with a local plan. Release note (SQL change): Added the session variable `multiple_active_portals_enabled`. This setting is only for a preview feature. When set to `true`, it allows multiple portals to be open at the same time, with their execution interleaved with each other. In other words, these portals can be paused. The underlying statement for a pausable portal must be a read-only `SELECT` query without sub-queries or post-queries, and such a portal is always executed with a local plan. --- .../generated/settings/settings-for-tenants.txt | 1 - docs/generated/settings/settings.html | 1 - pkg/server/BUILD.bazel | 1 - pkg/server/server_sql.go | 8 -------- pkg/sql/distsql_running.go | 6 +++--- pkg/sql/exec_util.go | 12 ++++-------- pkg/sql/execinfra/base.go | 5 +++-- .../testdata/logic_test/information_schema | 3 +++ .../logictest/testdata/logic_test/pg_catalog | 5 +++-- .../logictest/testdata/logic_test/show_source | 2 +- pkg/sql/pgwire/testdata/pgtest/portals_crbugs | 4 ++-- pkg/sql/prepared_stmt.go | 17 ++++++++++------- .../sessiondatapb/local_only_session_data.proto | 5 +++++ pkg/sql/sqltelemetry/pgwire.go | 8 +++++--- pkg/sql/vars.go | 17 +++++++++++++++++ 15 files changed, 56 insertions(+), 39 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index c376c084e63a..9865a8e72109 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -250,7 +250,6 @@ sql.multiple_modifications_of_table.enabled boolean false if true, allow stateme sql.multiregion.drop_primary_region.enabled boolean true allows dropping the PRIMARY REGION of a database if it is the last region sql.notices.enabled boolean true enable notices in the server/client protocol being sent sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled boolean false if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability -sql.pgwire.multiple_active_portals.enabled boolean false if true, portals with read-only SELECT query without sub/post queries can be executed in interleaving manner, but with local execution plan sql.schema.telemetry.recurrence string @weekly cron-tab recurrence for SQL schema telemetry job sql.show_ranges_deprecated_behavior.enabled boolean true if set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES. sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 1cbdd6b609c3..09384a4f8eff 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -201,7 +201,6 @@
sql.multiregion.drop_primary_region.enabled
booleantrueallows dropping the PRIMARY REGION of a database if it is the last region
sql.notices.enabled
booleantrueenable notices in the server/client protocol being sent
sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled
booleanfalseif enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability -
sql.pgwire.multiple_active_portals.enabled
booleanfalseif true, portals with read-only SELECT query without sub/post queries can be executed in interleaving manner, but with local execution plan
sql.schema.telemetry.recurrence
string@weeklycron-tab recurrence for SQL schema telemetry job
sql.show_ranges_deprecated_behavior.enabled
booleantrueif set, SHOW RANGES and crdb_internal.ranges{_no_leases} behave with deprecated pre-v23.1 semantics. NB: the new SHOW RANGES interface has richer WITH options than pre-v23.1 SHOW RANGES.
sql.spatial.experimental_box2d_comparison_operators.enabled
booleanfalseenables the use of certain experimental box2d comparison operators diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index fc03ce4c7f81..5cecaed24f56 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -251,7 +251,6 @@ go_library( "//pkg/sql/sqlstats/insights", "//pkg/sql/sqlstats/persistedsqlstats", "//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil", - "//pkg/sql/sqltelemetry", "//pkg/sql/stats", "//pkg/sql/stmtdiagnostics", "//pkg/sql/syntheticprivilege", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 262e4d953964..1989f016dcdf 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -59,7 +59,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" "github.com/cockroachdb/cockroach/pkg/server/status" "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" - "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/server/tracedumper" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -103,7 +102,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" - "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache" @@ -1353,12 +1351,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { vmoduleSetting.SetOnChange(&cfg.Settings.SV, fn) fn(ctx) - sql.EnableMultipleActivePortals.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) { - if sql.EnableMultipleActivePortals.Get(&cfg.Settings.SV) { - telemetry.Inc(sqltelemetry.MultipleActivePortalCounter) - } - }) - return &SQLServer{ ambientCtx: cfg.BaseConfig.AmbientCtx, stopper: cfg.stopper, diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 5c89251d216e..34ea495ad917 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -1549,16 +1549,16 @@ func (r *DistSQLReceiver) PushBatch( var ( // ErrLimitedResultNotSupported is an error produced by pgwire // indicating the user attempted to have multiple active portals but - // either without setting sql.pgwire.multiple_active_portals.enabled to + // either without setting session variable multiple_active_portals_enabled to // true or the underlying query does not satisfy the restriction. ErrLimitedResultNotSupported = unimplemented.NewWithIssue( 40195, "multiple active portals not supported, "+ - "please set sql.pgwire.multiple_active_portals.enabled to true. "+ + "please set session variable multiple_active_portals_enabled to true. "+ "Note: this feature is in preview", ) // ErrStmtNotSupportedForPausablePortal is returned when the user have set - // sql.pgwire.multiple_active_portals.enabled to true but set an unsupported + // session variable multiple_active_portals_enabled to true but set an unsupported // statement for a portal. ErrStmtNotSupportedForPausablePortal = unimplemented.NewWithIssue( 98911, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 4aebccb2298e..aa7db94e1826 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -733,14 +733,6 @@ var overrideAlterPrimaryRegionInSuperRegion = settings.RegisterBoolSetting( false, ).WithPublic() -var EnableMultipleActivePortals = settings.RegisterBoolSetting( - settings.TenantWritable, - "sql.pgwire.multiple_active_portals.enabled", - "if true, portals with read-only SELECT query without sub/post queries "+ - "can be executed in interleaving manner, but with local execution plan", - false, -).WithPublic() - var errNoTransactionInProgress = errors.New("there is no transaction in progress") var errTransactionInProgress = errors.New("there is already a transaction in progress") @@ -3536,6 +3528,10 @@ func (m *sessionDataMutator) SetStreamerEnabled(val bool) { m.data.StreamerEnabled = val } +func (m *sessionDataMutator) SetMultipleActivePortalsEnabled(val bool) { + m.data.MultipleActivePortalsEnabled = val +} + // Utility functions related to scrubbing sensitive information on SQL Stats. // quantizeCounts ensures that the Count field in the diff --git a/pkg/sql/execinfra/base.go b/pkg/sql/execinfra/base.go index 496e6bce51fa..737302352526 100644 --- a/pkg/sql/execinfra/base.go +++ b/pkg/sql/execinfra/base.go @@ -43,8 +43,9 @@ const ( NeedMoreRows ConsumerStatus = iota // SwitchToAnotherPortal indicates that the we received exec command for // a different portal, and may come back to continue executing the current - // portal later. If the cluster setting sql.pgwire.multiple_active_portals.enabled - // is set to be true, we do nothing and return the control to the connExecutor. + // portal later. If the cluster setting session variable + // multiple_active_portals_enabled is set to be true, we do nothing and return + // the control to the connExecutor. SwitchToAnotherPortal // DrainRequested indicates that the consumer will not process any more data // rows, but will accept trailing metadata from the producer. diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index 1e1852679121..ec2c19bd3d8c 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -5175,6 +5175,7 @@ WHERE 'distsql_workmem', 'copy_fast_path_enabled', 'direct_columnar_scans_enabled' + 'multiple_active_portals_enabled' ); ---- variable value @@ -5207,6 +5208,7 @@ default_transaction_read_only off default_transaction_use_follower_reads off default_with_oids off descriptor_validation on +direct_columnar_scans_enabled on disable_drop_tenant off disable_hoist_projection_in_join_limitation off disable_partially_distributed_plans off @@ -5265,6 +5267,7 @@ lock_timeout 0 log_timezone UTC max_identifier_length 128 max_index_keys 32 +multiple_active_portals_enabled off node_id 1 null_ordered_last off on_update_rehome_row_enabled on diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index e6e79c34cffe..c343895e445d 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -2669,7 +2669,7 @@ SELECT FROM pg_catalog.pg_settings WHERE - name NOT IN ('optimizer', 'crdb_version', 'session_id', 'distsql_workmem', 'copy_fast_path_enabled', 'direct_columnar_scans_enabled') + name NOT IN ('optimizer', 'crdb_version', 'session_id', 'distsql_workmem', 'copy_fast_path_enabled', 'direct_columnar_scans_enabled', 'multiple_active_portals_enabled') ---- name setting category short_desc extra_desc vartype allow_ordinal_column_references off NULL NULL NULL string @@ -2822,7 +2822,7 @@ SELECT FROM pg_catalog.pg_settings WHERE - name NOT IN ('optimizer', 'crdb_version', 'session_id', 'distsql_workmem', 'copy_fast_path_enabled', 'direct_columnar_scans_enabled') + name NOT IN ('optimizer', 'crdb_version', 'session_id', 'distsql_workmem', 'copy_fast_path_enabled', 'direct_columnar_scans_enabled', 'multiple_active_portals_enabled') ---- name setting unit context enumvals boot_val reset_val allow_ordinal_column_references off NULL user NULL off off @@ -3058,6 +3058,7 @@ lock_timeout NULL NULL NULL log_timezone NULL NULL NULL NULL NULL max_identifier_length NULL NULL NULL NULL NULL max_index_keys NULL NULL NULL NULL NULL +multiple_active_portals_enabled NULL NULL NULL NULL NULL node_id NULL NULL NULL NULL NULL null_ordered_last NULL NULL NULL NULL NULL on_update_rehome_row_enabled NULL NULL NULL NULL NULL diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index 5222c85b06c7..60f610cd5e99 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -23,7 +23,7 @@ UTF8 1 query TT colnames SELECT * FROM [SHOW ALL] -WHERE variable NOT IN ('optimizer', 'crdb_version', 'session_id', 'distsql_workmem', 'copy_fast_path_enabled', 'direct_columnar_scans_enabled') +WHERE variable NOT IN ('optimizer', 'crdb_version', 'session_id', 'distsql_workmem', 'copy_fast_path_enabled', 'direct_columnar_scans_enabled', 'multiple_active_portals_enabled') ---- variable value allow_ordinal_column_references off diff --git a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs index 1afeb7f1788f..14211a72b413 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs +++ b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs @@ -29,7 +29,7 @@ ReadyForQuery {"Type":"BindComplete"} {"Type":"DataRow","Values":[{"text":"1"}]} {"Type":"PortalSuspended"} -{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported, please set sql.pgwire.multiple_active_portals.enabled to true. Note: this feature is in preview"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported, please set session variable multiple_active_portals_enabled to true. Note: this feature is in preview"} {"Type":"ReadyForQuery","TxStatus":"E"} {"Type":"ReadyForQuery","TxStatus":"E"} @@ -70,7 +70,7 @@ ReadyForQuery {"Type":"BindComplete"} {"Type":"DataRow","Values":[{"text":"1"}]} {"Type":"PortalSuspended"} -{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported, please set sql.pgwire.multiple_active_portals.enabled to true. Note: this feature is in preview"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported, please set session variable multiple_active_portals_enabled to true. Note: this feature is in preview"} {"Type":"ReadyForQuery","TxStatus":"E"} send diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index 2f3834bda37c..aa3115adc405 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -15,11 +15,13 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" "github.com/cockroachdb/cockroach/pkg/sql/querycache" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -127,15 +129,15 @@ type PortalPausablity int64 const ( // PortalPausabilityDisabled is the default status of a portal when - // sql.pgwire.multiple_active_portals.enabled is false. + // the session variable multiple_active_portals_enabled is false. PortalPausabilityDisabled PortalPausablity = iota - // PausablePortal is set when sql.pgwire.multiple_active_portals.enabled is - // set to true and the underlying statement is a read-only SELECT query with - // no sub-queries or post-queries. + // PausablePortal is set when the session variable multiple_active_portals_enabled + // is set to true and the underlying statement is a read-only SELECT query + // with no sub-queries or post-queries. PausablePortal // NotPausablePortalForUnsupportedStmt is used when the cluster setting - // sql.pgwire.multiple_active_portals.enabled is set to true, while we don't - // support underlying statement. + // the session variable multiple_active_portals_enabled is set to true, while + // we don't support underlying statement. NotPausablePortalForUnsupportedStmt ) @@ -180,7 +182,8 @@ func (ex *connExecutor) makePreparedPortal( OutFormats: outFormats, } - if EnableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV) { + if ex.sessionData().MultipleActivePortalsEnabled { + telemetry.Inc(sqltelemetry.StmtsTriedWithPausablePortals) portal.pauseInfo = &portalPauseInfo{} portal.portalPausablity = PausablePortal } diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index 9a5fd41d163d..19914953017a 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -369,6 +369,11 @@ message LocalOnlySessionData { // DisableDropTenant causes errors when the client // attempts to drop tenants or tenant records. bool disable_drop_tenant = 99; + // MultipleActivePortalEnabled determines if the pgwire portal execution + // for certain queries can be paused. If true, portals with read-only SELECT + // query without sub/post queries can be executed in interleaving manner, but + // with a local execution plan. + bool multiple_active_portals_enabled = 100; /////////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // diff --git a/pkg/sql/sqltelemetry/pgwire.go b/pkg/sql/sqltelemetry/pgwire.go index 18a7ef68e867..3cacbfad4cdd 100644 --- a/pkg/sql/sqltelemetry/pgwire.go +++ b/pkg/sql/sqltelemetry/pgwire.go @@ -69,6 +69,8 @@ var CloseRequestCounter = telemetry.GetCounterOnce("pgwire.command.close") // is made. var FlushRequestCounter = telemetry.GetCounterOnce("pgwire.command.flush") -// MultipleActivePortalCounter is to be incremented every time the cluster setting -// sql.pgwire.multiple_active_portals.enabled is set true. -var MultipleActivePortalCounter = telemetry.GetCounterOnce("pgwire.multiple_active_portals") +// StmtsTriedWithPausablePortals is to be incremented every time there's a +// not-internal statement executed with a pgwire portal and the session variable +// multiple_active_portals_enabled has been set to true. +// The statement might not satisfy the restriction for a pausable portal. +var StmtsTriedWithPausablePortals = telemetry.GetCounterOnce("pgwire.pausable_portal_stmts") diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index e5560b65ee2c..0b9025e3cf24 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -2648,6 +2648,23 @@ var varGen = map[string]sessionVar{ return formatBoolAsPostgresSetting(execinfra.UseStreamerEnabled.Get(sv)) }, }, + + // CockroachDB extension. + `multiple_active_portals_enabled`: { + GetStringVal: makePostgresBoolGetStringValFn(`multiple_active_portals_enabled`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + b, err := paramparse.ParseBoolVar("multiple_active_portals_enabled", s) + if err != nil { + return err + } + m.SetMultipleActivePortalsEnabled(b) + return nil + }, + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return formatBoolAsPostgresSetting(evalCtx.SessionData().MultipleActivePortalsEnabled), nil + }, + GlobalDefault: globalFalse, + }, } // We want test coverage for this on and off so make it metamorphic. From 0b556627c447353e45810b1af88a924c605859cf Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Wed, 22 Mar 2023 15:03:42 -0700 Subject: [PATCH 3/8] sql: add clean-up steps for pausable portals to ensure proper resource persistence This commit is part of the implementation of multiple active portals. In order to execute portals interleavingly, certain resources need to be persisted and their clean-up must be delayed until the portal is closed. Additionally, these resources don't need to be re-setup when the portal is re-executed. To achieve this, we store the cleanup steps in the `cleanup` function stacks in `portalPauseInfo`, and they are called when any of the following events occur: 1. SQL transaction is committed or rolled back 2. Connection executor is closed 3. An error is encountered when executing the portal 4. The portal is explicited closed by the user The cleanup functions should be called in the original order of a normal portal. Since a portal's execution follows the `execPortal() -> execStmtInOpenState() -> dispatchToExecutionEngine() -> flow.Run()` function flow, we categorize the cleanup functions into 4 "layers", which are stored accordingly in `PreparedPortal.pauseInfo`. The cleanup is always LIFO, following the - resumableFlow.cleanup - dispatchToExecutionEngine.cleanup - execStmtInOpenState.cleanup - exhaustPortal.cleanup` order. Additionally, if an error occurs in any layer, we clean up the current and proceeding layers. For example, if an error occurs in `execStmtInOpenState()`, we perform `resumableFlow.cleanup` and `dispatchToExecutionEngine.cleanup` (proceeding) and then `execStmtInOpenState.cleanup` (current) before returning the error to `execPortal()`, where `exhaustPortal.cleanup` will eventually be called. This is to maintain the previous clean-up process for portals as much as possible. We also pass the `PreparedPortal` as a reference to the planner in `execStmtInOpenState()`,so that the portal's flow can be set and reused. Release note: None --- pkg/sql/apply_join.go | 4 + pkg/sql/conn_executor.go | 87 +++++- pkg/sql/conn_executor_exec.go | 461 ++++++++++++++++++++++++------- pkg/sql/conn_io.go | 10 + pkg/sql/distsql_running.go | 39 ++- pkg/sql/pgwire/command_result.go | 5 + pkg/sql/planner.go | 6 +- pkg/sql/prepared_stmt.go | 154 ++++++++++- 8 files changed, 636 insertions(+), 130 deletions(-) diff --git a/pkg/sql/apply_join.go b/pkg/sql/apply_join.go index e170277690c4..d22e84a0e375 100644 --- a/pkg/sql/apply_join.go +++ b/pkg/sql/apply_join.go @@ -320,6 +320,10 @@ func runPlanInsidePlan( // Make a copy of the EvalContext so it can be safely modified. evalCtx := params.p.ExtendedEvalContextCopy() plannerCopy := *params.p + // If we reach this part when re-executing a pausable portal, we won't want to + // resume the flow bound to it. The inner-plan should have its own lifecycle + // for its flow. + plannerCopy.pausablePortal = nil distributePlan := getPlanDistribution( ctx, plannerCopy.Descriptors().HasUncommittedTypes(), plannerCopy.SessionData().DistSQLMode, plan.main, diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 6b23641e4b76..ae2d55648178 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1138,6 +1138,14 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) { txnEvType = txnRollback } + // Close all portals, otherwise there will be leftover bytes. + ex.extraTxnState.prepStmtsNamespace.closeAllPortals( + ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, + ) + ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals( + ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, + ) + if closeType == normalClose { // We'll cleanup the SQL txn by creating a non-retriable (commit:true) event. // This event is guaranteed to be accepted in every state. @@ -1760,6 +1768,26 @@ func (ns *prepStmtNamespace) touchLRUEntry(name string) { ns.addLRUEntry(name, 0) } +func (ns *prepStmtNamespace) closeAllPortals( + ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount, +) { + for name, p := range ns.portals { + p.close(ctx, prepStmtsNamespaceMemAcc, name) + delete(ns.portals, name) + } +} + +func (ns *prepStmtNamespace) closeAllPausablePortals( + ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount, +) { + for name, p := range ns.portals { + if p.pauseInfo != nil { + p.close(ctx, prepStmtsNamespaceMemAcc, name) + delete(ns.portals, name) + } + } +} + // MigratablePreparedStatements returns a mapping of all prepared statements. func (ns *prepStmtNamespace) MigratablePreparedStatements() []sessiondatapb.MigratableSession_PreparedStatement { ret := make([]sessiondatapb.MigratableSession_PreparedStatement, 0, len(ns.prepStmts)) @@ -1836,10 +1864,7 @@ func (ns *prepStmtNamespace) resetTo( for name := range ns.prepStmtsLRU { delete(ns.prepStmtsLRU, name) } - for name, p := range ns.portals { - p.close(ctx, prepStmtsNamespaceMemAcc, name) - delete(ns.portals, name) - } + ns.closeAllPortals(ctx, prepStmtsNamespaceMemAcc) for name, ps := range to.prepStmts { ps.incRef(ctx) @@ -1880,10 +1905,9 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) { } // Close all portals. - for name, p := range ex.extraTxnState.prepStmtsNamespace.portals { - p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) - delete(ex.extraTxnState.prepStmtsNamespace.portals, name) - } + ex.extraTxnState.prepStmtsNamespace.closeAllPortals( + ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, + ) // Close all cursors. if err := ex.extraTxnState.sqlCursors.closeAll(false /* errorOnWithHold */); err != nil { @@ -1894,10 +1918,9 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) { switch ev.eventType { case txnCommit, txnRollback: - for name, p := range ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals { - p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) - delete(ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals, name) - } + ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals( + ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, + ) ex.extraTxnState.savepoints.clear() ex.onTxnFinish(ctx, ev) case txnRestart: @@ -2044,7 +2067,6 @@ func (ex *connExecutor) run( return err } } - } // errDrainingComplete is returned by execCmd when the connExecutor previously got @@ -2116,7 +2138,7 @@ func (ex *connExecutor) execCmd() (retErr error) { (tcmd.LastInBatchBeforeShowCommitTimestamp || tcmd.LastInBatch || !implicitTxnForBatch) ev, payload, err = ex.execStmt( - ctx, tcmd.Statement, nil /* prepared */, nil /* pinfo */, stmtRes, canAutoCommit, + ctx, tcmd.Statement, nil /* portal */, nil /* pinfo */, stmtRes, canAutoCommit, ) return err @@ -2186,6 +2208,9 @@ func (ex *connExecutor) execCmd() (retErr error) { ex.implicitTxn(), portal.portalPausablity, ) + if portal.pauseInfo != nil { + portal.pauseInfo.curRes = stmtRes + } res = stmtRes // In the extended protocol, autocommit is not always allowed. The postgres @@ -2204,6 +2229,8 @@ func (ex *connExecutor) execCmd() (retErr error) { // - ex.statsCollector merely contains a copy of the times, that // was created when the statement started executing (via the // reset() method). + // TODO(sql-sessions): fix the phase time for pausable portals. + // https://github.com/cockroachdb/cockroach/issues/99410 ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.SessionQueryServiced, timeutil.Now()) if err != nil { return err @@ -2313,9 +2340,26 @@ func (ex *connExecutor) execCmd() (retErr error) { var advInfo advanceInfo + // We close all pausable portals when we encounter err payload, otherwise + // there will be leftover bytes. + shouldClosePausablePortals := func(payload fsm.EventPayload) bool { + switch payload.(type) { + case eventNonRetriableErrPayload, eventRetriableErrPayload: + return true + default: + return false + } + } + // If an event was generated, feed it to the state machine. if ev != nil { var err error + if shouldClosePausablePortals(payload) { + // We need this as otherwise, there'll be leftover bytes when + // txnState.finishSQLTxn() is being called, as the underlying resources of + // pausable portals hasn't been cleared yet. + ex.extraTxnState.prepStmtsNamespace.closeAllPausablePortals(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc) + } advInfo, err = ex.txnStateTransitionsApplyWrapper(ev, payload, res, pos) if err != nil { return err @@ -2366,6 +2410,16 @@ func (ex *connExecutor) execCmd() (retErr error) { res.SetError(pe.errorCause()) } } + // For a pausable portal, we don't log the affected rows until we close the + // portal. However, we update the result for each execution. Thus, we need + // to accumulate the number of affected rows before closing the result. + if tcmd, ok := cmd.(*ExecPortal); ok { + if portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[tcmd.Name]; ok { + if portal.pauseInfo != nil { + portal.pauseInfo.dispatchToExecutionEngine.rowsAffected += res.(RestrictedCommandResult).RowsAffected() + } + } + } res.Close(ctx, stateToTxnStatusIndicator(ex.machine.CurState())) } else { res.Discard() @@ -3598,6 +3652,11 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( fallthrough case txnRollback: ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent) + // Since we're doing a complete rollback, there's no need to keep the + // prepared stmts for a txn rewind. + ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals( + ex.Ctx(), &ex.extraTxnState.prepStmtsNamespaceMemAcc, + ) case txnRestart: // In addition to resetting the extraTxnState, the restart event may // also need to reset the sqlliveness.Session. diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 2977fc2e9fc1..afd21af818d3 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -95,7 +95,7 @@ const numTxnRetryErrors = 3 func (ex *connExecutor) execStmt( ctx context.Context, parserStmt parser.Statement, - prepared *PreparedStatement, + portal *PreparedPortal, pinfo *tree.PlaceholderInfo, res RestrictedCommandResult, canAutoCommit bool, @@ -133,8 +133,12 @@ func (ex *connExecutor) execStmt( ev, payload = ex.execStmtInNoTxnState(ctx, ast, res) case stateOpen: - err = ex.execWithProfiling(ctx, ast, prepared, func(ctx context.Context) error { - ev, payload, err = ex.execStmtInOpenState(ctx, parserStmt, prepared, pinfo, res, canAutoCommit) + var preparedStmt *PreparedStatement + if portal != nil { + preparedStmt = portal.Stmt + } + err = ex.execWithProfiling(ctx, ast, preparedStmt, func(ctx context.Context) error { + ev, payload, err = ex.execStmtInOpenState(ctx, parserStmt, portal, pinfo, res, canAutoCommit) return err }) switch ev.(type) { @@ -201,7 +205,23 @@ func (ex *connExecutor) execPortal( stmtRes CommandResult, pinfo *tree.PlaceholderInfo, canAutoCommit bool, -) (ev fsm.Event, payload fsm.EventPayload, err error) { +) (ev fsm.Event, payload fsm.EventPayload, retErr error) { + defer func() { + if portal.isPausable() { + if !portal.pauseInfo.exhaustPortal.cleanup.isComplete { + portal.pauseInfo.exhaustPortal.cleanup.appendFunc(namedFunc{fName: "exhaust portal", f: func() { + ex.exhaustPortal(portalName) + }}) + portal.pauseInfo.exhaustPortal.cleanup.isComplete = true + } + // If we encountered an error when executing a pausable portal, clean up + // the retained resources. + if retErr != nil { + portal.pauseInfo.cleanupAll() + } + } + }() + switch ex.machine.CurState().(type) { case stateOpen: // We're about to execute the statement in an open state which @@ -223,23 +243,19 @@ func (ex *connExecutor) execPortal( if portal.exhausted { return nil, nil, nil } - ev, payload, err = ex.execStmt(ctx, portal.Stmt.Statement, portal.Stmt, pinfo, stmtRes, canAutoCommit) - // Portal suspension is supported via a "side" state machine - // (see pgwire.limitedCommandResult for details), so when - // execStmt returns, we know for sure that the portal has been - // executed to completion, thus, it is exhausted. - // Note that the portal is considered exhausted regardless of - // the fact whether an error occurred or not - if it did, we - // still don't want to re-execute the portal from scratch. + ev, payload, retErr = ex.execStmt(ctx, portal.Stmt.Statement, &portal, pinfo, stmtRes, canAutoCommit) + // For a non-pausable portal, it is considered exhausted regardless of the + // fact whether an error occurred or not - if it did, we still don't want + // to re-execute the portal from scratch. // The current statement may have just closed and deleted the portal, // so only exhaust it if it still exists. - if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok { - ex.exhaustPortal(portalName) + if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok && !portal.isPausable() { + defer ex.exhaustPortal(portalName) } - return ev, payload, err + return ev, payload, retErr default: - return ex.execStmt(ctx, portal.Stmt.Statement, portal.Stmt, pinfo, stmtRes, canAutoCommit) + return ex.execStmt(ctx, portal.Stmt.Statement, &portal, pinfo, stmtRes, canAutoCommit) } } @@ -259,17 +275,59 @@ func (ex *connExecutor) execPortal( func (ex *connExecutor) execStmtInOpenState( ctx context.Context, parserStmt parser.Statement, - prepared *PreparedStatement, + portal *PreparedPortal, pinfo *tree.PlaceholderInfo, res RestrictedCommandResult, canAutoCommit bool, ) (retEv fsm.Event, retPayload fsm.EventPayload, retErr error) { - ctx, sp := tracing.EnsureChildSpan(ctx, ex.server.cfg.AmbientCtx.Tracer, "sql query") - // TODO(andrei): Consider adding the placeholders as tags too. - sp.SetTag("statement", attribute.StringValue(parserStmt.SQL)) - defer sp.Finish() + // We need this to be function rather than a static bool, because a portal's + // "pausability" can be revoked in `dispatchToExecutionEngine()` if the + // underlying statement contains sub/post queries. Thus, we should evaluate + // whether a portal is pausable when executing the cleanup step. + isPausablePortal := func() bool { return portal != nil && portal.isPausable() } + // For pausable portals, we delay the clean-up until closing the portal by + // adding the function to the execStmtInOpenStateCleanup. + // Otherwise, perform the clean-up step within every execution. + processCleanupFunc := func(fName string, f func()) { + if !isPausablePortal() { + f() + } else if !portal.pauseInfo.execStmtInOpenState.cleanup.isComplete { + portal.pauseInfo.execStmtInOpenState.cleanup.appendFunc(namedFunc{ + fName: fName, + f: f, + }) + } + } + defer func() { + // This is the first defer, so it will always be called after any cleanup + // func being added to the stack from the defers below. + if isPausablePortal() && !portal.pauseInfo.execStmtInOpenState.cleanup.isComplete { + portal.pauseInfo.execStmtInOpenState.cleanup.isComplete = true + } + // If there's any error, do the cleanup right here. + if (retErr != nil || payloadHasError(retPayload)) && isPausablePortal() { + portal.pauseInfo.resumableFlow.cleanup.run() + portal.pauseInfo.dispatchToExecutionEngine.cleanup.run() + portal.pauseInfo.execStmtInOpenState.cleanup.run() + } + }() + ast := parserStmt.AST - ctx = withStatement(ctx, ast) + var sp *tracing.Span + if !isPausablePortal() || !portal.pauseInfo.execStmtInOpenState.cleanup.isComplete { + ctx, sp = tracing.EnsureChildSpan(ctx, ex.server.cfg.AmbientCtx.Tracer, "sql query") + // TODO(andrei): Consider adding the placeholders as tags too. + sp.SetTag("statement", attribute.StringValue(parserStmt.SQL)) + ctx = withStatement(ctx, ast) + if isPausablePortal() { + portal.pauseInfo.execStmtInOpenState.spCtx = ctx + } + defer func() { + processCleanupFunc("cleanup span", sp.Finish) + }() + } else { + ctx = portal.pauseInfo.execStmtInOpenState.spCtx + } makeErrEvent := func(err error) (fsm.Event, fsm.EventPayload, error) { ev, payload := ex.makeErrEvent(err, ast) @@ -277,7 +335,17 @@ func (ex *connExecutor) execStmtInOpenState( } var stmt Statement - queryID := ex.generateID() + var queryID clusterunique.ID + + if isPausablePortal() { + if !portal.pauseInfo.isQueryIDSet() { + portal.pauseInfo.execStmtInOpenState.queryID = ex.generateID() + } + queryID = portal.pauseInfo.execStmtInOpenState.queryID + } else { + queryID = ex.generateID() + } + // Update the deadline on the transaction based on the collections. err := ex.extraTxnState.descCollection.MaybeUpdateDeadline(ctx, ex.state.mu.txn) if err != nil { @@ -285,39 +353,56 @@ func (ex *connExecutor) execStmtInOpenState( } os := ex.machine.CurState().(stateOpen) - isExtendedProtocol := prepared != nil + isExtendedProtocol := portal != nil && portal.Stmt != nil if isExtendedProtocol { - stmt = makeStatementFromPrepared(prepared, queryID) + stmt = makeStatementFromPrepared(portal.Stmt, queryID) } else { stmt = makeStatement(parserStmt, queryID) } - ex.incrementStartedStmtCounter(ast) - defer func() { - if retErr == nil && !payloadHasError(retPayload) { - ex.incrementExecutedStmtCounter(ast) - } - }() - - func(st *txnState) { - st.mu.Lock() - defer st.mu.Unlock() - st.mu.stmtCount++ - }(&ex.state) - var queryTimeoutTicker *time.Timer var txnTimeoutTicker *time.Timer queryTimedOut := false txnTimedOut := false - // queryDoneAfterFunc and txnDoneAfterFunc will be allocated only when // queryTimeoutTicker or txnTimeoutTicker is non-nil. var queryDoneAfterFunc chan struct{} var txnDoneAfterFunc chan struct{} var cancelQuery context.CancelFunc - ctx, cancelQuery = contextutil.WithCancel(ctx) - ex.addActiveQuery(parserStmt, pinfo, queryID, cancelQuery) + addActiveQuery := func() { + ctx, cancelQuery = contextutil.WithCancel(ctx) + ex.incrementStartedStmtCounter(ast) + func(st *txnState) { + st.mu.Lock() + defer st.mu.Unlock() + st.mu.stmtCount++ + }(&ex.state) + ex.addActiveQuery(parserStmt, pinfo, queryID, cancelQuery) + } + + // For pausable portal, the active query needs to be set up only when + // the portal is executed for the first time. + if !isPausablePortal() || !portal.pauseInfo.execStmtInOpenState.cleanup.isComplete { + addActiveQuery() + if isPausablePortal() { + portal.pauseInfo.execStmtInOpenState.cancelQueryFunc = cancelQuery + portal.pauseInfo.execStmtInOpenState.cancelQueryCtx = ctx + } + defer func() { + processCleanupFunc( + "increment executed stmt cnt", + func() { + if retErr == nil && !payloadHasError(retPayload) { + ex.incrementExecutedStmtCounter(ast) + } + }, + ) + }() + } else { + ctx = portal.pauseInfo.execStmtInOpenState.cancelQueryCtx + cancelQuery = portal.pauseInfo.execStmtInOpenState.cancelQueryFunc + } // Make sure that we always unregister the query. It also deals with // overwriting res.Error to a more user-friendly message in case of query @@ -338,25 +423,47 @@ func (ex *connExecutor) execStmtInOpenState( } } - // Detect context cancelation and overwrite whatever error might have been - // set on the result before. The idea is that once the query's context is - // canceled, all sorts of actors can detect the cancelation and set all - // sorts of errors on the result. Rather than trying to impose discipline - // in that jungle, we just overwrite them all here with an error that's - // nicer to look at for the client. - if res != nil && ctx.Err() != nil && res.Err() != nil { - // Even in the cases where the error is a retryable error, we want to - // intercept the event and payload returned here to ensure that the query - // is not retried. - retEv = eventNonRetriableErr{ - IsCommit: fsm.FromBool(isCommit(ast)), + processCleanupFunc("cancel query", func() { + cancelQueryCtx := ctx + if isPausablePortal() { + cancelQueryCtx = portal.pauseInfo.execStmtInOpenState.cancelQueryCtx } - res.SetError(cancelchecker.QueryCanceledError) - retPayload = eventNonRetriableErrPayload{err: cancelchecker.QueryCanceledError} - } + resToPushErr := res + // For pausable portals, we retain the query but update the result for + // each execution. When the query context is cancelled and we're in the + // middle of an portal execution, push the error to the current result. + if isPausablePortal() { + resToPushErr = portal.pauseInfo.curRes + } + // Detect context cancelation and overwrite whatever error might have been + // set on the result before. The idea is that once the query's context is + // canceled, all sorts of actors can detect the cancelation and set all + // sorts of errors on the result. Rather than trying to impose discipline + // in that jungle, we just overwrite them all here with an error that's + // nicer to look at for the client. + if resToPushErr != nil && cancelQueryCtx.Err() != nil && resToPushErr.ErrAllowReleased() != nil { + // Even in the cases where the error is a retryable error, we want to + // intercept the event and payload returned here to ensure that the query + // is not retried. + retEv = eventNonRetriableErr{ + IsCommit: fsm.FromBool(isCommit(ast)), + } + errToPush := cancelchecker.QueryCanceledError + // For pausable portal, we can arrive here after encountering a timeout + // error and then perform a query-cleanup step. In this case, we don't + // want to override the original timeout error with the query-cancelled + // error. + if isPausablePortal() && (errors.Is(resToPushErr.Err(), sqlerrors.QueryTimeoutError) || + errors.Is(resToPushErr.Err(), sqlerrors.TxnTimeoutError)) { + errToPush = resToPushErr.Err() + } + resToPushErr.SetError(errToPush) + retPayload = eventNonRetriableErrPayload{err: errToPush} + } + ex.removeActiveQuery(queryID, ast) + cancelQuery() + }) - ex.removeActiveQuery(queryID, ast) - cancelQuery() if ex.executorType != executorTypeInternal { ex.metrics.EngineMetrics.SQLActiveStatements.Dec(1) } @@ -392,6 +499,9 @@ func (ex *connExecutor) execStmtInOpenState( ex.metrics.EngineMetrics.SQLActiveStatements.Inc(1) } + // TODO(sql-sessions): persist the planner for a pausable portal, and reuse + // it for each re-execution. + // https://github.com/cockroachdb/cockroach/issues/99625 p := &ex.planner stmtTS := ex.server.cfg.Clock.PhysicalTime() ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes) @@ -478,25 +588,60 @@ func (ex *connExecutor) execStmtInOpenState( } var needFinish bool - ctx, needFinish = ih.Setup( - ctx, ex.server.cfg, ex.statsCollector, p, ex.stmtDiagnosticsRecorder, - stmt.StmtNoConstants, os.ImplicitTxn.Get(), ex.extraTxnState.shouldCollectTxnExecutionStats, - ) + // For pausable portal, the instrumentation helper needs to be set up only when + // the portal is executed for the first time. + if !isPausablePortal() || portal.pauseInfo.execStmtInOpenState.ihWrapper == nil { + ctx, needFinish = ih.Setup( + ctx, ex.server.cfg, ex.statsCollector, p, ex.stmtDiagnosticsRecorder, + stmt.StmtNoConstants, os.ImplicitTxn.Get(), ex.extraTxnState.shouldCollectTxnExecutionStats, + ) + } else { + ctx = portal.pauseInfo.execStmtInOpenState.ihWrapper.ctx + } + // For pausable portals, we need to persist the instrumentationHelper as it + // shares the ctx with the underlying flow. If it got cleaned up before we + // clean up the flow, we will hit `span used after finished` whenever we log + // an event when cleaning up the flow. + // We need this seemingly weird wrapper here because we set the planner's ih + // with its pointer. However, for pausable portal, we'd like to persist the + // ih and reuse it for all re-executions. So the planner's ih and the portal's + // ih should never have the same address, otherwise changing the former will + // change the latter, and we will never be able to persist it. + if isPausablePortal() { + if portal.pauseInfo.execStmtInOpenState.ihWrapper == nil { + portal.pauseInfo.execStmtInOpenState.ihWrapper = &instrumentationHelperWrapper{ + ctx: ctx, + ih: *ih, + } + } else { + p.instrumentation = portal.pauseInfo.execStmtInOpenState.ihWrapper.ih + } + } if needFinish { sql := stmt.SQL defer func() { - retErr = ih.Finish( - ex.server.cfg, - ex.statsCollector, - &ex.extraTxnState.accumulatedStats, - ih.collectExecStats, - p, - ast, - sql, - res, - retPayload, - retErr, - ) + processCleanupFunc("finish instrumentation helper", func() { + // We need this weird thing because we need to make sure we're closing + // the correct instrumentation helper for the paused portal. + ihToFinish := ih + curRes := res + if isPausablePortal() { + ihToFinish = &portal.pauseInfo.execStmtInOpenState.ihWrapper.ih + curRes = portal.pauseInfo.curRes + } + retErr = ihToFinish.Finish( + ex.server.cfg, + ex.statsCollector, + &ex.extraTxnState.accumulatedStats, + ihToFinish.collectExecStats, + p, + ast, + sql, + curRes, + retPayload, + retErr, + ) + }) }() } @@ -562,6 +707,7 @@ func (ex *connExecutor) execStmtInOpenState( if retEv != nil || retErr != nil { return } + // As portals are from extended protocol, we don't auto commit for them. if canAutoCommit && !isExtendedProtocol { retEv, retPayload = ex.handleAutoCommit(ctx, ast) } @@ -660,8 +806,13 @@ func (ex *connExecutor) execStmtInOpenState( // For regular statements (the ones that get to this point), we // don't return any event unless an error happens. - if err := ex.handleAOST(ctx, ast); err != nil { - return makeErrEvent(err) + // For a portal (prepared stmt), since handleAOST() is called when preparing + // the statement, and this function is idempotent, we don't need to + // call it again during execution. + if portal == nil { + if err := ex.handleAOST(ctx, ast); err != nil { + return makeErrEvent(err) + } } // The first order of business is to ensure proper sequencing @@ -709,7 +860,9 @@ func (ex *connExecutor) execStmtInOpenState( p.extendedEvalCtx.Placeholders = &p.semaCtx.Placeholders p.extendedEvalCtx.Annotations = &p.semaCtx.Annotations p.stmt = stmt - p.cancelChecker.Reset(ctx) + if isPausablePortal() { + p.pausablePortal = portal + } // Auto-commit is disallowed during statement execution if we previously // executed any DDL. This is because may potentially create jobs and do other @@ -723,6 +876,9 @@ func (ex *connExecutor) execStmtInOpenState( var stmtThresholdSpan *tracing.Span alreadyRecording := ex.transitionCtx.sessionTracing.Enabled() + // TODO(sql-sessions): fix the stmtTraceThreshold for pausable portals, so + // that it records all executions. + // https://github.com/cockroachdb/cockroach/issues/99404 stmtTraceThreshold := TraceStmtThreshold.Get(&ex.planner.execCfg.Settings.SV) var stmtCtx context.Context // TODO(andrei): I think we should do this even if alreadyRecording == true. @@ -736,6 +892,8 @@ func (ex *connExecutor) execStmtInOpenState( var r *tree.ReleaseSavepoint enforceHomeRegion := p.EnforceHomeRegion() _, isSelectStmt := stmt.AST.(*tree.Select) + // TODO(sql-sessions): ensure this is not broken for pausable portals. + // https://github.com/cockroachdb/cockroach/issues/99408 if enforceHomeRegion && ex.state.mu.txn.IsOpen() && isSelectStmt { // Create a savepoint at a point before which rows were read so that we can // roll back to it, which will allow the txn to be modified with a @@ -1103,6 +1261,8 @@ func (ex *connExecutor) commitSQLTransactionInternal(ctx context.Context) error return err } + ex.extraTxnState.prepStmtsNamespace.closeAllPortals(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc) + // We need to step the transaction before committing if it has stepping // enabled. If it doesn't have stepping enabled, then we just set the // stepping mode back to what it was. @@ -1191,6 +1351,9 @@ func (ex *connExecutor) rollbackSQLTransaction( if err := ex.extraTxnState.sqlCursors.closeAll(false /* errorOnWithHold */); err != nil { return ex.makeErrEvent(err, stmt) } + + ex.extraTxnState.prepStmtsNamespace.closeAllPortals(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc) + if err := ex.state.mu.txn.Rollback(ctx); err != nil { log.Warningf(ctx, "txn rollback failed: %s", err) } @@ -1213,9 +1376,29 @@ func (ex *connExecutor) rollbackSQLTransaction( // producing an appropriate state machine event. func (ex *connExecutor) dispatchToExecutionEngine( ctx context.Context, planner *planner, res RestrictedCommandResult, -) error { +) (retErr error) { + getPausablePortalInfo := func() *portalPauseInfo { + if planner != nil && planner.pausablePortal != nil { + return planner.pausablePortal.pauseInfo + } + return nil + } + defer func() { + if ppInfo := getPausablePortalInfo(); ppInfo != nil { + if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete { + ppInfo.dispatchToExecutionEngine.cleanup.isComplete = true + } + if retErr != nil || res.Err() != nil { + ppInfo.resumableFlow.cleanup.run() + ppInfo.dispatchToExecutionEngine.cleanup.run() + } + } + }() + stmt := planner.stmt ex.sessionTracing.TracePlanStart(ctx, stmt.AST.StatementTag()) + // TODO(sql-sessions): fix the phase time for pausable portals. + // https://github.com/cockroachdb/cockroach/issues/99410 ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.PlannerStartLogicalPlan, timeutil.Now()) if multitenant.TenantRUEstimateEnabled.Get(ex.server.cfg.SV()) { @@ -1241,10 +1424,25 @@ func (ex *connExecutor) dispatchToExecutionEngine( ex.extraTxnState.hasAdminRoleCache.IsSet = true } } - // Prepare the plan. Note, the error is processed below. Everything - // between here and there needs to happen even if there's an error. - err := ex.makeExecPlan(ctx, planner) - defer planner.curPlan.close(ctx) + + var err error + if ppInfo := getPausablePortalInfo(); ppInfo != nil { + if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete { + err = ex.makeExecPlan(ctx, planner) + ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan + ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{ + fName: "close planTop", + f: func() { ppInfo.dispatchToExecutionEngine.planTop.close(ctx) }, + }) + } else { + planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop + } + } else { + // Prepare the plan. Note, the error is processed below. Everything + // between here and there needs to happen even if there's an error. + err = ex.makeExecPlan(ctx, planner) + defer planner.curPlan.close(ctx) + } // Include gist in error reports. ctx = withPlanGist(ctx, planner.instrumentation.planGist.String()) @@ -1262,17 +1460,56 @@ func (ex *connExecutor) dispatchToExecutionEngine( var stats topLevelQueryStats defer func() { var bulkJobId uint64 - // Note that for bulk job query (IMPORT, BACKUP and RESTORE), we don't - // use this numRows entry. We emit the number of changed rows when the job - // completes. (see the usages of logutil.LogJobCompletion()). - nonBulkJobNumRows := res.RowsAffected() - switch planner.stmt.AST.(type) { - case *tree.Import, *tree.Restore, *tree.Backup: - bulkJobId = res.GetBulkJobId() - } - planner.maybeLogStatement(ctx, ex.executorType, false, int(ex.state.mu.autoRetryCounter), ex.extraTxnState.txnCounter, nonBulkJobNumRows, bulkJobId, res.Err(), ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived), &ex.extraTxnState.hasAdminRoleCache, ex.server.TelemetryLoggingMetrics, stmtFingerprintID, &stats) + if ppInfo := getPausablePortalInfo(); ppInfo != nil && !ppInfo.dispatchToExecutionEngine.cleanup.isComplete { + ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{ + fName: "log statement", + f: func() { + planner.maybeLogStatement( + ctx, + ex.executorType, + false, /* isCopy */ + int(ex.state.mu.autoRetryCounter), + ex.extraTxnState.txnCounter, + ppInfo.dispatchToExecutionEngine.rowsAffected, + bulkJobId, + ppInfo.curRes.ErrAllowReleased(), + ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived), + &ex.extraTxnState.hasAdminRoleCache, + ex.server.TelemetryLoggingMetrics, + ppInfo.dispatchToExecutionEngine.stmtFingerprintID, + ppInfo.dispatchToExecutionEngine.queryStats, + ) + }, + }) + } else { + // Note that for bulk job query (IMPORT, BACKUP and RESTORE), we don't + // use this numRows entry. We emit the number of changed rows when the job + // completes. (see the usages of logutil.LogJobCompletion()). + nonBulkJobNumRows := res.RowsAffected() + switch planner.stmt.AST.(type) { + case *tree.Import, *tree.Restore, *tree.Backup: + bulkJobId = res.GetBulkJobId() + } + planner.maybeLogStatement( + ctx, + ex.executorType, + false, /* isCopy */ + int(ex.state.mu.autoRetryCounter), + ex.extraTxnState.txnCounter, + nonBulkJobNumRows, + bulkJobId, + res.Err(), + ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived), + &ex.extraTxnState.hasAdminRoleCache, + ex.server.TelemetryLoggingMetrics, + stmtFingerprintID, + &stats, + ) + } }() + // TODO(sql-sessions): fix the phase time for pausable portals. + // https://github.com/cockroachdb/cockroach/issues/99410 ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.PlannerEndLogicalPlan, timeutil.Now()) ex.sessionTracing.TracePlanEnd(ctx, err) @@ -1303,6 +1540,8 @@ func (ex *connExecutor) dispatchToExecutionEngine( ex.server.cfg.TestingKnobs.BeforeExecute(ctx, stmt.String(), planner.Descriptors()) } + // TODO(sql-sessions): fix the phase time for pausable portals. + // https://github.com/cockroachdb/cockroach/issues/99410 ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.PlannerStartExecStmt, timeutil.Now()) progAtomic, err := func() (*uint64, error) { @@ -1351,6 +1590,12 @@ func (ex *connExecutor) dispatchToExecutionEngine( stats, err = ex.execWithDistSQLEngine( ctx, planner, stmt.AST.StatementReturnType(), res, distribute, progAtomic, ) + if ppInfo := getPausablePortalInfo(); ppInfo != nil { + // For pausable portals, we log the stats when closing the portal, so we need + // to aggregate the stats for all executions. + ppInfo.dispatchToExecutionEngine.queryStats.add(&stats) + } + if res.Err() == nil { isSetOrShow := stmt.AST.StatementTag() == "SET" || stmt.AST.StatementTag() == "SHOW" if ex.sessionData().InjectRetryErrorsEnabled && !isSetOrShow && @@ -1365,14 +1610,14 @@ func (ex *connExecutor) dispatchToExecutionEngine( } } ex.sessionTracing.TraceExecEnd(ctx, res.Err(), res.RowsAffected()) + // TODO(sql-sessions): fix the phase time for pausable portals. + // https://github.com/cockroachdb/cockroach/issues/99410 ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.PlannerEndExecStmt, timeutil.Now()) ex.extraTxnState.rowsRead += stats.rowsRead ex.extraTxnState.bytesRead += stats.bytesRead ex.extraTxnState.rowsWritten += stats.rowsWritten - populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector) - // The transaction (from planner.txn) may already have been committed at this point, // due to one-phase commit optimization or an error. Since we use that transaction // on the optimizer, check if is still open before generating index recommendations. @@ -1384,12 +1629,28 @@ func (ex *connExecutor) dispatchToExecutionEngine( planner.instrumentation.SetIndexRecommendations(ctx, ex.server.idxRecommendationsCache, planner, isInternal) } - // Record the statement summary. This also closes the plan if the - // plan has not been closed earlier. - stmtFingerprintID = ex.recordStatementSummary( - ctx, planner, - int(ex.state.mu.autoRetryCounter), res.RowsAffected(), res.Err(), stats, - ) + if ppInfo := getPausablePortalInfo(); ppInfo != nil && !ppInfo.dispatchToExecutionEngine.cleanup.isComplete { + // We need to ensure that we're using the planner bound to the first-time + // execution of a portal. + curPlanner := *planner + ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{ + fName: "populate query level stats and regions", + f: func() { + populateQueryLevelStatsAndRegions(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector) + ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary( + ctx, &curPlanner, + int(ex.state.mu.autoRetryCounter), ppInfo.dispatchToExecutionEngine.rowsAffected, ppInfo.curRes.ErrAllowReleased(), *ppInfo.dispatchToExecutionEngine.queryStats, + ) + }, + }) + } else { + populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector) + stmtFingerprintID = ex.recordStatementSummary( + ctx, planner, + int(ex.state.mu.autoRetryCounter), res.RowsAffected(), res.Err(), stats, + ) + } + if ex.server.cfg.TestingKnobs.AfterExecute != nil { ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err()) } @@ -1455,7 +1716,7 @@ func populateQueryLevelStatsAndRegions( trace := ih.sp.GetRecording(tracingpb.RecordingStructured) var err error queryLevelStats, err := execstats.GetQueryLevelStats( - trace, p.execCfg.TestingKnobs.DeterministicExplain, flowsMetadata) + trace, cfg.TestingKnobs.DeterministicExplain, flowsMetadata) queryLevelStatsWithErr := execstats.MakeQueryLevelStatsWithErr(queryLevelStats, err) ih.queryLevelStatsWithErr = &queryLevelStatsWithErr if err != nil { diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index c5fc74d58656..7656884005e5 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -808,6 +808,11 @@ type RestrictedCommandResult interface { // GetBulkJobId returns the id of the job for the query, if the query is // IMPORT, BACKUP or RESTORE. GetBulkJobId() uint64 + + // ErrAllowReleased returns the error without asserting the result is not + // released yet. It should be used only in clean-up stages of a pausable + // portal. + ErrAllowReleased() error } // DescribeResult represents the result of a Describe command (for either @@ -976,6 +981,11 @@ type streamingCommandResult struct { var _ RestrictedCommandResult = &streamingCommandResult{} var _ CommandResultClose = &streamingCommandResult{} +// ErrAllowReleased is part of the sql.RestrictedCommandResult interface. +func (r *streamingCommandResult) ErrAllowReleased() error { + return r.err +} + // SetColumns is part of the RestrictedCommandResult interface. func (r *streamingCommandResult) SetColumns(ctx context.Context, cols colinfo.ResultColumns) { // The interface allows for cols to be nil, yet the iterator result expects diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 34ea495ad917..988608eb3536 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -46,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -856,15 +857,16 @@ func (dsp *DistSQLPlanner) Run( var flow flowinfra.Flow var err error - if i := planCtx.getPortalPauseInfo(); i != nil && i.flow != nil { - flow = i.flow + if i := planCtx.getPortalPauseInfo(); i != nil && i.resumableFlow.flow != nil { + flow = i.resumableFlow.flow } else { ctx, flow, err = dsp.setupFlows( ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL, ) if i != nil { - i.flow = flow - i.outputTypes = plan.GetResultTypes() + // TODO(yuzefovich): add a check that this flow runs in a single goroutine. + i.resumableFlow.flow = flow + i.resumableFlow.outputTypes = plan.GetResultTypes() } } @@ -1610,7 +1612,15 @@ func (dsp *DistSQLPlanner) PlanAndRunAll( planner *planner, recv *DistSQLReceiver, evalCtxFactory func(usedConcurrently bool) *extendedEvalContext, -) error { +) (retErr error) { + defer func() { + if ppInfo := planCtx.getPortalPauseInfo(); ppInfo != nil && !ppInfo.resumableFlow.cleanup.isComplete { + ppInfo.resumableFlow.cleanup.isComplete = true + } + if retErr != nil && planCtx.getPortalPauseInfo() != nil { + planCtx.getPortalPauseInfo().resumableFlow.cleanup.run() + } + }() if len(planner.curPlan.subqueryPlans) != 0 { // Create a separate memory account for the results of the subqueries. // Note that we intentionally defer the closure of the account until we @@ -1640,6 +1650,25 @@ func (dsp *DistSQLPlanner) PlanAndRunAll( ctx, evalCtx, planCtx, planner.txn, planner.curPlan.main, recv, finishedSetupFn, ) }() + + if p := planCtx.getPortalPauseInfo(); p != nil { + if buildutil.CrdbTestBuild && p.resumableFlow.flow == nil { + checkErr := errors.AssertionFailedf("flow for portal %s cannot be found", planner.pausablePortal.Name) + if recv.commErr != nil { + recv.commErr = errors.CombineErrors(recv.commErr, checkErr) + } else { + return checkErr + } + } + if !p.resumableFlow.cleanup.isComplete { + p.resumableFlow.cleanup.appendFunc(namedFunc{ + fName: "cleanup flow", f: func() { + p.resumableFlow.flow.Cleanup(ctx) + }, + }) + } + } + if recv.commErr != nil || recv.getError() != nil { return recv.commErr } diff --git a/pkg/sql/pgwire/command_result.go b/pkg/sql/pgwire/command_result.go index 716b280b81e7..459fd27bbfaa 100644 --- a/pkg/sql/pgwire/command_result.go +++ b/pkg/sql/pgwire/command_result.go @@ -197,6 +197,11 @@ func (r *commandResult) Err() error { return r.err } +// ErrAllowReleased is part of the sql.RestrictedCommandResult interface. +func (r *commandResult) ErrAllowReleased() error { + return r.err +} + // SetError is part of the sql.RestrictedCommandResult interface. // // We're not going to write any bytes to the buffer in order to support future diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 555a3db7eb39..2434b2246ba9 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -272,7 +272,7 @@ type planner struct { // hasFlowForPausablePortal returns true if the planner is for re-executing a // portal. We reuse the flow stored in p.pausablePortal.pauseInfo. func (p *planner) hasFlowForPausablePortal() bool { - return p.pausablePortal != nil && p.pausablePortal.pauseInfo != nil && p.pausablePortal.pauseInfo.flow != nil + return p.pausablePortal != nil && p.pausablePortal.pauseInfo != nil && p.pausablePortal.pauseInfo.resumableFlow.flow != nil } // resumeFlowForPausablePortal is called when re-executing a portal. We reuse @@ -282,8 +282,8 @@ func (p *planner) resumeFlowForPausablePortal(recv *DistSQLReceiver) error { return errors.AssertionFailedf("no flow found for pausable portal") } recv.discardRows = p.instrumentation.ShouldDiscardRows() - recv.outputTypes = p.pausablePortal.pauseInfo.outputTypes - p.pausablePortal.pauseInfo.flow.Resume(recv) + recv.outputTypes = p.pausablePortal.pauseInfo.resumableFlow.outputTypes + p.pausablePortal.pauseInfo.resumableFlow.flow.Resume(recv) return recv.commErr } diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index aa3115adc405..6713a751c26a 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -16,6 +16,8 @@ import ( "unsafe" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" + "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" @@ -185,6 +187,7 @@ func (ex *connExecutor) makePreparedPortal( if ex.sessionData().MultipleActivePortalsEnabled { telemetry.Inc(sqltelemetry.StmtsTriedWithPausablePortals) portal.pauseInfo = &portalPauseInfo{} + portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{} portal.portalPausablity = PausablePortal } return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) @@ -209,22 +212,157 @@ func (p *PreparedPortal) close( ) { prepStmtsNamespaceMemAcc.Shrink(ctx, p.size(portalName)) p.Stmt.decRef(ctx) + if p.pauseInfo != nil { + p.pauseInfo.cleanupAll() + p.pauseInfo = nil + } } func (p *PreparedPortal) size(portalName string) int64 { return int64(uintptr(len(portalName)) + unsafe.Sizeof(p)) } +func (p *PreparedPortal) isPausable() bool { + return p.pauseInfo != nil +} + +// cleanupFuncStack stores cleanup functions for a portal. The clean-up +// functions are added during the first-time execution of a portal. When the +// first-time execution is finished, we mark isComplete to true. +type cleanupFuncStack struct { + stack []namedFunc + isComplete bool +} + +func (n *cleanupFuncStack) appendFunc(f namedFunc) { + n.stack = append(n.stack, f) +} + +func (n *cleanupFuncStack) run() { + for i := 0; i < len(n.stack); i++ { + n.stack[i].f() + } + *n = cleanupFuncStack{} +} + +// namedFunc is function with name, which makes the debugging easier. It is +// used just for clean up functions of a pausable portal. +type namedFunc struct { + fName string + f func() +} + +// instrumentationHelperWrapper wraps the instrumentation helper. +// We need to maintain it for a paused portal. +type instrumentationHelperWrapper struct { + ctx context.Context + ih instrumentationHelper +} + // portalPauseInfo stores info that enables the pause of a portal. After pausing // the portal, execute any other statement, and come back to re-execute it or // close it. type portalPauseInfo struct { - // outputTypes are the types of the result columns produced by the physical plan. - // We need this as when re-executing the portal, we are reusing the flow - // with the new receiver, but not re-generating the physical plan. - outputTypes []*types.T - // We need to store the flow for a portal so that when re-executing it, we - // continue from the previous execution. It lives along with the portal, and - // will be cleaned-up when the portal is closed. - flow flowinfra.Flow + // curRes is the result channel of the current (or last, if the portal is + // closing) portal execution. It is assumed to be a pgwire.limitedCommandResult. + curRes RestrictedCommandResult + // The following 4 structs store information to persist for a portal's + // execution, as well as cleanup functions to be called when the portal is + // closed. These structs correspond to a function call chain for a query's + // execution: + // - connExecutor.execPortal() + // - connExecutor.execStmtInOpenStateCleanup() + // - connExecutor.dispatchToExecutionEngine() + // - DistSQLPlanner.Run() + // + // Each struct has two main parts: + // 1. Fields that need to be retained for a resumption of a portal's execution. + // 2. A cleanup function stack that will be called when the portal's execution + // has to be terminated or when the portal is to be closed. Each stack is + // defined as a closure in its corresponding function. + // + // When closing a portal, we need to follow the reverse order of its execution, + // which means running the cleanup functions of the four structs in the + // following order: + // - exhaustPortal.cleanup + // - execStmtInOpenState.cleanup + // - dispatchToExecutionEngine.cleanup + // - resumableFlow.cleanup + // + // If an error occurs in any of these functions, we run the cleanup functions of + // this layer and its children layers, and propagate the error to the parent + // layer. For example, if an error occurs in execStmtInOpenStateCleanup(), we + // run the cleanup functions in the following order: + // - execStmtInOpenState.cleanup + // - dispatchToExecutionEngine.cleanup + // - resumableFlow.cleanup + // + // When exiting connExecutor.execStmtInOpenState(), we finally run the + // exhaustPortal.cleanup function in connExecutor.execPortal(). + exhaustPortal struct { + cleanup cleanupFuncStack + } + + // TODO(sql-session): replace certain fields here with planner. + // https://github.com/cockroachdb/cockroach/issues/99625 + execStmtInOpenState struct { + spCtx context.Context + // queryID stores the id of the query that this portal bound to. When we re-execute + // an existing portal, we should use the same query id. + queryID clusterunique.ID + // ihWrapper stores the instrumentation helper that should be reused for + // each execution of the portal. + ihWrapper *instrumentationHelperWrapper + // cancelQueryFunc will be called to cancel the context of the query when + // the portal is closed. + cancelQueryFunc context.CancelFunc + // cancelQueryCtx is the context to be canceled when closing the portal. + cancelQueryCtx context.Context + cleanup cleanupFuncStack + } + + // TODO(sql-session): replace certain fields here with planner. + // https://github.com/cockroachdb/cockroach/issues/99625 + dispatchToExecutionEngine struct { + // rowsAffected is the number of rows queried with this portal. It is + // accumulated from each portal execution, and will be logged when this portal + // is closed. + rowsAffected int + // stmtFingerprintID is the fingerprint id of the underlying statement. + stmtFingerprintID appstatspb.StmtFingerprintID + // planTop collects the properties of the current plan being prepared. + // We reuse it when re-executing the portal. + // TODO(yuzefovich): consider refactoring distSQLFlowInfos from planTop to + // avoid storing the planTop here. + planTop planTop + // queryStats stores statistics on query execution. It is incremented for + // each execution of the portal. + queryStats *topLevelQueryStats + cleanup cleanupFuncStack + } + + resumableFlow struct { + // We need to store the flow for a portal so that when re-executing it, we + // continue from the previous execution. It lives along with the portal, and + // will be cleaned-up when the portal is closed. + flow flowinfra.Flow + // outputTypes are the types of the result columns produced by the physical plan. + // We need this as when re-executing the portal, we are reusing the flow + // with the new receiver, but not re-generating the physical plan. + outputTypes []*types.T + cleanup cleanupFuncStack + } +} + +// cleanupAll is to run all the cleanup layers. +func (pm *portalPauseInfo) cleanupAll() { + pm.resumableFlow.cleanup.run() + pm.dispatchToExecutionEngine.cleanup.run() + pm.execStmtInOpenState.cleanup.run() + pm.exhaustPortal.cleanup.run() +} + +// isQueryIDSet returns true if the query id for the portal is set. +func (pm *portalPauseInfo) isQueryIDSet() bool { + return !pm.execStmtInOpenState.queryID.Equal(clusterunique.ID{}.Uint128) } From e445917e845b06ccbbf85a8ac0dcee461e182ba6 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Tue, 28 Mar 2023 00:52:54 -0500 Subject: [PATCH 4/8] sql: store retErr and retPayload for pausable portals When executing or cleaning up a pausable portal, we may encounter an error and need to run the corresponding clean-up steps, where we need to check the latest `retErr` and `retPayload` rather than the ones evaluated when creating the cleanup functions. To address this, we use portal.pauseInfo.retErr and .retPayload to record the latest error and payload. They need to be updated for each execution. Specifically, 1. If the error happens during portal execution, we ensure `portal.pauseInfo` records the error by adding the following code to the main body of `execStmtInOpenState()`: ``` go defer func() { updateRetErrAndPayload(retErr, retPayload) }() ``` Note this defer should always happen _after_ the defer of running the cleanups. 2. If the error occurs during a certain cleanup step for the pausable portal, we ensure that cleanup steps after it can see the error by always having `updateRetErrAndPayload(retErr, retPayload)` run at the end of each cleanup step. Release note: None --- pkg/sql/conn_executor_exec.go | 37 ++++++++++++++++++++++++++++++++++- pkg/sql/prepared_stmt.go | 7 +++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index afd21af818d3..68d8ff70b07d 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -285,6 +285,17 @@ func (ex *connExecutor) execStmtInOpenState( // underlying statement contains sub/post queries. Thus, we should evaluate // whether a portal is pausable when executing the cleanup step. isPausablePortal := func() bool { return portal != nil && portal.isPausable() } + // updateRetErrAndPayload ensures that the latest event payload and error is + // always recorded by portal.pauseInfo. + // TODO(janexing): add test for this. + updateRetErrAndPayload := func(err error, payload fsm.EventPayload) { + retPayload = payload + retErr = err + if isPausablePortal() { + portal.pauseInfo.execStmtInOpenState.retPayload = payload + portal.pauseInfo.execStmtInOpenState.retErr = err + } + } // For pausable portals, we delay the clean-up until closing the portal by // adding the function to the execStmtInOpenStateCleanup. // Otherwise, perform the clean-up step within every execution. @@ -294,7 +305,12 @@ func (ex *connExecutor) execStmtInOpenState( } else if !portal.pauseInfo.execStmtInOpenState.cleanup.isComplete { portal.pauseInfo.execStmtInOpenState.cleanup.appendFunc(namedFunc{ fName: fName, - f: f, + f: func() { + f() + // Some cleanup steps modify the retErr and retPayload. We need to + // ensure that cleanup after them can see the update. + updateRetErrAndPayload(retErr, retPayload) + }, }) } } @@ -306,12 +322,23 @@ func (ex *connExecutor) execStmtInOpenState( } // If there's any error, do the cleanup right here. if (retErr != nil || payloadHasError(retPayload)) && isPausablePortal() { + updateRetErrAndPayload(retErr, retPayload) portal.pauseInfo.resumableFlow.cleanup.run() portal.pauseInfo.dispatchToExecutionEngine.cleanup.run() portal.pauseInfo.execStmtInOpenState.cleanup.run() } }() + // We need this part so that when we check if we need to increment the count + // of executed stmt, we are checking the latest error and payload. Otherwise, + // we would be checking the ones evaluated at the portal's first-time + // execution. + defer func() { + if isPausablePortal() { + updateRetErrAndPayload(retErr, retPayload) + } + }() + ast := parserStmt.AST var sp *tracing.Span if !isPausablePortal() || !portal.pauseInfo.execStmtInOpenState.cleanup.isComplete { @@ -393,6 +420,12 @@ func (ex *connExecutor) execStmtInOpenState( processCleanupFunc( "increment executed stmt cnt", func() { + // We need to check the latest errors rather than the ones evaluated + // when this function is created. + if isPausablePortal() { + retErr = portal.pauseInfo.execStmtInOpenState.retErr + retPayload = portal.pauseInfo.execStmtInOpenState.retPayload + } if retErr == nil && !payloadHasError(retPayload) { ex.incrementExecutedStmtCounter(ast) } @@ -628,6 +661,8 @@ func (ex *connExecutor) execStmtInOpenState( if isPausablePortal() { ihToFinish = &portal.pauseInfo.execStmtInOpenState.ihWrapper.ih curRes = portal.pauseInfo.curRes + retErr = portal.pauseInfo.execStmtInOpenState.retErr + retPayload = portal.pauseInfo.execStmtInOpenState.retPayload } retErr = ihToFinish.Finish( ex.server.cfg, diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index 6713a751c26a..6f8cea3469c2 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/fsm" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" ) @@ -318,6 +319,12 @@ type portalPauseInfo struct { cancelQueryFunc context.CancelFunc // cancelQueryCtx is the context to be canceled when closing the portal. cancelQueryCtx context.Context + // retPayload is needed for the cleanup steps as we will have to check the + // latest encountered error, so this field should be updated for each execution. + retPayload fsm.EventPayload + // retErr is needed for the cleanup steps as we will have to check the latest + // encountered error, so this field should be updated for each execution. + retErr error cleanup cleanupFuncStack } From 71f6bd3518f384f3784f6818ceb18c038df84fcd Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Sun, 12 Mar 2023 22:41:57 -0400 Subject: [PATCH 5/8] sql: add restrictions to pausable portals This commit adds several restrictions to pausable portals to ensure that they work properly with the current changes to the consumer-receiver model. Specifically, pausable portals must meet the following criteria: 1. Not be internal queries 2. Be read-only queries 3. Not contain sub-queries or post-queries 4. Only use local plans These restrictions are necessary because the current changes to the consumer-receiver model only consider the local push-based case. Release note: None --- pkg/sql/conn_executor.go | 6 ++++++ pkg/sql/conn_executor_exec.go | 25 ++++++++++++++++++++++++- pkg/sql/conn_io.go | 10 ++++++++++ pkg/sql/pgwire/command_result.go | 13 +++++++++++++ pkg/sql/prepared_stmt.go | 15 +++++++++++---- pkg/sql/sem/tree/stmt.go | 24 ++++++++++++++++++++++++ pkg/sql/sqltelemetry/pgwire.go | 12 ++++++++++++ 7 files changed, 100 insertions(+), 5 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index ae2d55648178..896455d1e5e3 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2195,6 +2195,12 @@ func (ex *connExecutor) execCmd() (retErr error) { Values: portal.Qargs, } + // If this is the first-time execution of a portal without a limit set, + // it means all rows will be exhausted, so no need to pause this portal. + if tcmd.Limit == 0 && portal.pauseInfo != nil && portal.pauseInfo.curRes == nil { + portal.pauseInfo = nil + } + stmtRes := ex.clientComm.CreateStatementResult( portal.Stmt.AST, // The client is using the extended protocol, so no row description is diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 68d8ff70b07d..3f4bc0f0d037 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1565,9 +1565,32 @@ func (ex *connExecutor) dispatchToExecutionEngine( } ex.sessionTracing.TracePlanCheckStart(ctx) + + distSQLMode := ex.sessionData().DistSQLMode + if planner.pausablePortal != nil { + if len(planner.curPlan.subqueryPlans) == 0 && + len(planner.curPlan.cascades) == 0 && + len(planner.curPlan.checkPlans) == 0 { + // We only allow non-distributed plan for pausable portals. + distSQLMode = sessiondatapb.DistSQLOff + } else { + telemetry.Inc(sqltelemetry.SubOrPostQueryStmtsTriedWithPausablePortals) + // We don't allow sub / post queries for pausable portal. Set it back to an + // un-pausable (normal) portal. + // With pauseInfo is nil, no cleanup function will be added to the stack + // and all clean-up steps will be performed as for normal portals. + planner.pausablePortal.pauseInfo = nil + // We need this so that the result consumption for this portal cannot be + // paused either. + if err := res.RevokePortalPausability(); err != nil { + res.SetError(err) + return nil + } + } + } distributePlan := getPlanDistribution( ctx, planner.Descriptors().HasUncommittedTypes(), - ex.sessionData().DistSQLMode, planner.curPlan.main, + distSQLMode, planner.curPlan.main, ) ex.sessionTracing.TracePlanCheckEnd(ctx, nil, distributePlan.WillDistribute()) diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index 7656884005e5..ab76542887e5 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -813,6 +813,11 @@ type RestrictedCommandResult interface { // released yet. It should be used only in clean-up stages of a pausable // portal. ErrAllowReleased() error + + // RevokePortalPausability is to make a portal un-pausable. It is called when + // we find the underlying query is not supported for a pausable portal. + // This method is implemented only by pgwire.limitedCommandResult. + RevokePortalPausability() error } // DescribeResult represents the result of a Describe command (for either @@ -986,6 +991,11 @@ func (r *streamingCommandResult) ErrAllowReleased() error { return r.err } +// RevokePortalPausability is part of the sql.RestrictedCommandResult interface. +func (r *streamingCommandResult) RevokePortalPausability() error { + return errors.AssertionFailedf("forPausablePortal is for limitedCommandResult only") +} + // SetColumns is part of the RestrictedCommandResult interface. func (r *streamingCommandResult) SetColumns(ctx context.Context, cols colinfo.ResultColumns) { // The interface allows for cols to be nil, yet the iterator result expects diff --git a/pkg/sql/pgwire/command_result.go b/pkg/sql/pgwire/command_result.go index 459fd27bbfaa..254e589ea2d0 100644 --- a/pkg/sql/pgwire/command_result.go +++ b/pkg/sql/pgwire/command_result.go @@ -118,6 +118,11 @@ type paramStatusUpdate struct { var _ sql.CommandResult = &commandResult{} +// RevokePortalPausability is part of the sql.RestrictedCommandResult interface. +func (r *commandResult) RevokePortalPausability() error { + return errors.AssertionFailedf("RevokePortalPausability is only implemented by limitedCommandResult only") +} + // Close is part of the sql.RestrictedCommandResult interface. func (r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndicator) { r.assertNotReleased() @@ -478,6 +483,8 @@ type limitedCommandResult struct { portalPausablity sql.PortalPausablity } +var _ sql.RestrictedCommandResult = &limitedCommandResult{} + // AddRow is part of the sql.RestrictedCommandResult interface. func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) error { if err := r.commandResult.AddRow(ctx, row); err != nil { @@ -507,6 +514,12 @@ func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) erro return nil } +// RevokePortalPausability is part of the sql.RestrictedCommandResult interface. +func (r *limitedCommandResult) RevokePortalPausability() error { + r.portalPausablity = sql.NotPausablePortalForUnsupportedStmt + return nil +} + // SupportsAddBatch is part of the sql.RestrictedCommandResult interface. // TODO(yuzefovich): implement limiting behavior for AddBatch. func (r *limitedCommandResult) SupportsAddBatch() bool { diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index 6f8cea3469c2..16ca739a4f55 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -185,11 +185,18 @@ func (ex *connExecutor) makePreparedPortal( OutFormats: outFormats, } - if ex.sessionData().MultipleActivePortalsEnabled { + if ex.sessionData().MultipleActivePortalsEnabled && ex.executorType != executorTypeInternal { telemetry.Inc(sqltelemetry.StmtsTriedWithPausablePortals) - portal.pauseInfo = &portalPauseInfo{} - portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{} - portal.portalPausablity = PausablePortal + if tree.IsAllowedToPause(stmt.AST) { + portal.pauseInfo = &portalPauseInfo{} + portal.pauseInfo.dispatchToExecutionEngine.queryStats = &topLevelQueryStats{} + portal.portalPausablity = PausablePortal + } else { + telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals) + // We have set the session variable multiple_active_portals_enabled to + // true, but we don't support the underlying query for a pausable portal. + portal.portalPausablity = NotPausablePortalForUnsupportedStmt + } } return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) } diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index 723fb01f5b3d..686ccd1d1b0c 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -121,6 +121,30 @@ type canModifySchema interface { modifiesSchema() bool } +// IsAllowedToPause returns true if the stmt cannot either modify the schema or +// write data. +// This function is to gate the queries allowed for pausable portals. +// TODO(janexing): We should be more accurate about the stmt selection here. +// Now we only allow SELECT, but is it too strict? And how to filter out +// SELECT with data writes / schema changes? +func IsAllowedToPause(stmt Statement) bool { + if stmt != nil && !CanModifySchema(stmt) && !CanWriteData(stmt) { + switch t := stmt.(type) { + case *Select: + if t.With != nil { + ctes := t.With.CTEList + for _, cte := range ctes { + if !IsAllowedToPause(cte.Stmt) { + return false + } + } + } + return true + } + } + return false +} + // CanModifySchema returns true if the statement can modify // the database schema. func CanModifySchema(stmt Statement) bool { diff --git a/pkg/sql/sqltelemetry/pgwire.go b/pkg/sql/sqltelemetry/pgwire.go index 3cacbfad4cdd..fd766c88bbba 100644 --- a/pkg/sql/sqltelemetry/pgwire.go +++ b/pkg/sql/sqltelemetry/pgwire.go @@ -74,3 +74,15 @@ var FlushRequestCounter = telemetry.GetCounterOnce("pgwire.command.flush") // multiple_active_portals_enabled has been set to true. // The statement might not satisfy the restriction for a pausable portal. var StmtsTriedWithPausablePortals = telemetry.GetCounterOnce("pgwire.pausable_portal_stmts") + +// NotReadOnlyStmtsTriedWithPausablePortals is to be incremented every time +// there's a not-internal not-read-only statement executed with a pgwire portal +// and the session variable multiple_active_portals_enabled has been set to true. +// In this case the execution cannot be paused. +var NotReadOnlyStmtsTriedWithPausablePortals = telemetry.GetCounterOnce("pgwire.pausable_portal_not_read_only_stmts") + +// SubOrPostQueryStmtsTriedWithPausablePortals is to be incremented every time +// there's a not-internal statement with post or sub queries executed with a +// pgwire portal and the session variable multiple_active_portals_enabled has +// been set to true. In this case the execution cannot be paused. +var SubOrPostQueryStmtsTriedWithPausablePortals = telemetry.GetCounterOnce("pgwire.pausable_portal_stmts_with_sub_or_post_queries") From e0c6ced3a994e29173d7d326f9d3e57bec472a74 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Mon, 3 Apr 2023 15:39:38 -0400 Subject: [PATCH 6/8] sql: correctly set inner plan for leafTxn when resuming flow for portal When resuming a portal, we always reset the planner. However we still need the planner to respect the outer txn's situation, as we did in #98120. Release note: None --- pkg/sql/planner.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 2434b2246ba9..e6076bd1dc27 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -283,7 +283,11 @@ func (p *planner) resumeFlowForPausablePortal(recv *DistSQLReceiver) error { } recv.discardRows = p.instrumentation.ShouldDiscardRows() recv.outputTypes = p.pausablePortal.pauseInfo.resumableFlow.outputTypes - p.pausablePortal.pauseInfo.resumableFlow.flow.Resume(recv) + flow := p.pausablePortal.pauseInfo.resumableFlow.flow + finishedSetupFn, cleanup := getFinishedSetupFn(p) + finishedSetupFn(flow) + defer cleanup() + flow.Resume(recv) return recv.commErr } From 20292aaa133fc2cbce5ef46c9915a0ac4c775c9d Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Thu, 6 Apr 2023 16:21:00 -0400 Subject: [PATCH 7/8] pgwire: add tests for multiple active portals Release note: None --- .../testdata/logic_test/information_schema | 4 +- .../testdata/pgtest/multiple_active_portals | 1250 +++++++++++++++++ pkg/sql/pgwire/testdata/pgtest/portals | 68 +- pkg/sql/pgwire/testdata/pgtest/portals_crbugs | 83 -- 4 files changed, 1282 insertions(+), 123 deletions(-) create mode 100644 pkg/sql/pgwire/testdata/pgtest/multiple_active_portals diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index ec2c19bd3d8c..47d4154fc759 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -5174,7 +5174,7 @@ WHERE 'use_declarative_schema_changer', 'distsql_workmem', 'copy_fast_path_enabled', - 'direct_columnar_scans_enabled' + 'direct_columnar_scans_enabled', 'multiple_active_portals_enabled' ); ---- @@ -5208,7 +5208,6 @@ default_transaction_read_only off default_transaction_use_follower_reads off default_with_oids off descriptor_validation on -direct_columnar_scans_enabled on disable_drop_tenant off disable_hoist_projection_in_join_limitation off disable_partially_distributed_plans off @@ -5267,7 +5266,6 @@ lock_timeout 0 log_timezone UTC max_identifier_length 128 max_index_keys 32 -multiple_active_portals_enabled off node_id 1 null_ordered_last off on_update_rehome_row_enabled on diff --git a/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals b/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals new file mode 100644 index 000000000000..2f77613a307f --- /dev/null +++ b/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals @@ -0,0 +1,1250 @@ +send crdb_only +Query {"String": "SET multiple_active_portals_enabled = true"} +---- + +until crdb_only ignore=NoticeResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest select_from_individual_resources + +send +Query {"String": "BEGIN"} +Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(1,20)"} +Parse {"Name": "q2", "Query": "SELECT * FROM generate_series(1,20)"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Sync +---- + +until +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Query {"String": "COMMIT"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"COMMIT"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end + + +subtest select_from_same_table + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "BEGIN"} +Query {"String": "CREATE TABLE mytable (x int)"} +Query {"String": "INSERT INTO mytable VALUES (1),(2),(3)"} +Parse {"Name": "q1", "Query": "SELECT * FROM mytable"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Parse {"Name": "q2", "Query": "SELECT * FROM mytable"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Sync +---- + + +until +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"CommandComplete","CommandTag":"INSERT 0 3"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +subtest end + + +subtest bind_to_an_existing_active_portal + +send +Parse {"Name": "q3", "Query": "SELECT * FROM mytable"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q3"} +Execute {"Portal": "p2", "MaxRows": 2} +Sync +---- + + +until keepErrMessage +ErrorResponse +---- +{"Type":"ParseComplete"} +{"Type":"ErrorResponse","Code":"42P03","Message":"portal \"p2\" already exists"} + +send +Query {"String": "COMMIT"} +Sync +---- + +# Rollback +until +ReadyForQuery +ReadyForQuery +---- +{"Type":"ReadyForQuery","TxStatus":"E"} +{"Type":"CommandComplete","CommandTag":"ROLLBACK"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end + + +subtest not_in_explicit_transaction + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(1,20)"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Parse {"Name": "q2", "Query": "SELECT * FROM generate_series(1,20)"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Sync +---- + + +until +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Execute {"Portal": "p2", "MaxRows": 2} +Sync +---- + +# p2 doesn't exist, as it is closed when the implicit txn is committed. +until keepErrMessage +ErrorResponse +ReadyForQuery +---- +{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"p2\""} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end + + +subtest drop_table_when_there_are_dependent_active_portals + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "BEGIN"} +Query {"String": "CREATE TABLE mytable (x int)"} +Query {"String": "INSERT INTO mytable VALUES (1),(2),(3)"} +Parse {"Name": "q1", "Query": "SELECT * FROM mytable"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Sync +---- + + +until +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"CommandComplete","CommandTag":"INSERT 0 3"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Query {"String": "DROP TABLE mytable"} +---- + + +until noncrdb_only keepErrMessage +ErrorResponse +ReadyForQuery +---- +{"Type":"ErrorResponse","Code":"55006","Message":"cannot DROP TABLE \"mytable\" because it is being used by active queries in this session"} +{"Type":"ReadyForQuery","TxStatus":"E"} + +# For cursor we have `cannot run schema change in a transaction with open DECLARE cursors`. +# We should have something similar for portals as well. +# https://github.com/cockroachdb/cockroach/issues/99085 +until crdb_only +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Query {"String": "COMMIT"} +---- + +until noncrdb_only +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"ROLLBACK"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +until crdb_only +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"COMMIT"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end + + +subtest different_portals_bind_to_the_same_statement + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "BEGIN"} +Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(1,20)"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Sync +---- + +until +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Execute {"Portal": "p1", "MaxRows": 1} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Query {"String": "COMMIT"} +Sync +---- + +until +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"COMMIT"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end + + +subtest more_complicated_stmts + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "BEGIN; DROP TABLE IF EXISTS ta; DROP TABLE IF EXISTS tb; CREATE TABLE ta (x int, y int); CREATE TABLE tb (x int, z int); INSERT INTO ta VALUES (1,1), (2,2), (3,3), (4,4); INSERT INTO tb VALUES (1,2), (2,3), (3,4), (4,5)"} +Parse {"Name": "q1", "Query": "SELECT z as myz FROM ta JOIN tb ON ta.x = tb.x ORDER BY myz"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 2} +Sync +---- + +until +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"CommandComplete","CommandTag":"INSERT 0 4"} +{"Type":"CommandComplete","CommandTag":"INSERT 0 4"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"3"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"3"}]} +{"Type":"DataRow","Values":[{"text":"4"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Query {"String": "COMMIT"} +Sync +---- + +until +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"COMMIT"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end + + +subtest not_supported_statements + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send crdb_only +Query {"String": "BEGIN; DROP TABLE IF EXISTS t; CREATE TABLE t (x int)"} +Parse {"Name": "q1", "Query": "WITH t AS (INSERT INTO t(x) VALUES (1), (2), (3) RETURNING x) SELECT * FROM t;"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Sync +---- + +until crdb_only keepErrMessage +ReadyForQuery +ErrorResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries"} +{"Type":"ReadyForQuery","TxStatus":"E"} + +send crdb_only +Query {"String": "COMMIT"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"ROLLBACK"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send crdb_only +Query {"String": "BEGIN"} +Parse {"Name": "q1", "Query": "SELECT EXISTS (SELECT 1 FROM crdb_internal.zones WHERE target = 'hello')"} +Parse {"Name": "q2", "Query": "SELECT EXISTS (SELECT 1 FROM crdb_internal.zones WHERE target = 'hello')"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Sync +---- + +until crdb_only keepErrMessage +ReadyForQuery +ErrorResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"f"}]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries"} +{"Type":"ReadyForQuery","TxStatus":"E"} + +send crdb_only +Query {"String": "COMMIT"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"ROLLBACK"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send crdb_only +Query {"String": "BEGIN; DROP TABLE IF EXISTS t; CREATE TABLE t (x int); INSERT INTO t VALUES (1), (2), (3)"} +Parse {"Name": "q1", "Query": "UPDATE t SET x = 10 WHERE true RETURNING x;"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Sync +---- + +until crdb_only keepErrMessage +ReadyForQuery +ErrorResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"CommandComplete","CommandTag":"INSERT 0 3"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"10"}]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries"} +{"Type":"ReadyForQuery","TxStatus":"E"} + +send crdb_only +Query {"String": "COMMIT"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"ROLLBACK"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send crdb_only +Query {"String": "BEGIN; DROP TABLE IF EXISTS t; CREATE TABLE t (x int)"} +Parse {"Name": "q1", "Query": "INSERT INTO t VALUES (1), (2), (3) RETURNING x;"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Sync +---- + + +until crdb_only keepErrMessage +ReadyForQuery +ErrorResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"CommandComplete","CommandTag":"DROP TABLE"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries"} +{"Type":"ReadyForQuery","TxStatus":"E"} + +send crdb_only +Query {"String": "COMMIT"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"ROLLBACK"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end + + +subtest query_timeout +# https://github.com/cockroachdb/cockroach/issues/99140 + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "BEGIN"} +Query {"String": "SET statement_timeout='2s'"} +Parse {"Name": "q1", "Query": "SELECT i, pg_sleep(0.5) FROM generate_series(1, 6) AS g(i)"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Sync +---- + +# The output for pg_sleep differ between cockroach and postgres: +# https://github.com/cockroachdb/cockroach/issues/98913 +until crdb_only keepErrMessage +ReadyForQuery +ReadyForQuery +ErrorResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"},{"text":"t"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"},{"text":"t"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"3"},{"text":"t"}]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"57014","Message":"query execution canceled due to statement timeout"} +{"Type":"ReadyForQuery","TxStatus":"E"} + +until noncrdb_only keepErrMessage +ReadyForQuery +ReadyForQuery +ErrorResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"},null]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"},null]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"3"},null]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"4"},null]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"57014","Message":"canceling statement due to statement timeout"} +{"Type":"ReadyForQuery","TxStatus":"E"} + +send +Query {"String": "COMMIT"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"ROLLBACK"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# timeout test with another query interleaving the portal execution. +send +Query {"String": "SET statement_timeout='2s'"} +Parse {"Name": "q1", "Query": "SELECT i, pg_sleep(0.5) FROM generate_series(1, 6) AS g(i)"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p1", "MaxRows": 1} +Query {"String": "SELECT pg_sleep(2)"} +Execute {"Portal": "p1", "MaxRows": 1} +Sync +---- + +until crdb_only keepErrMessage +ReadyForQuery +ErrorResponse +ReadyForQuery +ErrorResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"},{"text":"t"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"},{"text":"t"}]} +{"Type":"PortalSuspended"} +{"Type":"RowDescription","Fields":[{"Name":"pg_sleep","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":16,"DataTypeSize":1,"TypeModifier":-1,"Format":0}]} +{"Type":"ErrorResponse","Code":"57014","Message":"query execution canceled due to statement timeout"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"p1\""} +{"Type":"ReadyForQuery","TxStatus":"I"} + +until noncrdb_only keepErrMessage +ReadyForQuery +ErrorResponse +ReadyForQuery +ErrorResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"},null]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"},null]} +{"Type":"PortalSuspended"} +{"Type":"RowDescription","Fields":[{"Name":"pg_sleep","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":2278,"DataTypeSize":4,"TypeModifier":-1,"Format":0}]} +{"Type":"ErrorResponse","Code":"57014","Message":"canceling statement due to statement timeout"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ErrorResponse","Code":"34000","Message":"portal \"p1\" does not exist"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "RESET statement_timeout"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"RESET"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end + +subtest cancel_query_bug + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send crdb_only +Query {"String": "BEGIN"} +Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(11,21)"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Sync +---- + +until crdb_only +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"11"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +# We check the status of q1, cancel it, and recheck the status. +send crdb_only +Query {"String": "WITH x AS (SHOW CLUSTER STATEMENTS) SELECT query, phase FROM x WHERE query = 'SELECT * FROM ROWS FROM (generate_series(11, 21))'"} +Query {"String": "CANCEL QUERY (WITH x AS (SHOW CLUSTER STATEMENTS) SELECT query_id FROM x WHERE query = 'SELECT * FROM ROWS FROM (generate_series(11, 21))');"} +Query {"String": "WITH x AS (SHOW CLUSTER STATEMENTS) SELECT query, phase FROM x WHERE query = 'SELECT * FROM ROWS FROM (generate_series(11, 21))'"} +Sync +---- + +# TODO(janexing): the query should have been cancelled but it still shows +# `executing` status. It should be in `cancelled` status. +until crdb_only ignore=RowDescription +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +---- +{"Type":"DataRow","Values":[{"text":"SELECT * FROM ROWS FROM (generate_series(11, 21))"},{"text":"executing"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"CommandComplete","CommandTag":"CANCEL QUERIES 1"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"DataRow","Values":[{"text":"SELECT * FROM ROWS FROM (generate_series(11, 21))"},{"text":"executing"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 1"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +# q1 has been cancelled, so portals bound to it cannot be further executed. +send crdb_only +Execute {"Portal": "p1", "MaxRows": 1} +Sync +---- + +until crdb_only ignore=RowDescription keepErrMessage +ErrorResponse +ReadyForQuery +---- +{"Type":"ErrorResponse","Code":"57014","Message":"query execution canceled"} +{"Type":"ReadyForQuery","TxStatus":"E"} + +send crdb_only +Query {"String": "COMMIT"} +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"ROLLBACK"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end + + +subtest ingest_non_retriable_error + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# We force a non-retriable error here, and check that in this case we close +# all pausable portals. +send crdb_only +Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(1,20)"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Query {"String": "SELECT crdb_internal.force_error('','foo')"} +Execute {"Portal": "p1", "MaxRows": 1} +Sync +---- + +until crdb_only keepErrMessage ignore=RowDescription +ErrorResponse +ReadyForQuery +ErrorResponse +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"XXUUU","Message":"foo"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"p1\""} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end + + +subtest interleave_with_unpausable_portal + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "BEGIN; CREATE TABLE t (x int); INSERT INTO t VALUES (1), (2), (3)"} +Parse {"Name": "q1", "Query": "SELECT * FROM t"} +Parse {"Name": "q2", "Query": "UPDATE t SET x = 10 RETURNING x"} +Parse {"Name": "q3", "Query": "INSERT INTO t VALUES (5), (6), (7)"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Bind {"DestinationPortal": "p3", "PreparedStatement": "q3"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p3", "MaxRows": 1} +Sync +---- + +until keepErrMessage +ReadyForQuery +ErrorResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"CommandComplete","CommandTag":"INSERT 0 3"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"ParseComplete"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"10"}]} +{"Type":"PortalSuspended"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: the statement for a pausable portal must be a read-only SELECT query with no sub-queries or post-queries"} +{"Type":"ReadyForQuery","TxStatus":"E"} + +send +Query {"String": "COMMIT"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"ROLLBACK"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end + + +subtest pausable_portals_with_virtual_tables + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "BEGIN; CREATE TABLE a (id int UNIQUE, name TEXT); CREATE TABLE t1(a int primary key, b int); CREATE TABLE t2(a int primary key, b int); CREATE TABLE t3(a int primary key, b int); CREATE TABLE t4(a int primary key, b int); CREATE TABLE t5(a int primary key, b int); CREATE TABLE t6(a int primary key, b int); CREATE TABLE t7(a int primary key, b int); CREATE TABLE t8(a int primary key, b int);"} +Parse {"Name": "q", "Query": "SELECT a.attname, format_type(a.atttypid, a.atttypmod), pg_get_expr(d.adbin, d.adrelid), a.attnotnull, a.atttypid, a.atttypmod FROM pg_attribute a LEFT JOIN pg_attrdef d ON a.attrelid = d.adrelid AND a.attnum = d.adnum WHERE a.attrelid = 'a'::regclass AND a.attnum > 0 AND NOT a.attisdropped ORDER BY a.attnum;"} +Bind {"DestinationPortal": "p", "PreparedStatement": "q"} +Execute {"Portal": "p", "MaxRows": 1} +Sync +---- + +until +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"id"},{"text":"bigint"},null,{"text":"f"},{"text":"20"},{"text":"-1"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Execute {"Portal": "p", "MaxRows": 1} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"DataRow","Values":[{"text":"name"},{"text":"text"},null,{"text":"f"},{"text":"25"},{"text":"-1"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Parse {"Name": "q2", "Query": "SELECT a.attname AS column_name, NOT (a.attnotnull OR ((t.typtype = 'd') AND t.typnotnull)) AS is_nullable, pg_get_expr(ad.adbin, ad.adrelid) AS column_default FROM pg_attribute AS a LEFT JOIN pg_attrdef AS ad ON (a.attrelid = ad.adrelid) AND (a.attnum = ad.adnum) JOIN pg_type AS t ON a.atttypid = t.oid JOIN pg_class AS c ON a.attrelid = c.oid JOIN pg_namespace AS n ON c.relnamespace = n.oid WHERE ( ( (c.relkind IN ('f', 'm', 'p', 'r', 'v')) AND (c.relname ILIKE '%t%') ) AND (n.nspname NOT IN ('pg_catalog', 'pg_toast')) ) AND pg_table_is_visible(c.oid) ORDER BY 1, 2;"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Execute {"Portal": "p2", "MaxRows": 3} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]} +{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]} +{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Execute {"Portal": "p", "MaxRows": 1} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"DataRow","Values":[{"text":"rowid"},{"text":"bigint"},{"text":"unique_rowid()"},{"text":"t"},{"text":"20"},{"text":"-1"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Execute {"Portal": "p2", "MaxRows": 5} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]} +{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]} +{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]} +{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]} +{"Type":"DataRow","Values":[{"text":"a"},{"text":"f"},null]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send crdb_only +Execute {"Portal": "p2"} +Sync +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"DataRow","Values":[{"text":"auth_name"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"auth_srid"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"b"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"coord_dimension"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"f_geometry_column"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"f_table_catalog"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"f_table_name"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"f_table_schema"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"proj4text"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"rowid"},{"text":"f"},{"text":"unique_rowid()"}]} +{"Type":"DataRow","Values":[{"text":"rowid"},{"text":"f"},{"text":"unique_rowid()"}]} +{"Type":"DataRow","Values":[{"text":"srid"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"srid"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"srtext"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"type"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"x"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"x"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"y"},{"text":"t"},null]} +{"Type":"DataRow","Values":[{"text":"z"},{"text":"t"},null]} +{"Type":"CommandComplete","CommandTag":"SELECT 26"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Parse {"Name": "q3", "Query": "WITH RECURSIVE cte(a, b) AS (SELECT 0, 0 UNION ALL SELECT a+1, b+10 FROM cte WHERE a < 5) SELECT * FROM cte;"} +Parse {"Name": "q4", "Query": "WITH RECURSIVE cte(a, b) AS (SELECT 0, 0 UNION ALL SELECT a+1, b+10 FROM cte WHERE a < 5) SELECT * FROM cte;"} +Bind {"DestinationPortal": "p3", "PreparedStatement": "q3"} +Bind {"DestinationPortal": "p4", "PreparedStatement": "q4"} +Execute {"Portal": "p3", "MaxRows": 1} +Execute {"Portal": "p4", "MaxRows": 1} +Execute {"Portal": "p4", "MaxRows": 1} +Execute {"Portal": "p3", "MaxRows": 1} +Sync +---- + +until crdb_only +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"0"},{"text":"0"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"0"},{"text":"0"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"1"},{"text":"10"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"1"},{"text":"10"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Query {"String": "COMMIT"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"COMMIT"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end + + +subtest deallocating_prepared_stmt_should_not_interrupt_portal_execution + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "BEGIN"} +Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(1,20)"} +Parse {"Name": "q2", "Query": "SELECT * FROM generate_series(1,20)"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Query {"String": "DEALLOCATE q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Execute {"Portal": "p2", "MaxRows": 1} +Sync +---- + +until +ReadyForQuery +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"CommandComplete","CommandTag":"DEALLOCATE"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Query {"String": "COMMIT"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"COMMIT"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end + + +subtest close_conn_executor_without_committing + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "BEGIN"} +Parse {"Name": "q1", "Query": "SELECT * FROM generate_series(1,5)"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Parse {"Name": "q2", "Query": "SELECT * FROM generate_series(8,10)"} +Bind {"DestinationPortal": "p2", "PreparedStatement": "q2"} +Execute {"Portal": "p2", "MaxRows": 1} +Execute {"Portal": "p1"} +Sync +---- + +until +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"8"}]} +{"Type":"PortalSuspended"} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"DataRow","Values":[{"text":"3"}]} +{"Type":"DataRow","Values":[{"text":"4"}]} +{"Type":"DataRow","Values":[{"text":"5"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 4"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +subtest end diff --git a/pkg/sql/pgwire/testdata/pgtest/portals b/pkg/sql/pgwire/testdata/pgtest/portals index 3f0b9a472bc9..8e8f57c1c606 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals +++ b/pkg/sql/pgwire/testdata/pgtest/portals @@ -21,11 +21,11 @@ Execute Sync ---- -until +until keepErrMessage ErrorResponse ReadyForQuery ---- -{"Type":"ErrorResponse","Code":"34000"} +{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"\""} {"Type":"ReadyForQuery","TxStatus":"I"} # Verify that closing a bound portal prevents execution. @@ -38,14 +38,14 @@ Execute {"Portal": "p"} Sync ---- -until +until keepErrMessage ErrorResponse ReadyForQuery ---- {"Type":"ParseComplete"} {"Type":"BindComplete"} {"Type":"CloseComplete"} -{"Type":"ErrorResponse","Code":"34000"} +{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"p\""} {"Type":"ReadyForQuery","TxStatus":"I"} # The spec says that closing a prepared statement also closes its portals, @@ -337,11 +337,11 @@ Execute Sync ---- -until +until keepErrMessage ErrorResponse ReadyForQuery ---- -{"Type":"ErrorResponse","Code":"34000"} +{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"\""} {"Type":"ReadyForQuery","TxStatus":"I"} send @@ -384,12 +384,12 @@ Execute Sync ---- -until +until keepErrMessage ErrorResponse ReadyForQuery ---- {"Type":"CloseComplete"} -{"Type":"ErrorResponse","Code":"34000"} +{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"\""} {"Type":"ReadyForQuery","TxStatus":"E"} send @@ -928,6 +928,16 @@ ReadyForQuery {"Type":"CommandComplete","CommandTag":"BEGIN"} {"Type":"ReadyForQuery","TxStatus":"T"} +send crdb_only +Query {"String": "SET multiple_active_portals_enabled = true"} +---- + +until crdb_only ignore=NoticeResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"T"} + # 'S' for Statement # 49 = ASCII '1' # ParameterFormatCodes = [0] for text format @@ -959,7 +969,7 @@ Execute {"Portal": "por"} Sync ---- -until noncrdb_only +until ReadyForQuery ---- {"Type":"ParseComplete"} @@ -971,33 +981,17 @@ ReadyForQuery {"Type":"CommandComplete","CommandTag":"SELECT 2"} {"Type":"ReadyForQuery","TxStatus":"T"} -until crdb_only -ErrorResponse -ReadyForQuery ----- -{"Type":"ErrorResponse","Code":"0A000"} -{"Type":"ReadyForQuery","TxStatus":"E"} - -send noncrdb_only +send Execute {"Portal": "pc4"} Sync ---- -until noncrdb_only +until ReadyForQuery ---- {"Type":"CommandComplete","CommandTag":"COMMIT"} {"Type":"ReadyForQuery","TxStatus":"I"} -send crdb_only -Query {"String": "ROLLBACK"} ----- - -until crdb_only -ReadyForQuery ----- -{"Type":"CommandComplete","CommandTag":"ROLLBACK"} -{"Type":"ReadyForQuery","TxStatus":"I"} # Test that we can use a portal with MaxRows from an implicit transaction. send @@ -1185,7 +1179,7 @@ Execute Sync ---- -until ignore_constraint_name +until keepErrMessage ignore_constraint_name ErrorResponse ReadyForQuery ---- @@ -1194,7 +1188,7 @@ ReadyForQuery {"Type":"CommandComplete","CommandTag":"INSERT 0 1"} {"Type":"BindComplete"} {"Type":"NoData"} -{"Type":"ErrorResponse","Code":"23514"} +{"Type":"ErrorResponse","Code":"23514","Message":"failed to satisfy CHECK constraint (a \u003e 1.0:::FLOAT8)"} {"Type":"ReadyForQuery","TxStatus":"I"} send @@ -1230,11 +1224,11 @@ Execute {"Portal": "p16"} Sync ---- -until +until keepErrMessage ErrorResponse ReadyForQuery ---- -{"Type":"ErrorResponse","Code":"34000"} +{"Type":"ErrorResponse","Code":"34000","Message":"unknown portal \"p16\""} {"Type":"ReadyForQuery","TxStatus":"I"} # Verify that it's not possible to DECLARE a cursor of the same name as an open @@ -1265,7 +1259,7 @@ Execute Sync ---- -until +until keepErrMessage ErrorResponse ReadyForQuery ---- @@ -1275,7 +1269,7 @@ ReadyForQuery {"Type":"BindComplete"} {"Type":"ParseComplete"} {"Type":"BindComplete"} -{"Type":"ErrorResponse","Code":"42P03"} +{"Type":"ErrorResponse","Code":"42P03","Message":"cursor \"p\" already exists as portal"} {"Type":"ReadyForQuery","TxStatus":"E"} send @@ -1309,13 +1303,13 @@ Execute {"Portal": "fnord"} Sync ---- -until +until keepErrMessage ErrorResponse ReadyForQuery ---- {"Type":"ParseComplete"} {"Type":"BindComplete"} -{"Type":"ErrorResponse","Code":"42P03"} +{"Type":"ErrorResponse","Code":"42P03","Message":"cursor \"fnord\" already exists as portal"} {"Type":"ReadyForQuery","TxStatus":"E"} # Test the converse case: ensure that a portal cannot be created with the same @@ -1364,12 +1358,12 @@ Execute {"Portal": "sqlcursor"} Sync ---- -until +until keepErrMessage ErrorResponse ReadyForQuery ---- {"Type":"ParseComplete"} -{"Type":"ErrorResponse","Code":"42P03"} +{"Type":"ErrorResponse","Code":"42P03","Message":"portal \"sqlcursor\" already exists as cursor"} {"Type":"ReadyForQuery","TxStatus":"E"} send diff --git a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs index 14211a72b413..8c682a98e330 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs +++ b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs @@ -5,89 +5,6 @@ only crdb ---- -# More behavior that differs from postgres. Try executing a new query -# when a portal is suspended. Cockroach errors. - -send -Query {"String": "BEGIN"} -Parse {"Query": "SELECT * FROM generate_series(1, 2)"} -Bind -Execute {"MaxRows": 1} -Query {"String": "SELECT 1"} -Sync ----- - -until keepErrMessage -ReadyForQuery -ErrorResponse -ReadyForQuery -ReadyForQuery ----- -{"Type":"CommandComplete","CommandTag":"BEGIN"} -{"Type":"ReadyForQuery","TxStatus":"T"} -{"Type":"ParseComplete"} -{"Type":"BindComplete"} -{"Type":"DataRow","Values":[{"text":"1"}]} -{"Type":"PortalSuspended"} -{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported, please set session variable multiple_active_portals_enabled to true. Note: this feature is in preview"} -{"Type":"ReadyForQuery","TxStatus":"E"} -{"Type":"ReadyForQuery","TxStatus":"E"} - -send -Query {"String": "ROLLBACK"} -Query {"String": "SELECT 'here'"} ----- - -until ignore=RowDescription -ReadyForQuery -ReadyForQuery ----- -{"Type":"CommandComplete","CommandTag":"ROLLBACK"} -{"Type":"ReadyForQuery","TxStatus":"I"} -{"Type":"DataRow","Values":[{"text":"here"}]} -{"Type":"CommandComplete","CommandTag":"SELECT 1"} -{"Type":"ReadyForQuery","TxStatus":"I"} - -# Also try binding another portal during suspension. - -send -Query {"String": "BEGIN"} -Parse {"Query": "SELECT * FROM generate_series(1, 2)"} -Bind -Execute {"MaxRows": 1} -Bind -Sync ----- - -until keepErrMessage -ReadyForQuery -ErrorResponse -ReadyForQuery ----- -{"Type":"CommandComplete","CommandTag":"BEGIN"} -{"Type":"ReadyForQuery","TxStatus":"T"} -{"Type":"ParseComplete"} -{"Type":"BindComplete"} -{"Type":"DataRow","Values":[{"text":"1"}]} -{"Type":"PortalSuspended"} -{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported, please set session variable multiple_active_portals_enabled to true. Note: this feature is in preview"} -{"Type":"ReadyForQuery","TxStatus":"E"} - -send -Query {"String": "ROLLBACK"} -Query {"String": "SELECT 'here'"} ----- - -until ignore=RowDescription -ReadyForQuery -ReadyForQuery ----- -{"Type":"CommandComplete","CommandTag":"ROLLBACK"} -{"Type":"ReadyForQuery","TxStatus":"I"} -{"Type":"DataRow","Values":[{"text":"here"}]} -{"Type":"CommandComplete","CommandTag":"SELECT 1"} -{"Type":"ReadyForQuery","TxStatus":"I"} - ############################################################################## # Deviations from Postgres in how we handle portals' suspension and attempts # # to execute exhausted portals. # From 9da2397a326fcb4d7950fb526c52ab6fde692fec Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Thu, 6 Apr 2023 14:40:22 -0400 Subject: [PATCH 8/8] sql: disable multiple active portals for TestDistSQLReceiverDrainsMeta. We now only support multiple active portals with local plan, so explicitly disable it for this test for now. Release note: None --- pkg/sql/distsql_running_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index 0dcf5fe0d5dd..9e04579e7c9a 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -639,6 +639,14 @@ func TestDistSQLReceiverDrainsMeta(t *testing.T) { p, err := pgtest.NewPGTest(ctx, tc.Server(0).ServingSQLAddr(), username.RootUser) require.NoError(t, err) + // We disable multiple active portals here as it only supports local-only plan. + // TODO(sql-sessions): remove this line when we finish + // https://github.com/cockroachdb/cockroach/issues/100822. + require.NoError(t, p.SendOneLine(`Query {"String": "SET multiple_active_portals_enabled = false"}`)) + until := pgtest.ParseMessages("ReadyForQuery") + _, err = p.Until(false /* keepErrMsg */, until...) + require.NoError(t, err) + // Execute the test query asking for at most 25 rows. require.NoError(t, p.SendOneLine(`Query {"String": "USE test"}`)) require.NoError(t, p.SendOneLine(fmt.Sprintf(`Parse {"Query": "%s"}`, testQuery))) @@ -649,7 +657,7 @@ func TestDistSQLReceiverDrainsMeta(t *testing.T) { // Retrieve all of the results. We need to receive until two 'ReadyForQuery' // messages are returned (the first one for "USE test" query and the second // one is for the limited portal execution). - until := pgtest.ParseMessages("ReadyForQuery\nReadyForQuery") + until = pgtest.ParseMessages("ReadyForQuery\nReadyForQuery") msgs, err := p.Until(false /* keepErrMsg */, until...) require.NoError(t, err) received := pgtest.MsgsToJSONWithIgnore(msgs, &datadriven.TestData{})