-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
util: support spill offset into disk when spilling #34212
Merged
ti-chi-bot
merged 11 commits into
pingcap:master
from
wshwsh12:spill-offset-for-list-in-disk
May 5, 2022
Merged
Changes from 8 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
bfd34ab
temp
wshwsh12 9118c71
fix test
wshwsh12 95f9735
Merge remote-tracking branch 'upstream/master' into spill-offset-for-…
wshwsh12 0bb0cc5
rename
wshwsh12 593d888
Merge branch 'master' into spill-offset-for-list-in-disk
hawkingrei 1d0b30b
Merge branch 'master' into spill-offset-for-list-in-disk
wshwsh12 a5f296b
Merge branch 'master' into spill-offset-for-list-in-disk
hawkingrei 1cedc90
address comments
wshwsh12 b636337
address comments
wshwsh12 e4564f7
Merge branch 'master' into spill-offset-for-list-in-disk
ti-chi-bot 9239cf1
Merge branch 'master' into spill-offset-for-list-in-disk
ti-chi-bot File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,6 @@ import ( | |
"io" | ||
"os" | ||
"strconv" | ||
"sync" | ||
|
||
errors2 "github.com/pingcap/errors" | ||
"github.com/pingcap/tidb/config" | ||
|
@@ -32,27 +31,74 @@ import ( | |
|
||
// ListInDisk represents a slice of chunks storing in temporary disk. | ||
type ListInDisk struct { | ||
fieldTypes []*types.FieldType | ||
// offsets stores the offsets in disk of all RowPtr, | ||
// the offset of one RowPtr is offsets[RowPtr.ChkIdx][RowPtr.RowIdx]. | ||
offsets [][]int64 | ||
fieldTypes []*types.FieldType | ||
numRowForEachChunk []int | ||
numRowPrefixSum []int // indicates the offsets of each chunk's offsets in OffsetFile. | ||
numRowsInDisk int | ||
diskTracker *disk.Tracker // track disk usage. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. useless empty line |
||
dataFile diskFileReaderWriter | ||
offsetFile diskFileReaderWriter | ||
} | ||
|
||
// diskFileReaderWriter represents a Reader and a Writer for the temporary disk file, | ||
// without considering the detail of checksum and encryption. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cipherWriter is only enabled when ... |
||
type diskFileReaderWriter struct { | ||
disk *os.File | ||
w io.WriteCloser | ||
// offWrite is the current offset for writing. | ||
offWrite int64 | ||
|
||
disk *os.File | ||
w io.WriteCloser | ||
bufFlushMutex sync.RWMutex | ||
diskTracker *disk.Tracker // track disk usage. | ||
numRowsInDisk int | ||
|
||
checksumWriter *checksum.Writer | ||
cipherWriter *encrypt.Writer | ||
|
||
// ctrCipher stores the key and nonce using by aes encrypt io layer | ||
ctrCipher *encrypt.CtrCipher | ||
} | ||
|
||
func (l *diskFileReaderWriter) initWithFileName(fileName string) (err error) { | ||
l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, fileName) | ||
if err != nil { | ||
return errors2.Trace(err) | ||
} | ||
var underlying io.WriteCloser = l.disk | ||
if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext { | ||
// The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr" | ||
l.ctrCipher, err = encrypt.NewCtrCipher() | ||
if err != nil { | ||
return | ||
} | ||
l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher) | ||
underlying = l.cipherWriter | ||
} | ||
l.checksumWriter = checksum.NewWriter(underlying) | ||
l.w = l.checksumWriter | ||
return | ||
} | ||
|
||
func (l *diskFileReaderWriter) getReader() io.ReaderAt { | ||
var underlying io.ReaderAt = l.disk | ||
if l.ctrCipher != nil { | ||
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) | ||
} | ||
if l.checksumWriter != nil { | ||
underlying = NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) | ||
} | ||
return underlying | ||
} | ||
|
||
func (l *diskFileReaderWriter) getSectionReader(off int64) *io.SectionReader { | ||
checksumReader := l.getReader() | ||
r := io.NewSectionReader(checksumReader, off, l.offWrite-off) | ||
return r | ||
} | ||
|
||
func (l *diskFileReaderWriter) getWriter() io.Writer { | ||
return l.w | ||
} | ||
|
||
var defaultChunkListInDiskPath = "chunk.ListInDisk" | ||
var defaultChunkListInDiskOffsetPath = "chunk.ListInDiskOffset" | ||
|
||
// NewListInDisk creates a new ListInDisk with field types. | ||
func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk { | ||
|
@@ -69,23 +115,11 @@ func (l *ListInDisk) initDiskFile() (err error) { | |
if err != nil { | ||
return | ||
} | ||
l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, defaultChunkListInDiskPath+strconv.Itoa(l.diskTracker.Label())) | ||
err = l.dataFile.initWithFileName(defaultChunkListInDiskPath + strconv.Itoa(l.diskTracker.Label())) | ||
if err != nil { | ||
return errors2.Trace(err) | ||
} | ||
var underlying io.WriteCloser = l.disk | ||
if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext { | ||
// The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr" | ||
l.ctrCipher, err = encrypt.NewCtrCipher() | ||
if err != nil { | ||
return | ||
} | ||
l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher) | ||
underlying = l.cipherWriter | ||
return | ||
} | ||
l.checksumWriter = checksum.NewWriter(underlying) | ||
l.w = l.checksumWriter | ||
l.bufFlushMutex = sync.RWMutex{} | ||
err = l.offsetFile.initWithFileName(defaultChunkListInDiskOffsetPath + strconv.Itoa(l.diskTracker.Label())) | ||
return | ||
} | ||
|
||
|
@@ -101,34 +135,45 @@ func (l *ListInDisk) GetDiskTracker() *disk.Tracker { | |
|
||
// Add adds a chunk to the ListInDisk. Caller must make sure the input chk | ||
// is not empty and not used any more and has the same field types. | ||
// Warning: do not mix Add and GetRow (always use GetRow after you have added all the chunks), and do not use Add concurrently. | ||
// Warning: Do not use Add concurrently. | ||
func (l *ListInDisk) Add(chk *Chunk) (err error) { | ||
if chk.NumRows() == 0 { | ||
return errors2.New("chunk appended to List should have at least 1 row") | ||
} | ||
if l.disk == nil { | ||
if l.dataFile.disk == nil { | ||
err = l.initDiskFile() | ||
if err != nil { | ||
return | ||
} | ||
} | ||
chk2 := chunkInDisk{Chunk: chk, offWrite: l.offWrite} | ||
n, err := chk2.WriteTo(l.w) | ||
l.offWrite += n | ||
// Append data | ||
chkInDisk := chunkInDisk{Chunk: chk, offWrite: l.dataFile.offWrite} | ||
n, err := chkInDisk.WriteTo(l.dataFile.getWriter()) | ||
l.dataFile.offWrite += n | ||
if err != nil { | ||
return | ||
} | ||
l.offsets = append(l.offsets, chk2.getOffsetsOfRows()) | ||
l.diskTracker.Consume(n) | ||
|
||
// Append offsets | ||
offsetsOfRows := chkInDisk.getOffsetsOfRows() | ||
l.numRowForEachChunk = append(l.numRowForEachChunk, len(offsetsOfRows)) | ||
l.numRowPrefixSum = append(l.numRowPrefixSum, l.numRowsInDisk) | ||
n2, err := offsetsOfRows.WriteTo(l.offsetFile.getWriter()) | ||
l.offsetFile.offWrite += n2 | ||
if err != nil { | ||
return | ||
} | ||
|
||
l.diskTracker.Consume(n + n2) | ||
l.numRowsInDisk += chk.NumRows() | ||
return | ||
} | ||
|
||
// GetChunk gets a Chunk from the ListInDisk by chkIdx. | ||
func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) { | ||
chk := NewChunkWithCapacity(l.fieldTypes, l.NumRowsOfChunk(chkIdx)) | ||
offsets := l.offsets[chkIdx] | ||
for rowIdx := range offsets { | ||
chkSize := l.numRowForEachChunk[chkIdx] | ||
for rowIdx := 0; rowIdx < chkSize; rowIdx++ { | ||
row, err := l.GetRow(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) | ||
if err != nil { | ||
return chk, err | ||
|
@@ -140,16 +185,11 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) { | |
|
||
// GetRow gets a Row from the ListInDisk by RowPtr. | ||
func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { | ||
off, err := l.getOffset(ptr.ChkIdx, ptr.RowIdx) | ||
if err != nil { | ||
return | ||
} | ||
off := l.offsets[ptr.ChkIdx][ptr.RowIdx] | ||
var underlying io.ReaderAt = l.disk | ||
if l.ctrCipher != nil { | ||
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) | ||
} | ||
checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) | ||
r := io.NewSectionReader(checksumReader, off, l.offWrite-off) | ||
r := l.dataFile.getSectionReader(off) | ||
format := rowInDisk{numCol: len(l.fieldTypes)} | ||
_, err = format.ReadFrom(r) | ||
if err != nil { | ||
|
@@ -159,22 +199,40 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { | |
return row, err | ||
} | ||
|
||
func (l *ListInDisk) getOffset(chkIdx uint32, rowIdx uint32) (int64, error) { | ||
offsetInOffsetFile := l.numRowPrefixSum[chkIdx] + int(rowIdx) | ||
b := make([]byte, 8) | ||
reader := l.offsetFile.getSectionReader(int64(offsetInOffsetFile) * 8) | ||
n, err := io.ReadFull(reader, b) | ||
if err != nil { | ||
return 0, err | ||
} | ||
if n != 8 { | ||
return 0, errors2.New("The file spilled is broken, can not get data offset from the disk") | ||
} | ||
return bytesToI64Slice(b)[0], nil | ||
} | ||
|
||
// NumRowsOfChunk returns the number of rows of a chunk in the ListInDisk. | ||
func (l *ListInDisk) NumRowsOfChunk(chkID int) int { | ||
return len(l.offsets[chkID]) | ||
return l.numRowForEachChunk[chkID] | ||
} | ||
|
||
// NumChunks returns the number of chunks in the ListInDisk. | ||
func (l *ListInDisk) NumChunks() int { | ||
return len(l.offsets) | ||
return len(l.numRowForEachChunk) | ||
} | ||
|
||
// Close releases the disk resource. | ||
func (l *ListInDisk) Close() error { | ||
if l.disk != nil { | ||
if l.dataFile.disk != nil { | ||
l.diskTracker.Consume(-l.diskTracker.BytesConsumed()) | ||
terror.Call(l.disk.Close) | ||
terror.Log(os.Remove(l.disk.Name())) | ||
terror.Call(l.dataFile.disk.Close) | ||
terror.Log(os.Remove(l.dataFile.disk.Name())) | ||
} | ||
if l.offsetFile.disk != nil { | ||
terror.Call(l.offsetFile.disk.Close) | ||
terror.Log(os.Remove(l.offsetFile.disk.Name())) | ||
} | ||
return nil | ||
} | ||
|
@@ -198,7 +256,15 @@ type chunkInDisk struct { | |
// offWrite is the current offset for writing. | ||
offWrite int64 | ||
// offsetsOfRows stores the offset of each row. | ||
offsetsOfRows []int64 | ||
offsetsOfRows offsetsOfRows | ||
} | ||
|
||
type offsetsOfRows []int64 | ||
|
||
// WriteTo serializes the offsetsOfRow, and writes to w. | ||
func (off offsetsOfRows) WriteTo(w io.Writer) (written int64, err error) { | ||
n, err := w.Write(i64SliceToBytes(off)) | ||
return int64(n), err | ||
} | ||
|
||
// WriteTo serializes the chunk into the format of chunkInDisk, and | ||
|
@@ -222,7 +288,7 @@ func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) { | |
} | ||
|
||
// getOffsetsOfRows gets the offset of each row. | ||
func (chk *chunkInDisk) getOffsetsOfRows() []int64 { return chk.offsetsOfRows } | ||
func (chk *chunkInDisk) getOffsetsOfRows() offsetsOfRows { return chk.offsetsOfRows } | ||
|
||
// rowInDisk represents a Row in format of diskFormatRow. | ||
type rowInDisk struct { | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
numRowsOfEachChunk
rowNumOfEachChunkFirstRow
totalNumRows