diff --git a/pkg/apk/implementation.go b/pkg/apk/implementation.go index d004ea5..4d248d4 100644 --- a/pkg/apk/implementation.go +++ b/pkg/apk/implementation.go @@ -27,6 +27,7 @@ import ( "net/url" "os" "path/filepath" + "runtime" "strings" "time" @@ -478,19 +479,62 @@ func (a *APK) FixateWorld(ctx context.Context, sourceDateEpoch *time.Time) error return fmt.Errorf("cannot install due to conflict with %s", pkg) } } - for _, pkg := range allpkgs { - isInstalled, err := a.isInstalledPackage(pkg.Name) - if err != nil { - return fmt.Errorf("error checking if package %s is installed: %w", pkg.Name, err) - } - if isInstalled { + + // TODO: Consider making this configurable option. + jobs := runtime.GOMAXPROCS(0) + + var g errgroup.Group + g.SetLimit(jobs) + + expanded := make([]*APKExpanded, len(allpkgs)) + + // First we concurrently fetch and expand all our APKs. + for i, pkg := range allpkgs { + i, pkg := i, pkg + + g.Go(func() error { + isInstalled, err := a.isInstalledPackage(pkg.Name) + if err != nil { + return fmt.Errorf("error checking if package %s is installed: %w", pkg.Name, err) + } + if isInstalled { + return nil + } + + rc, err := a.fetchPackage(ctx, pkg) + if err != nil { + return fmt.Errorf("fetching package %q: %w", pkg.Name, err) + } + defer rc.Close() + + exp, err := ExpandApk(ctx, rc) + if err != nil { + return err + } + + expanded[i] = exp + + return nil + }) + } + + if err := g.Wait(); err != nil { + return fmt.Errorf("expanding packages: %w", err) + } + + // Then we sequentially install them. + for i, pkg := range allpkgs { + exp := expanded[i] + if exp == nil { + // As above, so below. continue } - // get the apk file - if err := a.installPackage(ctx, pkg, sourceDateEpoch); err != nil { + + if err := a.installPackage(ctx, pkg, exp, sourceDateEpoch); err != nil { return err } } + return nil } @@ -574,13 +618,10 @@ func (a *APK) fetchAlpineKeys(ctx context.Context, alpineVersions []string) erro return nil } -// installPkg install a single package and update installed db. -// +func (a *APK) fetchPackage(ctx context.Context, pkg *repository.RepositoryPackage) (io.ReadCloser, error) { + a.logger.Debugf("fetching %s (%s)", pkg.Name, pkg.Version) -func (a *APK) installPackage(ctx context.Context, pkg *repository.RepositoryPackage, sourceDateEpoch *time.Time) error { - a.logger.Debugf("installing %s (%s)", pkg.Name, pkg.Version) - - ctx, span := otel.Tracer("go-apk").Start(ctx, "installPackage", trace.WithAttributes(attribute.String("package", pkg.Name))) + ctx, span := otel.Tracer("go-apk").Start(ctx, "fetchPackage", trace.WithAttributes(attribute.String("package", pkg.Name))) defer span.End() u := pkg.Url() @@ -588,10 +629,7 @@ func (a *APK) installPackage(ctx context.Context, pkg *repository.RepositoryPack // Normalize the repo as a URI, so that local paths // are translated into file:// URLs, allowing them to be parsed // into a url.URL{}. - var ( - r io.Reader - asURI uri.URI - ) + var asURI uri.URI if strings.HasPrefix(u, "https://") { asURI, _ = uri.Parse(u) } else { @@ -599,17 +637,16 @@ func (a *APK) installPackage(ctx context.Context, pkg *repository.RepositoryPack } asURL, err := url.Parse(string(asURI)) if err != nil { - return fmt.Errorf("failed to parse package as URI: %w", err) + return nil, fmt.Errorf("failed to parse package as URI: %w", err) } switch asURL.Scheme { case "file": f, err := os.Open(u) if err != nil { - return fmt.Errorf("failed to read repository package apk %s: %w", u, err) + return nil, fmt.Errorf("failed to read repository package apk %s: %w", u, err) } - defer f.Close() - r = f + return f, nil case "https": client := a.client if client == nil { @@ -620,27 +657,31 @@ func (a *APK) installPackage(ctx context.Context, pkg *repository.RepositoryPack } req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) if err != nil { - return err + return nil, err } res, err := client.Do(req) if err != nil { - return fmt.Errorf("unable to get package apk at %s: %w", u, err) + return nil, fmt.Errorf("unable to get package apk at %s: %w", u, err) } - defer res.Body.Close() if res.StatusCode != http.StatusOK { - return fmt.Errorf("unable to get package apk at %s: %v", u, res.Status) + res.Body.Close() + return nil, fmt.Errorf("unable to get package apk at %s: %v", u, res.Status) } - r = res.Body + return res.Body, nil default: - return fmt.Errorf("repository scheme %s not supported", asURL.Scheme) + return nil, fmt.Errorf("repository scheme %s not supported", asURL.Scheme) } +} + +// installPackage installs a single package and updates installed db. +func (a *APK) installPackage(ctx context.Context, pkg *repository.RepositoryPackage, expanded *APKExpanded, sourceDateEpoch *time.Time) error { + a.logger.Debugf("installing %s (%s)", pkg.Name, pkg.Version) + + ctx, span := otel.Tracer("go-apk").Start(ctx, "installPackage", trace.WithAttributes(attribute.String("package", pkg.Name))) + defer span.End() - // install the apk file - expanded, err := ExpandApk(ctx, r) - if err != nil { - return fmt.Errorf("unable to expand apk for package %s: %w", pkg.Name, err) - } defer expanded.Close() + installedFiles, err := a.installAPKFiles(ctx, expanded.PackageData, pkg.Origin, pkg.Replaces) if err != nil { return fmt.Errorf("unable to install files for pkg %s: %w", pkg.Name, err) diff --git a/pkg/apk/implementation_test.go b/pkg/apk/implementation_test.go index 6e92c20..0a9b116 100644 --- a/pkg/apk/implementation_test.go +++ b/pkg/apk/implementation_test.go @@ -26,7 +26,6 @@ import ( "sort" "strings" "testing" - "time" "github.com/stretchr/testify/require" "gitlab.alpinelinux.org/alpine/go/pkg/repository" @@ -60,7 +59,7 @@ func TestInitDB(t *testing.T) { src := apkfs.NewMemFS() apk, err := New(WithFS(src), WithIgnoreMknodErrors(ignoreMknodErrors)) require.NoError(t, err) - err = apk.InitDB(context.TODO()) + err = apk.InitDB(context.Background()) require.NoError(t, err) // check all of the contents for _, d := range initDirectories { @@ -166,7 +165,7 @@ func TestInitKeyring(t *testing.T) { Transport: &testLocalTransport{root: testPrimaryPkgDir, basenameOnly: true}, }) - require.NoError(t, a.InitKeyring(context.TODO(), keyfiles, nil)) + require.NoError(t, a.InitKeyring(context.Background(), keyfiles, nil)) // InitKeyring should have copied the local key and remote key to the right place fi, err := src.ReadDir(DefaultKeyRingPath) // should be no error reading them @@ -178,13 +177,13 @@ func TestInitKeyring(t *testing.T) { keyfiles = []string{ "/liksdjlksdjlksjlksjdl", } - require.Error(t, a.InitKeyring(context.TODO(), keyfiles, nil)) + require.Error(t, a.InitKeyring(context.Background(), keyfiles, nil)) // Add an invalid url keyfiles = []string{ "http://sldkjflskdjflklksdlksdlkjslk.net", } - require.Error(t, a.InitKeyring(context.TODO(), keyfiles, nil)) + require.Error(t, a.InitKeyring(context.Background(), keyfiles, nil)) } func TestLoadSystemKeyring(t *testing.T) { @@ -263,10 +262,8 @@ func TestLoadSystemKeyring(t *testing.T) { } } -func TestInstallPkg(t *testing.T) { - // func (a *APK) installPackage(pkg *repository.RepositoryPackage, sourceDateEpoch *time.Time) error { +func TestFetchPackage(t *testing.T) { var ( - now = time.Now() repo = repository.Repository{Uri: fmt.Sprintf("%s/%s", testAlpineRepos, testArch)} packages = []*repository.Package{&testPkg} repoWithIndex = repo.WithIndex(&repository.ApkIndex{ @@ -274,6 +271,7 @@ func TestInstallPkg(t *testing.T) { }) testEtag = "testetag" pkg = repository.NewRepositoryPackage(&testPkg, repoWithIndex) + ctx = context.Background() ) var prepLayout = func(t *testing.T, cache string) *APK { src := apkfs.NewMemFS() @@ -286,7 +284,7 @@ func TestInstallPkg(t *testing.T) { } a, err := New(opts...) require.NoError(t, err, "unable to create APK") - err = a.InitDB(context.TODO()) + err = a.InitDB(ctx) require.NoError(t, err) // set a client so we use local testdata instead of heading out to the Internet each time @@ -297,7 +295,7 @@ func TestInstallPkg(t *testing.T) { a.SetClient(&http.Client{ Transport: &testLocalTransport{root: testPrimaryPkgDir, basenameOnly: true}, }) - err := a.installPackage(context.TODO(), pkg, &now) + _, err := a.fetchPackage(ctx, pkg) require.NoErrorf(t, err, "unable to install package") }) t.Run("cache miss no network", func(t *testing.T) { @@ -308,7 +306,7 @@ func TestInstallPkg(t *testing.T) { a.SetClient(&http.Client{ Transport: &testLocalTransport{fail: true}, }) - err := a.installPackage(context.TODO(), pkg, &now) + _, err := a.fetchPackage(ctx, pkg) require.Error(t, err, "should fail when no cache and no network") }) t.Run("cache miss network should fill cache", func(t *testing.T) { @@ -324,7 +322,7 @@ func TestInstallPkg(t *testing.T) { a.SetClient(&http.Client{ Transport: &testLocalTransport{root: testPrimaryPkgDir, basenameOnly: true}, }) - err = a.installPackage(context.TODO(), pkg, &now) + _, err = a.fetchPackage(ctx, pkg) require.NoErrorf(t, err, "unable to install pkg") // check that the package file is in place _, err = os.Stat(cacheApkFile) @@ -354,7 +352,7 @@ func TestInstallPkg(t *testing.T) { // use a different root, so we get a different file Transport: &testLocalTransport{root: testAlternatePkgDir, basenameOnly: true, headers: map[string][]string{http.CanonicalHeaderKey("etag"): {testEtag}}}, }) - err = a.installPackage(context.TODO(), pkg, &now) + _, err = a.fetchPackage(ctx, pkg) require.NoErrorf(t, err, "unable to install pkg") // check that the package file is in place _, err = os.Stat(cacheApkFile) @@ -384,7 +382,7 @@ func TestInstallPkg(t *testing.T) { // use a different root, so we get a different file Transport: &testLocalTransport{root: testAlternatePkgDir, basenameOnly: true, headers: map[string][]string{http.CanonicalHeaderKey("etag"): {testEtag}}}, }) - err = a.installPackage(context.TODO(), pkg, &now) + _, err = a.fetchPackage(ctx, pkg) require.NoErrorf(t, err, "unable to install pkg") // check that the package file is in place _, err = os.Stat(cacheApkFile) @@ -414,7 +412,7 @@ func TestInstallPkg(t *testing.T) { // use a different root, so we get a different file Transport: &testLocalTransport{root: testAlternatePkgDir, basenameOnly: true, headers: map[string][]string{http.CanonicalHeaderKey("etag"): {testEtag + "abcdefg"}}}, }) - err = a.installPackage(context.TODO(), pkg, &now) + _, err = a.fetchPackage(ctx, pkg) require.NoErrorf(t, err, "unable to install pkg") // check that the package file is in place _, err = os.Stat(cacheApkFile)