diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index 948c859bb9f9..cd2071623a0d 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -346,7 +346,9 @@ func (i *Inbox) sendDrainSignal(ctx context.Context) error { // It is safe to Send without holding the mutex because it is legal to call // Send and Recv from different goroutines. if err := i.streamMu.stream.Send(&execinfrapb.ConsumerSignal{DrainRequest: &execinfrapb.DrainRequest{}}); err != nil { - log.Warningf(ctx, "Inbox unable to send drain signal to Outbox: %+v", err) + if log.V(1) { + log.Warningf(ctx, "Inbox unable to send drain signal to Outbox: %+v", err) + } return err } return nil @@ -398,7 +400,9 @@ func (i *Inbox) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata { defer i.closeLocked() if err := i.maybeInitLocked(ctx); err != nil { - log.Warningf(ctx, "Inbox unable to initialize stream while draining metadata: %+v", err) + if log.V(1) { + log.Warningf(ctx, "Inbox unable to initialize stream while draining metadata: %+v", err) + } return allMeta } if !drainSignalSent { @@ -415,7 +419,9 @@ func (i *Inbox) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata { if err == io.EOF { break } - log.Warningf(ctx, "Inbox Recv connection error while draining metadata: %+v", err) + if log.V(1) { + log.Warningf(ctx, "Inbox Recv connection error while draining metadata: %+v", err) + } return allMeta } for _, remoteMeta := range msg.Data.Metadata { diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index 9e36e8b97a72..62738ef24257 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -172,10 +172,14 @@ func (o *Outbox) handleStreamErr( ctx context.Context, opName string, err error, cancelFn context.CancelFunc, ) { if err == io.EOF { - log.Infof(ctx, "Outbox calling cancelFn after %s EOF", opName) + if log.V(1) { + log.Infof(ctx, "Outbox calling cancelFn after %s EOF", opName) + } cancelFn() } else { - log.Errorf(ctx, "Outbox %s connection error: %+v", opName, err) + if log.V(1) { + log.Warningf(ctx, "Outbox %s connection error: %+v", opName, err) + } } } @@ -218,7 +222,7 @@ func (o *Outbox) sendBatches( } if err := execerror.CatchVectorizedRuntimeError(nextBatch); err != nil { - log.Errorf(ctx, "Outbox Next error: %+v", err) + log.Warningf(ctx, "Outbox Next error: %+v", err) return false, err } if o.batch.Length() == 0 { @@ -281,7 +285,9 @@ func (o *Outbox) runWithStream( msg, err := stream.Recv() if err != nil { if err != io.EOF { - log.Errorf(ctx, "Outbox Recv connection error: %+v", err) + if log.V(1) { + log.Warningf(ctx, "Outbox Recv connection error: %+v", err) + } } break }