Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 committed May 5, 2022
1 parent 1cedc90 commit b636337
Showing 1 changed file with 15 additions and 16 deletions.
31 changes: 15 additions & 16 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,25 @@ import (

// ListInDisk represents a slice of chunks storing in temporary disk.
type ListInDisk struct {
fieldTypes []*types.FieldType
numRowForEachChunk []int
numRowPrefixSum []int // indicates the offsets of each chunk's offsets in OffsetFile.
numRowsInDisk int
diskTracker *disk.Tracker // track disk usage.
fieldTypes []*types.FieldType
numRowsOfEachChunk []int
rowNumOfEachChunkFirstRow []int
totalNumRows int
diskTracker *disk.Tracker // track disk usage.

dataFile diskFileReaderWriter
offsetFile diskFileReaderWriter
}

// diskFileReaderWriter represents a Reader and a Writer for the temporary disk file,
// without considering the detail of checksum and encryption.
// diskFileReaderWriter represents a Reader and a Writer for the temporary disk file.
type diskFileReaderWriter struct {
disk *os.File
w io.WriteCloser
// offWrite is the current offset for writing.
offWrite int64

checksumWriter *checksum.Writer
cipherWriter *encrypt.Writer
cipherWriter *encrypt.Writer // cipherWriter is only enable when config SpilledFileEncryptionMethod is "aes128-ctr"

// ctrCipher stores the key and nonce using by aes encrypt io layer
ctrCipher *encrypt.CtrCipher
Expand Down Expand Up @@ -125,7 +124,7 @@ func (l *ListInDisk) initDiskFile() (err error) {

// Len returns the number of rows in ListInDisk
func (l *ListInDisk) Len() int {
return l.numRowsInDisk
return l.totalNumRows
}

// GetDiskTracker returns the memory tracker of this List.
Expand Down Expand Up @@ -156,23 +155,23 @@ func (l *ListInDisk) Add(chk *Chunk) (err error) {

// Append offsets
offsetsOfRows := chkInDisk.getOffsetsOfRows()
l.numRowForEachChunk = append(l.numRowForEachChunk, len(offsetsOfRows))
l.numRowPrefixSum = append(l.numRowPrefixSum, l.numRowsInDisk)
l.numRowsOfEachChunk = append(l.numRowsOfEachChunk, len(offsetsOfRows))
l.rowNumOfEachChunkFirstRow = append(l.rowNumOfEachChunkFirstRow, l.totalNumRows)
n2, err := offsetsOfRows.WriteTo(l.offsetFile.getWriter())
l.offsetFile.offWrite += n2
if err != nil {
return
}

l.diskTracker.Consume(n + n2)
l.numRowsInDisk += chk.NumRows()
l.totalNumRows += chk.NumRows()
return
}

// GetChunk gets a Chunk from the ListInDisk by chkIdx.
func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) {
chk := NewChunkWithCapacity(l.fieldTypes, l.NumRowsOfChunk(chkIdx))
chkSize := l.numRowForEachChunk[chkIdx]
chkSize := l.numRowsOfEachChunk[chkIdx]
for rowIdx := 0; rowIdx < chkSize; rowIdx++ {
row, err := l.GetRow(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
if err != nil {
Expand Down Expand Up @@ -200,7 +199,7 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
}

func (l *ListInDisk) getOffset(chkIdx uint32, rowIdx uint32) (int64, error) {
offsetInOffsetFile := l.numRowPrefixSum[chkIdx] + int(rowIdx)
offsetInOffsetFile := l.rowNumOfEachChunkFirstRow[chkIdx] + int(rowIdx)
b := make([]byte, 8)
reader := l.offsetFile.getSectionReader(int64(offsetInOffsetFile) * 8)
n, err := io.ReadFull(reader, b)
Expand All @@ -215,12 +214,12 @@ func (l *ListInDisk) getOffset(chkIdx uint32, rowIdx uint32) (int64, error) {

// NumRowsOfChunk returns the number of rows of a chunk in the ListInDisk.
func (l *ListInDisk) NumRowsOfChunk(chkID int) int {
return l.numRowForEachChunk[chkID]
return l.numRowsOfEachChunk[chkID]
}

// NumChunks returns the number of chunks in the ListInDisk.
func (l *ListInDisk) NumChunks() int {
return len(l.numRowForEachChunk)
return len(l.numRowsOfEachChunk)
}

// Close releases the disk resource.
Expand Down

0 comments on commit b636337

Please sign in to comment.