Skip to content

Commit

Permalink
Merge pull request #41467 from asubiotto/backport19.2-41174
Browse files Browse the repository at this point in the history
release-19.2: colrpc: downgrade log messages from Error to Warning
  • Loading branch information
asubiotto committed Oct 9, 2019
2 parents 4cb25ab + 9e29761 commit 10179b7
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
12 changes: 9 additions & 3 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ func (i *Inbox) sendDrainSignal(ctx context.Context) error {
// It is safe to Send without holding the mutex because it is legal to call
// Send and Recv from different goroutines.
if err := i.streamMu.stream.Send(&execinfrapb.ConsumerSignal{DrainRequest: &execinfrapb.DrainRequest{}}); err != nil {
log.Warningf(ctx, "Inbox unable to send drain signal to Outbox: %+v", err)
if log.V(1) {
log.Warningf(ctx, "Inbox unable to send drain signal to Outbox: %+v", err)
}
return err
}
return nil
Expand Down Expand Up @@ -398,7 +400,9 @@ func (i *Inbox) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata {
defer i.closeLocked()

if err := i.maybeInitLocked(ctx); err != nil {
log.Warningf(ctx, "Inbox unable to initialize stream while draining metadata: %+v", err)
if log.V(1) {
log.Warningf(ctx, "Inbox unable to initialize stream while draining metadata: %+v", err)
}
return allMeta
}
if !drainSignalSent {
Expand All @@ -415,7 +419,9 @@ func (i *Inbox) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata {
if err == io.EOF {
break
}
log.Warningf(ctx, "Inbox Recv connection error while draining metadata: %+v", err)
if log.V(1) {
log.Warningf(ctx, "Inbox Recv connection error while draining metadata: %+v", err)
}
return allMeta
}
for _, remoteMeta := range msg.Data.Metadata {
Expand Down
14 changes: 10 additions & 4 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,14 @@ func (o *Outbox) handleStreamErr(
ctx context.Context, opName string, err error, cancelFn context.CancelFunc,
) {
if err == io.EOF {
log.Infof(ctx, "Outbox calling cancelFn after %s EOF", opName)
if log.V(1) {
log.Infof(ctx, "Outbox calling cancelFn after %s EOF", opName)
}
cancelFn()
} else {
log.Errorf(ctx, "Outbox %s connection error: %+v", opName, err)
if log.V(1) {
log.Warningf(ctx, "Outbox %s connection error: %+v", opName, err)
}
}
}

Expand Down Expand Up @@ -218,7 +222,7 @@ func (o *Outbox) sendBatches(
}

if err := execerror.CatchVectorizedRuntimeError(nextBatch); err != nil {
log.Errorf(ctx, "Outbox Next error: %+v", err)
log.Warningf(ctx, "Outbox Next error: %+v", err)
return false, err
}
if o.batch.Length() == 0 {
Expand Down Expand Up @@ -281,7 +285,9 @@ func (o *Outbox) runWithStream(
msg, err := stream.Recv()
if err != nil {
if err != io.EOF {
log.Errorf(ctx, "Outbox Recv connection error: %+v", err)
if log.V(1) {
log.Warningf(ctx, "Outbox Recv connection error: %+v", err)
}
}
break
}
Expand Down

0 comments on commit 10179b7

Please sign in to comment.