Skip to content

Commit

Permalink
sql: make internal executor streaming
Browse files Browse the repository at this point in the history
This commit updates the internal executor to operate in a streaming
fashion by refactoring its internal logic to implement an iterator
pattern. A new method `QueryInternalEx` (and its counterpart
`QueryInternal`) is introduced (both not used currently) while all
existing methods of `InternalExecutor` interface are implemented
using the new iterator logic.

The communication between the iterator goroutine (the receiver) and the
connExecutor goroutine (the sender) is done via a buffered (of 32 size
in non-test setting) channel. The channel is closed when the
connExecutor goroutine exits its run() loop.

Care needs to be taken when closing the iterator - we need to make sure
to close the stmtBuf (so that there are no more commands for the
connExecutor goroutine to execute) and then we need to drain the channel
(since the connExecutor goroutine might be blocked on adding a row to
the channel). After that we have to wait for the connExecutor goroutine
to exit so that we can finish the tracing span. For convenience purposes,
if the iterator is fully exhausted, it will get closed automatically.

Release note: None
  • Loading branch information
yuzefovich committed Feb 8, 2021
1 parent 0e98670 commit 178239e
Show file tree
Hide file tree
Showing 7 changed files with 454 additions and 142 deletions.
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,23 @@ func (ie *wrappedInternalExecutor) QueryRow(
panic("not implemented")
}

func (ie *wrappedInternalExecutor) QueryIterator(
ctx context.Context, opName string, txn *kv.Txn, stmt string, qargs ...interface{},
) (sqlutil.InternalRows, error) {
panic("not implemented")
}

func (ie *wrappedInternalExecutor) QueryIteratorEx(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
stmt string,
qargs ...interface{},
) (sqlutil.InternalRows, error) {
panic("not implemented")
}

