Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: support spill offset into disk when spilling #34212

Merged
merged 11 commits into from
May 5, 2022
169 changes: 117 additions & 52 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"io"
"os"
"strconv"
"sync"

errors2 "github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
Expand All @@ -32,27 +31,73 @@ import (

// ListInDisk represents a slice of chunks storing in temporary disk.
type ListInDisk struct {
fieldTypes []*types.FieldType
// offsets stores the offsets in disk of all RowPtr,
// the offset of one RowPtr is offsets[RowPtr.ChkIdx][RowPtr.RowIdx].
offsets [][]int64
fieldTypes []*types.FieldType
numRowsOfEachChunk []int
rowNumOfEachChunkFirstRow []int
totalNumRows int
diskTracker *disk.Tracker // track disk usage.

dataFile diskFileReaderWriter
offsetFile diskFileReaderWriter
}

// diskFileReaderWriter represents a Reader and a Writer for the temporary disk file.
type diskFileReaderWriter struct {
disk *os.File
w io.WriteCloser
// offWrite is the current offset for writing.
offWrite int64

disk *os.File
w io.WriteCloser
bufFlushMutex sync.RWMutex
diskTracker *disk.Tracker // track disk usage.
numRowsInDisk int

checksumWriter *checksum.Writer
cipherWriter *encrypt.Writer
cipherWriter *encrypt.Writer // cipherWriter is only enable when config SpilledFileEncryptionMethod is "aes128-ctr"

// ctrCipher stores the key and nonce using by aes encrypt io layer
ctrCipher *encrypt.CtrCipher
}

func (l *diskFileReaderWriter) initWithFileName(fileName string) (err error) {
l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, fileName)
if err != nil {
return errors2.Trace(err)
}
var underlying io.WriteCloser = l.disk
if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext {
// The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr"
l.ctrCipher, err = encrypt.NewCtrCipher()
if err != nil {
return
}
l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher)
underlying = l.cipherWriter
}
l.checksumWriter = checksum.NewWriter(underlying)
l.w = l.checksumWriter
return
}

func (l *diskFileReaderWriter) getReader() io.ReaderAt {
var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
}
if l.checksumWriter != nil {
underlying = NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())
}
return underlying
}

func (l *diskFileReaderWriter) getSectionReader(off int64) *io.SectionReader {
checksumReader := l.getReader()
r := io.NewSectionReader(checksumReader, off, l.offWrite-off)
return r
}

func (l *diskFileReaderWriter) getWriter() io.Writer {
return l.w
}

var defaultChunkListInDiskPath = "chunk.ListInDisk"
var defaultChunkListInDiskOffsetPath = "chunk.ListInDiskOffset"

// NewListInDisk creates a new ListInDisk with field types.
func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk {
Expand All @@ -69,29 +114,17 @@ func (l *ListInDisk) initDiskFile() (err error) {
if err != nil {
return
}
l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, defaultChunkListInDiskPath+strconv.Itoa(l.diskTracker.Label()))
err = l.dataFile.initWithFileName(defaultChunkListInDiskPath + strconv.Itoa(l.diskTracker.Label()))
if err != nil {
return errors2.Trace(err)
}
var underlying io.WriteCloser = l.disk
if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext {
// The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr"
l.ctrCipher, err = encrypt.NewCtrCipher()
if err != nil {
return
}
l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher)
underlying = l.cipherWriter
return
}
l.checksumWriter = checksum.NewWriter(underlying)
l.w = l.checksumWriter
l.bufFlushMutex = sync.RWMutex{}
err = l.offsetFile.initWithFileName(defaultChunkListInDiskOffsetPath + strconv.Itoa(l.diskTracker.Label()))
return
}

// Len returns the number of rows in ListInDisk
func (l *ListInDisk) Len() int {
return l.numRowsInDisk
return l.totalNumRows
}

