Skip to content

Commit

Permalink
finish init rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
patrick-ogrady committed Apr 14, 2024
1 parent 04f9c44 commit ee027b0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 43 deletions.
1 change: 1 addition & 0 deletions vilmo/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func load(logger logging.Logger, logNum uint64, path string) (*log, map[uint64][
corrupt = fmt.Errorf("checksum mismatch expected=%d got=%d", checksum, computed)
break
}
ops = append(ops, checksum)

// Update our track for last committed
committedByte = reader.Cursor()
Expand Down
61 changes: 18 additions & 43 deletions vilmo/vilmo.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ type Vilmo struct {
bufferSize int
historyLen int

commitLock sync.RWMutex
oldestBatch *uint64
nextBatch uint64
commitLock sync.RWMutex
nextBatch uint64
logs map[uint64]*log

keyLock sync.RWMutex
batches map[uint64]*log
keys map[string]*record
}

Expand Down Expand Up @@ -122,8 +121,11 @@ func New(
}

// Build current state from all log files
keys := make(map[string]*record, initialSize)
batches := maps.Keys(batchesRead)
var (
checksum ids.ID
keys = make(map[string]*record, initialSize)
batches = maps.Keys(batchesRead)
)
slices.Sort(batches)
for _, batch := range batches {
// There may be gaps between batches depending on which files were rewritten,
Expand All @@ -147,6 +149,8 @@ func New(
continue
}
past.log.Remove(past, o.log != past.log)
case ids.ID:
checksum = o
default:
logger.Warn("found invalid operation", zap.Uint64("batch", batch), zap.Any("op", op))
return nil, ids.Empty, errors.New("invalid operation")
Expand All @@ -161,50 +165,21 @@ func New(
bufferSize: bufferSize,
historyLen: historyLen,

keys: make(map[string]*record, initialSize),
batches: make(map[uint64]*log, historyLen+1),
}

// Replay all changes on-disk
//
// Note: this will require reading all stored files
// to reconstruct data. If any files were not fully written or have become corrupt,
// this will error.
var (
lastChecksum ids.ID
firstBatch uint64
lastBatch uint64
)
for i, file := range files {
path := filepath.Join(baseDir, strconv.FormatUint(file, 10))
if err := adb.loadBatch(path, file); err != nil {
// We chose not to fix storage if we encounter an error as it
// could be destructive.
//
// TODO: make corruption fix optional/add tool
logger.Warn("could not open batch", zap.String("path", path), zap.Error(err))
return nil, ids.Empty, err
}
if i == 0 {
firstBatch = file
}
lastBatch = file
lastChecksum = adb.batches[file].checksum
keys: keys,
logs: logs,
}
if len(adb.batches) > 0 {
adb.oldestBatch = &firstBatch
adb.nextBatch = lastBatch + 1
if len(batches) > 0 {
adb.nextBatch = batches[len(batches)-1] + 1
}
logger.Info(
"loaded batches",
zap.Int("count", len(adb.batches)),
zap.Int("count", len(adb.logs)),
zap.Int("keys", len(adb.keys)),
zap.Uint64("first batch", firstBatch),
zap.Uint64("last batch", lastBatch),
zap.Stringer("last checksum", lastChecksum),
zap.Uint64("next batch", adb.nextBatch),
zap.Stringer("last checksum", checksum),
zap.Duration("duration", time.Since(start)),
)
return adb, lastChecksum, nil
return adb, checksum, nil
}

func (a *Vilmo) get(key string) ([]byte, error) {
Expand Down

0 comments on commit ee027b0

Please sign in to comment.