diff --git a/api/repos.go b/api/repos.go index a5dd39d45..1e2103704 100644 --- a/api/repos.go +++ b/api/repos.go @@ -327,7 +327,7 @@ func apiReposPackageFromDir(c *gin.Context) { } processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(), - context.CollectionFactory().PackageCollection(), reporter, nil, context.CollectionFactory().ChecksumCollection()) + context.CollectionFactory().PackageCollection(), reporter, nil, context.CollectionFactory().ChecksumCollection) failedFiles = append(failedFiles, failedFiles2...) processedFiles = append(processedFiles, otherFiles...) @@ -420,7 +420,7 @@ func apiReposIncludePackageFromDir(c *gin.Context) { _, failedFiles2, err = deb.ImportChangesFiles( changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier, repoTemplateString, context.Progress(), localRepoCollection, context.CollectionFactory().PackageCollection(), - context.PackagePool(), context.CollectionFactory().ChecksumCollection(), nil, query.Parse) + context.PackagePool(), context.CollectionFactory().ChecksumCollection, nil, query.Parse) failedFiles = append(failedFiles, failedFiles2...) if err != nil { diff --git a/aptly/interfaces.go b/aptly/interfaces.go index 7a0a75626..1b529619d 100644 --- a/aptly/interfaces.go +++ b/aptly/interfaces.go @@ -7,6 +7,7 @@ import ( "io" "os" + "github.com/aptly-dev/aptly/database" "github.com/aptly-dev/aptly/utils" ) @@ -134,6 +135,9 @@ type Downloader interface { GetLength(ctx context.Context, url string) (int64, error) } +// ChecksumStorageProvider creates ChecksumStorage based on DB +type ChecksumStorageProvider func(db database.ReaderWriter) ChecksumStorage + // ChecksumStorage is stores checksums in some (persistent) storage type ChecksumStorage interface { // Get finds checksums in DB by path diff --git a/cmd/mirror_update.go b/cmd/mirror_update.go index eae5dc15a..3c3f5b6b9 100644 --- a/cmd/mirror_update.go +++ b/cmd/mirror_update.go @@ -84,7 +84,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error { context.Progress().Printf("Building download queue...\n") queue, downloadSize, err = repo.BuildDownloadQueue(context.PackagePool(), context.CollectionFactory().PackageCollection(), - context.CollectionFactory().ChecksumCollection(), skipExistingPackages) + context.CollectionFactory().ChecksumCollection(nil), skipExistingPackages) if err != nil { return fmt.Errorf("unable to update: %s", err) @@ -210,7 +210,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error { } // and import it back to the pool - task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, context.CollectionFactory().ChecksumCollection()) + task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, context.CollectionFactory().ChecksumCollection(nil)) if err != nil { return fmt.Errorf("unable to import file: %s", err) } diff --git a/cmd/repo_add.go b/cmd/repo_add.go index 0482b5a0a..38484cbfe 100644 --- a/cmd/repo_add.go +++ b/cmd/repo_add.go @@ -49,7 +49,7 @@ func aptlyRepoAdd(cmd *commander.Command, args []string) error { processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(), context.CollectionFactory().PackageCollection(), &aptly.ConsoleResultReporter{Progress: context.Progress()}, nil, - context.CollectionFactory().ChecksumCollection()) + context.CollectionFactory().ChecksumCollection) failedFiles = append(failedFiles, failedFiles2...) if err != nil { return fmt.Errorf("unable to import package files: %s", err) diff --git a/cmd/repo_include.go b/cmd/repo_include.go index 120d0244e..08bb8a0b7 100644 --- a/cmd/repo_include.go +++ b/cmd/repo_include.go @@ -56,7 +56,7 @@ func aptlyRepoInclude(cmd *commander.Command, args []string) error { _, failedFiles2, err = deb.ImportChangesFiles( changesFiles, reporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles, verifier, repoTemplateString, context.Progress(), context.CollectionFactory().LocalRepoCollection(), context.CollectionFactory().PackageCollection(), - context.PackagePool(), context.CollectionFactory().ChecksumCollection(), + context.PackagePool(), context.CollectionFactory().ChecksumCollection, uploaders, query.Parse) failedFiles = append(failedFiles, failedFiles2...) diff --git a/database/database.go b/database/database.go index 710fbf33f..709a1aa80 100644 --- a/database/database.go +++ b/database/database.go @@ -30,6 +30,12 @@ type Writer interface { Delete(key []byte) error } +// ReaderWriter combines Reader and Writer +type ReaderWriter interface { + Reader + Writer +} + // Storage is an interface to KV storage type Storage interface { Reader diff --git a/deb/changes.go b/deb/changes.go index 7794b3a36..10fc83ebc 100644 --- a/deb/changes.go +++ b/deb/changes.go @@ -294,7 +294,7 @@ func CollectChangesFiles(locations []string, reporter aptly.ResultReporter) (cha // ImportChangesFiles imports referenced files in changes files into local repository func ImportChangesFiles(changesFiles []string, reporter aptly.ResultReporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles bool, verifier pgp.Verifier, repoTemplateString string, progress aptly.Progress, localRepoCollection *LocalRepoCollection, packageCollection *PackageCollection, - pool aptly.PackagePool, checksumStorage aptly.ChecksumStorage, uploaders *Uploaders, parseQuery parseQuery) (processedFiles []string, failedFiles []string, err error) { + pool aptly.PackagePool, checksumStorageProvider aptly.ChecksumStorageProvider, uploaders *Uploaders, parseQuery parseQuery) (processedFiles []string, failedFiles []string, err error) { var repoTemplate *template.Template repoTemplate, err = template.New("repo").Parse(repoTemplateString) @@ -384,7 +384,7 @@ func ImportChangesFiles(changesFiles []string, reporter aptly.ResultReporter, ac var processedFiles2, failedFiles2 []string processedFiles2, failedFiles2, err = ImportPackageFiles(list, packageFiles, forceReplace, verifier, pool, - packageCollection, reporter, restriction, checksumStorage) + packageCollection, reporter, restriction, checksumStorageProvider) if err != nil { return nil, nil, fmt.Errorf("unable to import package files: %s", err) diff --git a/deb/changes_test.go b/deb/changes_test.go index 5e15e36bc..af56d1c29 100644 --- a/deb/changes_test.go +++ b/deb/changes_test.go @@ -123,7 +123,7 @@ func (s *ChangesSuite) TestImportChangesFiles(c *C) { processedFiles, failedFiles, err := ImportChangesFiles( append(changesFiles, "testdata/changes/notexistent.changes"), s.Reporter, true, true, false, false, &NullVerifier{}, - "test", s.progress, s.localRepoCollection, s.packageCollection, s.packagePool, s.checksumStorage, + "test", s.progress, s.localRepoCollection, s.packageCollection, s.packagePool, func(database.ReaderWriter) aptly.ChecksumStorage { return s.checksumStorage }, nil, nil) c.Assert(err, IsNil) c.Check(failedFiles, DeepEquals, append(expectedFailedFiles, "testdata/changes/notexistent.changes")) diff --git a/deb/checksum_collection.go b/deb/checksum_collection.go index dfffb5ae4..23edd240b 100644 --- a/deb/checksum_collection.go +++ b/deb/checksum_collection.go @@ -11,12 +11,12 @@ import ( // ChecksumCollection does management of ChecksumInfo in DB type ChecksumCollection struct { - db database.Storage + db database.ReaderWriter codecHandle *codec.MsgpackHandle } // NewChecksumCollection creates new ChecksumCollection and binds it to database -func NewChecksumCollection(db database.Storage) *ChecksumCollection { +func NewChecksumCollection(db database.ReaderWriter) *ChecksumCollection { return &ChecksumCollection{ db: db, codecHandle: &codec.MsgpackHandle{}, diff --git a/deb/collections.go b/deb/collections.go index 49f1e93cf..7dfe85235 100644 --- a/deb/collections.go +++ b/deb/collections.go @@ -3,6 +3,7 @@ package deb import ( "sync" + "github.com/aptly-dev/aptly/aptly" "github.com/aptly-dev/aptly/database" ) @@ -91,10 +92,14 @@ func (factory *CollectionFactory) PublishedRepoCollection() *PublishedRepoCollec } // ChecksumCollection returns (or creates) new ChecksumCollection -func (factory *CollectionFactory) ChecksumCollection() *ChecksumCollection { +func (factory *CollectionFactory) ChecksumCollection(db database.ReaderWriter) aptly.ChecksumStorage { factory.Lock() defer factory.Unlock() + if db != nil { + return NewChecksumCollection(db) + } + if factory.checksums == nil { factory.checksums = NewChecksumCollection(factory.db) } diff --git a/deb/import.go b/deb/import.go index f4172a358..53322f8d6 100644 --- a/deb/import.go +++ b/deb/import.go @@ -66,11 +66,19 @@ func CollectPackageFiles(locations []string, reporter aptly.ResultReporter) (pac // ImportPackageFiles imports files into local repository func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace bool, verifier pgp.Verifier, pool aptly.PackagePool, collection *PackageCollection, reporter aptly.ResultReporter, restriction PackageQuery, - checksumStorage aptly.ChecksumStorage) (processedFiles []string, failedFiles []string, err error) { + checksumStorageProvider aptly.ChecksumStorageProvider) (processedFiles []string, failedFiles []string, err error) { if forceReplace { list.PrepareIndex() } + transaction, err := collection.db.OpenTransaction() + if err != nil { + return nil, nil, err + } + defer transaction.Discard() + + checksumStorage := checksumStorageProvider(transaction) + for _, file := range packageFiles { var ( stanza Stanza @@ -193,7 +201,7 @@ func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace b continue } - err = collection.Update(p) + err = collection.UpdateInTransaction(p, transaction) if err != nil { reporter.Warning("Unable to save package %s: %s", p, err) failedFiles = append(failedFiles, file) @@ -219,6 +227,6 @@ func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace b processedFiles = append(processedFiles, candidateProcessedFiles...) } - err = nil + err = transaction.Commit() return } diff --git a/deb/local.go b/deb/local.go index 5b8eb2a07..bad687b24 100644 --- a/deb/local.go +++ b/deb/local.go @@ -161,17 +161,23 @@ func (collection *LocalRepoCollection) Add(repo *LocalRepo) error { // Update stores updated information about repo in DB func (collection *LocalRepoCollection) Update(repo *LocalRepo) error { - err := collection.db.Put(repo.Key(), repo.Encode()) + transaction, err := collection.db.OpenTransaction() + if err != nil { + return err + } + defer transaction.Discard() + + err = transaction.Put(repo.Key(), repo.Encode()) if err != nil { return err } if repo.packageRefs != nil { - err = collection.db.Put(repo.RefKey(), repo.packageRefs.Encode()) + err = transaction.Put(repo.RefKey(), repo.packageRefs.Encode()) if err != nil { return err } } - return nil + return transaction.Commit() } // LoadComplete loads additional information for local repo @@ -245,16 +251,28 @@ func (collection *LocalRepoCollection) Len() int { // Drop removes remote repo from collection func (collection *LocalRepoCollection) Drop(repo *LocalRepo) error { - if _, err := collection.db.Get(repo.Key()); err == database.ErrNotFound { - panic("local repo not found!") + transaction, err := collection.db.OpenTransaction() + if err != nil { + return err } + defer transaction.Discard() delete(collection.cache, repo.UUID) - err := collection.db.Delete(repo.Key()) - if err != nil { + if _, err = transaction.Get(repo.Key()); err != nil { + if err == database.ErrNotFound { + return errors.New("local repo not found") + } + return err + } + + if err = transaction.Delete(repo.Key()); err != nil { + return err + } + + if err = transaction.Delete(repo.RefKey()); err != nil { return err } - return collection.db.Delete(repo.RefKey()) + return transaction.Commit() } diff --git a/deb/local_test.go b/deb/local_test.go index 2dd34313d..80087edb3 100644 --- a/deb/local_test.go +++ b/deb/local_test.go @@ -199,5 +199,5 @@ func (s *LocalRepoCollectionSuite) TestDrop(c *C) { r2, _ := collection.ByName("local2") c.Check(r2.String(), Equals, repo2.String()) - c.Check(func() { s.collection.Drop(repo1) }, Panics, "local repo not found!") + c.Check(s.collection.Drop(repo1), ErrorMatches, "local repo not found") } diff --git a/deb/package_collection.go b/deb/package_collection.go index f2ec6dc37..c9c92d028 100644 --- a/deb/package_collection.go +++ b/deb/package_collection.go @@ -201,8 +201,23 @@ func (collection *PackageCollection) loadContents(p *Package, packagePool aptly. return contents } -// Update adds or updates information about package in DB checking for conficts first +// Update adds or updates information about package in DB func (collection *PackageCollection) Update(p *Package) error { + transaction, err := collection.db.OpenTransaction() + if err != nil { + return err + } + defer transaction.Discard() + + if err = collection.UpdateInTransaction(p, transaction); err != nil { + return err + } + + return transaction.Commit() +} + +// UpdateInTransaction updates/creates package info in the context of the outer transaction +func (collection *PackageCollection) UpdateInTransaction(p *Package, transaction database.Transaction) error { var encodeBuffer bytes.Buffer encoder := codec.NewEncoder(&encodeBuffer, collection.codecHandle) @@ -210,12 +225,11 @@ func (collection *PackageCollection) Update(p *Package) error { encodeBuffer.Reset() encodeBuffer.WriteByte(0xc1) encodeBuffer.WriteByte(0x1) - err := encoder.Encode(p) - if err != nil { + if err := encoder.Encode(p); err != nil { return err } - err = collection.db.Put(p.Key(""), encodeBuffer.Bytes()) + err := transaction.Put(p.Key(""), encodeBuffer.Bytes()) if err != nil { return err } @@ -228,7 +242,7 @@ func (collection *PackageCollection) Update(p *Package) error { return err } - err = collection.db.Put(p.Key("xF"), encodeBuffer.Bytes()) + err = transaction.Put(p.Key("xF"), encodeBuffer.Bytes()) if err != nil { return err } @@ -241,7 +255,7 @@ func (collection *PackageCollection) Update(p *Package) error { return err } - err = collection.db.Put(p.Key("xD"), encodeBuffer.Bytes()) + err = transaction.Put(p.Key("xD"), encodeBuffer.Bytes()) if err != nil { return err } @@ -256,7 +270,7 @@ func (collection *PackageCollection) Update(p *Package) error { return err } - err = collection.db.Put(p.Key("xE"), encodeBuffer.Bytes()) + err = transaction.Put(p.Key("xE"), encodeBuffer.Bytes()) if err != nil { return err } diff --git a/deb/publish.go b/deb/publish.go index ad9c6d3fc..81b8bec4c 100644 --- a/deb/publish.go +++ b/deb/publish.go @@ -914,21 +914,27 @@ func (collection *PublishedRepoCollection) CheckDuplicate(repo *PublishedRepo) * } // Update stores updated information about repo in DB -func (collection *PublishedRepoCollection) Update(repo *PublishedRepo) (err error) { - err = collection.db.Put(repo.Key(), repo.Encode()) +func (collection *PublishedRepoCollection) Update(repo *PublishedRepo) error { + transaction, err := collection.db.OpenTransaction() if err != nil { - return + return err + } + defer transaction.Discard() + + err = transaction.Put(repo.Key(), repo.Encode()) + if err != nil { + return err } if repo.SourceKind == SourceLocalRepo { for component, item := range repo.sourceItems { - err = collection.db.Put(repo.RefKey(component), item.packageRefs.Encode()) + err = transaction.Put(repo.RefKey(component), item.packageRefs.Encode()) if err != nil { - return + return err } } } - return + return transaction.Commit() } // LoadComplete loads additional information for remote repo @@ -1170,6 +1176,13 @@ func (collection *PublishedRepoCollection) Remove(publishedStorageProvider aptly storage, prefix, distribution string, collectionFactory *CollectionFactory, progress aptly.Progress, force, skipCleanup bool) error { + transaction, err := collection.db.OpenTransaction() + if err != nil { + return err + } + defer transaction.Discard() + + // TODO: load via transaction collection.loadList() repo, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) @@ -1221,17 +1234,17 @@ func (collection *PublishedRepoCollection) Remove(publishedStorageProvider aptly } } - err = collection.db.Delete(repo.Key()) + err = transaction.Delete(repo.Key()) if err != nil { return err } for _, component := range repo.Components() { - err = collection.db.Delete(repo.RefKey(component)) + err = transaction.Delete(repo.RefKey(component)) if err != nil { return err } } - return nil + return transaction.Commit() } diff --git a/deb/remote.go b/deb/remote.go index 57d61058d..7ecd3e755 100644 --- a/deb/remote.go +++ b/deb/remote.go @@ -617,34 +617,44 @@ func (repo *RemoteRepo) BuildDownloadQueue(packagePool aptly.PackagePool, packag // FinalizeDownload swaps for final value of package refs func (repo *RemoteRepo) FinalizeDownload(collectionFactory *CollectionFactory, progress aptly.Progress) error { + transaction, err := collectionFactory.PackageCollection().db.OpenTransaction() + if err != nil { + return err + } + defer transaction.Discard() + repo.LastDownloadDate = time.Now() if progress != nil { - progress.InitBar(int64(repo.packageList.Len()), true) + progress.InitBar(int64(repo.packageList.Len()), false) } var i int // update all the packages in collection - err := repo.packageList.ForEach(func(p *Package) error { + err = repo.packageList.ForEach(func(p *Package) error { i++ if progress != nil { progress.SetBar(i) } // download process might have updated checksums p.UpdateFiles(p.Files()) - return collectionFactory.PackageCollection().Update(p) + return collectionFactory.PackageCollection().UpdateInTransaction(p, transaction) }) - repo.packageRefs = NewPackageRefListFromPackageList(repo.packageList) + if err == nil { + repo.packageRefs = NewPackageRefListFromPackageList(repo.packageList) + repo.packageList = nil + } if progress != nil { progress.ShutdownBar() } - repo.packageList = nil - - return err + if err != nil { + return err + } + return transaction.Commit() } // Encode does msgpack encoding of RemoteRepo @@ -795,17 +805,24 @@ func (collection *RemoteRepoCollection) Add(repo *RemoteRepo) error { // Update stores updated information about repo in DB func (collection *RemoteRepoCollection) Update(repo *RemoteRepo) error { - err := collection.db.Put(repo.Key(), repo.Encode()) + transaction, err := collection.db.OpenTransaction() + if err != nil { + return err + } + defer transaction.Discard() + + err = transaction.Put(repo.Key(), repo.Encode()) if err != nil { return err } if repo.packageRefs != nil { - err = collection.db.Put(repo.RefKey(), repo.packageRefs.Encode()) + err = transaction.Put(repo.RefKey(), repo.packageRefs.Encode()) if err != nil { return err } } - return nil + + return transaction.Commit() } // LoadComplete loads additional information for remote repo @@ -878,16 +895,29 @@ func (collection *RemoteRepoCollection) Len() int { // Drop removes remote repo from collection func (collection *RemoteRepoCollection) Drop(repo *RemoteRepo) error { - if _, err := collection.db.Get(repo.Key()); err == database.ErrNotFound { - panic("repo not found!") + transaction, err := collection.db.OpenTransaction() + if err != nil { + return err + } + defer transaction.Discard() + + if _, err = transaction.Get(repo.Key()); err != nil { + if err == database.ErrNotFound { + return errors.New("repo not found") + } + + return err } delete(collection.cache, repo.UUID) - err := collection.db.Delete(repo.Key()) - if err != nil { + if err = transaction.Delete(repo.Key()); err != nil { + return err + } + + if err = transaction.Delete(repo.RefKey()); err != nil { return err } - return collection.db.Delete(repo.RefKey()) + return transaction.Commit() } diff --git a/deb/remote_test.go b/deb/remote_test.go index 894865f5b..1ea1d558d 100644 --- a/deb/remote_test.go +++ b/deb/remote_test.go @@ -774,7 +774,7 @@ func (s *RemoteRepoCollectionSuite) TestDrop(c *C) { r2, _ := collection.ByName("tyndex") c.Check(r2.String(), Equals, repo2.String()) - c.Check(func() { s.collection.Drop(repo1) }, Panics, "repo not found!") + c.Check(s.collection.Drop(repo1), ErrorMatches, "repo not found") } const exampleReleaseFile = `Origin: LP-PPA-agenda-developers-daily diff --git a/deb/snapshot.go b/deb/snapshot.go index 46fb438f2..a2ffbcb57 100644 --- a/deb/snapshot.go +++ b/deb/snapshot.go @@ -216,14 +216,22 @@ func (collection *SnapshotCollection) Add(snapshot *Snapshot) error { // Update stores updated information about snapshot in DB func (collection *SnapshotCollection) Update(snapshot *Snapshot) error { - err := collection.db.Put(snapshot.Key(), snapshot.Encode()) + transaction, err := collection.db.OpenTransaction() + if err != nil { + return err + } + defer transaction.Discard() + + err = transaction.Put(snapshot.Key(), snapshot.Encode()) if err != nil { return err } if snapshot.packageRefs != nil { - return collection.db.Put(snapshot.RefKey(), snapshot.packageRefs.Encode()) + if err = transaction.Put(snapshot.RefKey(), snapshot.packageRefs.Encode()); err != nil { + return err + } } - return nil + return transaction.Commit() } // LoadComplete loads additional information about snapshot @@ -379,18 +387,31 @@ func (collection *SnapshotCollection) Len() int { // Drop removes snapshot from collection func (collection *SnapshotCollection) Drop(snapshot *Snapshot) error { - if _, err := collection.db.Get(snapshot.Key()); err == database.ErrNotFound { - panic("snapshot not found!") + transaction, err := collection.db.OpenTransaction() + if err != nil { + return err + } + defer transaction.Discard() + + if _, err = transaction.Get(snapshot.Key()); err != nil { + if err == database.ErrNotFound { + return errors.New("snapshot not found") + } + + return err } delete(collection.cache, snapshot.UUID) - err := collection.db.Delete(snapshot.Key()) - if err != nil { + if err = transaction.Delete(snapshot.Key()); err != nil { + return err + } + + if err = transaction.Delete(snapshot.RefKey()); err != nil { return err } - return collection.db.Delete(snapshot.RefKey()) + return transaction.Commit() } // Snapshot sorting methods diff --git a/deb/snapshot_test.go b/deb/snapshot_test.go index 1eeb0b49a..d27c42269 100644 --- a/deb/snapshot_test.go +++ b/deb/snapshot_test.go @@ -280,5 +280,5 @@ func (s *SnapshotCollectionSuite) TestDrop(c *C) { _, err = collection.ByUUID(s.snapshot1.UUID) c.Check(err, ErrorMatches, "snapshot .* not found") - c.Check(func() { s.collection.Drop(s.snapshot1) }, Panics, "snapshot not found!") + c.Check(s.collection.Drop(s.snapshot1), ErrorMatches, "snapshot not found") }