Skip to content
This repository has been archived by the owner on Jun 5, 2024. It is now read-only.

Commit

Permalink
Fan-out fetch and expand for installPackage (#74)
Browse files Browse the repository at this point in the history
Signed-off-by: Jon Johnson <jon.johnson@chainguard.dev>
  • Loading branch information
jonjohnsonjr authored Jul 5, 2023
1 parent 61f0000 commit ebe7903
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 48 deletions.
107 changes: 74 additions & 33 deletions pkg/apk/implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net/url"
"os"
"path/filepath"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -478,19 +479,62 @@ func (a *APK) FixateWorld(ctx context.Context, sourceDateEpoch *time.Time) error
return fmt.Errorf("cannot install due to conflict with %s", pkg)
}
}
for _, pkg := range allpkgs {
isInstalled, err := a.isInstalledPackage(pkg.Name)
if err != nil {
return fmt.Errorf("error checking if package %s is installed: %w", pkg.Name, err)
}
if isInstalled {

// TODO: Consider making this configurable option.
jobs := runtime.GOMAXPROCS(0)

var g errgroup.Group
g.SetLimit(jobs)

expanded := make([]*APKExpanded, len(allpkgs))

// First we concurrently fetch and expand all our APKs.
for i, pkg := range allpkgs {
i, pkg := i, pkg

g.Go(func() error {
isInstalled, err := a.isInstalledPackage(pkg.Name)
if err != nil {
return fmt.Errorf("error checking if package %s is installed: %w", pkg.Name, err)
}
if isInstalled {
return nil
}

rc, err := a.fetchPackage(ctx, pkg)
if err != nil {
return fmt.Errorf("fetching package %q: %w", pkg.Name, err)
}
defer rc.Close()

exp, err := ExpandApk(ctx, rc)
if err != nil {
return err
}

expanded[i] = exp

return nil
})
}

if err := g.Wait(); err != nil {
return fmt.Errorf("expanding packages: %w", err)
}

// Then we sequentially install them.
for i, pkg := range allpkgs {
exp := expanded[i]
if exp == nil {
// As above, so below.
continue
}
// get the apk file
if err := a.installPackage(ctx, pkg, sourceDateEpoch); err != nil {

if err := a.installPackage(ctx, pkg, exp, sourceDateEpoch); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -574,42 +618,35 @@ func (a *APK) fetchAlpineKeys(ctx context.Context, alpineVersions []string) erro
return nil
}

// installPkg install a single package and update installed db.
//
func (a *APK) fetchPackage(ctx context.Context, pkg *repository.RepositoryPackage) (io.ReadCloser, error) {
a.logger.Debugf("fetching %s (%s)", pkg.Name, pkg.Version)

func (a *APK) installPackage(ctx context.Context, pkg *repository.RepositoryPackage, sourceDateEpoch *time.Time) error {
a.logger.Debugf("installing %s (%s)", pkg.Name, pkg.Version)

ctx, span := otel.Tracer("go-apk").Start(ctx, "installPackage", trace.WithAttributes(attribute.String("package", pkg.Name)))
ctx, span := otel.Tracer("go-apk").Start(ctx, "fetchPackage", trace.WithAttributes(attribute.String("package", pkg.Name)))
defer span.End()

u := pkg.Url()

// Normalize the repo as a URI, so that local paths
// are translated into file:// URLs, allowing them to be parsed
// into a url.URL{}.
var (
r io.Reader
asURI uri.URI
)
var asURI uri.URI
if strings.HasPrefix(u, "https://") {
asURI, _ = uri.Parse(u)
} else {
asURI = uri.New(u)
}
asURL, err := url.Parse(string(asURI))
if err != nil {
return fmt.Errorf("failed to parse package as URI: %w", err)
return nil, fmt.Errorf("failed to parse package as URI: %w", err)
}

switch asURL.Scheme {
case "file":
f, err := os.Open(u)
if err != nil {
return fmt.Errorf("failed to read repository package apk %s: %w", u, err)
return nil, fmt.Errorf("failed to read repository package apk %s: %w", u, err)
}
defer f.Close()
r = f
return f, nil
case "https":
client := a.client
if client == nil {
Expand All @@ -620,27 +657,31 @@ func (a *APK) installPackage(ctx context.Context, pkg *repository.RepositoryPack
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return err
return nil, err
}
res, err := client.Do(req)
if err != nil {
return fmt.Errorf("unable to get package apk at %s: %w", u, err)
return nil, fmt.Errorf("unable to get package apk at %s: %w", u, err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("unable to get package apk at %s: %v", u, res.Status)
res.Body.Close()
return nil, fmt.Errorf("unable to get package apk at %s: %v", u, res.Status)
}
r = res.Body
return res.Body, nil
default:
return fmt.Errorf("repository scheme %s not supported", asURL.Scheme)
return nil, fmt.Errorf("repository scheme %s not supported", asURL.Scheme)
}
}

// installPackage installs a single package and updates installed db.
func (a *APK) installPackage(ctx context.Context, pkg *repository.RepositoryPackage, expanded *APKExpanded, sourceDateEpoch *time.Time) error {
a.logger.Debugf("installing %s (%s)", pkg.Name, pkg.Version)

ctx, span := otel.Tracer("go-apk").Start(ctx, "installPackage", trace.WithAttributes(attribute.String("package", pkg.Name)))
defer span.End()

// install the apk file
expanded, err := ExpandApk(ctx, r)
if err != nil {
return fmt.Errorf("unable to expand apk for package %s: %w", pkg.Name, err)
}
defer expanded.Close()

installedFiles, err := a.installAPKFiles(ctx, expanded.PackageData, pkg.Origin, pkg.Replaces)
if err != nil {
return fmt.Errorf("unable to install files for pkg %s: %w", pkg.Name, err)
Expand Down
28 changes: 13 additions & 15 deletions pkg/apk/implementation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"sort"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"gitlab.alpinelinux.org/alpine/go/pkg/repository"
Expand Down Expand Up @@ -60,7 +59,7 @@ func TestInitDB(t *testing.T) {
src := apkfs.NewMemFS()
apk, err := New(WithFS(src), WithIgnoreMknodErrors(ignoreMknodErrors))
require.NoError(t, err)
err = apk.InitDB(context.TODO())
err = apk.InitDB(context.Background())
require.NoError(t, err)
// check all of the contents
for _, d := range initDirectories {
Expand Down Expand Up @@ -166,7 +165,7 @@ func TestInitKeyring(t *testing.T) {
Transport: &testLocalTransport{root: testPrimaryPkgDir, basenameOnly: true},
})

require.NoError(t, a.InitKeyring(context.TODO(), keyfiles, nil))
require.NoError(t, a.InitKeyring(context.Background(), keyfiles, nil))
// InitKeyring should have copied the local key and remote key to the right place
fi, err := src.ReadDir(DefaultKeyRingPath)
// should be no error reading them
Expand All @@ -178,13 +177,13 @@ func TestInitKeyring(t *testing.T) {
keyfiles = []string{
"/liksdjlksdjlksjlksjdl",
}
require.Error(t, a.InitKeyring(context.TODO(), keyfiles, nil))
require.Error(t, a.InitKeyring(context.Background(), keyfiles, nil))

// Add an invalid url
keyfiles = []string{
"http://sldkjflskdjflklksdlksdlkjslk.net",
}
require.Error(t, a.InitKeyring(context.TODO(), keyfiles, nil))
require.Error(t, a.InitKeyring(context.Background(), keyfiles, nil))
}

func TestLoadSystemKeyring(t *testing.T) {
Expand Down Expand Up @@ -263,17 +262,16 @@ func TestLoadSystemKeyring(t *testing.T) {
}
}

func TestInstallPkg(t *testing.T) {
// func (a *APK) installPackage(pkg *repository.RepositoryPackage, sourceDateEpoch *time.Time) error {
func TestFetchPackage(t *testing.T) {
var (
now = time.Now()
repo = repository.Repository{Uri: fmt.Sprintf("%s/%s", testAlpineRepos, testArch)}
packages = []*repository.Package{&testPkg}
repoWithIndex = repo.WithIndex(&repository.ApkIndex{
Packages: packages,
})
testEtag = "testetag"
pkg = repository.NewRepositoryPackage(&testPkg, repoWithIndex)
ctx = context.Background()
)
var prepLayout = func(t *testing.T, cache string) *APK {
src := apkfs.NewMemFS()
Expand All @@ -286,7 +284,7 @@ func TestInstallPkg(t *testing.T) {
}
a, err := New(opts...)
require.NoError(t, err, "unable to create APK")
err = a.InitDB(context.TODO())
err = a.InitDB(ctx)
require.NoError(t, err)

// set a client so we use local testdata instead of heading out to the Internet each time
Expand All @@ -297,7 +295,7 @@ func TestInstallPkg(t *testing.T) {
a.SetClient(&http.Client{
Transport: &testLocalTransport{root: testPrimaryPkgDir, basenameOnly: true},
})
err := a.installPackage(context.TODO(), pkg, &now)
_, err := a.fetchPackage(ctx, pkg)
require.NoErrorf(t, err, "unable to install package")
})
t.Run("cache miss no network", func(t *testing.T) {
Expand All @@ -308,7 +306,7 @@ func TestInstallPkg(t *testing.T) {
a.SetClient(&http.Client{
Transport: &testLocalTransport{fail: true},
})
err := a.installPackage(context.TODO(), pkg, &now)
_, err := a.fetchPackage(ctx, pkg)
require.Error(t, err, "should fail when no cache and no network")
})
t.Run("cache miss network should fill cache", func(t *testing.T) {
Expand All @@ -324,7 +322,7 @@ func TestInstallPkg(t *testing.T) {
a.SetClient(&http.Client{
Transport: &testLocalTransport{root: testPrimaryPkgDir, basenameOnly: true},
})
err = a.installPackage(context.TODO(), pkg, &now)
_, err = a.fetchPackage(ctx, pkg)
require.NoErrorf(t, err, "unable to install pkg")
// check that the package file is in place
_, err = os.Stat(cacheApkFile)
Expand Down Expand Up @@ -354,7 +352,7 @@ func TestInstallPkg(t *testing.T) {
// use a different root, so we get a different file
Transport: &testLocalTransport{root: testAlternatePkgDir, basenameOnly: true, headers: map[string][]string{http.CanonicalHeaderKey("etag"): {testEtag}}},
})
err = a.installPackage(context.TODO(), pkg, &now)
_, err = a.fetchPackage(ctx, pkg)
require.NoErrorf(t, err, "unable to install pkg")
// check that the package file is in place
_, err = os.Stat(cacheApkFile)
Expand Down Expand Up @@ -384,7 +382,7 @@ func TestInstallPkg(t *testing.T) {
// use a different root, so we get a different file
Transport: &testLocalTransport{root: testAlternatePkgDir, basenameOnly: true, headers: map[string][]string{http.CanonicalHeaderKey("etag"): {testEtag}}},
})
err = a.installPackage(context.TODO(), pkg, &now)
_, err = a.fetchPackage(ctx, pkg)
require.NoErrorf(t, err, "unable to install pkg")
// check that the package file is in place
_, err = os.Stat(cacheApkFile)
Expand Down Expand Up @@ -414,7 +412,7 @@ func TestInstallPkg(t *testing.T) {
// use a different root, so we get a different file
Transport: &testLocalTransport{root: testAlternatePkgDir, basenameOnly: true, headers: map[string][]string{http.CanonicalHeaderKey("etag"): {testEtag + "abcdefg"}}},
})
err = a.installPackage(context.TODO(), pkg, &now)
_, err = a.fetchPackage(ctx, pkg)
require.NoErrorf(t, err, "unable to install pkg")
// check that the package file is in place
_, err = os.Stat(cacheApkFile)
Expand Down

0 comments on commit ebe7903

Please sign in to comment.