Skip to content

Commit

Permalink
Merge pull request vitessio#4592 from systay/split-clone-close-readers
Browse files Browse the repository at this point in the history
Close result readers correctly
  • Loading branch information
sougou authored Feb 6, 2019
2 parents 9094685 + add6111 commit 634b176
Showing 1 changed file with 22 additions and 18 deletions.
40 changes: 22 additions & 18 deletions go/vt/worker/split_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,25 +912,32 @@ func mergeOrSingle(readers []ResultReader, td *tabletmanagerdatapb.TableDefiniti
return sourceReader, nil
}

func closeReaders(ctx context.Context, readers []ResultReader) {
for _, reader := range readers {
if reader != nil {
reader.Close(ctx)
}
}
}

func (scw *SplitCloneWorker) getSourceResultReader(ctx context.Context, td *tabletmanagerdatapb.TableDefinition, state StatusWorkerState, chunk chunk, txID int64) (ResultReader, error) {
sourceReaders := make([]ResultReader, len(scw.sourceShards))
var readers []ResultReader
defer func() {
for _, i := range readers {
i.Close(ctx)
}
}()

for shardIndex, si := range scw.sourceShards {
var sourceResultReader ResultReader
var err error
if state == WorkerStateCloneOffline && scw.useConsistentSnapshot {
var err error
if txID < 1 {
return nil, fmt.Errorf("tried using consistent snapshot without a valid transaction")
}
tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), scw.tabletType)
sourceResultReader, err = NewTransactionalRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, false, txID)
if err != nil {
closeReaders(ctx, sourceReaders)
return nil, fmt.Errorf("NewTransactionalRestartableResultReader for source: %v failed: %v", tp.description(), err)
}
} else {
var err error
var tp tabletProvider
allowMultipleRetries := true
if state == WorkerStateCloneOffline {
Expand All @@ -946,39 +953,36 @@ func (scw *SplitCloneWorker) getSourceResultReader(ctx context.Context, td *tabl
}
sourceResultReader, err = NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, allowMultipleRetries)
if err != nil {
closeReaders(ctx, sourceReaders)
return nil, fmt.Errorf("NewRestartableResultReader for source: %v failed: %v", tp.description(), err)
}
readers = append(readers, sourceResultReader)
}
sourceReaders[shardIndex] = sourceResultReader
}
resultReader, err := mergeOrSingle(sourceReaders, td)
if err == nil {
readers = readers[:0]
if err != nil {
closeReaders(ctx, sourceReaders)
return nil, err
}
return resultReader, err
}

func (scw *SplitCloneWorker) getDestinationResultReader(ctx context.Context, td *tabletmanagerdatapb.TableDefinition, state StatusWorkerState, chunk chunk) (ResultReader, error) {
destReaders := make([]ResultReader, len(scw.destinationShards))
var readers []ResultReader
defer func() {
for _, i := range readers {
i.Close(ctx)
}
}()

for shardIndex, si := range scw.destinationShards {
tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER)
destResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, true /* allowMultipleRetries */)
if err != nil {
closeReaders(ctx, destReaders)
return nil, fmt.Errorf("NewRestartableResultReader for destination: %v failed: %v", tp.description(), err)
}
destReaders[shardIndex] = destResultReader
}
resultReader, err := mergeOrSingle(destReaders, td)
if err == nil {
readers = readers[:0]
if err != nil {
closeReaders(ctx, destReaders)
return nil, err
}
return resultReader, err
}
Expand Down

0 comments on commit 634b176

Please sign in to comment.