diff --git a/cmd/porter/installations.go b/cmd/porter/installations.go index 914af86438..eb322f031b 100644 --- a/cmd/porter/installations.go +++ b/cmd/porter/installations.go @@ -139,6 +139,9 @@ You can use the show command to create the initial file: "Force the bundle to be executed when no changes are detected.") f.BoolVar(&opts.DryRun, "dry-run", false, "Evaluate if the bundle would be executed based on the changes in the file.") + f.StringVarP(&opts.RawFormat, "output", "o", "plaintext", + "Specify an output format. Allowed values: plaintext, json, yaml") + return &cmd } diff --git a/docs/content/cli/installations_apply.md b/docs/content/cli/installations_apply.md index 8ef9bea1c9..e883380d78 100644 --- a/docs/content/cli/installations_apply.md +++ b/docs/content/cli/installations_apply.md @@ -37,6 +37,7 @@ porter installations apply FILE [flags] --force Force the bundle to be executed when no changes are detected. -h, --help help for apply -n, --namespace string Namespace in which the installation is defined. Defaults to the namespace defined in the file. + -o, --output string Specify an output format. Allowed values: plaintext, json, yaml (default "plaintext") ``` ### Options inherited from parent commands diff --git a/go.mod b/go.mod index 0acbc47514..73409aedbf 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,7 @@ require ( github.com/spf13/viper v1.10.0 github.com/stretchr/testify v1.8.1 github.com/xeipuuv/gojsonschema v1.2.0 + github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 go.mongodb.org/mongo-driver v1.7.1 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.4 go.opentelemetry.io/otel v1.11.1 diff --git a/go.sum b/go.sum index a1383ca282..abd63ab3e8 100644 --- a/go.sum +++ b/go.sum @@ -1616,6 +1616,8 @@ github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8/go.mod h1:HUYIGzjTL3rfEspMx github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 h1:7v7L5lsfw4w8iqBBXETukHo4IPltmD+mWoLRYUmeGN8= +github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869/go.mod h1:Rfzr+sqaDreiCaoQbFCu3sTXxeFq/9kXRuyOoSlGQHE= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/pkg/cnab/dependencies/v2/bundle_graph.go b/pkg/cnab/dependencies/v2/bundle_graph.go new file mode 100644 index 0000000000..de5a4b00df --- /dev/null +++ b/pkg/cnab/dependencies/v2/bundle_graph.go @@ -0,0 +1,129 @@ +package v2 + +import ( + "sort" + + "get.porter.sh/porter/pkg/cnab" + depsv2ext "get.porter.sh/porter/pkg/cnab/extensions/dependencies/v2" + "github.com/yourbasic/graph" +) + +// BundleGraph is a directed acyclic graph of a bundle and its dependencies +// (which may be other bundles, or installations) It is used to resolve the +// dependency order in which the bundles must be executed. +type BundleGraph struct { + // nodeKeys is a map from the node key to its index in nodes + nodeKeys map[string]int + nodes []Node +} + +func NewBundleGraph() *BundleGraph { + return &BundleGraph{ + nodeKeys: make(map[string]int), + } +} + +// RegisterNode adds the specified node to the graph +// returning true if the node is already present. +func (g *BundleGraph) RegisterNode(node Node) bool { + _, exists := g.nodeKeys[node.GetKey()] + if !exists { + nodeIndex := len(g.nodes) + g.nodes = append(g.nodes, node) + g.nodeKeys[node.GetKey()] = nodeIndex + } + return exists +} + +func (g *BundleGraph) Sort() ([]Node, bool) { + dag := graph.New(len(g.nodes)) + for nodeIndex, node := range g.nodes { + for _, depKey := range node.GetRequires() { + depIndex, ok := g.nodeKeys[depKey] + if !ok { + panic("oops") + } + dag.Add(nodeIndex, depIndex) + } + } + + indices, ok := graph.TopSort(dag) + if !ok { + return nil, false + } + + // Reverse the sort so that items with no dependencies are listed first + count := len(indices) + results := make([]Node, count) + for i, nodeIndex := range indices { + results[count-i-1] = g.nodes[nodeIndex] + } + return results, true +} + +func (g *BundleGraph) GetNode(key string) (Node, bool) { + if nodeIndex, ok := g.nodeKeys[key]; ok { + return g.nodes[nodeIndex], true + } + return nil, false +} + +// Node in a BundleGraph. +type Node interface { + GetRequires() []string + GetKey() string +} + +var _ Node = BundleNode{} +var _ Node = InstallationNode{} + +// BundleNode is a Node in a BundleGraph that represents a dependency on a bundle +// that has not yet been installed. +type BundleNode struct { + Key string + ParentKey string + Reference cnab.BundleReference + Requires []string + + // TODO(PEP003): DO we need to store this? Can we do it somewhere else or hold a reference to the dep and add more to the Node interface? + Credentials map[string]depsv2ext.DependencySource + Parameters map[string]depsv2ext.DependencySource +} + +func (d BundleNode) GetKey() string { + return d.Key +} + +func (d BundleNode) GetParentKey() string { + return d.ParentKey +} + +func (d BundleNode) GetRequires() []string { + sort.Strings(d.Requires) + return d.Requires +} + +func (d BundleNode) IsRoot() bool { + return d.Key == "root" +} + +// InstallationNode is a Node in a BundleGraph that represents a dependency on an +// installed bundle (installation). +type InstallationNode struct { + Key string + ParentKey string + Namespace string + Name string +} + +func (d InstallationNode) GetKey() string { + return d.Key +} + +func (d InstallationNode) GetParentKey() string { + return d.ParentKey +} + +func (d InstallationNode) GetRequires() []string { + return nil +} diff --git a/pkg/cnab/dependencies/v2/bundle_graph_test.go b/pkg/cnab/dependencies/v2/bundle_graph_test.go new file mode 100644 index 0000000000..a61278ed1f --- /dev/null +++ b/pkg/cnab/dependencies/v2/bundle_graph_test.go @@ -0,0 +1,70 @@ +package v2 + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/stretchr/testify/require" +) + +func TestEngine_DependOnInstallation(t *testing.T) { + /* + A -> B (installation) + A -> C (bundle) + c.parameters.connstr <- B.outputs.connstr + */ + + b := InstallationNode{Key: "b"} + c := BundleNode{ + Key: "c", + Requires: []string{"b"}, + } + a := BundleNode{ + Key: "root", + Requires: []string{"b", "c"}, + } + + g := NewBundleGraph() + g.RegisterNode(a) + g.RegisterNode(b) + g.RegisterNode(c) + sortedNodes, ok := g.Sort() + require.True(t, ok, "graph should not be cyclic") + + gotOrder := make([]string, len(sortedNodes)) + for i, node := range sortedNodes { + gotOrder[i] = node.GetKey() + } + wantOrder := []string{ + "b", + "c", + "root", + } + assert.Equal(t, wantOrder, gotOrder) +} + +/* +✅ need to represent new dependency structure on an extended bundle wrapper +(put in cnab-go later) + +need to read a bundle and make a BundleGraph +? how to handle a param that isn't a pure assignment, e.g. connstr: ${bundle.deps.VM.outputs.ip}:${bundle.deps.SVC.outputs.port} +? when are templates evaluated as the graph is executed (for simplicity, first draft no composition / templating) + +need to resolve dependencies in the graph +* lookup against existing installations +* lookup against semver tags in registry +* lookup against bundle index? when would we look here? (i.e. preferred/registered implementations of interfaces) + +need to turn the sorted nodes into an execution plan +execution plan needs: +* bundle to execute and the installation it will become +* parameters and credentials to pass + * sources: + root parameters/creds + installation outputs + +need to write something that can run an execution plan +* knows how to grab sources and pass them into the bundle +*/ diff --git a/pkg/cnab/dependencies/v2/bundle_puller.go b/pkg/cnab/dependencies/v2/bundle_puller.go new file mode 100644 index 0000000000..e263033356 --- /dev/null +++ b/pkg/cnab/dependencies/v2/bundle_puller.go @@ -0,0 +1,17 @@ +package v2 + +import ( + "context" + + "get.porter.sh/porter/pkg/cache" + "get.porter.sh/porter/pkg/cnab" +) + +// BundlePuller can query and pull bundles. +type BundlePuller interface { + // GetBundle retrieves a bundle definition. + GetBundle(ctx context.Context, ref cnab.OCIReference) (cache.CachedBundle, error) + + // ListTags retrieves all tags defined for a bundle. + ListTags(ctx context.Context, ref cnab.OCIReference) ([]string, error) +} diff --git a/pkg/cnab/dependencies/v2/composite_resolver.go b/pkg/cnab/dependencies/v2/composite_resolver.go new file mode 100644 index 0000000000..623a7ebd9a --- /dev/null +++ b/pkg/cnab/dependencies/v2/composite_resolver.go @@ -0,0 +1,200 @@ +package v2 + +import ( + "context" + "fmt" + + "get.porter.sh/porter/pkg/cnab" + depsv2ext "get.porter.sh/porter/pkg/cnab/extensions/dependencies/v2" + "get.porter.sh/porter/pkg/storage" + "github.com/Masterminds/semver/v3" +) + +var _ DependencyResolver = CompositeResolver{} +var _ BundleGraphResolver = CompositeResolver{} + +// CompositeResolver combines multiple resolution strategies into a single +// resolver that applies each strategy in the proper order to resolve a +// Dependency to an action in an ExecutionPlan. +type CompositeResolver struct { + namespace string + resolvers []DependencyResolver +} + +func NewCompositeResolver(namespace string, puller BundlePuller, store storage.InstallationProvider) CompositeResolver { + instResolver := InstallationResolver{ + store: store, + namespace: namespace, + } + versionResolver := VersionResolver{ + puller: puller, + } + return CompositeResolver{ + namespace: namespace, + resolvers: []DependencyResolver{ + instResolver, + versionResolver, + DefaultBundleResolver{puller: puller}, + }, + } +} + +func (r CompositeResolver) ResolveDependency(ctx context.Context, dep Dependency) (Node, bool, error) { + // pull the default bundle if set, and verify that it meets the interface. It's a problem if it doesn't + // We should stop early if it doesn't work because most likely the interface is defined incorrectly + // We can check at build time that the bundle will work with all the defaults + // don't do this at runtime, assume the bundle has been checked + + // build an interface + // config setting to reuse existing installations + + for _, resolver := range r.resolvers { + depNode, resolved, err := resolver.ResolveDependency(ctx, dep) + if err != nil { + return nil, false, err + } + if resolved { + return depNode, true, nil + } + } + + return nil, false, nil +} + +func (r CompositeResolver) ResolveDependencyGraph(ctx context.Context, bun cnab.ExtendedBundle) (*BundleGraph, error) { + g := NewBundleGraph() + + // Add the root bundle + root := BundleNode{ + Key: "root", + Reference: cnab.BundleReference{Definition: bun}, + } + + err := r.addBundleToGraph(ctx, g, root) + return g, err +} + +func (r CompositeResolver) addBundleToGraph(ctx context.Context, g *BundleGraph, node BundleNode) error { + if _, exists := g.GetNode(node.Key); exists { + // We have already processed this bundle, return to avoid an infinite loop + return nil + } + + // Process dependencies, if it has any + bun := node.Reference.Definition + if !bun.HasDependenciesV2() { + // No deps so let's move on + g.RegisterNode(node) + return nil + } + + deps, err := bun.ReadDependenciesV2() + if err != nil { + return err + } + + node.Requires = make([]string, 0, len(deps.Requires)) + for _, dep := range deps.Requires { + // Resolve the dependency + resolved, err := r.resolveDependency(ctx, node.Key, dep) + if err != nil { + return err + } + + // Update the node to track its dependencies + node.Requires = append(node.Requires, resolved.GetKey()) + + // + // Add the dependency to the graph + // + depNode, ok := resolved.(BundleNode) + if !ok { + // installations don't have any dependencies so there's nothing left to do + g.RegisterNode(resolved) + continue + } + + // Make connections between the dependency and any outputs of other dependencies that it requires + requireOutput := func(source depsv2ext.DependencySource) { + if source.Output == "" { + return + } + + outputRequires := node.Key + if source.Dependency != "" { + // PEP(003): How do we ensure that these keys are unique in deep graphs where root + current dep key is unique? + outputRequires = MakeDependencyKey(node.Key, source.Dependency) + } + depNode.Requires = append(depNode.Requires, outputRequires) + } + for _, source := range dep.Parameters { + requireOutput(source) + } + for _, source := range dep.Credentials { + requireOutput(source) + } + r.addBundleToGraph(ctx, g, depNode) + } + + g.RegisterNode(node) + return nil +} + +func (r CompositeResolver) resolveDependency(ctx context.Context, parentKey string, dep depsv2ext.Dependency) (Node, error) { + unresolved := Dependency{ + ParentKey: parentKey, + Key: MakeDependencyKey(parentKey, dep.Name), + Parameters: dep.Parameters, + Credentials: dep.Credentials, + } + if dep.Bundle != "" { + ref, err := cnab.ParseOCIReference(dep.Bundle) + if err != nil { + return nil, fmt.Errorf("invalid bundle for dependency %s: %w", unresolved.Key, err) + } + unresolved.DefaultBundle = &BundleReferenceSelector{ + Reference: ref, + } + if dep.Version != "" { + unresolved.DefaultBundle.Version, err = semver.NewConstraint(dep.Version) + if err != nil { + return nil, err + } + } + } + + if dep.Interface != nil { + // TODO: convert the interface document into a BundleInterfaceSelector + } + + if dep.Installation != nil { + unresolved.InstallationSelector = &InstallationSelector{} + + matchNamespaces := make([]string, 0, 2) + if !dep.Installation.Criteria.IgnoreLabels { + unresolved.InstallationSelector.Labels = dep.Installation.Labels + } + + matchNamespaces = append(matchNamespaces, r.namespace) + if !dep.Installation.Criteria.MatchNamespace && r.namespace != "" { + // Include the global namespace + matchNamespaces = append(matchNamespaces, "") + } + unresolved.InstallationSelector.Namespaces = matchNamespaces + + if !dep.Installation.Criteria.MatchInterface { + unresolved.InstallationSelector.Bundle = unresolved.DefaultBundle + } + } + + depNode, resolved, err := r.ResolveDependency(ctx, unresolved) + if err != nil { + return nil, err + } + + if !resolved { + return nil, fmt.Errorf("could not resolve dependency %s", unresolved.Key) + } + + return depNode, nil +} diff --git a/pkg/cnab/dependencies/v2/composite_resolver_test.go b/pkg/cnab/dependencies/v2/composite_resolver_test.go new file mode 100644 index 0000000000..2b3f039989 --- /dev/null +++ b/pkg/cnab/dependencies/v2/composite_resolver_test.go @@ -0,0 +1,64 @@ +package v2 + +import ( + "context" + "testing" + + "get.porter.sh/porter/pkg/cnab" + configadapter "get.porter.sh/porter/pkg/cnab/config-adapter" + "get.porter.sh/porter/pkg/config" + "get.porter.sh/porter/pkg/experimental" + "get.porter.sh/porter/pkg/manifest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCompositeResolver_ResolveDependencyGraph(t *testing.T) { + c := config.NewTestConfig(t) + c.SetExperimentalFlags(experimental.FlagDependenciesV2) + c.TestContext.UseFilesystem() + ctx := context.Background() + + // load our test porter.yaml into a cnab bundle + m, err := manifest.ReadManifest(c.Context, "testdata/porter.yaml") + require.NoError(t, err) + converter := configadapter.NewManifestConverter(c.Config, m, nil, nil) + bun, err := converter.ToBundle(ctx) + require.NoError(t, err) + + r := CompositeResolver{ + resolvers: []DependencyResolver{TestResolver{ + Mocks: map[string]Node{ + "root/load-balancer": InstallationNode{Key: "root/load-balancer"}, + "root/mysql": BundleNode{Key: "root/mysql", Reference: cnab.BundleReference{Definition: cnab.ExtendedBundle{}}}, + }}}} + g, err := r.ResolveDependencyGraph(ctx, bun) + require.NoError(t, err) + + sortedNodes, ok := g.Sort() + require.True(t, ok, "graph should not have a cycle") + + gotOrder := make([]string, len(sortedNodes)) + for i, node := range sortedNodes { + gotOrder[i] = node.GetKey() + } + wantOrder := []string{ + "root/load-balancer", + "root/mysql", + "root", + } + assert.Equal(t, wantOrder, gotOrder) + + // Check the dependencies of each node + rootNode, _ := g.GetNode("root") + require.IsType(t, BundleNode{}, rootNode, "expected the root node to be a bundle") + require.Equal(t, []string{"root/load-balancer", "root/mysql"}, rootNode.GetRequires(), "expected the root bundle to depend on the load balancer and mysql") + + mysqlNode, _ := g.GetNode("root/mysql") + require.IsType(t, BundleNode{}, mysqlNode, "expected the mysql node to be a bundle") + require.Equal(t, []string{"root/load-balancer"}, mysqlNode.GetRequires(), "expected mysql to depend only on the load balancer") + + loadBalancerNode, _ := g.GetNode("root/load-balancer") + require.IsType(t, InstallationNode{}, loadBalancerNode, "expected the load balancer node to be an installation") + require.Empty(t, loadBalancerNode.GetRequires(), "expected the load balancer to have no dependencies") +} diff --git a/pkg/cnab/dependencies/v2/default_bundle_resolver.go b/pkg/cnab/dependencies/v2/default_bundle_resolver.go new file mode 100644 index 0000000000..28e542aca9 --- /dev/null +++ b/pkg/cnab/dependencies/v2/default_bundle_resolver.go @@ -0,0 +1,33 @@ +package v2 + +import ( + "context" +) + +var _ DependencyResolver = DefaultBundleResolver{} + +// DefaultBundleResolver resolves the default bundle defined on the dependency. +type DefaultBundleResolver struct { + puller BundlePuller +} + +func (d DefaultBundleResolver) ResolveDependency(ctx context.Context, dep Dependency) (Node, bool, error) { + if dep.DefaultBundle == nil { + return nil, false, nil + } + + cb, err := d.puller.GetBundle(ctx, dep.DefaultBundle.Reference) + if err != nil { + // wrap not found error and indicate that we could resolve anything + return nil, false, err + } + + return BundleNode{ + Key: dep.Key, + ParentKey: dep.ParentKey, + Reference: cb.BundleReference, + // TODO(PEP003): Do we have to duplicate this? Can't we get it from the bundle def when we need it? + Parameters: dep.Parameters, + Credentials: dep.Credentials, + }, true, nil +} diff --git a/pkg/cnab/dependencies/v2/dependency.go b/pkg/cnab/dependencies/v2/dependency.go new file mode 100644 index 0000000000..32f7eeca60 --- /dev/null +++ b/pkg/cnab/dependencies/v2/dependency.go @@ -0,0 +1,174 @@ +package v2 + +import ( + "context" + "fmt" + + depsv2ext "get.porter.sh/porter/pkg/cnab/extensions/dependencies/v2" + + "get.porter.sh/porter/pkg/cnab" + "get.porter.sh/porter/pkg/storage" + "get.porter.sh/porter/pkg/tracing" + "github.com/Masterminds/semver/v3" + "github.com/cnabio/cnab-go/bundle" + "go.opentelemetry.io/otel/attribute" +) + +// Dependency is a fully hydrated representation of a bundle dependency with +// sufficient information to be resolved using a DependencyResolver to an action +// that can be used in an Execution Plan. +// TODO: can we come up with a better name, e.g. unresolve dependency, dependency selector, etc +type Dependency struct { + Key string + ParentKey string + DefaultBundle *BundleReferenceSelector + Interface *BundleInterfaceSelector + InstallationSelector *InstallationSelector + Requires []string + Parameters map[string]depsv2ext.DependencySource + Credentials map[string]depsv2ext.DependencySource +} + +// BundleReferenceSelector evaluates the bundle criteria of a Dependency. +type BundleReferenceSelector struct { + // Reference to a bundle, optionally including a default tag or digest. + Reference cnab.OCIReference + + // Version specifies the range of allowed versions that Porter may select from + // when resolving this bundle to a specific reference. When a version is not + // specified or cannot be resolved, the tag/digest specified on the Reference is + // used as a default. + Version *semver.Constraints +} + +// IsMatch determines if the specified installation satisfies a Dependency's bundle criteria. +func (s *BundleReferenceSelector) IsMatch(ctx context.Context, inst storage.Installation) bool { + log := tracing.LoggerFromContext(ctx) + log.Debug("Evaluating installation bundle definition") + + if inst.Status.BundleReference == "" { + log.Debug("Installation does not match because it does not have an associated bundle") + return false + } + + ref, err := cnab.ParseOCIReference(inst.Status.BundleReference) + if err != nil { + log.Warn("Could not evaluate installation because the BundleReference is invalid", + attribute.String("reference", inst.Status.BundleReference)) + return false + } + + // If no selector is defined, consider it a match + if s == nil { + return true + } + + // If a version range is specified, ignore the version on the selector and apply the range + // otherwise match the tag or digest + if s.Version != nil { + if inst.Status.BundleVersion == "" { + log.Debug("Installation does not match because it does not have an associated bundle version") + return false + } + + // First check that the repository is the same + gotRepo := ref.Repository() + wantRepo := s.Reference.Repository() + if gotRepo != wantRepo { + log.Warn("Installation does not match because the bundle repository is incorrect", + attribute.String("installation-bundle-repository", gotRepo), + attribute.String("dependency-bundle-repository", wantRepo), + ) + return false + } + + gotVersion, err := semver.NewVersion(inst.Status.BundleVersion) + if err != nil { + log.Warn("Installation does not match because the bundle version is invalid", + attribute.String("installation-bundle-version", inst.Status.BundleVersion), + ) + return false + } + + if s.Version.Check(gotVersion) { + log.Debug("Installation matches because the bundle version is in range", + attribute.String("installation-bundle-version", inst.Status.BundleVersion), + attribute.String("dependency-bundle-version", s.Version.String()), + ) + return true + } else { + log.Debug("Installation does not match because the bundle version is incorrect", + attribute.String("installation-bundle-version", inst.Status.BundleVersion), + attribute.String("dependency-bundle-version", s.Version.String()), + ) + return false + } + } else { + gotRef := ref.String() + wantRef := s.Reference.String() + if gotRef == wantRef { + log.Warn("Installation matches because the bundle reference is correct", + attribute.String("installation-bundle-reference", gotRef), + attribute.String("dependency-bundle-reference", wantRef), + ) + return true + } else { + log.Warn("Installation does not match because the bundle reference is incorrect", + attribute.String("installation-bundle-reference", gotRef), + attribute.String("dependency-bundle-reference", wantRef), + ) + return false + } + } +} + +// InstallationSelector evaluates the installation criteria of a Dependency. +type InstallationSelector struct { + // Bundle is the criteria used for evaluating if a bundle satisfies a Dependency. + Bundle *BundleReferenceSelector + + // Interface is the criteria used for evaluating if an installation or bundle + // satisfies a Dependency. + Interface *BundleInterfaceSelector + + // Labels is the set of labels used to find an existing installation that may be + // used to satisfy a Dependency. + Labels map[string]string + + // Namespaces is the set of namespaces used when searching for an existing + // installation that may be used to satisfy a Dependency. + Namespaces []string +} + +// IsMatch determines if the specified installation satisfies a Dependency's installation criteria. +func (s InstallationSelector) IsMatch(ctx context.Context, inst storage.Installation) bool { + // Skip checking labels and namespaces, those were used to query the set of + // installations that we are checking + + bundleMatches := s.Bundle.IsMatch(ctx, inst) + if !bundleMatches { + return false + } + + interfaceMatches := s.Interface.IsMatch(ctx, inst) + return interfaceMatches +} + +// BundleInterfaceSelector defines how a bundle is going to be used. +// It is not the same as the bundle definition. +// It works like go interfaces where its defined by its consumer. +type BundleInterfaceSelector struct { + Parameters []bundle.Parameter + Credentials []bundle.Credential + Outputs []bundle.Output +} + +// IsMatch determines if the specified installation satisfies a Dependency's bundle interface criteria. +func (s BundleInterfaceSelector) IsMatch(ctx context.Context, inst storage.Installation) bool { + // TODO: implement + return true +} + +func MakeDependencyKey(parent string, dep string) string { + return fmt.Sprintf("%s/%s", parent, dep) +} diff --git a/pkg/cnab/dependencies/v2/dependency_resolver.go b/pkg/cnab/dependencies/v2/dependency_resolver.go new file mode 100644 index 0000000000..b49d2bc967 --- /dev/null +++ b/pkg/cnab/dependencies/v2/dependency_resolver.go @@ -0,0 +1,17 @@ +package v2 + +import ( + "context" + + "get.porter.sh/porter/pkg/cnab" +) + +// DependencyResolver is an interface for various strategies of resolving a +// Dependency to an action in an ExecutionPlan. +type DependencyResolver interface { + ResolveDependency(ctx context.Context, dep Dependency) (Node, bool, error) +} + +type BundleGraphResolver interface { + ResolveDependencyGraph(ctx context.Context, bun cnab.ExtendedBundle) (*BundleGraph, error) +} diff --git a/pkg/cnab/dependencies/v2/doc.go b/pkg/cnab/dependencies/v2/doc.go new file mode 100644 index 0000000000..fe08036eef --- /dev/null +++ b/pkg/cnab/dependencies/v2/doc.go @@ -0,0 +1,3 @@ +// Package v2 contains the implementation of the v2 Porter Dependency +// specification, sh.porter.dependencies.v2. +package v2 diff --git a/pkg/cnab/dependencies/v2/helpers.go b/pkg/cnab/dependencies/v2/helpers.go new file mode 100644 index 0000000000..4ad34e6bec --- /dev/null +++ b/pkg/cnab/dependencies/v2/helpers.go @@ -0,0 +1,41 @@ +package v2 + +import ( + "context" + "fmt" + + "get.porter.sh/porter/pkg/cnab" +) + +var _ DependencyResolver = TestResolver{} +var _ BundleGraphResolver = TestResolver{} + +// TODO(PEP003): I think we should remove this and just mock the underlying stores used to resolve, e.g. existing installations, or registry queries. +// Otherwise we also have to handle copying values from the dep to the mocked node, or mocking it too +type TestResolver struct { + Namespace string + Mocks map[string]Node +} + +func (t TestResolver) ResolveDependency(ctx context.Context, dep Dependency) (Node, bool, error) { + node, ok := t.Mocks[dep.Key] + if ok { + if bunNode, ok := node.(BundleNode); ok { + bunNode.Parameters = dep.Parameters + bunNode.Credentials = dep.Credentials + node = bunNode + } + + return node, true, nil + } + + return nil, false, fmt.Errorf("no mock exists for %s", dep.Key) +} + +func (t TestResolver) ResolveDependencyGraph(ctx context.Context, bun cnab.ExtendedBundle) (*BundleGraph, error) { + r := CompositeResolver{ + resolvers: []DependencyResolver{t}, + namespace: t.Namespace, + } + return r.ResolveDependencyGraph(ctx, bun) +} diff --git a/pkg/cnab/dependencies/v2/installation_resolver.go b/pkg/cnab/dependencies/v2/installation_resolver.go new file mode 100644 index 0000000000..d8fbef473a --- /dev/null +++ b/pkg/cnab/dependencies/v2/installation_resolver.go @@ -0,0 +1,129 @@ +package v2 + +import ( + "context" + + "get.porter.sh/porter/pkg/cnab" + "get.porter.sh/porter/pkg/storage" + "go.mongodb.org/mongo-driver/bson" +) + +var _ DependencyResolver = InstallationResolver{} + +// InstallationResolver resolves an existing installation from a dependency +type InstallationResolver struct { + store storage.InstallationProvider + + // Namespace of the root installation + namespace string +} + +// Resolve attempts to identify an existing installation that satisfies the +// specified Dependency. +// +// Returns the matching installation (if found), whether +// a matching installation was found, and an error if applicable. +func (r InstallationResolver) ResolveDependency(ctx context.Context, dep Dependency) (Node, bool, error) { + if dep.InstallationSelector == nil { + return nil, false, nil + } + + // Build a query for matching installations + filter := make(bson.M, 1) + + // Match installations with one of the specified namespaces + namespacesQuery := make([]bson.M, 2) + for _, ns := range dep.InstallationSelector.Namespaces { + namespacesQuery = append(namespacesQuery, bson.M{"namespace": ns}) + } + filter["$or"] = namespacesQuery + + // Match all specified labels + for k, v := range dep.InstallationSelector.Labels { + filter["labels."+k] = v + } + + findOpts := storage.FindOptions{ + Sort: []string{"-namespace", "name"}, + Filter: filter, + } + installations, err := r.store.FindInstallations(ctx, findOpts) + if err != nil { + return nil, false, err + } + + // map[installation index]isMatchBool + matches := make(map[int]bool) + for i, inst := range installations { + if dep.InstallationSelector.IsMatch(ctx, inst) { + matches[i] = true + } + } + + switch len(matches) { + case 0: + return nil, false, nil + case 1: + var instIndex int + for i := range matches { + instIndex = i + } + inst := installations[instIndex] + match := &InstallationNode{ + Key: dep.Key, + Namespace: inst.Namespace, + Name: inst.Name, + } + return match, true, nil + default: + var preferredMatch *storage.Installation + // Prefer an installation that is the same as the default bundle if there are multiple interface matches + if dep.DefaultBundle != nil { + for i, isCandidate := range matches { + if !isCandidate { + continue + } + + inst := installations[i] + bundleRef, err := cnab.ParseOCIReference(inst.Status.BundleReference) + if err != nil { + matches[i] = false + continue + } + + if dep.DefaultBundle.Reference.Repository() == bundleRef.Repository() { + preferredMatch = &inst + break + } + + } + } + + // Prefer an installation in the same namespace if there is both a global and local installation + if preferredMatch != nil && preferredMatch.Namespace == r.namespace { + match := &InstallationNode{ + Key: dep.Key, + Namespace: preferredMatch.Namespace, + Name: preferredMatch.Name, + } + return match, true, nil + } + + // Just pick the first installation sorted by -namespace, name (i.e. global last) + for i, isCandidate := range matches { + if !isCandidate { + continue + } + + inst := installations[i] + match := &InstallationNode{ + Key: dep.Key, + Namespace: inst.Namespace, + Name: inst.Name, + } + return match, true, nil + } + + return nil, false, nil + } +} diff --git a/pkg/cnab/dependencies/v2/testdata/porter.yaml b/pkg/cnab/dependencies/v2/testdata/porter.yaml new file mode 100644 index 0000000000..c62be8df5e --- /dev/null +++ b/pkg/cnab/dependencies/v2/testdata/porter.yaml @@ -0,0 +1,27 @@ +parameters: + - name: region + type: string + +credentials: + - name: kubeconfig + type: file + +outputs: + - name: connstr + type: string + source: bundle.dependencies.mysql.output.admin-connstr + +dependencies: + requires: + - name: load-balancer + bundle: + reference: example/load-balancer:v1.0.0 + parameters: + region: bundle.parameters.region + - name: mysql + bundle: + reference: example/mysql:v1.0.0 + parameters: + ip: bundle.dependencies.load-balancer.outputs.ipAddress + credentials: + kubeconfig: bundle.credentials.kubeconfig diff --git a/pkg/cnab/dependencies/v2/version_resolver.go b/pkg/cnab/dependencies/v2/version_resolver.go new file mode 100644 index 0000000000..7b0a431d43 --- /dev/null +++ b/pkg/cnab/dependencies/v2/version_resolver.go @@ -0,0 +1,60 @@ +package v2 + +import ( + "context" + "sort" + + "get.porter.sh/porter/pkg/cnab" + "github.com/Masterminds/semver/v3" +) + +var _ DependencyResolver = VersionResolver{} + +// VersionResolver resolves the highest version of the default bundle reference of a Dependency. +type VersionResolver struct { + puller BundlePuller +} + +// Resolve attempts to find the highest available version of the default bundle for the specified Dependency. +// +// Returns the resolved bundle reference, whether a match was found, and an error if applicable. +func (v VersionResolver) ResolveDependency(ctx context.Context, dep Dependency) (Node, bool, error) { + bundle := dep.DefaultBundle + if bundle == nil || bundle.Version == nil { + return nil, false, nil + } + + tags, err := v.puller.ListTags(ctx, bundle.Reference) + if err != nil { + return nil, false, err + } + + versions := make(semver.Collection, 0, len(tags)) + for _, tag := range tags { + version, err := semver.NewVersion(tag) + if err == nil { + versions = append(versions, version) + } + } + + if len(versions) == 0 { + return nil, false, nil + } + + sort.Sort(sort.Reverse(versions)) + + // TODO: return the first one that matches the bundle interface + versionRef, err := bundle.Reference.WithTag(versions[0].Original()) + if err != nil { + return nil, false, err + } + + bunRef := cnab.BundleReference{Reference: versionRef} + return BundleNode{ + Key: dep.Key, + ParentKey: dep.ParentKey, + Reference: bunRef, + Parameters: dep.Parameters, + Credentials: dep.Credentials, + }, true, nil +} diff --git a/pkg/cnab/dependencies_v1_test.go b/pkg/cnab/dependencies_v1_test.go index 60907aac4e..57e52254e3 100644 --- a/pkg/cnab/dependencies_v1_test.go +++ b/pkg/cnab/dependencies_v1_test.go @@ -1,7 +1,6 @@ package cnab import ( - "os" "testing" "github.com/cnabio/cnab-go/bundle" @@ -12,14 +11,8 @@ import ( func TestReadDependencyV1Properties(t *testing.T) { t.Parallel() - data, err := os.ReadFile("testdata/bundle.json") - require.NoError(t, err, "cannot read bundle file") - - b, err := bundle.Unmarshal(data) - require.NoError(t, err, "could not unmarshal the bundle") - - bun := ExtendedBundle{*b} - assert.True(t, bun.HasDependenciesV1()) + bun := ReadTestBundle(t, "testdata/bundle.json") + require.True(t, bun.HasDependenciesV1()) deps, err := bun.ReadDependenciesV1() require.NoError(t, err, "ReadDependenciesV1 failed") @@ -27,12 +20,12 @@ func TestReadDependencyV1Properties(t *testing.T) { assert.Len(t, deps.Requires, 2, "Dependencies.Requires is the wrong length") dep := deps.Requires["storage"] - assert.NotNil(t, dep, "expected Dependencies.Requires to have an entry for 'storage'") + require.NotNil(t, dep, "expected Dependencies.Requires to have an entry for 'storage'") assert.Equal(t, "somecloud/blob-storage", dep.Bundle, "Dependency.Bundle is incorrect") assert.Nil(t, dep.Version, "Dependency.Version should be nil") dep = deps.Requires["mysql"] - assert.NotNil(t, dep, "expected Dependencies.Requires to have an entry for 'mysql'") + require.NotNil(t, dep, "expected Dependencies.Requires to have an entry for 'mysql'") assert.Equal(t, "somecloud/mysql", dep.Bundle, "Dependency.Bundle is incorrect") assert.True(t, dep.Version.AllowPrereleases, "Dependency.Bundle.Version.AllowPrereleases should be true") assert.Equal(t, []string{"5.7.x"}, dep.Version.Ranges, "Dependency.Bundle.Version.Ranges is incorrect") diff --git a/pkg/cnab/dependencies_v2_test.go b/pkg/cnab/dependencies_v2_test.go index ad0a648f71..1c64b9e73e 100644 --- a/pkg/cnab/dependencies_v2_test.go +++ b/pkg/cnab/dependencies_v2_test.go @@ -1,7 +1,6 @@ package cnab import ( - "os" "testing" "github.com/cnabio/cnab-go/bundle" @@ -12,28 +11,22 @@ import ( func TestReadDependencyV2Properties(t *testing.T) { t.Parallel() - data, err := os.ReadFile("testdata/bundle-depsv2.json") - require.NoError(t, err, "cannot read bundle file") - - b, err := bundle.Unmarshal(data) - require.NoError(t, err, "could not unmarshal the bundle") - - bun := ExtendedBundle{*b} - assert.True(t, bun.HasDependenciesV2()) + bun := ReadTestBundle(t, "testdata/bundle-depsv2.json") + require.True(t, bun.HasDependenciesV2()) deps, err := bun.ReadDependenciesV2() require.NoError(t, err) - assert.NotNil(t, deps, "DependenciesV2 was not populated") + require.NotNil(t, deps, "DependenciesV2 was not populated") assert.Len(t, deps.Requires, 2, "DependenciesV2.Requires is the wrong length") dep := deps.Requires["storage"] - assert.NotNil(t, dep, "expected DependenciesV2.Requires to have an entry for 'storage'") + require.NotNil(t, dep, "expected DependenciesV2.Requires to have an entry for 'storage'") assert.Equal(t, "somecloud/blob-storage", dep.Bundle, "DependencyV2.Bundle is incorrect") assert.Empty(t, dep.Version, "DependencyV2.Version should be nil") dep = deps.Requires["mysql"] - assert.NotNil(t, dep, "expected DependenciesV2.Requires to have an entry for 'mysql'") + require.NotNil(t, dep, "expected DependenciesV2.Requires to have an entry for 'mysql'") assert.Equal(t, "somecloud/mysql", dep.Bundle, "DependencyV2.Bundle is incorrect") assert.Equal(t, "5.7.x", dep.Version, "DependencyV2.Bundle.Version is incorrect") diff --git a/pkg/cnab/extensions/dependencies/v2/types.go b/pkg/cnab/extensions/dependencies/v2/types.go index 7dee2d5e7d..fe2d0af96a 100644 --- a/pkg/cnab/extensions/dependencies/v2/types.go +++ b/pkg/cnab/extensions/dependencies/v2/types.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "regexp" + + "get.porter.sh/porter/pkg/secrets" ) // Dependencies describes the set of custom extension metadata associated with the dependencies spec @@ -63,13 +65,14 @@ type DependencySource struct { Output string `json:"output,omitempty" mapstructure:"output,omitempty"` } +var dependencySourceWiringRegex = regexp.MustCompile(`bundle(\.dependencies\.([^.]+))?\.([^.]+)\.(.+)`) + // ParseDependencySource identifies the components specified in a wiring string. func ParseDependencySource(value string) (DependencySource, error) { // TODO(PEP003): At build time, check if a dependency source was defined with templating and error out // e.g. ${bundle.parameters.foo} should be bundle.parameters.foo - regex := regexp.MustCompile(`bundle(\.dependencies\.([^.]+))?\.([^.]+)\.(.+)`) - matches := regex.FindStringSubmatch(value) + matches := dependencySourceWiringRegex.FindStringSubmatch(value) // If it doesn't match our wiring syntax, assume that it is a hard coded value if matches == nil || len(matches) < 5 { @@ -97,9 +100,33 @@ func ParseDependencySource(value string) (DependencySource, error) { return result, nil } +func (s DependencySource) AsWorkflowStrategy(name string, parentJob string) secrets.Strategy { + strategy := secrets.Strategy{ + Name: name, + Source: secrets.Source{ + // bundle.dependencies.DEP.outputs.OUTPUT -> workflow.jobs.JOB.outputs.OUTPUT + // TODO(PEP003): Figure out if we need a job id, or if we can do okay with just a job key that we resolve to a run later + Value: s.AsWorkflowWiring(parentJob), + }, + } + + // TODO(PEP003): Are other strategies valid when talking about dependency wiring? Or can we only pass hard-coded values and data from a previous job? + if s.Value != "" { + strategy.Source.Key = "value" + } else { + strategy.Source.Key = "porter" + } + + return strategy +} + // AsBundleWiring is the wiring string representation in the bundle definition. // For example, bundle.parameters.PARAM or bundle.dependencies.DEP.outputs.OUTPUT func (s DependencySource) AsBundleWiring() string { + if s.Value != "" { + return s.Value + } + suffix := s.WiringSuffix() if s.Dependency != "" { return fmt.Sprintf("bundle.dependencies.%s.%s", s.Dependency, suffix) @@ -111,6 +138,10 @@ func (s DependencySource) AsBundleWiring() string { // AsWorkflowWiring is the wiring string representation in a workflow definition. // For example, workflow.jobs.JOB.outputs.OUTPUT func (s DependencySource) AsWorkflowWiring(jobID string) string { + if s.Value != "" { + return s.Value + } + return fmt.Sprintf("workflow.jobs.%s.%s", jobID, s.WiringSuffix()) } @@ -132,6 +163,47 @@ func (s DependencySource) WiringSuffix() string { return s.Value } +type WorkflowWiring struct { + WorkflowID string + JobKey string + Parameter string + Credential string + Output string +} + +var workflowWiringRegex = regexp.MustCompile(`workflow\.([^\.]+)\.jobs\.([^\.]+)\.([^\.]+)\.(.+)`) + +func ParseWorkflowWiring(value string) (WorkflowWiring, error) { + matches := workflowWiringRegex.FindStringSubmatch(value) + if len(matches) < 5 { + return WorkflowWiring{}, fmt.Errorf("invalid workflow wiring was passed to the porter strategy, %s", value) + } + + // the first group is the entire match, we don't care about it + workflowID := matches[1] + jobKey := matches[2] + dataType := matches[3] // e.g. parameters, credentials or outputs + dataKey := matches[4] // e.g. the name of the param/cred/output + + wiring := WorkflowWiring{ + WorkflowID: workflowID, + JobKey: jobKey, + } + + switch dataType { + case "parameters": + wiring.Parameter = dataKey + case "credentials": + wiring.Credential = dataKey + case "outputs": + wiring.Output = dataKey + default: + return WorkflowWiring{}, fmt.Errorf("invalid workflow wiring was passed to the porter strategy, %s", value) + } + + return wiring, nil +} + type DependencyInstallation struct { Labels map[string]string `json:"labels,omitempty" mapstructure:"labels,omitempty"` Criteria *InstallationCriteria `json:"criteria,omitempty" mapstructure:"criteria,omitempty"` diff --git a/pkg/cnab/extensions/dependencies/v2/types_test.go b/pkg/cnab/extensions/dependencies/v2/types_test.go index 7cee2ce1d4..65b97a72ab 100644 --- a/pkg/cnab/extensions/dependencies/v2/types_test.go +++ b/pkg/cnab/extensions/dependencies/v2/types_test.go @@ -10,6 +10,7 @@ import ( func TestDependencySource(t *testing.T) { t.Parallel() + jobKey := "1" testcases := []struct { name string bundleWiring string @@ -17,6 +18,14 @@ func TestDependencySource(t *testing.T) { wantWorkflowWiring string wantErr string }{ + { // Check that we can still pass hard-coded values in a workflow + name: "value", + bundleWiring: "11", + wantSource: DependencySource{ + Value: "11", + }, + wantWorkflowWiring: "11", + }, { name: "parameter", bundleWiring: "bundle.parameters.color", @@ -81,8 +90,71 @@ func TestDependencySource(t *testing.T) { require.Equal(t, tc.bundleWiring, gotBundleWiring, "incorrect bundle wiring was returned") // Check that we can convert to a workflow wiring form - gotWorkflowWiring := gotSource.AsWorkflowWiring("1") - require.Equal(t, tc.wantWorkflowWiring, gotWorkflowWiring, "incorrect workflow wiring was returned") + gotWorkflowWiringValue := gotSource.AsWorkflowWiring(jobKey) + require.Equal(t, tc.wantWorkflowWiring, gotWorkflowWiringValue, "incorrect workflow wiring string value was returned") + } else { + tests.RequireErrorContains(t, err, tc.wantErr) + } + }) + } +} + +func TestParseWorkflowWiring(t *testing.T) { + t.Parallel() + + testcases := []struct { + name string + wiringStr string + wantWorkflowWiring WorkflowWiring + wantErr string + }{ + { // Check that we can still pass hard-coded values in a workflow + name: "value not supported", + wiringStr: "11", + wantErr: "invalid workflow wiring", + }, + { + name: "parameter", + wiringStr: "workflow.abc123.jobs.myjerb.parameters.logLevel", + wantWorkflowWiring: WorkflowWiring{ + WorkflowID: "abc123", + JobKey: "myjerb", + Parameter: "logLevel", + }, + }, + { + name: "credential", + wiringStr: "workflow.myworkflow.jobs.root.credentials.kubeconfig", + wantWorkflowWiring: WorkflowWiring{ + WorkflowID: "myworkflow", + JobKey: "root", + Credential: "kubeconfig", + }, + }, + { + name: "output", + wiringStr: "workflow.abc123.jobs.mydb.outputs.connstr", + wantWorkflowWiring: WorkflowWiring{ + WorkflowID: "abc123", + JobKey: "mydb", + Output: "connstr", + }, + }, + { + name: "dependencies not allowed", + wiringStr: "workflow.abc123.jobs.root.dependencies.mydb.outputs.connstr", + wantErr: "invalid workflow wiring", + }, + } + + for _, tc := range testcases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + gotWiring, err := ParseWorkflowWiring(tc.wiringStr) + if tc.wantErr == "" { + require.Equal(t, tc.wantWorkflowWiring, gotWiring, "incorrect WorkflowWiring was parsed") } else { tests.RequireErrorContains(t, err, tc.wantErr) } diff --git a/pkg/cnab/helpers.go b/pkg/cnab/helpers.go index eca675ed03..6a67c95295 100644 --- a/pkg/cnab/helpers.go +++ b/pkg/cnab/helpers.go @@ -2,6 +2,7 @@ package cnab import ( "os" + "strconv" "testing" "github.com/cnabio/cnab-go/bundle" @@ -17,3 +18,15 @@ func ReadTestBundle(t *testing.T, path string) ExtendedBundle { return NewBundle(*bun) } + +// TestIDGenerator returns a sequential set of ids (default starting at 0) +// Used for predictable IDs for tests. +type TestIDGenerator struct { + NextID int +} + +func (g TestIDGenerator) NewID() string { + id := g.NextID + g.NextID++ + return strconv.Itoa(id) +} diff --git a/pkg/cnab/parameter_sources_test.go b/pkg/cnab/parameter_sources_test.go index 564b110170..8d705aa602 100644 --- a/pkg/cnab/parameter_sources_test.go +++ b/pkg/cnab/parameter_sources_test.go @@ -1,7 +1,6 @@ package cnab import ( - "os" "testing" "github.com/cnabio/cnab-go/bundle" @@ -55,13 +54,8 @@ func TestProcessedExtensions_GetParameterSourcesExtension(t *testing.T) { func TestReadParameterSourcesProperties(t *testing.T) { t.Parallel() - data, err := os.ReadFile("testdata/bundle.json") - require.NoError(t, err, "cannot read bundle file") - - b, err := bundle.Unmarshal(data) - require.NoError(t, err, "could not unmarshal the bundle") - bun := NewBundle(*b) - assert.True(t, bun.HasParameterSources()) + bun := ReadTestBundle(t, "testdata/bundle.json") + require.True(t, bun.HasParameterSources()) ps, err := bun.ReadParameterSources() require.NoError(t, err, "could not read parameter sources") diff --git a/pkg/cnab/ulid.go b/pkg/cnab/ulid.go new file mode 100644 index 0000000000..96b83615ed --- /dev/null +++ b/pkg/cnab/ulid.go @@ -0,0 +1,18 @@ +package cnab + +import ( + cnabclaims "github.com/cnabio/cnab-go/claim" +) + +// IDGenerator is a test friendly interface for swapping out how we generate IDs. +type IDGenerator interface { + // NewID returns a new unique ID. + NewID() string +} + +// ULIDGenerator creates IDs that are ULIDs. +type ULIDGenerator struct{} + +func (g ULIDGenerator) NewID() string { + return cnabclaims.MustNewULID() +} diff --git a/pkg/porter/action.go b/pkg/porter/action.go index bb4ed3e7e3..914be9cab2 100644 --- a/pkg/porter/action.go +++ b/pkg/porter/action.go @@ -1,26 +1,122 @@ package porter import ( + "bytes" "context" + "fmt" + + "go.opentelemetry.io/otel/attribute" + + "get.porter.sh/porter/pkg/printer" + "get.porter.sh/porter/pkg/tracing" "get.porter.sh/porter/pkg/storage" ) -// ExecuteAction runs the specified action. Supported actions are: install, upgrade, invoke. -// The uninstall action works in reverse so it's implemented separately. -func (p *Porter) ExecuteAction(ctx context.Context, installation storage.Installation, action BundleAction) error { - deperator := newDependencyExecutioner(p, installation, action) - err := deperator.Prepare(ctx) - if err != nil { - return err +// ExecuteBundleAndDependencies runs a specified action for a root bundle. +// The bundle should be a root bundle, and if there are dependencies, they will also be executed as appropriate. +// Supported actions are: install, upgrade, invoke. +// The uninstall action works in reverse, so it's implemented separately. +// Dependencies are resolved and executed differently depending on whether the deps-v2 feature is enabled (workflow). +func (p *Porter) ExecuteBundleAndDependencies(ctx context.Context, installation storage.Installation, action BundleAction) error { + // Callers should check for a noop action (because the installation is up-to-date, but let's check too just in case + if action == nil { + return nil } - err = deperator.Execute(ctx) - if err != nil { - return err + opts := action.GetOptions() + bundleRef := opts.bundleRef + + ctx, span := tracing.StartSpan(ctx, + tracing.ObjectAttribute("installation", installation), + attribute.String("action", action.GetAction()), + attribute.Bool("dry-run", opts.DryRun), + ) + defer span.EndSpan() + + // Switch between our two dependency implementations + depsv2 := p.useWorkflowEngine(bundleRef.Definition) + span.SetAttributes(attribute.Bool("deps-v2", depsv2)) + + if depsv2 { + // TODO(PEP003): Use new getregistryoptions elsewhere that we create that + puller := NewBundleResolver(p.Cache, opts.Force, p.Registry, opts.GetRegistryOptions()) + eng := NewWorkflowEngine(installation.Namespace, puller, p.Installations, p) + workflowOpts := CreateWorkflowOptions{ + Installation: installation, + Bundle: bundleRef.Definition, + DebugMode: opts.DebugMode, + MaxParallel: 1, + } + ws, err := eng.CreateWorkflow(ctx, workflowOpts) + if err != nil { + return err + } + + if opts.DryRun { + span.Info("Skipping workflow execution because --dry-run was specified") + + // TODO(PEP003): It would be better to have a way to always emit something to stdout, and capture it in the trace at the same time + var buf bytes.Buffer + err = printer.PrintYaml(&buf, ws) + fmt.Fprintln(p.Out, buf.String()) + span.SetAttributes(attribute.String("workflow", buf.String())) + + // TODO(PEP003): Print out the generated workflow according to opts.Format + // TODO(PEP003): how do we want to get Format in here so we can print properly? + return err + } + + w := storage.Workflow{WorkflowSpec: ws} + if err := p.Installations.InsertWorkflow(ctx, w); err != nil { + return err + } + + return eng.RunWorkflow(ctx, w) + } else { // Fallback to the old implementation of dependencies and bundle execution + if opts.DryRun { + span.Info("Skipping bundle execution because --dry-run was specified") + return nil + } + + deperator := newDependencyExecutioner(p, installation, action) + err := deperator.Prepare(ctx) + if err != nil { + return err + } + + err = deperator.Execute(ctx) + if err != nil { + return err + } + + actionArgs, err := deperator.PrepareRootActionArguments(ctx) + if err != nil { + return err + } + + return p.CNAB.Execute(ctx, actionArgs) } +} + +// ExecuteRootBundleOnly runs a single bundle that has already had its dependencies resolved by a workflow. +// The workflow is responsible identifying the bundles to run, their order, what to pass between them, etc. +// It is only intended to be used with the deps-v2 feature. +func (p *Porter) ExecuteRootBundleOnly(ctx context.Context, installation storage.Installation, action BundleAction) error { + // Callers should check for a noop action (because the installation is up-to-date, but let's check too just in case + if action == nil { + return nil + } + + opts := action.GetOptions() + ctx, span := tracing.StartSpan(ctx, + tracing.ObjectAttribute("installation", installation), + attribute.String("action", action.GetAction()), + attribute.Bool("dry-run", opts.DryRun), + ) + defer span.EndSpan() - actionArgs, err := deperator.PrepareRootActionArguments(ctx) + actionArgs, err := p.BuildActionArgs(ctx, installation, action) if err != nil { return err } diff --git a/pkg/porter/apply.go b/pkg/porter/apply.go index 31fd95bf09..6a01d901f3 100644 --- a/pkg/porter/apply.go +++ b/pkg/porter/apply.go @@ -8,13 +8,14 @@ import ( "get.porter.sh/porter/pkg/encoding" "get.porter.sh/porter/pkg/portercontext" "get.porter.sh/porter/pkg/printer" - "get.porter.sh/porter/pkg/storage" "get.porter.sh/porter/pkg/tracing" "go.opentelemetry.io/otel/attribute" "go.uber.org/zap/zapcore" ) type ApplyOptions struct { + printer.PrintOptions + Namespace string File string @@ -47,24 +48,24 @@ func (o *ApplyOptions) Validate(cxt *portercontext.Context, args []string) error return fmt.Errorf("invalid file argument %s, must be a file not a directory", o.File) } - return nil + return o.PrintOptions.Validate(ApplyDefaultFormat, ApplyAllowedFormats) } func (p *Porter) InstallationApply(ctx context.Context, opts ApplyOptions) error { - ctx, log := tracing.StartSpan(ctx) - defer log.EndSpan() + ctx, span := tracing.StartSpan(ctx) + defer span.EndSpan() - log.Debugf("Reading input file %s", opts.File) + span.Debugf("Reading input file %s", opts.File) namespace, err := p.getNamespaceFromFile(opts) if err != nil { return err } - if log.ShouldLog(zapcore.DebugLevel) { + if span.ShouldLog(zapcore.DebugLevel) { // ignoring any error here, printing debug info isn't critical contents, _ := p.FileSystem.ReadFile(opts.File) - log.Debug("read input file", attribute.String("contents", string(contents))) + span.Debug("read input file", attribute.String("contents", string(contents))) } var input DisplayInstallation @@ -77,33 +78,15 @@ func (p *Porter) InstallationApply(ctx context.Context, opts ApplyOptions) error return err } - installation, err := p.Installations.GetInstallation(ctx, inputInstallation.Namespace, inputInstallation.Name) - if err != nil { - if !errors.Is(err, storage.ErrNotFound{}) { - return fmt.Errorf("could not query for an existing installation document for %s: %w", inputInstallation, err) - } - - // Create a new installation - installation = storage.NewInstallation(input.Namespace, input.Name) - installation.Apply(inputInstallation.InstallationSpec) - - log.Info("Creating a new installation", attribute.String("installation", installation.String())) - } else { - // Apply the specified changes to the installation - installation.Apply(inputInstallation.InstallationSpec) - if err := installation.Validate(); err != nil { - return err - } - - fmt.Fprintf(p.Err, "Updating %s installation\n", installation) - } + span.Info("Reconciling installation") reconcileOpts := ReconcileOptions{ Namespace: input.Namespace, Name: input.Name, - Installation: installation, + Installation: inputInstallation.InstallationSpec, Force: opts.Force, DryRun: opts.DryRun, + Format: opts.Format, } - return p.ReconcileInstallation(ctx, reconcileOpts) + return p.ReconcileInstallationAndDependencies(ctx, reconcileOpts) } diff --git a/pkg/porter/build.go b/pkg/porter/build.go index 51a1f3f081..3a5bac789d 100644 --- a/pkg/porter/build.go +++ b/pkg/porter/build.go @@ -232,7 +232,7 @@ func (p *Porter) buildBundle(ctx context.Context, m *manifest.Manifest, digest d return p.writeBundle(bun) } -func (p Porter) writeBundle(b cnab.ExtendedBundle) error { +func (p *Porter) writeBundle(b cnab.ExtendedBundle) error { f, err := p.Config.FileSystem.OpenFile(build.LOCAL_BUNDLE, os.O_RDWR|os.O_CREATE|os.O_TRUNC, pkg.FileModeWritable) if err != nil { return fmt.Errorf("error creating %s: %w", build.LOCAL_BUNDLE, err) diff --git a/pkg/porter/dependencies.go b/pkg/porter/dependencies.go index b4d06f0651..cc060c305c 100644 --- a/pkg/porter/dependencies.go +++ b/pkg/porter/dependencies.go @@ -6,6 +6,8 @@ import ( "fmt" "strings" + cnabtooci "get.porter.sh/porter/pkg/cnab/cnab-to-oci" + "get.porter.sh/porter/pkg/cnab" "get.porter.sh/porter/pkg/cnab/bundleruntime" depsv1 "get.porter.sh/porter/pkg/cnab/dependencies/v1" @@ -21,7 +23,7 @@ type dependencyExecutioner struct { *config.Config porter *Porter - Resolver BundleResolver + resolver BundleResolver CNAB bundleruntime.CNABProvider Installations storage.InstallationProvider @@ -35,17 +37,17 @@ type dependencyExecutioner struct { } func newDependencyExecutioner(p *Porter, installation storage.Installation, action BundleAction) *dependencyExecutioner { - resolver := BundleResolver{ - Cache: p.Cache, - Registry: p.Registry, - } + parentOpts := action.GetOptions() + regOpts := cnabtooci.RegistryOptions{InsecureRegistry: parentOpts.InsecureRegistry} + resolver := NewBundleResolver(p.Cache, parentOpts.Force, p.Registry, regOpts) + return &dependencyExecutioner{ porter: p, parentInstallation: installation, parentAction: action, - parentOpts: action.GetOptions(), + parentOpts: parentOpts, + resolver: resolver, Config: p.Config, - Resolver: resolver, CNAB: p.CNAB, Installations: p.Installations, } @@ -140,7 +142,7 @@ func (e *dependencyExecutioner) identifyDependencies(ctx context.Context) error } bun = bundle } else if e.parentOpts.Reference != "" { - cachedBundle, err := e.Resolver.Resolve(ctx, e.parentOpts.BundlePullOptions) + cachedBundle, err := e.resolver.GetBundle(ctx, e.parentOpts.GetReference()) if err != nil { return span.Error(fmt.Errorf("could not resolve bundle: %w", err)) } @@ -180,16 +182,7 @@ func (e *dependencyExecutioner) prepareDependency(ctx context.Context, dep *queu defer span.EndSpan() // Pull the dependency - var err error - pullOpts := BundlePullOptions{ - Reference: dep.Reference, - InsecureRegistry: e.parentOpts.InsecureRegistry, - Force: e.parentOpts.Force, - } - if err := pullOpts.Validate(); err != nil { - return span.Error(fmt.Errorf("error preparing dependency %s: %w", dep.Alias, err)) - } - cachedDep, err := e.Resolver.Resolve(ctx, pullOpts) + cachedDep, err := e.resolver.GetBundle(ctx, dep.BundleReference.Reference) if err != nil { return span.Error(fmt.Errorf("error pulling dependency %s: %w", dep.Alias, err)) } diff --git a/pkg/porter/helpers.go b/pkg/porter/helpers.go index 6b7a182cba..7d162485f3 100644 --- a/pkg/porter/helpers.go +++ b/pkg/porter/helpers.go @@ -24,7 +24,6 @@ import ( "get.porter.sh/porter/pkg/storage" "get.porter.sh/porter/pkg/tracing" "get.porter.sh/porter/pkg/yaml" - "github.com/cnabio/cnab-go/bundle" "github.com/stretchr/testify/require" ) @@ -181,13 +180,7 @@ func (p *TestPorter) T() *testing.T { } func (p *TestPorter) ReadBundle(path string) cnab.ExtendedBundle { - bunD, err := os.ReadFile(path) - require.NoError(p.T(), err, "ReadFile failed for %s", path) - - bun, err := bundle.Unmarshal(bunD) - require.NoError(p.T(), err, "Unmarshal failed for bundle at %s", path) - - return cnab.NewBundle(*bun) + return cnab.ReadTestBundle(p.T(), path) } func (p *TestPorter) RandomString(len int) string { diff --git a/pkg/porter/install.go b/pkg/porter/install.go index 787c9edceb..fb320a28e8 100644 --- a/pkg/porter/install.go +++ b/pkg/porter/install.go @@ -6,6 +6,7 @@ import ( "fmt" "get.porter.sh/porter/pkg/cnab" + "get.porter.sh/porter/pkg/experimental" "get.porter.sh/porter/pkg/storage" "get.porter.sh/porter/pkg/tracing" ) @@ -87,13 +88,53 @@ func (p *Porter) InstallBundle(ctx context.Context, opts InstallOptions) error { } i.TrackBundle(bundleRef.Reference) i.Labels = opts.ParseLabels() + + if p.useWorkflowEngine(bundleRef.Definition) { + // TODO(PEP003): Use new getregistryoptions elsewhere that we create that + puller := NewBundleResolver(p.Cache, opts.Force, p.Registry, opts.GetRegistryOptions()) + eng := NewWorkflowEngine(i.Namespace, puller, p.Installations, p) + workflowOpts := CreateWorkflowOptions{ + Installation: i, + Bundle: bundleRef.Definition, + DebugMode: opts.DebugMode, + MaxParallel: 1, + } + ws, err := eng.CreateWorkflow(ctx, workflowOpts) + if err != nil { + return err + } + + w := storage.Workflow{WorkflowSpec: ws} + if err := p.Installations.InsertWorkflow(ctx, w); err != nil { + return err + } + + // TODO(PEP003): if a dry-run is requested, print out the execution plan and then exit + return eng.RunWorkflow(ctx, w) + } + + // Use the old implementation of bundle execution compatible with depsv1 err = p.Installations.UpsertInstallation(ctx, i) if err != nil { return fmt.Errorf("error saving installation record: %w", err) } // Run install using the updated installation record - return p.ExecuteAction(ctx, i, opts) + return p.ExecuteBundleAndDependencies(ctx, i, opts) +} + +// useWorkflowEngine determines if the new workflow engine or the old bundle execution code should be used. +// Once depsv2 is no longer experimental, we can switch 100% to the workflow engine +// Old bundles can still use depsv1, since depsv2 is a superset of depsv1. +// It will change how the bundle is run, for example calling install right now twice in a row +// results in an error, and this would remove that limitation, and instead a second call to install causes it to be reconciled and possibly skipped. +// In either case, the solution to the user is to call --force so the change isn't breaking. +func (p *Porter) useWorkflowEngine(bun cnab.ExtendedBundle) bool { + if bun.HasDependenciesV2() { + return true + } + + return p.Config.IsFeatureEnabled(experimental.FlagDependenciesV2) } // Remember the parameters and credentials used with the bundle last. diff --git a/pkg/porter/install_test.go b/pkg/porter/install_test.go index 251c03650b..742bc8a2d0 100644 --- a/pkg/porter/install_test.go +++ b/pkg/porter/install_test.go @@ -84,14 +84,15 @@ func TestPorter_applyActionOptionsToInstallation(t *testing.T) { }, }) - return ctx, p, &storage.Installation{InstallationSpec: storage.InstallationSpec{ - Bundle: storage.OCIReferenceParts{ - Repository: "example.com/mybuns", - Version: "1.0.0", - }, - ParameterSets: []string{"oldps1"}, - CredentialSets: []string{"oldcs1", "oldcs2"}, - }} + return ctx, p, &storage.Installation{ + InstallationSpec: storage.InstallationSpec{ + Bundle: storage.OCIReferenceParts{ + Repository: "example.com/mybuns", + Version: "1.0.0", + }, + ParameterSets: []string{"oldps1"}, + CredentialSets: []string{"oldcs1", "oldcs2"}, + }} } t.Run("replace previous sets", func(t *testing.T) { diff --git a/pkg/porter/invoke.go b/pkg/porter/invoke.go index 54d9cb36f9..d286dc8dc0 100644 --- a/pkg/porter/invoke.go +++ b/pkg/porter/invoke.go @@ -69,5 +69,30 @@ func (p *Porter) InvokeBundle(ctx context.Context, opts InvokeOptions) error { if err != nil { return err } - return p.ExecuteAction(ctx, installation, opts) + + if p.useWorkflowEngine(opts.bundleRef.Definition) { + puller := NewBundleResolver(p.Cache, opts.Force, p.Registry, opts.GetRegistryOptions()) + eng := NewWorkflowEngine(installation.Namespace, puller, p.Installations, p) + workflowOpts := CreateWorkflowOptions{ + Installation: installation, + CustomAction: opts.Action, + Bundle: opts.bundleRef.Definition, + DebugMode: opts.DebugMode, + MaxParallel: 1, // TODO(PEP003): make this configurable + } + ws, err := eng.CreateWorkflow(ctx, workflowOpts) + if err != nil { + return err + } + + w := storage.Workflow{WorkflowSpec: ws} + if err := p.Installations.InsertWorkflow(ctx, w); err != nil { + return err + } + + // TODO(PEP003): if a dry-run is requested, print out the execution plan and then exit + return eng.RunWorkflow(ctx, w) + } + + return p.ExecuteBundleAndDependencies(ctx, installation, opts) } diff --git a/pkg/porter/lifecycle.go b/pkg/porter/lifecycle.go index c03cc3b416..53d90d35bd 100644 --- a/pkg/porter/lifecycle.go +++ b/pkg/porter/lifecycle.go @@ -46,6 +46,9 @@ type BundleExecutionOptions struct { // DebugMode indicates if the bundle should be run in debug mode. DebugMode bool + // DryRun specifies that the bundle should not be executed and the execution plan should be printed instead. + DryRun bool + // NoLogs runs the bundle without persisting any logs. NoLogs bool @@ -157,6 +160,7 @@ func (o *BundleExecutionOptions) populateInternalParameterSet(ctx context.Contex } if !replaced { + fmt.Fprintf(p.Err, "applied param %s with %s\n", paramOverride.Name, paramOverride.Value) inst.Parameters.Parameters = append(inst.Parameters.Parameters, paramOverride) } } diff --git a/pkg/porter/porter_strategy.go b/pkg/porter/porter_strategy.go new file mode 100644 index 0000000000..4903d5228b --- /dev/null +++ b/pkg/porter/porter_strategy.go @@ -0,0 +1,53 @@ +package porter + +import ( + "context" + "fmt" + "regexp" + + "get.porter.sh/porter/pkg/storage" + + v2 "get.porter.sh/porter/pkg/cnab/extensions/dependencies/v2" + "get.porter.sh/porter/pkg/tracing" +) + +// PorterSecretStrategy knows how to resolve specially formatted wiring strings +// such as workflow.jobs.db.outputs.connstr from Porter instead of from a plugin. +// It is not written as a plugin because it is much more straightforward to +// retrieve the data already loaded in the running Porter instance than to start +// another one, load its config and requery the database. +type PorterSecretStrategy struct { + installations storage.InstallationProvider +} + +// regular expression for parsing a workflow wiring string, such as workflow.jobs.db.outputs.connstr +var workflowWiringRegex = regexp.MustCompile(`workflow\.jobs\.([^\.]+)\.(.+)`) + +func (s PorterSecretStrategy) Resolve(ctx context.Context, keyName string, keyValue string) (string, error) { + ctx, span := tracing.StartSpan(ctx) + defer span.EndSpan() + + // TODO(PEP003): It would be great when we configure this strategy that we also do host, so that host secret resolution isn't deferred to the plugins + // i.e. we can configure a secret strategy and still be able to resolve directly in porter any host values. + if keyName != "porter" { + return "", fmt.Errorf("attempted to resolve secrets of type %s from the porter strategy", keyName) + } + + wiring, err := v2.ParseWorkflowWiring(keyValue) + if err != nil { + return "", fmt.Errorf("invalid workflow wiring was passed to the porter strategy, %s", keyValue) + } + + // TODO(PEP003): How do we want to re-resolve credentials passed to the root bundle? They aren't recorded so it's not a simple lookup + if wiring.Parameter != "" { + // TODO(PEP003): Resolve a parameter from another job that has not run yet + // 1. Find the workflow definition from the db (need a way to track "current" workflow) + // 2. Grab the job based on the jobid in the workflow wiring + // 3. First check the parameters field for the param, resolve just that if available, otherwise resolve parameter sets and get it from there + // it sure would help if we remembered what params are in each set + } else if wiring.Output != "" { + // TODO(PEP003): Resolve the output from an already executed job + } + + panic("not implemented") +} diff --git a/pkg/porter/pull.go b/pkg/porter/pull.go index 12785fd1db..f24bf94328 100644 --- a/pkg/porter/pull.go +++ b/pkg/porter/pull.go @@ -6,6 +6,7 @@ import ( "get.porter.sh/porter/pkg/cache" "get.porter.sh/porter/pkg/cnab" + cnabtooci "get.porter.sh/porter/pkg/cnab/cnab-to-oci" ) type BundlePullOptions struct { @@ -39,12 +40,15 @@ func (b *BundlePullOptions) validateReference() error { return nil } +func (b *BundlePullOptions) GetRegistryOptions() cnabtooci.RegistryOptions { + return cnabtooci.RegistryOptions{ + InsecureRegistry: b.InsecureRegistry, + } +} + // PullBundle looks for a given bundle tag in the bundle cache. If it is not found, it is // pulled and stored in the cache. The path to the cached bundle is returned. func (p *Porter) PullBundle(ctx context.Context, opts BundlePullOptions) (cache.CachedBundle, error) { - resolver := BundleResolver{ - Cache: p.Cache, - Registry: p.Registry, - } - return resolver.Resolve(ctx, opts) + resolver := NewBundleResolver(p.Cache, opts.Force, p.Registry, opts.GetRegistryOptions()) + return resolver.GetBundle(ctx, opts.GetReference()) } diff --git a/pkg/porter/reconcile.go b/pkg/porter/reconcile.go index 7f96e63944..d5cd33691d 100644 --- a/pkg/porter/reconcile.go +++ b/pkg/porter/reconcile.go @@ -6,6 +6,8 @@ import ( "fmt" "sort" + "get.porter.sh/porter/pkg/printer" + "get.porter.sh/porter/pkg/cnab" "get.porter.sh/porter/pkg/storage" "get.porter.sh/porter/pkg/tracing" @@ -17,31 +19,102 @@ import ( type ReconcileOptions struct { Name string Namespace string - Installation storage.Installation + Installation storage.InstallationSpec // Just reapply the installation regardless of what has changed (or not) Force bool // DryRun only checks if the changes would trigger a bundle run DryRun bool + + // ExcludeDependencies indicates that only the requested installation should be executed, not its dependencies + ExcludeDependencies bool + + // Format that should be used when printing details about what Porter is (or will) do. + Format printer.Format } -// ReconcileInstallation compares the desired state of an installation +// ReconcileInstallationAndDependencies compares the desired state of an installation // as stored in the installation record with the current state of the // installation. If they are not in sync, the appropriate bundle action // is executed to bring them in sync. // This is only used for install/upgrade actions triggered by applying a file // to an installation. For uninstall or invoke, you should call those directly. -func (p *Porter) ReconcileInstallation(ctx context.Context, opts ReconcileOptions) error { - ctx, log := tracing.StartSpan(ctx) - log.Debugf("Reconciling %s/%s installation", opts.Namespace, opts.Name) +func (p *Porter) ReconcileInstallationAndDependencies(ctx context.Context, opts ReconcileOptions) error { + ctx, span := tracing.StartSpan(ctx) + defer span.EndSpan() + + installation, actionOpts, err := p.reconcileInstallation(ctx, opts) + if err != nil { + return err + } + + // Nothing to do, the installation is up-to-date + if actionOpts == nil { + return nil + } + + return p.ExecuteBundleAndDependencies(ctx, installation, actionOpts) +} + +// ReconcileInstallationInWorkflow compares the desired state of an installation +// as stored in the installation record with the current state of the +// installation. If they are not in sync, the appropriate bundle action +// is executed to bring them in sync. +// This is only used for install/upgrade actions triggered by applying a file +// to an installation. For uninstall or invoke, you should call those directly. +// This should only be used with deps-v2 feature workflows. +func (p *Porter) ReconcileInstallationInWorkflow(ctx context.Context, opts ReconcileOptions) error { + ctx, span := tracing.StartSpan(ctx) + defer span.EndSpan() + + installation, actionOpts, err := p.reconcileInstallation(ctx, opts) + if err != nil { + return err + } + + // Nothing to do, the installation is up-to-date + if actionOpts == nil { + return nil + } + + return p.ExecuteRootBundleOnly(ctx, installation, actionOpts) +} + +func (p *Porter) reconcileInstallation(ctx context.Context, opts ReconcileOptions) (storage.Installation, BundleAction, error) { + ctx, span := tracing.StartSpan(ctx) + defer span.EndSpan() + span.Debugf("Reconciling %s/%s installation", opts.Namespace, opts.Name) + + // Determine if the installation exists + inputInstallation := opts.Installation + installation, err := p.Installations.GetInstallation(ctx, inputInstallation.Namespace, inputInstallation.Name) + if err != nil { + if !errors.Is(err, storage.ErrNotFound{}) { + return storage.Installation{}, nil, fmt.Errorf("could not query for an existing installation document for %s: %w", inputInstallation, err) + } + + // Create a new installation + installation = storage.NewInstallation(inputInstallation.Namespace, inputInstallation.Name) + installation.Apply(inputInstallation) + + span.Info("Creating a new installation", attribute.String("installation", installation.String())) + } else { + // Apply the specified changes to the installation + installation.Apply(inputInstallation) + if err := installation.Validate(); err != nil { + return storage.Installation{}, nil, err + } + + fmt.Fprintf(p.Err, "Updating %s installation\n", installation) + } // Get the last run of the installation, if available var lastRun *storage.Run r, err := p.Installations.GetLastRun(ctx, opts.Namespace, opts.Name) neverRun := errors.Is(err, storage.ErrNotFound{}) if err != nil && !neverRun { - return err + return storage.Installation{}, nil, err } if !neverRun { lastRun = &r @@ -49,16 +122,16 @@ func (p *Porter) ReconcileInstallation(ctx context.Context, opts ReconcileOption ref, ok, err := opts.Installation.Bundle.GetBundleReference() if err != nil { - return log.Error(err) + return storage.Installation{}, nil, span.Error(err) } if !ok { instYaml, _ := yaml.Marshal(opts.Installation) - return log.Error(fmt.Errorf("the installation does not define a valid bundle reference.\n%s", instYaml)) + return storage.Installation{}, nil, span.Error(fmt.Errorf("the installation does not define a valid bundle reference.\n%s", instYaml)) } // Configure the bundle action that we should execute IF IT'S OUT OF SYNC var actionOpts BundleAction - if opts.Installation.IsInstalled() { + if installation.IsInstalled() { if opts.Installation.Uninstalled { actionOpts = NewUninstallOptions() } else { @@ -69,50 +142,48 @@ func (p *Porter) ReconcileInstallation(ctx context.Context, opts ReconcileOption } lifecycleOpts := actionOpts.GetOptions() + lifecycleOpts.DryRun = opts.DryRun lifecycleOpts.Reference = ref.String() lifecycleOpts.Name = opts.Name lifecycleOpts.Namespace = opts.Namespace + lifecycleOpts.Driver = p.Data.RuntimeDriver lifecycleOpts.CredentialIdentifiers = opts.Installation.CredentialSets lifecycleOpts.ParameterSets = opts.Installation.ParameterSets - // Write out the parameters as string values. Not efficient but reusing ExecuteAction would need more refactoring otherwise _, err = p.resolveBundleReference(ctx, lifecycleOpts.BundleReferenceOptions) if err != nil { - return err + return storage.Installation{}, nil, err } if !opts.DryRun { - if err = p.Installations.UpsertInstallation(ctx, opts.Installation); err != nil { - return err + // TODO(PEP003): Move this ideally into execute action or right before we call it. + // I'm not moving it now because my tests aren't passing at the moment + if err = p.Installations.UpsertInstallation(ctx, installation); err != nil { + return storage.Installation{}, nil, err } } // Determine if the installation's desired state is out of sync with reality 🤯 - inSync, err := p.IsInstallationInSync(ctx, opts.Installation, lastRun, actionOpts) + inSync, err := p.IsInstallationInSync(ctx, installation, lastRun, actionOpts) if err != nil { - return err + return storage.Installation{}, nil, err } if inSync { if opts.Force { - log.Info("The installation is up-to-date but will be re-applied because --force was specified") + span.Info("The installation is up-to-date but will be re-applied because --force was specified") } else { - log.Info("The installation is already up-to-date.") - return nil + span.Info("The installation is already up-to-date.") + return storage.Installation{}, nil, nil } } - log.Infof("The installation is out-of-sync, running the %s action...", actionOpts.GetAction()) + span.Infof("The installation is out-of-sync, running the %s action...", actionOpts.GetAction()) if err := actionOpts.Validate(ctx, nil, p); err != nil { - return err + return storage.Installation{}, nil, err } - if opts.DryRun { - log.Info("Skipping bundle execution because --dry-run was specified") - return nil - } - - return p.ExecuteAction(ctx, opts.Installation, actionOpts) + return installation, actionOpts, nil } // IsInstallationInSync determines if the desired state of the installation matches @@ -141,6 +212,10 @@ func (p *Porter) IsInstallationInSync(ctx context.Context, i storage.Installatio log.Info("Ignoring because installation.uninstalled is true but the installation doesn't exist yet") return true, nil } else { + // TODO(PEP003): we should check the status of the last run and handle in progress/pending by returning an error if the not in sync otherise + // i.e. if we run two commands to apply, the first starts, the second succeeds since it asked for what the other is providing? + // apply waits, so really it should wait for the pending/inprogress to complete? or stop early and say it's in progress elsewhere. + // Should we install it? if !i.IsInstalled() { log.Info("Triggering because the installation has not completed successfully yet") diff --git a/pkg/porter/resolver.go b/pkg/porter/resolver.go index da2fd77de7..41ae6d4b21 100644 --- a/pkg/porter/resolver.go +++ b/pkg/porter/resolver.go @@ -4,25 +4,42 @@ import ( "context" "fmt" + "get.porter.sh/porter/pkg/cnab" + depsv2 "get.porter.sh/porter/pkg/cnab/dependencies/v2" + "get.porter.sh/porter/pkg/cache" cnabtooci "get.porter.sh/porter/pkg/cnab/cnab-to-oci" "get.porter.sh/porter/pkg/tracing" ) +var _ depsv2.BundlePuller = BundleResolver{} + +// BundleResolver supports retrieving bundles from a registry, with cache support. type BundleResolver struct { - Cache cache.BundleCache - Registry cnabtooci.RegistryProvider + cache cache.BundleCache + registry cnabtooci.RegistryProvider + regOpts cnabtooci.RegistryOptions + + // refreshCache always pulls from the registry, and ignores cached bundles. + refreshCache bool +} + +func NewBundleResolver(cache cache.BundleCache, refreshCache bool, registry cnabtooci.RegistryProvider, regOpts cnabtooci.RegistryOptions) BundleResolver { + return BundleResolver{ + cache: cache, + refreshCache: refreshCache, + registry: registry, + regOpts: regOpts, + } } -// Resolves a bundle from the cache, or pulls it and caches it -// Returns the location of the bundle or an error -func (r *BundleResolver) Resolve(ctx context.Context, opts BundlePullOptions) (cache.CachedBundle, error) { +func (r BundleResolver) GetBundle(ctx context.Context, ref cnab.OCIReference) (cache.CachedBundle, error) { log := tracing.LoggerFromContext(ctx) - if !opts.Force { - cachedBundle, ok, err := r.Cache.FindBundle(opts.GetReference()) + if !r.refreshCache { + cachedBundle, ok, err := r.cache.FindBundle(ref) if err != nil { - return cache.CachedBundle{}, log.Error(fmt.Errorf("unable to load bundle %s from cache: %w", opts.Reference, err)) + return cache.CachedBundle{}, log.Error(fmt.Errorf("unable to load bundle %s from cache: %w", ref, err)) } // If we found the bundle, return the path to the bundle.json if ok { @@ -30,15 +47,18 @@ func (r *BundleResolver) Resolve(ctx context.Context, opts BundlePullOptions) (c } } - regOpts := cnabtooci.RegistryOptions{InsecureRegistry: opts.InsecureRegistry} - bundleRef, err := r.Registry.PullBundle(ctx, opts.GetReference(), regOpts) + bundleRef, err := r.registry.PullBundle(ctx, ref, r.regOpts) if err != nil { return cache.CachedBundle{}, err } - cb, err := r.Cache.StoreBundle(bundleRef) + cb, err := r.cache.StoreBundle(bundleRef) if err != nil { return cache.CachedBundle{}, log.Errorf("error storing the bundle %s in the Porter bundle cache: %w", bundleRef, err) } return cb, nil } + +func (r BundleResolver) ListTags(ctx context.Context, repo cnab.OCIReference) ([]string, error) { + return r.registry.ListTags(ctx, repo, r.regOpts) +} diff --git a/pkg/porter/resolver_test.go b/pkg/porter/resolver_test.go index c55107bc06..479356148d 100644 --- a/pkg/porter/resolver_test.go +++ b/pkg/porter/resolver_test.go @@ -9,7 +9,6 @@ import ( cnabtooci "get.porter.sh/porter/pkg/cnab/cnab-to-oci" "get.porter.sh/porter/pkg/config" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestBundleResolver_Resolve_ForcePull(t *testing.T) { @@ -17,10 +16,7 @@ func TestBundleResolver_Resolve_ForcePull(t *testing.T) { tc := config.NewTestConfig(t) testReg := cnabtooci.NewTestRegistry() testCache := cache.NewTestCache(cache.New(tc.Config)) - resolver := BundleResolver{ - Cache: testCache, - Registry: testReg, - } + resolver := NewBundleResolver(testCache, true, testReg, cnabtooci.RegistryOptions{}) cacheSearched := false testCache.FindBundleMock = func(ref cnab.OCIReference) (cache.CachedBundle, bool, error) { @@ -33,13 +29,7 @@ func TestBundleResolver_Resolve_ForcePull(t *testing.T) { pulled = true return cnab.BundleReference{Reference: ref}, nil } - - opts := BundlePullOptions{ - Reference: kahnlatest.String(), - Force: true, - } - require.NoError(t, opts.Validate()) - resolver.Resolve(ctx, opts) + resolver.GetBundle(ctx, kahnlatest) assert.False(t, cacheSearched, "Force should have skipped the cache") assert.True(t, pulled, "The bundle should have been re-pulled") @@ -50,10 +40,7 @@ func TestBundleResolver_Resolve_CacheHit(t *testing.T) { tc := config.NewTestConfig(t) testReg := cnabtooci.NewTestRegistry() testCache := cache.NewTestCache(cache.New(tc.Config)) - resolver := BundleResolver{ - Cache: testCache, - Registry: testReg, - } + resolver := NewBundleResolver(testCache, false, testReg, cnabtooci.RegistryOptions{}) cacheSearched := false testCache.FindBundleMock = func(ref cnab.OCIReference) (cache.CachedBundle, bool, error) { @@ -67,8 +54,8 @@ func TestBundleResolver_Resolve_CacheHit(t *testing.T) { return cnab.BundleReference{Reference: ref}, nil } - opts := BundlePullOptions{Reference: "ghcr.io/getporter/examples/porter-hello:v0.2.0"} - resolver.Resolve(ctx, opts) + ref := cnab.MustParseOCIReference("ghcr.io/getporter/examples/porter-hello:v0.2.0") + resolver.GetBundle(ctx, ref) assert.True(t, cacheSearched, "The cache should be searched when force is not specified") assert.False(t, pulled, "The bundle should NOT be pulled because it was found in the cache") @@ -79,10 +66,7 @@ func TestBundleResolver_Resolve_CacheMiss(t *testing.T) { tc := config.NewTestConfig(t) testReg := cnabtooci.NewTestRegistry() testCache := cache.NewTestCache(cache.New(tc.Config)) - resolver := BundleResolver{ - Cache: testCache, - Registry: testReg, - } + resolver := NewBundleResolver(testCache, false, testReg, cnabtooci.RegistryOptions{}) cacheSearched := false testCache.FindBundleMock = func(ref cnab.OCIReference) (cache.CachedBundle, bool, error) { @@ -96,8 +80,8 @@ func TestBundleResolver_Resolve_CacheMiss(t *testing.T) { return cnab.BundleReference{Reference: ref}, nil } - opts := BundlePullOptions{Reference: "ghcr.io/getporter/examples/porter-hello:v0.2.0"} - resolver.Resolve(ctx, opts) + ref := cnab.MustParseOCIReference("ghcr.io/getporter/examples/porter-hello:v0.2.0") + resolver.GetBundle(ctx, ref) assert.True(t, cacheSearched, "The cache should be searched when force is not specified") assert.True(t, pulled, "The bundle should have been pulled because the bundle was not in the cache") diff --git a/pkg/porter/uninstall.go b/pkg/porter/uninstall.go index d44dc5f1b1..b6c6c2e589 100644 --- a/pkg/porter/uninstall.go +++ b/pkg/porter/uninstall.go @@ -85,11 +85,36 @@ func (p *Porter) UninstallBundle(ctx context.Context, opts UninstallOptions) err return fmt.Errorf("could not find installation %s/%s: %w", opts.Namespace, opts.Name, err) } + // TODO(PEP003): I think we should flip uninstall flag on the spec? + // gotta figure out how that interacts with the operator54r5 err = p.applyActionOptionsToInstallation(ctx, &installation, opts.BundleExecutionOptions) if err != nil { return err } + if p.useWorkflowEngine(opts.bundleRef.Definition) { + puller := NewBundleResolver(p.Cache, opts.Force, p.Registry, opts.GetRegistryOptions()) + eng := NewWorkflowEngine(installation.Namespace, puller, p.Installations, p) + workflowOpts := CreateWorkflowOptions{ + Installation: installation, + Bundle: opts.bundleRef.Definition, + DebugMode: opts.DebugMode, + MaxParallel: 1, + } + ws, err := eng.CreateWorkflow(ctx, workflowOpts) + if err != nil { + return err + } + + w := storage.Workflow{WorkflowSpec: ws} + if err := p.Installations.InsertWorkflow(ctx, w); err != nil { + return err + } + + // TODO(PEP003): if a dry-run is requested, print out the execution plan and then exit + return eng.RunWorkflow(ctx, w) + } + deperator := newDependencyExecutioner(p, installation, opts) err = deperator.Prepare(ctx) if err != nil { diff --git a/pkg/porter/upgrade.go b/pkg/porter/upgrade.go index fd9fdf5453..2498fc718d 100644 --- a/pkg/porter/upgrade.go +++ b/pkg/porter/upgrade.go @@ -7,6 +7,7 @@ import ( "time" "get.porter.sh/porter/pkg/cnab" + "get.porter.sh/porter/pkg/storage" "github.com/Masterminds/semver/v3" ) @@ -84,17 +85,43 @@ func (p *Porter) UpgradeBundle(ctx context.Context, opts *UpgradeOptions) error if err != nil { return err } - err = p.Installations.UpdateInstallation(ctx, i) - if err != nil { - return err - } // Re-resolve the bundle after we have figured out the version we are upgrading to + // TODO(PEP003): I probably just broke this by calling it before the installation is saved + // figure out what that function is relying on and do it without saving opts.bundleRef = nil _, err = p.resolveBundleReference(ctx, opts.BundleReferenceOptions) if err != nil { return err } - return p.ExecuteAction(ctx, i, opts) + if p.useWorkflowEngine(opts.bundleRef.Definition) { + puller := NewBundleResolver(p.Cache, opts.Force, p.Registry, opts.GetRegistryOptions()) + eng := NewWorkflowEngine(i.Namespace, puller, p.Installations, p) + workflowOpts := CreateWorkflowOptions{ + Installation: i, + Bundle: opts.bundleRef.Definition, + DebugMode: opts.DebugMode, + MaxParallel: 1, + } + ws, err := eng.CreateWorkflow(ctx, workflowOpts) + if err != nil { + return err + } + + w := storage.Workflow{WorkflowSpec: ws} + if err := p.Installations.InsertWorkflow(ctx, w); err != nil { + return err + } + + // TODO(PEP003): if a dry-run is requested, print out the execution plan and then exit + return eng.RunWorkflow(ctx, w) + } + + err = p.Installations.UpdateInstallation(ctx, i) + if err != nil { + return err + } + + return p.ExecuteBundleAndDependencies(ctx, i, opts) } diff --git a/pkg/porter/workflow_engine.go b/pkg/porter/workflow_engine.go new file mode 100644 index 0000000000..8d011e29d2 --- /dev/null +++ b/pkg/porter/workflow_engine.go @@ -0,0 +1,349 @@ +package porter + +import ( + "context" + "fmt" + "runtime" + "strings" + + "get.porter.sh/porter/pkg/cnab" + depsv2 "get.porter.sh/porter/pkg/cnab/dependencies/v2" + "get.porter.sh/porter/pkg/secrets" + "get.porter.sh/porter/pkg/storage" + "get.porter.sh/porter/pkg/tracing" + "go.opentelemetry.io/otel/attribute" + "golang.org/x/sync/errgroup" +) + +// Engine handles executing a workflow of bundles to execute. +type Engine struct { + namespace string + store storage.InstallationProvider + + // TODO(PEP003): don't inject a resolver, inject the stuff that the resolver uses (store and puller) mock those two instead + resolver depsv2.BundleGraphResolver + p *Porter +} + +// NewWorkflowEngine configures a Workflow Engine +func NewWorkflowEngine(namespace string, puller depsv2.BundlePuller, store storage.InstallationProvider, p *Porter) *Engine { + return &Engine{ + namespace: namespace, + resolver: depsv2.NewCompositeResolver(namespace, puller, store), + store: store, + p: p, + } +} + +// CreateWorkflowOptions are the set of options for creating a Workflow. +type CreateWorkflowOptions struct { + // DebugMode alters how the workflow is executed so that it can be stepped through. + DebugMode bool + + // MaxParallel indicates how many parallel bundles may be executed at the same + // time. Defaults to 0, indicating that the maximum should be determined by the + // number of available CPUs or cluster nodes (depending on the runtime driver). + MaxParallel int + + // Installation that triggered the workflow. + // TODO(PEP003): Does this need to be a full installation? can it just be the spec? + Installation storage.Installation + + // Bundle definition of the Installation. + Bundle cnab.ExtendedBundle + + // CustomAction is the name of a custom action defined on the bundle to execute. + // When not set, the installation is reconciled. + CustomAction string +} + +func (t Engine) CreateWorkflow(ctx context.Context, opts CreateWorkflowOptions) (storage.WorkflowSpec, error) { + ctx, span := tracing.StartSpan(ctx) + defer span.EndSpan() + + g, err := t.resolver.ResolveDependencyGraph(ctx, opts.Bundle) + if err != nil { + return storage.WorkflowSpec{}, err + } + + nodes, ok := g.Sort() + if !ok { + return storage.WorkflowSpec{}, fmt.Errorf("could not generate a workflow for the bundle: the bundle graph has a cyle") + } + + // Now build job definitions for each node in the graph + jobs := make(map[string]*storage.Job, len(nodes)) + for _, node := range nodes { + switch tn := node.(type) { + case depsv2.BundleNode: + var inst storage.InstallationSpec + if tn.IsRoot() { + inst = opts.Installation.InstallationSpec + } else { + // TODO(PEP003?): generate a unique installation name, e.g. ROOTINSTALLATION-DEPKEY-SUFFIX + // I think we discussed this in a meeting? go look for notes or suggestions + inst = storage.InstallationSpec{ + Namespace: t.namespace, + // TODO(PEP003): can we fix the key so that it uses something real from the installation and not root for the root key name? + Name: strings.Replace(tn.Key, "root/", opts.Installation.Name+"/", 1), + Bundle: storage.NewOCIReferenceParts(tn.Reference.Reference), + // PEP(003): Add labels so that we know who is the parent installation + } + + // Populate the dependency's credentials from the wiring + inst.Credentials.SchemaVersion = storage.CredentialSetSchemaVersion + inst.Credentials.Credentials = make([]secrets.Strategy, 0, len(tn.Credentials)) + for credName, source := range tn.Credentials { + inst.Credentials.Credentials = append(inst.Credentials.Credentials, + source.AsWorkflowStrategy(credName, tn.ParentKey)) + } + + // Populate the dependency's parameters from the wiring + inst.Parameters.SchemaVersion = storage.ParameterSetSchemaVersion + inst.Parameters.Parameters = make([]secrets.Strategy, 0, len(tn.Parameters)) + for paramName, source := range tn.Parameters { + inst.Parameters.Parameters = append(inst.Parameters.Parameters, + source.AsWorkflowStrategy(paramName, tn.ParentKey)) + } + } + + // Determine which jobs in the workflow we rely upon + requires := node.GetRequires() + requiredJobs := make([]string, 0, len(requires)) + for _, requiredKey := range requires { + // Check if a job was created for this dependency (some won't exist because they are already installed) + if _, ok := jobs[requiredKey]; ok { + requiredJobs = append(requiredJobs, requiredKey) + } + } + + jobs[tn.Key] = &storage.Job{ + Action: cnab.ActionInstall, // TODO(PEP003): eventually this needs to support all actions + Installation: inst, + Depends: requiredJobs, + } + + case depsv2.InstallationNode: + // TODO(PEP003): Do we need to do anything for this part? Check the status of the installation? + + default: + return storage.WorkflowSpec{}, fmt.Errorf("invalid node type: %T", tn) + } + + } + + w := storage.WorkflowSpec{ + Stages: []storage.Stage{ + {Jobs: jobs}, + }, + MaxParallel: opts.MaxParallel, + DebugMode: opts.DebugMode, + } + return w, nil +} + +func (t Engine) RunWorkflow(ctx context.Context, w storage.Workflow) error { + ctx, span := tracing.StartSpan(ctx, tracing.ObjectAttribute("workflow", w)) + defer span.EndSpan() + + // TODO(PEP003): 2. anything we need to validate? + w.Prepare() + + // 1. save the workflow to the database + if err := t.store.UpsertWorkflow(ctx, w); err != nil { + return err + } + + // 3. go through each stage and execute it + for i := range w.Stages { + // Run each stage in series + if err := t.executeStage(ctx, w, i); err != nil { + return fmt.Errorf("stage[%d] failed: %w", i, err) + } + } + + /* + + + + 4. what type of status do we want to track? active jobs? + 5. how do we determine which to run? we need to resolve a graph again, with depends. Is there any way to not have to do that? + 6. be smarter + */ + + return nil +} + +func (t Engine) CancelWorkflow(ctx context.Context, workflow storage.Workflow) error { + //TODO implement me + // What should cancel do? Mark a status on the record that we check before running the next thing? + // Who can call this and when? + panic("implement me") +} + +func (t Engine) RetryWorkflow(ctx context.Context, workflow storage.Workflow) error { + //TODO implement me + // Start executing from the last failed job (retry the run, keep it and add a second result), skip over everything completed + panic("implement me") +} + +func (t Engine) StepThrough(ctx context.Context, workflow storage.Workflow, job string) error { + //TODO implement me + // execute the specified job only and update the status + panic("implement me") +} + +func (t Engine) executeStage(ctx context.Context, w storage.Workflow, stageIndex int) error { + ctx, span := tracing.StartSpan(ctx, attribute.Int("stage", stageIndex)) + defer span.EndSpan() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + s := w.Stages[stageIndex] + stageGraph := depsv2.NewBundleGraph() + for _, job := range s.Jobs { + stageGraph.RegisterNode(job) + } + + sortedJobs, ok := stageGraph.Sort() + if !ok { + return fmt.Errorf("could not sort jobs in stage") + } + + availableJobs := make(chan *storage.Job, len(s.Jobs)) + completedJobs := make(chan *storage.Job, len(s.Jobs)) + + // Default the number of parallel jobs to the number of CPUs + // This gives us 1 CPU per invocation image. + var maxParallel int + if w.DebugMode { + maxParallel = 1 + } else if w.MaxParallel == 0 { + maxParallel = runtime.NumCPU() + } else { + maxParallel = w.MaxParallel + } + + // Start up workers to run the jobs as they are available + jobsInProgress := errgroup.Group{} + for i := 0; i < maxParallel; i++ { + go func() { + for { + select { + case <-ctx.Done(): + return + case job := <-availableJobs: + jobsInProgress.Go(func() error { + // TODO(PEP003) why do we have to look this up again? + return t.executeJob(ctx, s.Jobs[job.Key], completedJobs) + }) + } + } + }() + } + + t.queueAvailableJobs(ctx, s, sortedJobs, availableJobs) + + jobsInProgress.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case completedJob := <-completedJobs: + // If succeeded, remove from the graph so we don't need to keep evaluating it + // Leave failed ones since they act to stop graph traversal + if completedJob.Status.IsSucceeded() { + for i, job := range sortedJobs { + if job.GetKey() == completedJob.Key { + if i == 0 { + sortedJobs = sortedJobs[1:] + } else { + sortedJobs = append(sortedJobs[:i-1], sortedJobs[i+1:]...) + } + break + } + } + + // Look for more jobs to run + stop := t.queueAvailableJobs(ctx, s, sortedJobs, availableJobs) + if stop { + // Stop running all of our jobs + cancel() + return nil + } + } else { + return fmt.Errorf("job %s failed: %s", completedJob.Key, completedJob.Status.Message) + } + } + } + + }) + + err := jobsInProgress.Wait() + return err +} + +func (t Engine) queueAvailableJobs(ctx context.Context, s storage.Stage, sortedNodes []depsv2.Node, availableJobs chan *storage.Job) bool { + // Walk through the graph in sorted order (bottom up) + // if the node's dependencies are all successful, schedule it + // as soon as it's not schedule, stop looking because none of the remainder will be either + var i int + for i = 0; i < len(sortedNodes); i++ { + node := sortedNodes[i] + + job := node.(*storage.Job) + switch job.Status.Status { + case cnab.StatusFailed: + // stop scheduling more jobs + return true + case "": + jobReady := true + for _, depKey := range job.Depends { + dep := s.Jobs[depKey] + if !dep.Status.IsSucceeded() { + jobReady = false + break + } + } + + if jobReady { + availableJobs <- job + // there are still more jobs to process + return false + } + default: + continue + } + } + + // Did we iterate through all the nodes? Can we stop now? + return i >= len(sortedNodes) +} + +func (t Engine) executeJob(ctx context.Context, j *storage.Job, jobs chan *storage.Job) error { + ctx, span := tracing.StartSpan(ctx, tracing.ObjectAttribute("job", j)) + defer span.EndSpan() + + opts := ReconcileOptions{ + // TODO(PEP003): we don't need name+namespace, we can get that info from the installation + Name: j.Installation.Name, + Namespace: j.Installation.Namespace, + Installation: j.Installation, + ExcludeDependencies: true, + // TODO(PEP003): We need to split this out into what the user calls with porter isntallation apply (which will end up calling the engine) and a lower level path that just installs the bundle + } + err := t.p.ReconcileInstallationInWorkflow(ctx, opts) + if err != nil { + // TODO(PEP003): Let's consider the relationship between jobs, runs and results. + // How much should we duplicate on job? + // How do we want to link the job to the run (e.g. should job status include runid or something?) + j.Status.Status = cnab.StatusFailed + j.Status.Message = err.Error() + } else { + j.Status.Status = cnab.StatusSucceeded + j.Status.Message = "" + } + jobs <- j + return nil +} diff --git a/pkg/secrets/plugin_adapter.go b/pkg/secrets/plugin_adapter.go index 1f84a9db9c..0cb1fcb59c 100644 --- a/pkg/secrets/plugin_adapter.go +++ b/pkg/secrets/plugin_adapter.go @@ -28,6 +28,12 @@ func (a PluginAdapter) Close() error { } func (a PluginAdapter) Resolve(ctx context.Context, keyName string, keyValue string) (string, error) { + // Instead of calling out to a plugin, resolve the value from Porter's database + // This supports bundle workflows where we are sourcing data from other runs, e.g. passing a connection string from a dependency to another bundle + if keyName == "porter" { + + } + return a.plugin.Resolve(ctx, keyName, keyValue) } diff --git a/pkg/storage/installation.go b/pkg/storage/installation.go index d9bc44f395..c6990bf5da 100644 --- a/pkg/storage/installation.go +++ b/pkg/storage/installation.go @@ -55,15 +55,18 @@ type InstallationSpec struct { // CredentialSets that should be included when the bundle is reconciled. CredentialSets []string `json:"credentialSets,omitempty"` + // Credentials specified by the user through overrides. + // Does not include defaults, or values resolved from credential sets. + // TODO(PEP003): use this when executing the bundle + Credentials CredentialSetSpec `json:"credentials,omitempty"` + // ParameterSets that should be included when the bundle is reconciled. ParameterSets []string `json:"parameterSets,omitempty"` // Parameters specified by the user through overrides. - // Does not include defaults, or values resolved from parameter sources. + // Does not include defaults, or values resolved from parameter sets. + // TODO(PEP003): We should consider if it makes sense to store just the ParameterSetSpec instead, like we do for credentials which was added later Parameters ParameterSet `json:"parameters,omitempty"` - - // Status of the installation. - Status InstallationStatus `json:"status,omitempty"` } func (i InstallationSpec) String() string { @@ -270,6 +273,15 @@ type OCIReferenceParts struct { Tag string `json:"tag,omitempty" yaml:"tag,omitempty" toml:"tag,omitempty"` } +func NewOCIReferenceParts(ref cnab.OCIReference) OCIReferenceParts { + return OCIReferenceParts{ + Repository: ref.Repository(), + Version: ref.Version(), + Digest: ref.Digest().String(), + Tag: ref.Tag(), + } +} + func (r OCIReferenceParts) GetBundleReference() (cnab.OCIReference, bool, error) { if r.Repository == "" { return cnab.OCIReference{}, false, nil diff --git a/pkg/storage/installation_provider.go b/pkg/storage/installation_provider.go index 9d0b6b9298..1154cbbf81 100644 --- a/pkg/storage/installation_provider.go +++ b/pkg/storage/installation_provider.go @@ -71,4 +71,9 @@ type InstallationProvider interface { // GetLastLogs returns the logs from the last run of an Installation. GetLastLogs(ctx context.Context, namespace string, installation string) (logs string, hasLogs bool, err error) + + // TODO(PEP003): document and make sure we have all standard functions here + GetWorkflow(ctx context.Context, id string) (Workflow, error) + InsertWorkflow(ctx context.Context, workflow Workflow) error + UpsertWorkflow(ctx context.Context, workflow Workflow) error } diff --git a/pkg/storage/installation_store.go b/pkg/storage/installation_store.go index c2c11fa1cb..1ac2baa5c2 100644 --- a/pkg/storage/installation_store.go +++ b/pkg/storage/installation_store.go @@ -13,24 +13,21 @@ const ( CollectionRuns = "runs" CollectionResults = "results" CollectionOutputs = "outputs" + CollectionWorkflows = "workflows" ) var _ InstallationProvider = InstallationStore{} // InstallationStore is a persistent store for installation documents. type InstallationStore struct { - store Store - encrypt EncryptionHandler - decrypt EncryptionHandler + store Store } // NewInstallationStore creates a persistent store for installations using the specified // backing datastore. func NewInstallationStore(datastore Store) InstallationStore { return InstallationStore{ - store: datastore, - encrypt: noOpEncryptionHandler, - decrypt: noOpEncryptionHandler, + store: datastore, } } @@ -57,6 +54,8 @@ func EnsureInstallationIndices(ctx context.Context, store Store) error { {Collection: CollectionOutputs, Keys: []string{"resultId", "name"}, Unique: true}, // query most recent outputs by name for an installation {Collection: CollectionOutputs, Keys: []string{"namespace", "installation", "name", "-resultId"}}, + // query workflows by id (list) + {Collection: CollectionWorkflows, Keys: []string{"id"}, Unique: true}, }, } @@ -373,10 +372,30 @@ func (s InstallationStore) RemoveInstallation(ctx context.Context, namespace str return nil } -// EncryptionHandler is a function that transforms data by encrypting or decrypting it. -type EncryptionHandler func([]byte) ([]byte, error) +func (s InstallationStore) GetWorkflow(ctx context.Context, id string) (Workflow, error) { + var out Workflow -// noOpEncryptHandler is used when no handler is specified. -var noOpEncryptionHandler = func(data []byte) ([]byte, error) { - return data, nil + opts := FindOptions{ + Filter: bson.M{ + "id": id, + }, + } + err := s.store.FindOne(ctx, CollectionWorkflows, opts, &out) + return out, err +} + +func (s InstallationStore) InsertWorkflow(ctx context.Context, workflow Workflow) error { + opts := InsertOptions{ + Documents: []interface{}{workflow}, + } + return s.store.Insert(ctx, CollectionWorkflows, opts) +} + +func (s InstallationStore) UpsertWorkflow(ctx context.Context, workflow Workflow) error { + workflow.SchemaVersion = WorkflowSchemaVersion + opts := UpdateOptions{ + Upsert: true, + Document: workflow, + } + return s.store.Update(ctx, CollectionWorkflows, opts) } diff --git a/pkg/storage/schema.go b/pkg/storage/schema.go index b2c09d0ba3..3daac7595e 100644 --- a/pkg/storage/schema.go +++ b/pkg/storage/schema.go @@ -11,6 +11,10 @@ const ( // for all installation documents: installations, runs, results and outputs. InstallationSchemaVersion = schema.Version("1.0.2") + // WorkflowSchemaVersion represents the version associated with the schema + // for all workflow documents: workflow. + WorkflowSchemaVersion = schema.Version("1.0.0") + // CredentialSetSchemaVersion represents the version associated with the schema // credential set documents. CredentialSetSchemaVersion = schema.Version("1.0.1") diff --git a/pkg/storage/workflow.go b/pkg/storage/workflow.go new file mode 100644 index 0000000000..5ea6406bc9 --- /dev/null +++ b/pkg/storage/workflow.go @@ -0,0 +1,121 @@ +package storage + +import ( + "get.porter.sh/porter/pkg/cnab" + "github.com/cnabio/cnab-go/schema" +) + +// Workflow represents how a bundle and its dependencies should be run by Porter. +type Workflow struct { + // ID of the workflow. + ID string `json:"id"` + + WorkflowSpec `json:"spec"` + + // TODO(PEP003): When we wrap this in a DisplayWorkflow, override marshal so that we don't marshal an ID or status when empty + // i.e. if we do a dry run, we shouldn't get out an empty id or status + Status WorkflowStatus `json:"status"` +} + +type WorkflowSpec struct { + SchemaVersion schema.Version `json:"schemaVersion"` + + // Stages are groups of jobs that run in serial. + Stages []Stage `json:"stages"` + + // MaxParallel is the maximum number of jobs that can run in parallel. + MaxParallel int `json:"maxParallel"` + + // DebugMode tweaks how the workflow is run to make it easier to debug + DebugMode bool `json:"debugMode"` +} + +// TODO(PEP003): Figure out what needs to be persisted, and how to persist multiple or continued runs +type WorkflowStatus struct { +} + +// Prepare updates the internal data representation of the workflow before running it. +func (w *Workflow) Prepare() { + // Assign an id to the workflow + w.ID = cnab.NewULID() + + // Update any workflow wiring to use the workflow id? + + for _, s := range w.Stages { + s.Prepare(w.ID) + } +} + +// Stage represents a set of jobs that should run, possibly in parallel. +type Stage struct { + // Jobs is the set of bundles to execute, keyed by the job name. + Jobs map[string]*Job `json:"jobs"` +} + +func (s *Stage) Prepare(workflowID string) { + // Update the jobs so that they know their job key (since they won't be used within the larger workflow, but as independent jobs) + for jobKey, job := range s.Jobs { + job.Prepare(workflowID, jobKey) + s.Jobs[jobKey] = job + } + +} + +// Job represents the execution of a bundle. +type Job struct { + // TODO(PEP003): no job can have the same name as the parent installation (which is keyed from the installation). Or we need to stick to root and reserve that? + // Key is the unique name of the job within a stage. + // We handle copying this value so that it's easier to work with a single job when not in a map + Key string `json:"-"` + + // Action name to execute on the bundle, when empty default to applying the installation. + Action string `json:"action"` + + // TODO(PEP003): workflows should have DisplayWorkflow and use DisplayInstallation + // Installation defines the installation upon which Porter should act. + Installation InstallationSpec `json:"installation"` + + // Depends is a list of job keys that the Job depends upon. + Depends []string `json:"depends"` + + Status JobStatus `json:"status,omitempty"` +} + +func (j *Job) GetRequires() []string { + return j.Depends +} + +func (j *Job) GetKey() string { + return j.Key +} + +func (j *Job) Prepare(workflowId string, jobKey string) { + j.Key = jobKey + for _, param := range j.Installation.Parameters.Parameters { + if param.Source.Key != "porter" { + continue + } + + } +} + +type JobStatus struct { + Status string `json:"status"` + Message string `json:"message"` +} + +func (s JobStatus) IsSucceeded() bool { + return s.Status == cnab.StatusSucceeded +} + +func (s JobStatus) IsFailed() bool { + return s.Status == cnab.StatusFailed +} + +func (s JobStatus) IsDone() bool { + switch s.Status { + case cnab.StatusSucceeded, cnab.StatusFailed: + return true + } + return false +} diff --git a/pkg/storage/workflow_test.go b/pkg/storage/workflow_test.go new file mode 100644 index 0000000000..3f59fdd659 --- /dev/null +++ b/pkg/storage/workflow_test.go @@ -0,0 +1,21 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestJobStatus_IsFailed(t *testing.T) { + s := JobStatus{} + assert.False(t, s.IsFailed(), "IsFailed should be false when the status is not failed") + s.Status = "failed" + assert.True(t, s.IsFailed(), "IsFailed should be true when the status is failed") +} + +func TestJobStatus_IsCompleted(t *testing.T) { + s := JobStatus{} + assert.False(t, s.IsSucceeded(), "IsSucceeded should be false when the status is not succeeded") + s.Status = "succeeded" + assert.True(t, s.IsSucceeded(), "IsSucceeded should be true when the status is succeeded") +} diff --git a/pkg/tracing/attributes.go b/pkg/tracing/attributes.go new file mode 100644 index 0000000000..c4219811e1 --- /dev/null +++ b/pkg/tracing/attributes.go @@ -0,0 +1,16 @@ +package tracing + +import ( + "encoding/json" + + "go.opentelemetry.io/otel/attribute" +) + +func ObjectAttribute(key string, value interface{}) attribute.KeyValue { + data, err := json.Marshal(value) + if err != nil { + data = []byte("unable to marshal object to an otel attribute") + } + + return attribute.Key(key).String(string(data)) +} diff --git a/tests/integration/mybuns.yaml b/tests/integration/mybuns.yaml new file mode 100644 index 0000000000..ea86e11b00 --- /dev/null +++ b/tests/integration/mybuns.yaml @@ -0,0 +1,15 @@ +schemaVersion: 1.0.2 +name: mybuns +labels: + generator: porter-operator + generatorVersion: v0.2.0 + thing: "1" +bundle: + repository: localhost:5000/mybuns + version: v0.1.2 +parameters: + password: "supersecret" +credentialSets: + - mybuns +parameterSets: + - mybuns diff --git a/tests/integration/testdata/workflow/mybuns.yaml b/tests/integration/testdata/workflow/mybuns.yaml new file mode 100644 index 0000000000..056a109bf6 --- /dev/null +++ b/tests/integration/testdata/workflow/mybuns.yaml @@ -0,0 +1,41 @@ +schemaVersion: 1.0.0-alpha.1 +name: "apply mybuns.yaml" +maxParallel: 1 +debugMode: false +stages: + - jobs: + root: + action: install + installation: + schemaVersion: 1.0.2 + name: mybuns + namespace: dev + bundle: + repository: localhost:5000/mybuns + version: v0.1.2 + labels: + generator: porter-operator + generatorVersion: v0.2.0 + thing: "1" + credentialSets: + - mybuns + parameterSets: + - mybuns + parameters: + - name: password + source: + secret: 01GG5VKAA5VS24CGRGSPY09DDX-password + root/db: + action: install + installation: + schemaVersion: 1.0.2 + name: mybuns/db + namespace: dev + bundle: + repository: localhost:5000/mydb + version: 0.1.0 + tag: v0.1.0 + parameters: + - name: database + source: + value: bigdb diff --git a/tests/integration/workflow_test.go b/tests/integration/workflow_test.go new file mode 100644 index 0000000000..b3634183f1 --- /dev/null +++ b/tests/integration/workflow_test.go @@ -0,0 +1,43 @@ +//go:build integration + +package integration + +import ( + "fmt" + "testing" + + testhelpers "get.porter.sh/porter/pkg/test" + "get.porter.sh/porter/tests/tester" + "github.com/stretchr/testify/require" +) + +func TestWorkflow(t *testing.T) { + test, err := tester.NewTestWithConfig(t, "tests/testdata/config/config-with-depsv2.yaml") + defer test.Close() + require.NoError(t, err, "test setup failed") + test.PrepareTestBundle() + + test.TestContext.AddTestFileFromRoot("tests/testdata/installations/mybuns.yaml", "mybuns.yaml") + + // First validate the plan for the workflow + // TODO(PEP003): Do we want to use different terms/commands for generating a workflow? This pretty much associates --dry-run with "print out your workflow" + workflowContents, output := test.RequirePorter("installation", "apply", "mybuns.yaml", "--output=yaml", "--dry-run") + fmt.Println(output) + testhelpers.CompareGoldenFile(t, "testdata/workflow/mybuns.yaml", workflowContents) + + // Run the workflow + _, output = test.RequirePorter("installation", "apply", "mybuns.yaml") + fmt.Println(output) + + // TODO A workflow should be persisted, and it should match the execution plan generated first with --dry-run + // We don't expose workflow commands yet though so the only way to test this is call the db directly + + // We should have 2 installations, mybuns and mydb + test.RequireInstallationExists(test.CurrentNamespace(), "mybuns") + mydb := test.RequireInstallationExists(test.CurrentNamespace(), "mybuns/db") + require.Contains(t, mydb.Parameters, "database", "mybuns should have explicitly set the database parameter on its db dependency") + require.Equal(t, "bigdb", mydb.Parameters["database"], "incorrect value used for the database parameter on the db dependency, expected the hard coded value specified by the root bundle") + + // TODO mydb should have a parameter that was set by the workflow, e.g. the db name + // TODO mybuns should have used an output from mydb that we saved as a root bundle output so that we can validate that it was used properly +} diff --git a/tests/testdata/config/config-with-depsv2.yaml b/tests/testdata/config/config-with-depsv2.yaml new file mode 100644 index 0000000000..8c09e3c540 --- /dev/null +++ b/tests/testdata/config/config-with-depsv2.yaml @@ -0,0 +1,24 @@ +experimental: + - dependencies-v2 + +namespace: dev +default-storage: testdb +default-secrets-plugin: filesystem + +# The test bundle, mybuns, requires docker +allow-docker-host-access: true + +logs: + enabled: true + level: debug + +telemetry: + enabled: ${env.PORTER_TEST_TELEMETRY_ENABLED} + protocol: grpc + insecure: true + +storage: + - name: testdb + plugin: mongodb + config: + url: mongodb://localhost:27017/${env.PORTER_TEST_DB_NAME}?connect=direct