Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos x/vstream implementation #5466

Closed
wants to merge 12 commits into from
Closed
74 changes: 74 additions & 0 deletions golang/cosmos/proto/agoric/types/stream.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
syntax = "proto3";
package agoric.types;

import "gogoproto/gogo.proto";

option go_package = "github.com/Agoric/agoric-sdk/golang/cosmos/types/stream";

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introductory comment: "A stream is a sequence of arbitrary bytes values used by publishers to communicate with consumers. Values in a stream are indexed by 1-based contiguous sequence numbers. A stream is stored as a sequence of StreamCells. StreamCells are typically written to the same key in a KVStore, relying on reading at old block heights to see older values."

// StreamCell represents one publish state in a vstream stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"published". Consider instead "StreamCell contains the stream values that are published during the execution of a single block. Each StreamCell contains a reference to the prior cell in the stream.

message StreamCell {
option (gogoproto.equal) = false;

// EndState is the state of the stream at the last position in this cell.
enum EndState {
// END_STATE_UNSPECIFIED means unknown end state.
END_STATE_UNSPECIFIED = 0;
// END_STATE_APPENDABLE means that appending to this cell is possible.
END_STATE_APPENDABLE = 1;
// END_STATE_FINISHED means this cell has terminated.
END_STATE_FINISHED = 2;
// END_STATE_FAILURE means this cell has terminated with an error value.
END_STATE_FAILURE = 3;
}

// Chronologically-ordered list of values for this cell.
// If state=END_STATE_FAILURE, the last value is the serialised error.
repeated bytes values = 1;

// The block height in which this cell was last updated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it sounds like a cell is updated only in a single block, change this to just block_height.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to specify that the value of the cell may be queried at a height > updated_block_height. Just using block_height does not convey that meaning as clearly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. In the comment at least, instead of "was last updated", try "was produced", and see if that inspires a different name.

int64 updated_block_height = 2 [
(gogoproto.jsontag) = "updated_block_height",
(gogoproto.moretags) = "yaml:\"updated_block_height\""
];

// The state as of the last value.
EndState end_state = 3 [
(gogoproto.jsontag) = "end_state",
(gogoproto.moretags) = "yaml:\"end_state\""
];

// The prior position in the stream. If this is the first cell in the
// stream, this will be the default value. All other cells will have a
// reference with a nonzero block height.
StreamPosition prior = 4 [
(gogoproto.nullable) = false
];
}

// StreamPosition is data that refers to a particular stream cell value.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add The default value of this message is the equivalent of a null pointer.

message StreamPosition {
// The height at which the cell is stored.
int64 block_height = 1 [
(gogoproto.jsontag) = "block_height",
(gogoproto.moretags) = "yaml:\"block_height\""
];

// The low-level (Cosmos SDK multistore) store name under which the cell is stored.
string store_name = 2 [
(gogoproto.jsontag) = "store_name",
(gogoproto.moretags) = "yaml:\"store_name\""
];

// The low-level (Cosmos SDK multistore) store subkey under which the cell is stored.
bytes store_subkey = 3 [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the subkey/child key for vstorage? Make that explicit, and be consistent with the choice of "sub" vs "child" made for vstorage.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is at a lower level than vstorage: the Cosmos SDK multistore. I've noted as such.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, "subkey" doesn't appear to be standard cosmos terminology - the APIs just use "key".

(gogoproto.jsontag) = "store_subkey",
(gogoproto.moretags) = "yaml:\"store_subkey\""
];

// The sequence number of the value at this position. Necessary to allow
// references to a fork point within a cell.
uint64 sequence_number = 4 [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I first thought this was redundant with firstSequenceNumber in StreamCell, but now I see this is necessary for a reference to a fork point within a cell.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you've removed firstSequenceNumber from StreamCell. I guess it's redundant as long as you're always accessing a cell via a StreamPosition, but it seems a little too bold to leave this essential information only extrinsic. Consider restoring the field.

(gogoproto.jsontag) = "sequence_number",
(gogoproto.moretags) = "yaml:\"sequence_number\""
];
}
57 changes: 57 additions & 0 deletions golang/cosmos/types/state_ref.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package types

import (
"fmt"

sdk "github.com/cosmos/cosmos-sdk/types"
)

// An interface to allow updating KVStore-backed state and extracting proof
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interface ... to a stored StreamCell.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not StreamCell specific.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, but see below for comments that make it explicit when we are expecting it to reference a StreamCell. When do you think the SDK might start using generics?

type StateRef[T Jsoner] interface {
    Read(ctx sdk.Context) (T, error)
    Write(ctx sdkContext, value T) error
    ...
}

// parameters.
type StateRef interface {
Read(ctx sdk.Context) ([]byte, error)
Write(ctx sdk.Context, value []byte) error
Exists(ctx sdk.Context) bool
StoreName() string
StoreSubkey() []byte
String() string
}

