Skip to content

Commit

Permalink
Add support for delta bundles
Browse files Browse the repository at this point in the history
Earlier a snapshot bundle would describe the full state of OPA's
policy/data and any update would require first erasing the state from
the existing bundle and then activating the new bundle.

This commit introduces a new bundle type called "delta".
Delta bundles contain patches to data instead of snapshots.
They allow users to efficiently make updates to OPA's data
cache.

Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar committed Jan 29, 2022
1 parent cb867a1 commit dd02a7f
Show file tree
Hide file tree
Showing 15 changed files with 1,368 additions and 231 deletions.
240 changes: 161 additions & 79 deletions bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ const (
PlanFile = "plan.json"
ManifestExt = ".manifest"
SignaturesFile = "signatures.json"
patchFile = "patch.json"
dataFile = "data.json"
yamlDataFile = "data.yaml"
defaultHashingAlg = "SHA-256"
DefaultSizeLimitBytes = (1024 * 1024 * 1024) // limit bundle reads to 1GB to protect against gzip bombs
DeltaBundleType = "delta"
SnapshotBundleType = "snapshot"
)

// Bundle represents a loaded bundle. The bundle can contain data and policies.
Expand All @@ -50,6 +53,20 @@ type Bundle struct {
Wasm []byte // Deprecated. Use WasmModules instead
WasmModules []WasmModuleFile
PlanModules []PlanModuleFile
Patch Patch
}

// Patch contains an array of objects wherein each object represents the patch operation to be
// applied to the bundle data.
type Patch struct {
Data []PatchOperation `json:"data,omitempty"`
}

// PatchOperation models a single patch operation against a document.
type PatchOperation struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value"`
}

// SignaturesConfig represents an array of JWTs that encapsulate the signatures for the bundle.
Expand Down Expand Up @@ -131,21 +148,11 @@ func (m Manifest) Equal(other Manifest) bool {
return false
}

if len(m.WasmResolvers) != len(other.WasmResolvers) {
return false
}

for i := 0; i < len(m.WasmResolvers); i++ {
if m.WasmResolvers[i] != other.WasmResolvers[i] {
return false
}
}

if !reflect.DeepEqual(m.Metadata, other.Metadata) {
return false
}

return m.rootSet().Equal(other.rootSet())
return m.equalWasmResolversAndRoots(other)
}

// Copy returns a deep copy of the manifest.
Expand Down Expand Up @@ -186,6 +193,20 @@ func (m Manifest) rootSet() stringSet {
return stringSet(rs)
}

func (m Manifest) equalWasmResolversAndRoots(other Manifest) bool {
if len(m.WasmResolvers) != len(other.WasmResolvers) {
return false
}

for i := 0; i < len(m.WasmResolvers); i++ {
if m.WasmResolvers[i] != other.WasmResolvers[i] {
return false
}
}

return m.rootSet().Equal(other.rootSet())
}

type stringSet map[string]struct{}