// GetDiskTracker returns the memory tracker of this List.
Expand All @@ -101,34 +134,45 @@ func (l *ListInDisk) GetDiskTracker() *disk.Tracker {

// Add adds a chunk to the ListInDisk. Caller must make sure the input chk
// is not empty and not used any more and has the same field types.
// Warning: do not mix Add and GetRow (always use GetRow after you have added all the chunks), and do not use Add concurrently.
// Warning: Do not use Add concurrently.
func (l *ListInDisk) Add(chk *Chunk) (err error) {
if chk.NumRows() == 0 {
return errors2.New("chunk appended to List should have at least 1 row")
}
if l.disk == nil {
if l.dataFile.disk == nil {
err = l.initDiskFile()
if err != nil {
return
}
}
chk2 := chunkInDisk{Chunk: chk, offWrite: l.offWrite}
n, err := chk2.WriteTo(l.w)
l.offWrite += n
// Append data
chkInDisk := chunkInDisk{Chunk: chk, offWrite: l.dataFile.offWrite}
n, err := chkInDisk.WriteTo(l.dataFile.getWriter())
l.dataFile.offWrite += n
if err != nil {
return
}
l.offsets = append(l.offsets, chk2.getOffsetsOfRows())
l.diskTracker.Consume(n)
l.numRowsInDisk += chk.NumRows()

// Append offsets
offsetsOfRows := chkInDisk.getOffsetsOfRows()
l.numRowsOfEachChunk = append(l.numRowsOfEachChunk, len(offsetsOfRows))
l.rowNumOfEachChunkFirstRow = append(l.rowNumOfEachChunkFirstRow, l.totalNumRows)
n2, err := offsetsOfRows.WriteTo(l.offsetFile.getWriter())
l.offsetFile.offWrite += n2
if err != nil {
return
}

l.diskTracker.Consume(n + n2)
l.totalNumRows += chk.NumRows()
return
}

// GetChunk gets a Chunk from the ListInDisk by chkIdx.
func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) {
chk := NewChunkWithCapacity(l.fieldTypes, l.NumRowsOfChunk(chkIdx))
offsets := l.offsets[chkIdx]
for rowIdx := range offsets {
chkSize := l.numRowsOfEachChunk[chkIdx]
for rowIdx := 0; rowIdx < chkSize; rowIdx++ {
row, err := l.GetRow(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
if err != nil {
return chk, err
Expand All @@ -140,16 +184,11 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) {

// GetRow gets a Row from the ListInDisk by RowPtr.
func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
off, err := l.getOffset(ptr.ChkIdx, ptr.RowIdx)
if err != nil {
return
}
off := l.offsets[ptr.ChkIdx][ptr.RowIdx]
var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
}
checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())
r := io.NewSectionReader(checksumReader, off, l.offWrite-off)
r := l.dataFile.getSectionReader(off)
format := rowInDisk{numCol: len(l.fieldTypes)}
_, err = format.ReadFrom(r)
if err != nil {
Expand All @@ -159,22 +198,40 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
return row, err
}

func (l *ListInDisk) getOffset(chkIdx uint32, rowIdx uint32) (int64, error) {
offsetInOffsetFile := l.rowNumOfEachChunkFirstRow[chkIdx] + int(rowIdx)
b := make([]byte, 8)
reader := l.offsetFile.getSectionReader(int64(offsetInOffsetFile) * 8)
n, err := io.ReadFull(reader, b)
if err != nil {
return 0, err
}
if n != 8 {
return 0, errors2.New("The file spilled is broken, can not get data offset from the disk")
}
return bytesToI64Slice(b)[0], nil
}

// NumRowsOfChunk returns the number of rows of a chunk in the ListInDisk.
func (l *ListInDisk) NumRowsOfChunk(chkID int) int {
return len(l.offsets[chkID])
return l.numRowsOfEachChunk[chkID]
}

// NumChunks returns the number of chunks in the ListInDisk.
func (l *ListInDisk) NumChunks() int {
return len(l.offsets)
return len(l.numRowsOfEachChunk)
}

// Close releases the disk resource.
func (l *ListInDisk) Close() error {
if l.disk != nil {
if l.dataFile.disk != nil {
l.diskTracker.Consume(-l.diskTracker.BytesConsumed())
terror.Call(l.disk.Close)
terror.Log(os.Remove(l.disk.Name()))
terror.Call(l.dataFile.disk.Close)
terror.Log(os.Remove(l.dataFile.disk.Name()))
}
if l.offsetFile.disk != nil {
terror.Call(l.offsetFile.disk.Close)
terror.Log(os.Remove(l.offsetFile.disk.Name()))
}
return nil
}
Expand All @@ -198,7 +255,15 @@ type chunkInDisk struct {
// offWrite is the current offset for writing.
offWrite int64
// offsetsOfRows stores the offset of each row.
offsetsOfRows []int64
offsetsOfRows offsetsOfRows
}

type offsetsOfRows []int64

// WriteTo serializes the offsetsOfRow, and writes to w.
func (off offsetsOfRows) WriteTo(w io.Writer) (written int64, err error) {
n, err := w.Write(i64SliceToBytes(off))
return int64(n), err
}

// WriteTo serializes the chunk into the format of chunkInDisk, and
Expand All @@ -222,7 +287,7 @@ func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) {
}

// getOffsetsOfRows gets the offset of each row.
func (chk *chunkInDisk) getOffsetsOfRows() []int64 { return chk.offsetsOfRows }
func (chk *chunkInDisk) getOffsetsOfRows() offsetsOfRows { return chk.offsetsOfRows }

// rowInDisk represents a Row in format of diskFormatRow.
type rowInDisk struct {
Expand Down
Loading