Skip to content

Commit

Permalink
backupds kvlog: Address review
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Mar 10, 2021
1 parent 2b380c9 commit 3f1054d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 27 deletions.
10 changes: 5 additions & 5 deletions lib/backupds/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"go.uber.org/multierr"
"golang.org/x/xerrors"

"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -189,11 +190,10 @@ func (d *Datastore) CloseLog() error {
}

func (d *Datastore) Close() error {
if err := d.child.Close(); err != nil {
return xerrors.Errorf("closing child datastore: %w", err)
}

return d.CloseLog()
return multierr.Combine(
d.child.Close(),
d.CloseLog(),
)
}

func (d *Datastore) Batch() (datastore.Batch, error) {
Expand Down
46 changes: 24 additions & 22 deletions lib/backupds/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,32 +65,34 @@ func (d *Datastore) startLog(logdir string) error {
return xerrors.Errorf("writing new log head: %w", err)
}

go func() {
defer close(d.closed)
for {
select {
case ent := <-d.log:
if err := l.writeEntry(&ent); err != nil {
log.Errorw("failed to write log entry", "error", err)
// todo try to do something, maybe start a new log file (but not when we're out of disk space)
}

// todo: batch writes when multiple are pending; flush on a timer
if err := l.file.Sync(); err != nil {
log.Errorw("failed to sync log", "error", err)
}
case <-d.closing:
if err := l.Close(); err != nil {
log.Errorw("failed to close log", "error", err)
}
return
}
}
}()
go d.runLog(l)

return nil
}

func (d *Datastore) runLog(l *logfile) {
defer close(d.closed)
for {
select {
case ent := <-d.log:
if err := l.writeEntry(&ent); err != nil {
log.Errorw("failed to write log entry", "error", err)
// todo try to do something, maybe start a new log file (but not when we're out of disk space)
}

// todo: batch writes when multiple are pending; flush on a timer
if err := l.file.Sync(); err != nil {
log.Errorw("failed to sync log", "error", err)
}
case <-d.closing:
if err := l.Close(); err != nil {
log.Errorw("failed to close log", "error", err)
}
return
}
}
}

type logfile struct {
file *os.File
}
Expand Down

0 comments on commit 3f1054d

Please sign in to comment.