type KVStoreStateRef struct {
storeKey sdk.StoreKey
subkey []byte
}

var _ StateRef = KVStoreStateRef{}

func NewKVStoreStateRef(storeKey sdk.StoreKey, subkey []byte) *KVStoreStateRef {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider not pointer-izing.

return &KVStoreStateRef{storeKey, subkey}
}

func (s KVStoreStateRef) Read(ctx sdk.Context) ([]byte, error) {
store := ctx.KVStore(s.storeKey)
return store.Get(s.subkey), nil
}

func (s KVStoreStateRef) Write(ctx sdk.Context, value []byte) error {
store := ctx.KVStore(s.storeKey)
store.Set(s.subkey, value)
return nil
}

func (s KVStoreStateRef) Exists(ctx sdk.Context) bool {
store := ctx.KVStore(s.storeKey)
return store.Has(s.subkey)
}

func (s KVStoreStateRef) StoreName() string {
return s.storeKey.Name()
}

func (s KVStoreStateRef) StoreSubkey() []byte {
return s.subkey
}

func (s KVStoreStateRef) String() string {
return fmt.Sprintf("KVStoreStateRef{%s, %s}", s.storeKey.Name(), s.subkey)
}
59 changes: 59 additions & 0 deletions golang/cosmos/types/stream/cell.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package stream

import (
"encoding/json"

sdk "github.com/cosmos/cosmos-sdk/types"

agoric "github.com/Agoric/agoric-sdk/golang/cosmos/types"
)

// NewStreamCell creates a new StreamCell at blockHeight with the specified
// prior position.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a zero stream position if this is the first cell in the stream.

func NewStreamCell(blockHeight int64, prior StreamPosition) StreamCell {
return StreamCell{
UpdatedBlockHeight: blockHeight,
EndState: StreamCell_END_STATE_APPENDABLE,
Values: make([][]byte, 0, 1),
Prior: prior,
}
}

func NewStreamPosition(blockHeight int64, storeName string, subkey []byte, seq uint64) StreamPosition {
return StreamPosition{
BlockHeight: blockHeight,
StoreName: storeName,
StoreSubkey: subkey,
SequenceNumber: seq,
}
}

func NewZeroStreamPosition() StreamPosition {
return NewStreamPosition(0, "", nil, 0)
}

// GetLatestPosition returns the position of the last value in the cell, or a
// zero stream position if the referenced cell does not exist.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a StateRef could reference anything, document that it is expected to reference a StreamCell.

Add a note that it will unconditionally return the position of the last value in the cell regardless of the cell's end state. The cell end state should be checked to ensure the position is meaningful, e.g. not END_STATE_FAILURE.

func GetLatestPosition(ctx sdk.Context, state agoric.StateRef) (*StreamPosition, error) {
JimLarson marked this conversation as resolved.
Show resolved Hide resolved
if !state.Exists(ctx) {
zeroPosition := NewZeroStreamPosition()
return &zeroPosition, nil
}

var priorCell StreamCell
data, err := state.Read(ctx)
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &priorCell); err != nil {
return nil, err
}

priorReference := NewStreamPosition(
priorCell.UpdatedBlockHeight,
state.StoreName(),
state.StoreSubkey(),
uint64(len(priorCell.Values)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to add priorCell.FirstSequenceNumber? Add a test which would have caught this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out that was redundant information that I've removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fixed yet.

)
return &priorReference, nil
}
188 changes: 188 additions & 0 deletions golang/cosmos/types/stream/operation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package stream

import (
"bytes"
"encoding/json"
"fmt"

sdk "github.com/cosmos/cosmos-sdk/types"

agoric "github.com/Agoric/agoric-sdk/golang/cosmos/types"
)

type StreamCellUpdater interface {
Update(cell StreamCell) error
}

var _, _ StreamCellUpdater = AppendStreamCellUpdater{}, FailStreamCellUpdater{}

type StreamOperation struct {
state agoric.StateRef
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document that state is expected to reference a StreamCell.

Updater StreamCellUpdater
Prior StreamPosition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the Prior field used? It seems redundant with the priorPos argument to StreamOperation.Commit().

}

func NewStreamOperation(ctx sdk.Context, state agoric.StateRef, updater StreamCellUpdater) StreamOperation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document that state is expected to reference a StreamCell.

return StreamOperation{
state: state,
Updater: updater,
Prior: NewZeroStreamPosition(),
}
}

func (so StreamOperation) GetLatestPosition(ctx sdk.Context) (*StreamPosition, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that it returns the position of the last value regardless of the end state.

return GetLatestPosition(ctx, so.state)
}

// LoadAndCheckHead returns a head satisfying prior that we can mutate.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand "head satisfying prior". How about "Given the position of the last value in the stream (or a zero position if the stream is empty), returns a StreamCell to which new stream values can be appended. Does not check whether an existing cell is failed or finished. This cannot be used to fork a stream - the StreamOperation's StateRef must match the store and key of prior". (Or skip the checks and allow this to create a fork.)

