Skip to content

Commit

Permalink
Merge pull request #8175 from Agoric/mhofman/8031-more-snapshot-logic…
Browse files Browse the repository at this point in the history
…-refactor

refactor(x/swingset): shared KVEntry and helpers
  • Loading branch information
mhofman committed Aug 15, 2023
2 parents fa038bd + 65a5058 commit b81f249
Show file tree
Hide file tree
Showing 20 changed files with 864 additions and 240 deletions.
8 changes: 4 additions & 4 deletions docs/architecture/state-sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ sequenceDiagram
D-CS-->>-SSEH-CS:
SSEH-CS->>+SSES-CS: OnExportRetrieved()
loop
SSES-CS->>+SSEH-CS: provider.ReadArtifact()
SSES-CS->>+SSEH-CS: provider.ReadNextArtifact()
SSEH-CS->>+D-CS: Read(artifactFile)
D-CS-->>-SSEH-CS:
SSEH-CS-->>-SSES-CS: artifact{name, data}
Expand Down Expand Up @@ -246,16 +246,16 @@ sequenceDiagram
SSEH-CS->>SSEH-CS: activeOperation = operationDetails{}
SSEH-CS->>+D-CS: MkDir(exportDir)
D-CS-->>-SSEH-CS:
SSEH-CS->>+SSES-CS: provider.GetExportData()
SSEH-CS->>+SSES-CS: provider.GetExportDataReader()
SSES-CS->>+MS-CS: ExportStorageFromPrefix<br/>("swingStore.")
MS-CS-->>-SSES-CS: vstorage data entries
SSES-CS-->>-SSEH-CS:
SSES-CS--)-SSEH-CS: export data reader
loop each data entry
SSEH-CS->>+D-CS: Append(export-data.jsonl, <br/>"JSON(entry tuple)\n")
D-CS-->>-SSEH-CS:
end
loop extension snapshot items
SSEH-CS->>+SSES-CS: provider.readArtifact()
SSEH-CS->>+SSES-CS: provider.ReadNextArtifact()
SSES-CS->>+SM-CS: payloadReader()
SM-CS->>+SM-M: chunk = <-chunks
SM-M-->>-SM-CS:
Expand Down
13 changes: 12 additions & 1 deletion golang/cosmos/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ import (
tmjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
tmos "github.com/tendermint/tendermint/libs/os"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
dbm "github.com/tendermint/tm-db"

gaiaappparams "github.com/Agoric/agoric-sdk/golang/cosmos/app/params"

appante "github.com/Agoric/agoric-sdk/golang/cosmos/ante"
agorictypes "github.com/Agoric/agoric-sdk/golang/cosmos/types"
"github.com/Agoric/agoric-sdk/golang/cosmos/vm"
"github.com/Agoric/agoric-sdk/golang/cosmos/x/lien"
"github.com/Agoric/agoric-sdk/golang/cosmos/x/swingset"
Expand Down Expand Up @@ -472,10 +474,19 @@ func NewAgoricApp(
return sendToController(true, string(bz))
},
)

getSwingStoreExportDataShadowCopyReader := func(height int64) agorictypes.KVEntryReader {
ctx := app.NewUncachedContext(false, tmproto.Header{Height: height})
exportDataEntries := app.SwingSetKeeper.ExportSwingStore(ctx)
if len(exportDataEntries) == 0 {
return nil
}
return agorictypes.NewVstorageDataEntriesReader(exportDataEntries)
}
app.SwingSetSnapshotter = *swingsetkeeper.NewExtensionSnapshotter(
bApp,
&app.SwingStoreExportsHandler,
app.SwingSetKeeper.ExportSwingStore,
getSwingStoreExportDataShadowCopyReader,
)