func (ss stringSet) Equal(other stringSet) bool {
Expand Down Expand Up @@ -224,12 +245,7 @@ func (m *Manifest) validateAndInjectDefaults(b Bundle) error {
for _, module := range b.Modules {
found := false
if path, err := module.Parsed.Package.Path.Ptr(); err == nil {
for i := range roots {
if strings.HasPrefix(path, roots[i]) {
found = true
break
}
}
found = RootPathsContain(roots, path)
}
if !found {
return fmt.Errorf("manifest roots %v do not permit '%v' in module '%v'", roots, module.Parsed.Package, module.Path)
Expand All @@ -250,15 +266,7 @@ func (m *Manifest) validateAndInjectDefaults(b Bundle) error {
}

// Ensure wasm module entrypoint in within bundle roots
found := false
for i := range roots {
if strings.HasPrefix(wmConfig.Entrypoint, roots[i]) {
found = true
break
}
}

if !found {
if !RootPathsContain(roots, wmConfig.Entrypoint) {
return fmt.Errorf("manifest roots %v do not permit '%v' entrypoint for wasm module '%v'", roots, wmConfig.Entrypoint, wmConfig.Module)
}

Expand All @@ -270,17 +278,24 @@ func (m *Manifest) validateAndInjectDefaults(b Bundle) error {
wasmModuleToEps[wmConfig.Module] = wmConfig.Entrypoint
}

// Validate data patches in bundle.
for _, patch := range b.Patch.Data {
path := strings.Trim(patch.Path, "/")
if !RootPathsContain(roots, path) {
return fmt.Errorf("manifest roots %v do not permit data patch at path '%s'", roots, path)
}
}

// Validate data in bundle.
return dfs(b.Data, "", func(path string, node interface{}) (bool, error) {
path = strings.Trim(path, "/")
for i := range roots {
if strings.HasPrefix(path, roots[i]) {
return true, nil
}
if RootPathsContain(roots, path) {
return true, nil
}

if _, ok := node.(map[string]interface{}); ok {
for i := range roots {
if strings.HasPrefix(roots[i], path) {
if RootPathsContain(strings.Split(path, "/"), roots[i]) {
return false, nil
}
}
Expand Down Expand Up @@ -398,31 +413,28 @@ func (r *Reader) Read() (Bundle, error) {
var descriptors []*Descriptor
var err error

bundle.Data = map[string]interface{}{}

bundle.Signatures, descriptors, err = listSignaturesAndDescriptors(r.loader, r.skipVerify, r.sizeLimitBytes)
bundle.Signatures, bundle.Patch, descriptors, err = preProcessBundle(r.loader, r.skipVerify, r.sizeLimitBytes)
if err != nil {
return bundle, err
}

err = r.checkSignaturesAndDescriptors(bundle.Signatures)
if err != nil {
return bundle, err
if bundle.Type() == SnapshotBundleType {
err = r.checkSignaturesAndDescriptors(bundle.Signatures)
if err != nil {
return bundle, err
}

bundle.Data = map[string]interface{}{}
}

for _, f := range descriptors {
var buf bytes.Buffer
n, err := f.Read(&buf, r.sizeLimitBytes)
f.Close() // always close, even on error

if err != nil && err != io.EOF {
buf, err := readFile(f, r.sizeLimitBytes)
if err != nil {
return bundle, err
} else if err == nil && n >= r.sizeLimitBytes {
return bundle, fmt.Errorf("bundle file exceeded max size (%v bytes)", r.sizeLimitBytes-1)
}

// verify the file content
if !bundle.Signatures.isEmpty() {
if bundle.Type() == SnapshotBundleType && !bundle.Signatures.isEmpty() {
path := f.Path()
if r.baseDir != "" {
path = f.URL()
Expand Down Expand Up @@ -509,8 +521,22 @@ func (r *Reader) Read() (Bundle, error) {
}
}

if bundle.Type() == DeltaBundleType {
if len(bundle.Data) != 0 {
return bundle, fmt.Errorf("delta bundle expected to contain only patch file but data files found")
}

if len(bundle.Modules) != 0 {
return bundle, fmt.Errorf("delta bundle expected to contain only patch file but policy files found")
}

if len(bundle.WasmModules) != 0 {
return bundle, fmt.Errorf("delta bundle expected to contain only patch file but wasm files found")
}
}

// check if the bundle signatures specify any files that weren't found in the bundle
if len(r.files) != 0 {
if bundle.Type() == SnapshotBundleType && len(r.files) != 0 {
extra := []string{}
for k := range r.files {
extra = append(extra, k)
Expand Down Expand Up @@ -651,43 +677,51 @@ func (w *Writer) Write(bundle Bundle) error {
gw := gzip.NewWriter(w.w)
tw := tar.NewWriter(gw)

var buf bytes.Buffer
bundleType := bundle.Type()

if err := json.NewEncoder(&buf).Encode(bundle.Data); err != nil {
return err
}
if bundleType == SnapshotBundleType {
var buf bytes.Buffer

if err := archive.WriteFile(tw, "data.json", buf.Bytes()); err != nil {
return err
}
if err := json.NewEncoder(&buf).Encode(bundle.Data); err != nil {
return err
}

for _, module := range bundle.Modules {
path := module.URL
if w.usePath {
path = module.Path
if err := archive.WriteFile(tw, "data.json", buf.Bytes()); err != nil {
return err
}

if err := archive.WriteFile(tw, path, module.Raw); err != nil {
for _, module := range bundle.Modules {
path := module.URL
if w.usePath {
path = module.Path
}

if err := archive.WriteFile(tw, path, module.Raw); err != nil {
return err
}
}

if err := w.writeWasm(tw, bundle); err != nil {
return err
}
}

if err := w.writeWasm(tw, bundle); err != nil {
return err
}
if err := writeSignatures(tw, bundle); err != nil {
return err
}

if err := w.writePlan(tw, bundle); err != nil {
return err
if err := w.writePlan(tw, bundle); err != nil {
return err
}
} else if bundleType == DeltaBundleType {
if err := writePatch(tw, bundle); err != nil {
return err
}
}

if err := writeManifest(tw, bundle); err != nil {
return err
}

if err := writeSignatures(tw, bundle); err != nil {
return err
}

if err := tw.Close(); err != nil {
return err
}
Expand Down Expand Up @@ -749,6 +783,17 @@ func writeManifest(tw *tar.Writer, bundle Bundle) error {
return archive.WriteFile(tw, ManifestExt, buf.Bytes())
}

func writePatch(tw *tar.Writer, bundle Bundle) error {

var buf bytes.Buffer

if err := json.NewEncoder(&buf).Encode(bundle.Patch); err != nil {
return err
}

return archive.WriteFile(tw, patchFile, buf.Bytes())
}

func writeSignatures(tw *tar.Writer, bundle Bundle) error {

if bundle.Signatures.isEmpty() {
Expand Down Expand Up @@ -1019,6 +1064,14 @@ func (b *Bundle) readData(key []string) *interface{} {
return &child
}

// Type returns the type of the bundle.
func (b *Bundle) Type() string {
if len(b.Patch.Data) != 0 {
return DeltaBundleType
}
return SnapshotBundleType
}

func mktree(path []string, value interface{}) (map[string]interface{}, error) {
if len(path) == 0 {
// For 0 length path the value is the full tree.
Expand Down Expand Up @@ -1189,9 +1242,10 @@ func IsStructuredDoc(name string) bool {
filepath.Base(name) == SignaturesFile || filepath.Base(name) == ManifestExt
}

func listSignaturesAndDescriptors(loader DirectoryLoader, skipVerify bool, sizeLimitBytes int64) (SignaturesConfig, []*Descriptor, error) {
func preProcessBundle(loader DirectoryLoader, skipVerify bool, sizeLimitBytes int64) (SignaturesConfig, Patch, []*Descriptor, error) {
descriptors := []*Descriptor{}
var signatures SignaturesConfig
var patch Patch

for {
f, err := loader.NextFile()
Expand All @@ -1200,26 +1254,54 @@ func listSignaturesAndDescriptors(loader DirectoryLoader, skipVerify bool, sizeL
}

if err != nil {
return signatures, nil, errors.Wrap(err, "bundle read failed")
return signatures, patch, nil, errors.Wrap(err, "bundle read failed")
}

// check for the signatures file
if !skipVerify && strings.HasSuffix(f.Path(), SignaturesFile) {
var buf bytes.Buffer
n, err := f.Read(&buf, sizeLimitBytes)
f.Close() // always close, even on error
if err != nil && err != io.EOF {
return signatures, nil, err
} else if err == nil && n >= sizeLimitBytes {
return signatures, nil, fmt.Errorf("bundle signatures file exceeded max size (%v bytes)", sizeLimitBytes-1)
buf, err := readFile(f, sizeLimitBytes)
if err != nil {
return signatures, patch, nil, err
}

if err := util.NewJSONDecoder(&buf).Decode(&signatures); err != nil {
return signatures, nil, errors.Wrap(err, "bundle load failed on signatures decode")
return signatures, patch, nil, errors.Wrap(err, "bundle load failed on signatures decode")
}
} else if !strings.HasSuffix(f.Path(), SignaturesFile) {
descriptors = append(descriptors, f)

if filepath.Base(f.Path()) == patchFile {

var b bytes.Buffer
tee := io.TeeReader(f.reader, &b)
f.reader = tee

buf, err := readFile(f, sizeLimitBytes)
if err != nil {
return signatures, patch, nil, err
}

if err := util.NewJSONDecoder(&buf).Decode(&patch); err != nil {
return signatures, patch, nil, errors.Wrap(err, "bundle load failed on patch decode")
}

f.reader = &b
}
}
}
return signatures, descriptors, nil
return signatures, patch, descriptors, nil
}

func readFile(f *Descriptor, sizeLimitBytes int64) (bytes.Buffer, error) {
var buf bytes.Buffer
n, err := f.Read(&buf, sizeLimitBytes)
f.Close() // always close, even on error

if err != nil && err != io.EOF {
return buf, err
} else if err == nil && n >= sizeLimitBytes {
return buf, fmt.Errorf("bundle file '%v' exceeded max size (%v bytes)", strings.TrimPrefix(f.Path(), "/"), sizeLimitBytes-1)
}

return buf, nil
}
Loading

0 comments on commit dd02a7f

Please sign in to comment.