Skip to content

Commit

Permalink
fix offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
vadiminshakov committed Jan 6, 2025
1 parent 16afd92 commit 3654d5b
Showing 1 changed file with 7 additions and 37 deletions.
44 changes: 7 additions & 37 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func removeCorruptedSegments(segmentNumbers []int, basePath string) ([]string, e

// loadSegment loads segment info (file descriptor, name, size, etc) and index from segment file.
func loadSegment(path string) (fd *os.File, checksumFd *os.File, lastOffset int64, index map[uint64]msg, err error) {
fd, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0755)
fd, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE, 0755)
if err != nil {
return nil, nil, 0, nil, errors.Wrap(err, "failed to open log segment file")
}
Expand Down Expand Up @@ -136,53 +136,23 @@ func loadSegment(path string) (fd *os.File, checksumFd *os.File, lastOffset int6
}

func calculateLastOffset(fd *os.File) (int64, error) {
var lastOffset int64

decoder := gob.NewDecoder(fd)
for {
var msg msg
offset, err := fd.Seek(0, io.SeekCurrent)
if err != nil {
return 0, errors.Wrap(err, "failed to get current offset")
}

if err = decoder.Decode(&msg); err != nil {
if err == io.EOF {
break
}

// set offset to the beginning of the file
if _, seekErr := fd.Seek(0, io.SeekStart); seekErr != nil {
return 0, errors.Wrap(seekErr, "failed to seek to the beginning of the file after error")
}

return lastOffset, nil
}

if _, err = fd.Seek(0, io.SeekStart); err != nil {
return 0, errors.Wrap(err, "failed to seek to the beginning of the file")
}

lastOffset = offset
}

// set offset to the beginning of the file
if _, err := fd.Seek(lastOffset, io.SeekStart); err != nil {
return 0, errors.Wrap(err, "failed to seek to the beginning of the file")
offset, err := fd.Seek(0, io.SeekCurrent)
if err != nil {
return 0, errors.Wrap(err, "failed to get current offset")
}

return lastOffset, nil
return offset, nil
}

// handleCorruptedSegment checks the checksum and removes the segment and checksum files if corrupted.
func handleCorruptedSegment(segmentPath string) (bool, error) {
file, err := os.OpenFile(segmentPath, os.O_RDWR|os.O_CREATE, 0755)
file, err := os.OpenFile(segmentPath, os.O_RDWR, 0755)
if err != nil {
return false, errors.Wrap(err, "failed to open segment file")
}
defer file.Close()

checksumFile, err := os.OpenFile(segmentPath+checkSumPostfix, os.O_RDWR|os.O_CREATE, 0755)
checksumFile, err := os.OpenFile(segmentPath+checkSumPostfix, os.O_RDWR, 0755)
if err != nil {
return false, errors.Wrap(err, "failed to open checksum file")
}
Expand Down

0 comments on commit 3654d5b

Please sign in to comment.