func (so StreamOperation) LoadAndCheckHead(ctx sdk.Context, prior StreamPosition) (*StreamCell, error) {
head := NewStreamCell(ctx.BlockHeight(), prior)
if !so.state.Exists(ctx) {
// No prior state, safe to use the fresh head.
return &head, nil
}
// Get the current head.
data, err := so.state.Read(ctx)
if err != nil {
return nil, err
}
if err := json.Unmarshal([]byte(data), &head); err != nil {
return nil, err
}
if prior.BlockHeight != head.UpdatedBlockHeight {
return nil, fmt.Errorf("prior block height %d does not match current %q head block height %d", prior.BlockHeight, so.state, head.UpdatedBlockHeight)
}
nextSequence := head.Prior.SequenceNumber + uint64(len(head.Values))
if prior.SequenceNumber != nextSequence {
return nil, fmt.Errorf("prior sequence number %d does not point to last value %d", prior.SequenceNumber, nextSequence)
}
stateStoreName := so.state.StoreName()
if prior.StoreName != stateStoreName {
return nil, fmt.Errorf("prior store name %s does not match state store name %s", prior.StoreName, stateStoreName)
}
stateStoreSubKey := so.state.StoreSubkey()
if !bytes.Equal(prior.StoreSubkey, stateStoreSubKey) {
return nil, fmt.Errorf("prior store subkey %s does not match state store subkey %s", prior.StoreSubkey, stateStoreSubKey)
}
// We can update the current head state.
return &head, nil
}

// Commit commits the stream operation to the current state.
func (so StreamOperation) Commit(ctx sdk.Context, priorPos StreamPosition, forceOverwrite bool) error {
head := NewStreamCell(ctx.BlockHeight(), priorPos)
if !forceOverwrite {
// Get the current head and assert that it's compatible with prior.
headP, err := so.LoadAndCheckHead(ctx, priorPos)
if err != nil {
return err
}
head = *headP
}

if head.UpdatedBlockHeight != ctx.BlockHeight() {
// Start a new values list.
head.UpdatedBlockHeight = ctx.BlockHeight()
head.Values = nil
}

head.Prior = priorPos

if head.Values == nil {
// Allocate fresh values for potential appending.
head.Values = make([][]byte, 0, 1)
}

switch head.EndState {
case StreamCell_END_STATE_APPENDABLE:
break
case StreamCell_END_STATE_FINISHED:
return fmt.Errorf("cannot update stream at %s that is already done", so.state)
case StreamCell_END_STATE_FAILURE:
return fmt.Errorf("cannot update stream at %s that has an error: %s", so.state, head.Values[len(head.Values)-1])
default:
return fmt.Errorf("cannot update stream at %s with unrecognized end state %q", so.state, head.EndState)
}

if err := so.Updater.Update(head); err != nil {
return err
}

// Convert the head to JSON.
bz, err := json.Marshal(head)
if err != nil {
return err
}

// COMMIT POINT
// Store the marshalled stream cell.
if err := so.state.Write(ctx, bz); err != nil {
return err
}

// Emit the advisory state change event.
ctx.EventManager().EmitEvent(
agoric.NewStateChangeEvent(
so.state.StoreName(),
so.state.StoreSubkey(),
bz,
),
)
return nil
}

// CommitToCurrent fetches the latest position and appends to it.
func (so StreamOperation) CommitToCurrent(ctx sdk.Context) error {
prior, err := so.GetLatestPosition(ctx)
if err != nil {
return err
}
return so.Commit(ctx, *prior, false)
}

type AppendStreamCellUpdater struct {
Done bool
Value []byte
}

func NewUpdateStreamOperation(ctx sdk.Context, state agoric.StateRef, value []byte) StreamOperation {
updater := AppendStreamCellUpdater{
Value: value,
Done: false,
}
return NewStreamOperation(ctx, state, updater)
}

func NewFinishStreamOperation(ctx sdk.Context, state agoric.StateRef, value []byte) StreamOperation {
updater := AppendStreamCellUpdater{
Value: value,
Done: true,
}
return NewStreamOperation(ctx, state, updater)
}

func (op AppendStreamCellUpdater) Update(cell StreamCell) error {
// Add the new state to the batch.
cell.Values = append(cell.Values, op.Value)
if op.Done {
cell.EndState = StreamCell_END_STATE_FINISHED
}
return nil
}

type FailStreamCellUpdater struct {
Failure []byte
}

func (op FailStreamCellUpdater) Update(cell StreamCell) error {
cell.Values = append(cell.Values, op.Failure)
cell.EndState = StreamCell_END_STATE_FAILURE
return nil
}

func NewFailStreamOperation(ctx sdk.Context, state agoric.StateRef, failure []byte) StreamOperation {
updater := FailStreamCellUpdater{
Failure: failure,
}
return NewStreamOperation(ctx, state, updater)
}
Loading