Skip to content

Commit

Permalink
Consistently use transactions to update database
Browse files Browse the repository at this point in the history
For any action which is multi-step (requires updating more than 1 DB
key), use transaction to make update atomic.

Also pack big chunks of updates (importing packages for importing and
mirror updates) into single transaction to improve aptly performance and
get some isolation.

Note that still layers up (Collections) provide some level of isolation,
so this is going to shine with the future PRs to remove collection
locks.

Spin-off of #459
  • Loading branch information
smira committed Aug 10, 2019
1 parent 67e3895 commit 77d7c38
Show file tree
Hide file tree
Showing 19 changed files with 184 additions and 65 deletions.
4 changes: 2 additions & 2 deletions api/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func apiReposPackageFromDir(c *gin.Context) {
}

processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
context.CollectionFactory().PackageCollection(), reporter, nil, context.CollectionFactory().ChecksumCollection())
context.CollectionFactory().PackageCollection(), reporter, nil, context.CollectionFactory().ChecksumCollection)
failedFiles = append(failedFiles, failedFiles2...)

processedFiles = append(processedFiles, otherFiles...)
Expand Down Expand Up @@ -420,7 +420,7 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
_, failedFiles2, err = deb.ImportChangesFiles(
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
repoTemplateString, context.Progress(), localRepoCollection, context.CollectionFactory().PackageCollection(),
context.PackagePool(), context.CollectionFactory().ChecksumCollection(), nil, query.Parse)
context.PackagePool(), context.CollectionFactory().ChecksumCollection, nil, query.Parse)
failedFiles = append(failedFiles, failedFiles2...)

if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions aptly/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"

"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/utils"
)

Expand Down Expand Up @@ -134,6 +135,9 @@ type Downloader interface {
GetLength(ctx context.Context, url string) (int64, error)
}

// ChecksumStorageProvider creates ChecksumStorage based on DB
type ChecksumStorageProvider func(db database.ReaderWriter) ChecksumStorage