app.VibcKeeper = vibc.NewKeeper(
Expand Down
114 changes: 114 additions & 0 deletions golang/cosmos/types/kv_entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package types

import (
"encoding/json"
"fmt"
)

var _ json.Marshaler = &KVEntry{}
var _ json.Unmarshaler = &KVEntry{}

// KVEntry represents a string key / string value pair, where the value may be
// missing, which is different from an empty value.
// The semantics of a missing value are purpose-dependent rather than specified
// here, but frequently correspond with deletion/incompleteness/etc.
// A KVEntry with an empty key is considered invalid.
type KVEntry struct {
key string
value *string
}

// NewKVEntry creates a KVEntry with the provided key and value
func NewKVEntry(key string, value string) KVEntry {
return KVEntry{key, &value}
}

// NewKVEntryWithNoValue creates a KVEntry with the provided key and no value
func NewKVEntryWithNoValue(key string) KVEntry {
return KVEntry{key, nil}
}

// UnmarshalJSON updates a KVEntry from JSON text corresponding with a
// [key: string, value?: string | null] shape, or returns an error indicating
// invalid input.
// The key must be a non-empty string, and the value (if present) must be a
// string or null.
//
// Implements json.Unmarshaler
// Note: unlike other methods, this accepts a pointer to satisfy
// the Unmarshaler semantics.
func (entry *KVEntry) UnmarshalJSON(input []byte) (err error) {
var generic []*string
err = json.Unmarshal(input, &generic)
if err != nil {
return err
}

length := len(generic)

if generic == nil {
return fmt.Errorf("KVEntry cannot be null")
}
if length != 1 && length != 2 {
return fmt.Errorf("KVEntry must be an array of length 1 or 2 (not %d)", length)
}

key := generic[0]
if key == nil || *key == "" {
return fmt.Errorf("KVEntry key must be a non-empty string: %v", key)
}

var value *string
if length == 2 {
value = generic[1]
}

entry.key = *key
entry.value = value

return nil
}

// MarshalJSON encodes the KVEntry into a JSON array of [key: string, value?: string],
// with the value missing (array length of 1) if the entry has no value.
//
// Implements json.Marshaler
func (entry KVEntry) MarshalJSON() ([]byte, error) {
if !entry.IsValidKey() {
return nil, fmt.Errorf("cannot marshal invalid KVEntry")
}
if entry.value != nil {
return json.Marshal([2]string{entry.key, *entry.value})
} else {
return json.Marshal([1]string{entry.key})
}
}

// IsValidKey returns whether the KVEntry has a non-empty key.
func (entry KVEntry) IsValidKey() bool {
return entry.key != ""
}

// Key returns the string key.
func (entry KVEntry) Key() string {
return entry.key
}

// HasValue returns whether the KVEntry has a value or not.
func (entry KVEntry) HasValue() bool {
return entry.value != nil
}

// Value returns a pointer to the string value or nil if the entry has no value.
func (entry KVEntry) Value() *string {
return entry.value
}

// StringValue returns the string value, or the empty string if the entry has no value.
// Note that the result therefore does not differentiate an empty string value from no value.
func (entry KVEntry) StringValue() string {
if entry.value != nil {
return *entry.value
}
return ""
}
220 changes: 220 additions & 0 deletions golang/cosmos/types/kv_entry_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package types

import (
"encoding/json"
"fmt"
"io"

vstoragetypes "github.com/Agoric/agoric-sdk/golang/cosmos/x/vstorage/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)

// These helpers facilitate handling KVEntry streams, in particular for the
// swing-store "export data" use case. The goal is to avoid passing around
// large slices of key/value pairs.
//
// Handling of these streams is primarily accomplished through a KVEntryReader
// interface, with multiple implementations for different backing sources, as
// well as a helper function to consume a reader and write the entries into a
// byte Writer as line terminated json encoded KVEntry.

// We attempt to pass sdk.Iterator around as much as possible to abstract a
// stream of Key/Value pairs without requiring the whole slice to be held in
// memory if possible. Cosmos SDK defines iterators as yielding Key/Value
// pairs, both as byte slices.
//
// More precisely, we define here the following:
// - A KVEntryReader interface allowing to Read the KVEntry one by one from an
// underlying source.
// - Multiple implementations of the KVEntryReader interface:
// - NewKVIteratorReader constructs a reader which consumes an sdk.Iterator.
// Keys and values are converted from byte slices to strings, and nil values
// are preserved as KVEntry instances with no value.
// - A generic reader which uses a slice of key/value data, and a conversion
// function from that data type to a KVEntry. The reader does bounds
// checking and keeps track of the current position. The following data
// types are available:
// - NewVstorageDataEntriesReader constructs a reader from a slice of
// vstorage DataEntry values.
// - NewJsonRawMessageKVEntriesReader constructs a reader from a slice of
// [key: string, value?: string | null] JSON array values.
// - NewJsonlKVEntryDecoderReader constructs a reader from an io.ReadCloser
// (like a file) containing JSON Lines in which each item is a
// [key: string, value?: string | null] array.
// - EncodeKVEntryReaderToJsonl consumes a KVEntryReader and writes its entries
// into an io.Writer as a sequence of single-line JSON texts. The encoding of
// each line is [key, value] if the KVEntry has a value, and [key] otherwise.
// This format terminates each line, but is still compatible with JSON Lines
// (which is line feed *separated*) for Go and JS decoders.

// KVEntryReader is an abstraction for iteratively reading KVEntry data.
type KVEntryReader interface {
// Read returns the next KVEntry, or an error.
// An `io.EOF` error indicates that the previous Read() returned the final KVEntry.
Read() (KVEntry, error)
// Close frees the underlying resource (such as a slice or file descriptor).
Close() error
}

var _ KVEntryReader = &kvIteratorReader{}

// kvIteratorReader is a KVEntryReader backed by an sdk.Iterator
type kvIteratorReader struct {
iter sdk.Iterator
}

// NewKVIteratorReader returns a KVEntryReader backed by an sdk.Iterator.
func NewKVIteratorReader(iter sdk.Iterator) KVEntryReader {
return &kvIteratorReader{
iter: iter,
}
}

// Read yields the next KVEntry from the source iterator
// Implements KVEntryReader
func (ir kvIteratorReader) Read() (next KVEntry, err error) {
if !ir.iter.Valid() {
// There is unfortunately no way to differentiate completion from iteration
// errors with the implementation of Iterators by cosmos-sdk since the
// iter.Error() returns an error in both cases
return KVEntry{}, io.EOF
}

key := ir.iter.Key()
if len(key) == 0 {
return KVEntry{}, fmt.Errorf("nil or empty key yielded by iterator")
}

value := ir.iter.Value()
ir.iter.Next()
if value == nil {
return NewKVEntryWithNoValue(string(key)), nil
} else {
return NewKVEntry(string(key), string(value)), nil
}
}

func (ir kvIteratorReader) Close() error {
return ir.iter.Close()
}

var _ KVEntryReader = &kvEntriesReader[any]{}

// kvEntriesReader is the KVEntryReader using an underlying slice of generic
// kv entries. It reads from the slice sequentially using a type specific
// toKVEntry func, performing bounds checks, and tracking the position.
type kvEntriesReader[T any] struct {
entries []T
toKVEntry func(T) (KVEntry, error)
nextIndex int
}

// Read yields the next KVEntry from the source
// Implements KVEntryReader
func (reader *kvEntriesReader[T]) Read() (next KVEntry, err error) {
if reader.entries == nil {
return KVEntry{}, fmt.Errorf("reader closed")
}

length := len(reader.entries)

if reader.nextIndex < length {
entry, err := reader.toKVEntry(reader.entries[reader.nextIndex])
reader.nextIndex += 1
if err != nil {
return KVEntry{}, err
}
if !entry.IsValidKey() {
return KVEntry{}, fmt.Errorf("source yielded a KVEntry with an invalid key")
}
return entry, err
} else if reader.nextIndex == length {
reader.nextIndex += 1
return KVEntry{}, io.EOF
} else {
return KVEntry{}, fmt.Errorf("index %d is out of source bounds (length %d)", reader.nextIndex, length)
}
}

// Close releases the source slice
// Implements KVEntryReader
func (reader *kvEntriesReader[any]) Close() error {
reader.entries = nil
return nil
}

// NewVstorageDataEntriesReader creates a KVEntryReader backed by a
// vstorage DataEntry slice
func NewVstorageDataEntriesReader(vstorageDataEntries []*vstoragetypes.DataEntry) KVEntryReader {
return &kvEntriesReader[*vstoragetypes.DataEntry]{
entries: vstorageDataEntries,
toKVEntry: func(sourceEntry *vstoragetypes.DataEntry) (KVEntry, error) {
return NewKVEntry(sourceEntry.Path, sourceEntry.Value), nil
},
}
}

// NewJsonRawMessageKVEntriesReader creates a KVEntryReader backed by
// a json.RawMessage slice
func NewJsonRawMessageKVEntriesReader(jsonEntries []json.RawMessage) KVEntryReader {
return &kvEntriesReader[json.RawMessage]{
entries: jsonEntries,
toKVEntry: func(sourceEntry json.RawMessage) (entry KVEntry, err error) {
err = json.Unmarshal(sourceEntry, &entry)
return entry, err
},
}
}

var _ KVEntryReader = &jsonlKVEntryDecoderReader{}

// jsonlKVEntryDecoderReader is the KVEntryReader decoding
// jsonl-like encoded key/value pairs.
type jsonlKVEntryDecoderReader struct {
closer io.Closer
decoder *json.Decoder
}

// Read yields the next decoded KVEntry
// Implements KVEntryReader
func (reader jsonlKVEntryDecoderReader) Read() (next KVEntry, err error) {
err = reader.decoder.Decode(&next)
return next, err
}

// Close release the underlying resource backing the decoder
// Implements KVEntryReader
func (reader jsonlKVEntryDecoderReader) Close() error {
return reader.closer.Close()
}

// NewJsonlKVEntryDecoderReader creates a KVEntryReader over a byte
// stream reader that decodes each line as a json encoded KVEntry. The entries
// are yielded in order they're present in the stream.
func NewJsonlKVEntryDecoderReader(byteReader io.ReadCloser) KVEntryReader {
return &jsonlKVEntryDecoderReader{
closer: byteReader,
decoder: json.NewDecoder(byteReader),
}
}

// EncodeKVEntryReaderToJsonl consumes a KVEntryReader and JSON encodes each
// KVEntry, terminating by new lines.
// It will not Close the Reader when done
func EncodeKVEntryReaderToJsonl(reader KVEntryReader, bytesWriter io.Writer) (err error) {
encoder := json.NewEncoder(bytesWriter)
encoder.SetEscapeHTML(false)
for {
entry, err := reader.Read()
if err == io.EOF {
return nil
} else if err != nil {
return err
}

err = encoder.Encode(entry)
if err != nil {
return err
}
}
}
Loading

0 comments on commit b81f249

Please sign in to comment.