Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 committed May 5, 2022
1 parent a5f296b commit 1cedc90
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 21 deletions.
41 changes: 21 additions & 20 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ import (
type ListInDisk struct {
fieldTypes []*types.FieldType
numRowForEachChunk []int
numRowPrefixSum []int
numRowPrefixSum []int // indicates the offsets of each chunk's offsets in OffsetFile.
numRowsInDisk int
diskTracker *disk.Tracker // track disk usage.

numRowsInDisk int
diskTracker *disk.Tracker // track disk usage.

dataFile tempFileWithIOWrapper
offsetFile tempFileWithIOWrapper
dataFile diskFileReaderWriter
offsetFile diskFileReaderWriter
}

type tempFileWithIOWrapper struct {
// diskFileReaderWriter represents a Reader and a Writer for the temporary disk file,
// without considering the detail of checksum and encryption.
type diskFileReaderWriter struct {
disk *os.File
w io.WriteCloser
// offWrite is the current offset for writing.
Expand All @@ -55,7 +56,7 @@ type tempFileWithIOWrapper struct {
ctrCipher *encrypt.CtrCipher
}

func (l *tempFileWithIOWrapper) initWithFileName(fileName string) (err error) {
func (l *diskFileReaderWriter) initWithFileName(fileName string) (err error) {
l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, fileName)
if err != nil {
return errors2.Trace(err)
Expand All @@ -75,7 +76,7 @@ func (l *tempFileWithIOWrapper) initWithFileName(fileName string) (err error) {
return
}

func (l *tempFileWithIOWrapper) getReader() io.ReaderAt {
func (l *diskFileReaderWriter) getReader() io.ReaderAt {
var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
Expand All @@ -86,13 +87,13 @@ func (l *tempFileWithIOWrapper) getReader() io.ReaderAt {
return underlying
}

func (l *tempFileWithIOWrapper) getSelectionReader(off int64) *io.SectionReader {
func (l *diskFileReaderWriter) getSectionReader(off int64) *io.SectionReader {
checksumReader := l.getReader()
r := io.NewSectionReader(checksumReader, off, l.offWrite-off)
return r
}

func (l *tempFileWithIOWrapper) getWriter() io.Writer {
func (l *diskFileReaderWriter) getWriter() io.Writer {
return l.w
}

Expand Down Expand Up @@ -188,7 +189,7 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
if err != nil {
return
}
r := l.dataFile.getSelectionReader(off)
r := l.dataFile.getSectionReader(off)
format := rowInDisk{numCol: len(l.fieldTypes)}
_, err = format.ReadFrom(r)
if err != nil {
Expand All @@ -201,13 +202,13 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
func (l *ListInDisk) getOffset(chkIdx uint32, rowIdx uint32) (int64, error) {
offsetInOffsetFile := l.numRowPrefixSum[chkIdx] + int(rowIdx)
b := make([]byte, 8)
reader := l.offsetFile.getSelectionReader(int64(offsetInOffsetFile) * 8)
reader := l.offsetFile.getSectionReader(int64(offsetInOffsetFile) * 8)
n, err := io.ReadFull(reader, b)
if err != nil {
return 0, err
}
if n != 8 {
return 0, errors2.New("Can not get offset from disk")
return 0, errors2.New("The file spilled is broken, can not get data offset from the disk")
}
return bytesToI64Slice(b)[0], nil
}
Expand Down Expand Up @@ -260,6 +261,12 @@ type chunkInDisk struct {

type offsetsOfRows []int64

// WriteTo serializes the offsetsOfRow, and writes to w.
func (off offsetsOfRows) WriteTo(w io.Writer) (written int64, err error) {
n, err := w.Write(i64SliceToBytes(off))
return int64(n), err
}

// WriteTo serializes the chunk into the format of chunkInDisk, and
// writes to w.
func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) {
Expand All @@ -283,12 +290,6 @@ func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) {
// getOffsetsOfRows gets the offset of each row.
func (chk *chunkInDisk) getOffsetsOfRows() offsetsOfRows { return chk.offsetsOfRows }

// WriteTo serializes the offsetsOfRow, and writes to w.
func (off offsetsOfRows) WriteTo(w io.Writer) (written int64, err error) {
n, err := w.Write(i64SliceToBytes(off))
return int64(n), err
}

// rowInDisk represents a Row in format of diskFormatRow.
type rowInDisk struct {
numCol int
Expand Down
2 changes: 1 addition & 1 deletion util/chunk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type listInDiskWriteDisk struct {
ListInDisk
}

func (l *tempFileWithIOWrapper) flushForTest() (err error) {
func (l *diskFileReaderWriter) flushForTest() (err error) {
err = l.disk.Close()
if err != nil {
return
Expand Down

0 comments on commit 1cedc90

Please sign in to comment.