// ChecksumStorage is stores checksums in some (persistent) storage
type ChecksumStorage interface {
// Get finds checksums in DB by path
Expand Down
4 changes: 2 additions & 2 deletions cmd/mirror_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {

context.Progress().Printf("Building download queue...\n")
queue, downloadSize, err = repo.BuildDownloadQueue(context.PackagePool(), context.CollectionFactory().PackageCollection(),
context.CollectionFactory().ChecksumCollection(), skipExistingPackages)
context.CollectionFactory().ChecksumCollection(nil), skipExistingPackages)

if err != nil {
return fmt.Errorf("unable to update: %s", err)
Expand Down Expand Up @@ -210,7 +210,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
}

// and import it back to the pool
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, context.CollectionFactory().ChecksumCollection())
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, context.CollectionFactory().ChecksumCollection(nil))
if err != nil {
return fmt.Errorf("unable to import file: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/repo_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func aptlyRepoAdd(cmd *commander.Command, args []string) error {

processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
context.CollectionFactory().PackageCollection(), &aptly.ConsoleResultReporter{Progress: context.Progress()}, nil,
context.CollectionFactory().ChecksumCollection())
context.CollectionFactory().ChecksumCollection)
failedFiles = append(failedFiles, failedFiles2...)
if err != nil {
return fmt.Errorf("unable to import package files: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/repo_include.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func aptlyRepoInclude(cmd *commander.Command, args []string) error {
_, failedFiles2, err = deb.ImportChangesFiles(
changesFiles, reporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles, verifier, repoTemplateString,
context.Progress(), context.CollectionFactory().LocalRepoCollection(), context.CollectionFactory().PackageCollection(),
context.PackagePool(), context.CollectionFactory().ChecksumCollection(),
context.PackagePool(), context.CollectionFactory().ChecksumCollection,
uploaders, query.Parse)
failedFiles = append(failedFiles, failedFiles2...)

Expand Down
6 changes: 6 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ type Writer interface {
Delete(key []byte) error
}

// ReaderWriter combines Reader and Writer
type ReaderWriter interface {
Reader
Writer
}

// Storage is an interface to KV storage
type Storage interface {
Reader
Expand Down
4 changes: 2 additions & 2 deletions deb/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func CollectChangesFiles(locations []string, reporter aptly.ResultReporter) (cha
// ImportChangesFiles imports referenced files in changes files into local repository
func ImportChangesFiles(changesFiles []string, reporter aptly.ResultReporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles bool,
verifier pgp.Verifier, repoTemplateString string, progress aptly.Progress, localRepoCollection *LocalRepoCollection, packageCollection *PackageCollection,
pool aptly.PackagePool, checksumStorage aptly.ChecksumStorage, uploaders *Uploaders, parseQuery parseQuery) (processedFiles []string, failedFiles []string, err error) {
pool aptly.PackagePool, checksumStorageProvider aptly.ChecksumStorageProvider, uploaders *Uploaders, parseQuery parseQuery) (processedFiles []string, failedFiles []string, err error) {

var repoTemplate *template.Template
repoTemplate, err = template.New("repo").Parse(repoTemplateString)
Expand Down Expand Up @@ -384,7 +384,7 @@ func ImportChangesFiles(changesFiles []string, reporter aptly.ResultReporter, ac
var processedFiles2, failedFiles2 []string

processedFiles2, failedFiles2, err = ImportPackageFiles(list, packageFiles, forceReplace, verifier, pool,
packageCollection, reporter, restriction, checksumStorage)
packageCollection, reporter, restriction, checksumStorageProvider)

if err != nil {
return nil, nil, fmt.Errorf("unable to import package files: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion deb/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *ChangesSuite) TestImportChangesFiles(c *C) {
processedFiles, failedFiles, err := ImportChangesFiles(
append(changesFiles, "testdata/changes/notexistent.changes"),
s.Reporter, true, true, false, false, &NullVerifier{},
"test", s.progress, s.localRepoCollection, s.packageCollection, s.packagePool, s.checksumStorage,
"test", s.progress, s.localRepoCollection, s.packageCollection, s.packagePool, func(database.ReaderWriter) aptly.ChecksumStorage { return s.checksumStorage },
nil, nil)
c.Assert(err, IsNil)
c.Check(failedFiles, DeepEquals, append(expectedFailedFiles, "testdata/changes/notexistent.changes"))
Expand Down
4 changes: 2 additions & 2 deletions deb/checksum_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (

// ChecksumCollection does management of ChecksumInfo in DB
type ChecksumCollection struct {
db database.Storage
db database.ReaderWriter
codecHandle *codec.MsgpackHandle
}

// NewChecksumCollection creates new ChecksumCollection and binds it to database
func NewChecksumCollection(db database.Storage) *ChecksumCollection {
func NewChecksumCollection(db database.ReaderWriter) *ChecksumCollection {
return &ChecksumCollection{
db: db,
codecHandle: &codec.MsgpackHandle{},
Expand Down
7 changes: 6 additions & 1 deletion deb/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package deb
import (
"sync"

"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/database"
)

Expand Down Expand Up @@ -91,10 +92,14 @@ func (factory *CollectionFactory) PublishedRepoCollection() *PublishedRepoCollec
}

// ChecksumCollection returns (or creates) new ChecksumCollection
func (factory *CollectionFactory) ChecksumCollection() *ChecksumCollection {
func (factory *CollectionFactory) ChecksumCollection(db database.ReaderWriter) aptly.ChecksumStorage {
factory.Lock()
defer factory.Unlock()

if db != nil {
return NewChecksumCollection(db)
}

if factory.checksums == nil {
factory.checksums = NewChecksumCollection(factory.db)
}
Expand Down
14 changes: 11 additions & 3 deletions deb/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,19 @@ func CollectPackageFiles(locations []string, reporter aptly.ResultReporter) (pac
// ImportPackageFiles imports files into local repository
func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace bool, verifier pgp.Verifier,
pool aptly.PackagePool, collection *PackageCollection, reporter aptly.ResultReporter, restriction PackageQuery,
checksumStorage aptly.ChecksumStorage) (processedFiles []string, failedFiles []string, err error) {
checksumStorageProvider aptly.ChecksumStorageProvider) (processedFiles []string, failedFiles []string, err error) {
if forceReplace {
list.PrepareIndex()
}

transaction, err := collection.db.OpenTransaction()
if err != nil {
return nil, nil, err
}
defer transaction.Discard()

checksumStorage := checksumStorageProvider(transaction)

for _, file := range packageFiles {
var (
stanza Stanza
Expand Down Expand Up @@ -193,7 +201,7 @@ func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace b
continue
}

err = collection.Update(p)
err = collection.UpdateInTransaction(p, transaction)
if err != nil {
reporter.Warning("Unable to save package %s: %s", p, err)
failedFiles = append(failedFiles, file)
Expand All @@ -219,6 +227,6 @@ func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace b
processedFiles = append(processedFiles, candidateProcessedFiles...)
}

err = nil
err = transaction.Commit()
return
}
34 changes: 26 additions & 8 deletions deb/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,23 @@ func (collection *LocalRepoCollection) Add(repo *LocalRepo) error {

// Update stores updated information about repo in DB
func (collection *LocalRepoCollection) Update(repo *LocalRepo) error {
err := collection.db.Put(repo.Key(), repo.Encode())
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()

err = transaction.Put(repo.Key(), repo.Encode())
if err != nil {
return err
}
if repo.packageRefs != nil {
err = collection.db.Put(repo.RefKey(), repo.packageRefs.Encode())
err = transaction.Put(repo.RefKey(), repo.packageRefs.Encode())
if err != nil {
return err
}
}
return nil
return transaction.Commit()
}

// LoadComplete loads additional information for local repo
Expand Down Expand Up @@ -245,16 +251,28 @@ func (collection *LocalRepoCollection) Len() int {

// Drop removes remote repo from collection
func (collection *LocalRepoCollection) Drop(repo *LocalRepo) error {
if _, err := collection.db.Get(repo.Key()); err == database.ErrNotFound {
panic("local repo not found!")
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()

delete(collection.cache, repo.UUID)

err := collection.db.Delete(repo.Key())
if err != nil {
if _, err = transaction.Get(repo.Key()); err != nil {
if err == database.ErrNotFound {
return errors.New("local repo not found")
}
return err
}

if err = transaction.Delete(repo.Key()); err != nil {
return err
}

if err = transaction.Delete(repo.RefKey()); err != nil {
return err
}

return collection.db.Delete(repo.RefKey())
return transaction.Commit()
}
2 changes: 1 addition & 1 deletion deb/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,5 +199,5 @@ func (s *LocalRepoCollectionSuite) TestDrop(c *C) {
r2, _ := collection.ByName("local2")
c.Check(r2.String(), Equals, repo2.String())

c.Check(func() { s.collection.Drop(repo1) }, Panics, "local repo not found!")
c.Check(s.collection.Drop(repo1), ErrorMatches, "local repo not found")
}
28 changes: 21 additions & 7 deletions deb/package_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,21 +201,35 @@ func (collection *PackageCollection) loadContents(p *Package, packagePool aptly.
return contents
}

// Update adds or updates information about package in DB checking for conficts first
// Update adds or updates information about package in DB
func (collection *PackageCollection) Update(p *Package) error {
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()

if err = collection.UpdateInTransaction(p, transaction); err != nil {
return err
}

return transaction.Commit()
}

// UpdateInTransaction updates/creates package info in the context of the outer transaction
func (collection *PackageCollection) UpdateInTransaction(p *Package, transaction database.Transaction) error {
var encodeBuffer bytes.Buffer

encoder := codec.NewEncoder(&encodeBuffer, collection.codecHandle)

encodeBuffer.Reset()
encodeBuffer.WriteByte(0xc1)
encodeBuffer.WriteByte(0x1)
err := encoder.Encode(p)
if err != nil {
if err := encoder.Encode(p); err != nil {
return err
}

err = collection.db.Put(p.Key(""), encodeBuffer.Bytes())
err := transaction.Put(p.Key(""), encodeBuffer.Bytes())
if err != nil {
return err
}
Expand All @@ -228,7 +242,7 @@ func (collection *PackageCollection) Update(p *Package) error {
return err
}

err = collection.db.Put(p.Key("xF"), encodeBuffer.Bytes())
err = transaction.Put(p.Key("xF"), encodeBuffer.Bytes())
if err != nil {
return err
}
Expand All @@ -241,7 +255,7 @@ func (collection *PackageCollection) Update(p *Package) error {
return err
}

err = collection.db.Put(p.Key("xD"), encodeBuffer.Bytes())
err = transaction.Put(p.Key("xD"), encodeBuffer.Bytes())
if err != nil {
return err
}
Expand All @@ -256,7 +270,7 @@ func (collection *PackageCollection) Update(p *Package) error {
return err
}

err = collection.db.Put(p.Key("xE"), encodeBuffer.Bytes())
err = transaction.Put(p.Key("xE"), encodeBuffer.Bytes())
if err != nil {
return err
}
Expand Down
31 changes: 22 additions & 9 deletions deb/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,21 +914,27 @@ func (collection *PublishedRepoCollection) CheckDuplicate(repo *PublishedRepo) *
}

// Update stores updated information about repo in DB
func (collection *PublishedRepoCollection) Update(repo *PublishedRepo) (err error) {
err = collection.db.Put(repo.Key(), repo.Encode())
func (collection *PublishedRepoCollection) Update(repo *PublishedRepo) error {
transaction, err := collection.db.OpenTransaction()
if err != nil {
return
return err
}
defer transaction.Discard()

err = transaction.Put(repo.Key(), repo.Encode())
if err != nil {
return err
}

if repo.SourceKind == SourceLocalRepo {
for component, item := range repo.sourceItems {
err = collection.db.Put(repo.RefKey(component), item.packageRefs.Encode())
err = transaction.Put(repo.RefKey(component), item.packageRefs.Encode())
if err != nil {
return
return err
}
}
}
return
return transaction.Commit()
}

// LoadComplete loads additional information for remote repo
Expand Down Expand Up @@ -1170,6 +1176,13 @@ func (collection *PublishedRepoCollection) Remove(publishedStorageProvider aptly
storage, prefix, distribution string, collectionFactory *CollectionFactory, progress aptly.Progress,
force, skipCleanup bool) error {

transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()

// TODO: load via transaction
collection.loadList()

repo, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
Expand Down Expand Up @@ -1221,17 +1234,17 @@ func (collection *PublishedRepoCollection) Remove(publishedStorageProvider aptly
}
}

err = collection.db.Delete(repo.Key())
err = transaction.Delete(repo.Key())
if err != nil {
return err
}

for _, component := range repo.Components() {
err = collection.db.Delete(repo.RefKey(component))
err = transaction.Delete(repo.RefKey(component))
if err != nil {
return err
}
}

return nil
return transaction.Commit()
}
Loading

0 comments on commit 77d7c38

Please sign in to comment.