func (ie *wrappedInternalExecutor) getErrFunc() func(statement string) error {
ie.mu.RLock()
defer ie.mu.RUnlock()
Expand Down
52 changes: 19 additions & 33 deletions pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
buf, syncResults, finished, stopper, err := startConnExecutor(ctx)
buf, syncResults, finished, stopper, _, err := startConnExecutor(ctx)
if err != nil {
t.Fatal(err)
}
Expand All @@ -69,11 +69,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
// succeed and the 2nd one to fail (since the portal is destroyed after the
// Execute).
cmdPos := 0
stmt := mustParseOne("SELECT 1")
if err != nil {
t.Fatal(err)
}
if err = buf.Push(ctx, PrepareStmt{Name: "ps_nontxn", Statement: stmt}); err != nil {
if err = buf.Push(ctx, PrepareStmt{Name: "ps_nontxn", Statement: mustParseOne("SELECT 1")}); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -121,7 +117,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
if numResults != cmdPos+1 {
t.Fatalf("expected %d results, got: %d", cmdPos+1, len(results))
}
if err := results[successfulDescribePos].err; err != nil {
if err = results[successfulDescribePos].err; err != nil {
t.Fatalf("expected first Describe to succeed, got err: %s", err)
}
if !testutils.IsError(results[failedDescribePos].err, "unknown portal") {
Expand All @@ -134,20 +130,12 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
// after the COMMIT). The point of the SELECT is to show that the portal
// survives execution of a statement.
cmdPos++
stmt = mustParseOne("BEGIN")
if err != nil {
t.Fatal(err)
}
if err := buf.Push(ctx, ExecStmt{Statement: stmt}); err != nil {
if err = buf.Push(ctx, ExecStmt{Statement: mustParseOne("BEGIN")}); err != nil {
t.Fatal(err)
}

cmdPos++
stmt = mustParseOne("SELECT 1")
if err != nil {
t.Fatal(err)
}
if err = buf.Push(ctx, PrepareStmt{Name: "ps1", Statement: stmt}); err != nil {
if err = buf.Push(ctx, PrepareStmt{Name: "ps1", Statement: mustParseOne("SELECT 1")}); err != nil {
t.Fatal(err)
}

Expand All @@ -160,11 +148,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
}

cmdPos++
stmt = mustParseOne("SELECT 2")
if err != nil {
t.Fatal(err)
}
if err := buf.Push(ctx, ExecStmt{Statement: stmt}); err != nil {
if err = buf.Push(ctx, ExecStmt{Statement: mustParseOne("SELECT 2")}); err != nil {
t.Fatal(err)
}

Expand All @@ -178,11 +162,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
}

cmdPos++
stmt = mustParseOne("COMMIT")
if err != nil {
t.Fatal(err)
}
if err := buf.Push(ctx, ExecStmt{Statement: stmt}); err != nil {
if err = buf.Push(ctx, ExecStmt{Statement: mustParseOne("COMMIT")}); err != nil {
t.Fatal(err)
}

Expand All @@ -207,7 +187,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
t.Fatalf("expected %d results, got: %d", exp, len(results))
}
succDescIdx := successfulDescribePos - numResults
if err := results[succDescIdx].err; err != nil {
if err = results[succDescIdx].err; err != nil {
t.Fatalf("expected first Describe to succeed, got err: %s", err)
}
failDescIdx := failedDescribePos - numResults
Expand All @@ -216,7 +196,7 @@ func TestPortalsDestroyedOnTxnFinish(t *testing.T) {
}

buf.Close()
if err := <-finished; err != nil {
if err = <-finished; err != nil {
t.Fatal(err)
}
}
Expand All @@ -240,9 +220,13 @@ func mustParseOne(s string) parser.Statement {
// gets the error from closing down the executor once the StmtBuf is closed, a
// stopper that must be stopped when the test completes (this does not stop the
// executor but stops other background work).
//
// It also returns a channel that AddRow might block on which can buffer up to
// 16 items (including column types when applicable), so the caller might need
// to receive from it occasionally.
func startConnExecutor(
ctx context.Context,
) (*StmtBuf, <-chan []resWithPos, <-chan error, *stop.Stopper, error) {
) (*StmtBuf, <-chan []resWithPos, <-chan error, *stop.Stopper, <-chan ieIteratorResult, error) {
// A lot of boilerplate for creating a connExecutor.
stopper := stop.NewStopper()
clock := hlc.NewClock(hlc.UnixNano, 0 /* maxOffset */)
Expand All @@ -258,7 +242,7 @@ func startConnExecutor(
gw := gossip.MakeOptionalGossip(nil)
tempEngine, tempFS, err := storage.NewTempEngine(ctx, base.DefaultTestTempStorageConfig(st), base.DefaultTestStoreSpec)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
defer tempEngine.Close()
cfg := &ExecutorConfig{
Expand Down Expand Up @@ -305,16 +289,18 @@ func startConnExecutor(
s := NewServer(cfg, pool)
buf := NewStmtBuf()
syncResults := make(chan []resWithPos, 1)
iteratorCh := make(chan ieIteratorResult, 16)
var cc ClientComm = &internalClientComm{
sync: func(res []resWithPos) {
syncResults <- res
},
ch: iteratorCh,
}
sqlMetrics := MakeMemMetrics("test" /* endpoint */, time.Second /* histogramWindow */)

conn, err := s.SetupConn(ctx, SessionArgs{}, buf, cc, sqlMetrics)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
finished := make(chan error)

Expand All @@ -324,7 +310,7 @@ func startConnExecutor(
go func() {
finished <- s.ServeConn(ctx, conn, mon.BoundAccount{}, nil /* cancel */)
}()
return buf, syncResults, finished, stopper, nil
return buf, syncResults, finished, stopper, iteratorCh, nil
}

// Test that a client session can close without deadlocking when the closing
Expand Down
78 changes: 40 additions & 38 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,108 +858,110 @@ type resCloseType bool
const closed resCloseType = true
const discarded resCloseType = false

// bufferedCommandResult is a CommandResult that buffers rows and can call a
// provided callback when closed.
type bufferedCommandResult struct {
// streamingCommandResult is a CommandResult that streams rows on the channel
// and can call a provided callback when closed.
type streamingCommandResult struct {
ch chan ieIteratorResult
err error
rows []tree.Datums
rowsAffected int
cols colinfo.ResultColumns

// errOnly, if set, makes AddRow() panic. This can be used when the execution
// of the query is not expected to produce any results.
errOnly bool

// closeCallback, if set, is called when Close()/Discard() is called.
closeCallback func(*bufferedCommandResult, resCloseType, error)
closeCallback func(*streamingCommandResult, resCloseType)
}

var _ RestrictedCommandResult = &bufferedCommandResult{}
var _ CommandResultClose = &bufferedCommandResult{}
var _ RestrictedCommandResult = &streamingCommandResult{}
var _ CommandResultClose = &streamingCommandResult{}

// SetColumns is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) SetColumns(_ context.Context, cols colinfo.ResultColumns) {
if r.errOnly {
panic("SetColumns() called when errOnly is set")
}
r.cols = cols
func (r *streamingCommandResult) SetColumns(ctx context.Context, cols colinfo.ResultColumns) {
r.ch <- ieIteratorResult{cols: cols}
}

// BufferParamStatusUpdate is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) BufferParamStatusUpdate(key string, val string) {
func (r *streamingCommandResult) BufferParamStatusUpdate(key string, val string) {
panic("unimplemented")
}

// BufferNotice is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) BufferNotice(notice pgnotice.Notice) {
func (r *streamingCommandResult) BufferNotice(notice pgnotice.Notice) {
panic("unimplemented")
}

// ResetStmtType is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) ResetStmtType(stmt tree.Statement) {
func (r *streamingCommandResult) ResetStmtType(stmt tree.Statement) {
panic("unimplemented")
}

// AddRow is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) AddRow(ctx context.Context, row tree.Datums) error {
if r.errOnly {
panic("AddRow() called when errOnly is set")
}
func (r *streamingCommandResult) AddRow(ctx context.Context, row tree.Datums) error {
// AddRow() and IncrementRowsAffected() are never called on the same command
// result, so we will not double count the affected rows by an increment
// here.
r.rowsAffected++
rowCopy := make(tree.Datums, len(row))
copy(rowCopy, row)
r.rows = append(r.rows, rowCopy)
r.ch <- ieIteratorResult{row: rowCopy}
return nil
}

func (r *bufferedCommandResult) DisableBuffering() {
func (r *streamingCommandResult) DisableBuffering() {
panic("cannot disable buffering here")
}

// SetError is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) SetError(err error) {
func (r *streamingCommandResult) SetError(err error) {
r.err = err
// Note that we intentionally do not send the error on the channel (when it
// is present) since we might replace the error with another one later which
// is allowed by the interface. An example of this is queryDone() closure
// in execStmtInOpenState().
}

// Err is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) Err() error {
func (r *streamingCommandResult) Err() error {
return r.err
}

// IncrementRowsAffected is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) IncrementRowsAffected(n int) {
func (r *streamingCommandResult) IncrementRowsAffected(n int) {
r.rowsAffected += n
if r.ch != nil {
// streamingCommandResult might be used outside of the internal executor
// (i.e. not by rowsIterator) in which case the channel is not set.
r.ch <- ieIteratorResult{rowsAffectedIncrement: &n}
}
}

// RowsAffected is part of the RestrictedCommandResult interface.
func (r *bufferedCommandResult) RowsAffected() int {
func (r *streamingCommandResult) RowsAffected() int {
return r.rowsAffected
}

// Close is part of the CommandResultClose interface.
func (r *bufferedCommandResult) Close(context.Context, TransactionStatusIndicator) {
func (r *streamingCommandResult) Close(context.Context, TransactionStatusIndicator) {
if r.closeCallback != nil {
r.closeCallback(r, closed, nil /* err */)
r.closeCallback(r, closed)
}
}

// Discard is part of the CommandResult interface.
func (r *bufferedCommandResult) Discard() {
func (r *streamingCommandResult) Discard() {
if r.closeCallback != nil {
r.closeCallback(r, discarded, nil /* err */)
r.closeCallback(r, discarded)
}
}

// SetInferredTypes is part of the DescribeResult interface.
func (r *bufferedCommandResult) SetInferredTypes([]oid.Oid) {}
func (r *streamingCommandResult) SetInferredTypes([]oid.Oid) {}

// SetNoDataRowDescription is part of the DescribeResult interface.
func (r *bufferedCommandResult) SetNoDataRowDescription() {}
func (r *streamingCommandResult) SetNoDataRowDescription() {}

// SetPrepStmtOutput is part of the DescribeResult interface.
func (r *bufferedCommandResult) SetPrepStmtOutput(context.Context, colinfo.ResultColumns) {}
func (r *streamingCommandResult) SetPrepStmtOutput(context.Context, colinfo.ResultColumns) {}

// SetPortalOutput is part of the DescribeResult interface.
func (r *bufferedCommandResult) SetPortalOutput(
func (r *streamingCommandResult) SetPortalOutput(
context.Context, colinfo.ResultColumns, []pgwirebase.FormatCode,
) {
}
2 changes: 1 addition & 1 deletion pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (c *copyMachine) insertRows(ctx context.Context) (retErr error) {
return err
}

var res bufferedCommandResult
var res streamingCommandResult
err := c.execInsertPlan(ctx, &c.p, &res)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 178239e

Please sign in to comment.