Skip to content
This repository has been archived by the owner on Jun 5, 2024. It is now read-only.

Fan-out fetch and expand for installPackage #74

Merged
merged 1 commit into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 74 additions & 33 deletions pkg/apk/implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net/url"
"os"
"path/filepath"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -478,19 +479,62 @@ func (a *APK) FixateWorld(ctx context.Context, sourceDateEpoch *time.Time) error
return fmt.Errorf("cannot install due to conflict with %s", pkg)
}
}
for _, pkg := range allpkgs {
isInstalled, err := a.isInstalledPackage(pkg.Name)
if err != nil {
return fmt.Errorf("error checking if package %s is installed: %w", pkg.Name, err)
}
if isInstalled {

// TODO: Consider making this configurable option.
jobs := runtime.GOMAXPROCS(0)

var g errgroup.Group
g.SetLimit(jobs)

expanded := make([]*APKExpanded, len(allpkgs))

// First we concurrently fetch and expand all our APKs.
for i, pkg := range allpkgs {
i, pkg := i, pkg

g.Go(func() error {
isInstalled, err := a.isInstalledPackage(pkg.Name)
if err != nil {
return fmt.Errorf("error checking if package %s is installed: %w", pkg.Name, err)
}
if isInstalled {
return nil
}

rc, err := a.fetchPackage(ctx, pkg)
if err != nil {
return fmt.Errorf("fetching package %q: %w", pkg.Name, err)
}
defer rc.Close()

exp, err := ExpandApk(ctx, rc)
if err != nil {
return err
}

expanded[i] = exp

return nil
})
}

if err := g.Wait(); err != nil {
return fmt.Errorf("expanding packages: %w", err)
}

// Then we sequentially install them.
for i, pkg := range allpkgs {
exp := expanded[i]
if exp == nil {
// As above, so below.
continue
}
// get the apk file
if err := a.installPackage(ctx, pkg, sourceDateEpoch); err != nil {

if err := a.installPackage(ctx, pkg, exp, sourceDateEpoch); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -574,42 +618,35 @@ func (a *APK) fetchAlpineKeys(ctx context.Context, alpineVersions []string) erro
return nil
}

// installPkg install a single package and update installed db.
//
func (a *APK) fetchPackage(ctx context.Context, pkg *repository.RepositoryPackage) (io.ReadCloser, error) {
a.logger.Debugf("fetching %s (%s)", pkg.Name, pkg.Version)

func (a *APK) installPackage(ctx context.Context, pkg *repository.RepositoryPackage, sourceDateEpoch *time.Time) error {
a.logger.Debugf("installing %s (%s)", pkg.Name, pkg.Version)

ctx, span := otel.Tracer("go-apk").Start(ctx, "installPackage", trace.WithAttributes(attribute.String("package", pkg.Name)))
ctx, span := otel.Tracer("go-apk").Start(ctx, "fetchPackage", trace.WithAttributes(attribute.String("package", pkg.Name)))
defer span.End()

u := pkg.Url()

// Normalize the repo as a URI, so that local paths
// are translated into file:// URLs, allowing them to be parsed
// into a url.URL{}.
var (
r io.Reader
asURI uri.URI
)
var asURI uri.URI
if strings.HasPrefix(u, "https://") {
asURI, _ = uri.Parse(u)
} else {
asURI = uri.New(u)
}
asURL, err := url.Parse(string(asURI))
if err != nil {
return fmt.Errorf("failed to parse package as URI: %w", err)
return nil, fmt.Errorf("failed to parse package as URI: %w", err)
}

switch asURL.Scheme {
case "file":
f, err := os.Open(u)
if err != nil {
return fmt.Errorf("failed to read repository package apk %s: %w", u, err)
return nil, fmt.Errorf("failed to read repository package apk %s: %w", u, err)
}
defer f.Close()
r = f
return f, nil
case "https":
client := a.client
if client == nil {
Expand All @@ -620,27 +657,31 @@ func (a *APK) installPackage(ctx context.Context, pkg *repository.RepositoryPack
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return err
return nil, err
}
res, err := client.Do(req)
if err != nil {
return fmt.Errorf("unable to get package apk at %s: %w", u, err)
return nil, fmt.Errorf("unable to get package apk at %s: %w", u, err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("unable to get package apk at %s: %v", u, res.Status)
res.Body.Close()
return nil, fmt.Errorf("unable to get package apk at %s: %v", u, res.Status)
}
r = res.Body
return res.Body, nil
default:
return fmt.Errorf("repository scheme %s not supported", asURL.Scheme)
return nil, fmt.Errorf("repository scheme %s not supported", asURL.Scheme)
}
}

// installPackage installs a single package and updates installed db.
func (a *APK) installPackage(ctx context.Context, pkg *repository.RepositoryPackage, expanded *APKExpanded, sourceDateEpoch *time.Time) error {
a.logger.Debugf("installing %s (%s)", pkg.Name, pkg.Version)

ctx, span := otel.Tracer("go-apk").Start(ctx, "installPackage", trace.WithAttributes(attribute.String("package", pkg.Name)))
defer span.End()

// install the apk file
expanded, err := ExpandApk(ctx, r)
if err != nil {
return fmt.Errorf("unable to expand apk for package %s: %w", pkg.Name, err)
}
defer expanded.Close()

installedFiles, err := a.installAPKFiles(ctx, expanded.PackageData, pkg.Origin, pkg.Replaces)
if err != nil {
return fmt.Errorf("unable to install files for pkg %s: %w", pkg.Name, err)
Expand Down
28 changes: 13 additions & 15 deletions pkg/apk/implementation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"sort"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"gitlab.alpinelinux.org/alpine/go/pkg/repository"
Expand Down Expand Up @@ -60,7 +59,7 @@ func TestInitDB(t *testing.T) {
src := apkfs.NewMemFS()
apk, err := New(WithFS(src), WithIgnoreMknodErrors(ignoreMknodErrors))
require.NoError(t, err)
err = apk.InitDB(context.TODO())
err = apk.InitDB(context.Background())
require.NoError(t, err)
// check all of the contents
for _, d := range initDirectories {
Expand Down Expand Up @@ -166,7 +165,7 @@ func TestInitKeyring(t *testing.T) {
Transport: &testLocalTransport{root: testPrimaryPkgDir, basenameOnly: true},
})

require.NoError(t, a.InitKeyring(context.TODO(), keyfiles, nil))
require.NoError(t, a.InitKeyring(context.Background(), keyfiles, nil))
// InitKeyring should have copied the local key and remote key to the right place
fi, err := src.ReadDir(DefaultKeyRingPath)
// should be no error reading them
Expand All @@ -178,13 +177,13 @@ func TestInitKeyring(t *testing.T) {
keyfiles = []string{
"/liksdjlksdjlksjlksjdl",
}
require.Error(t, a.InitKeyring(context.TODO(), keyfiles, nil))
require.Error(t, a.InitKeyring(context.Background(), keyfiles, nil))

// Add an invalid url
keyfiles = []string{
"http://sldkjflskdjflklksdlksdlkjslk.net",
}
require.Error(t, a.InitKeyring(context.TODO(), keyfiles, nil))
require.Error(t, a.InitKeyring(context.Background(), keyfiles, nil))
}

func TestLoadSystemKeyring(t *testing.T) {
Expand Down Expand Up @@ -263,17 +262,16 @@ func TestLoadSystemKeyring(t *testing.T) {
}
}

func TestInstallPkg(t *testing.T) {
// func (a *APK) installPackage(pkg *repository.RepositoryPackage, sourceDateEpoch *time.Time) error {
func TestFetchPackage(t *testing.T) {
var (
now = time.Now()
repo = repository.Repository{Uri: fmt.Sprintf("%s/%s", testAlpineRepos, testArch)}
packages = []*repository.Package{&testPkg}
repoWithIndex = repo.WithIndex(&repository.ApkIndex{
Packages: packages,
})
testEtag = "testetag"
pkg = repository.NewRepositoryPackage(&testPkg, repoWithIndex)
ctx = context.Background()
)
var prepLayout = func(t *testing.T, cache string) *APK {
src := apkfs.NewMemFS()
Expand All @@ -286,7 +284,7 @@ func TestInstallPkg(t *testing.T) {
}
a, err := New(opts...)
require.NoError(t, err, "unable to create APK")
err = a.InitDB(context.TODO())
err = a.InitDB(ctx)
require.NoError(t, err)

// set a client so we use local testdata instead of heading out to the Internet each time
Expand All @@ -297,7 +295,7 @@ func TestInstallPkg(t *testing.T) {
a.SetClient(&http.Client{
Transport: &testLocalTransport{root: testPrimaryPkgDir, basenameOnly: true},
})
err := a.installPackage(context.TODO(), pkg, &now)
_, err := a.fetchPackage(ctx, pkg)
require.NoErrorf(t, err, "unable to install package")
})
t.Run("cache miss no network", func(t *testing.T) {
Expand All @@ -308,7 +306,7 @@ func TestInstallPkg(t *testing.T) {
a.SetClient(&http.Client{
Transport: &testLocalTransport{fail: true},
})
err := a.installPackage(context.TODO(), pkg, &now)
_, err := a.fetchPackage(ctx, pkg)
require.Error(t, err, "should fail when no cache and no network")
})
t.Run("cache miss network should fill cache", func(t *testing.T) {
Expand All @@ -324,7 +322,7 @@ func TestInstallPkg(t *testing.T) {
a.SetClient(&http.Client{
Transport: &testLocalTransport{root: testPrimaryPkgDir, basenameOnly: true},
})
err = a.installPackage(context.TODO(), pkg, &now)
_, err = a.fetchPackage(ctx, pkg)
require.NoErrorf(t, err, "unable to install pkg")
// check that the package file is in place
_, err = os.Stat(cacheApkFile)
Expand Down Expand Up @@ -354,7 +352,7 @@ func TestInstallPkg(t *testing.T) {
// use a different root, so we get a different file
Transport: &testLocalTransport{root: testAlternatePkgDir, basenameOnly: true, headers: map[string][]string{http.CanonicalHeaderKey("etag"): {testEtag}}},
})
err = a.installPackage(context.TODO(), pkg, &now)
_, err = a.fetchPackage(ctx, pkg)
require.NoErrorf(t, err, "unable to install pkg")
// check that the package file is in place
_, err = os.Stat(cacheApkFile)
Expand Down Expand Up @@ -384,7 +382,7 @@ func TestInstallPkg(t *testing.T) {
// use a different root, so we get a different file
Transport: &testLocalTransport{root: testAlternatePkgDir, basenameOnly: true, headers: map[string][]string{http.CanonicalHeaderKey("etag"): {testEtag}}},
})
err = a.installPackage(context.TODO(), pkg, &now)
_, err = a.fetchPackage(ctx, pkg)
require.NoErrorf(t, err, "unable to install pkg")
// check that the package file is in place
_, err = os.Stat(cacheApkFile)
Expand Down Expand Up @@ -414,7 +412,7 @@ func TestInstallPkg(t *testing.T) {
// use a different root, so we get a different file
Transport: &testLocalTransport{root: testAlternatePkgDir, basenameOnly: true, headers: map[string][]string{http.CanonicalHeaderKey("etag"): {testEtag + "abcdefg"}}},
})
err = a.installPackage(context.TODO(), pkg, &now)
_, err = a.fetchPackage(ctx, pkg)
require.NoErrorf(t, err, "unable to install pkg")
// check that the package file is in place
_, err = os.Stat(cacheApkFile)
Expand Down