Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish large message payloads to S3 #248

Merged
merged 2 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions plugins/indexing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ The process that starts the node should have the following environment variables
```sh
export COSMOS_SDK_ABCI=PATH_TO_PLUGIN_EXECUTABLE
export SQS_QUEUE_URL=""
export S3_LARGE_MSG_BUCKET_NAME=""
export PLUGIN_LOG_FILE=PATH_TO_DESIRED_LOG_FILE
# Optionally you can also specify the log level, one of "trace", "debug", "info", "warn", "error"
export PLUGIN_LOG_LEVEL="WARN"
```

Lastly, as we're using SQS the node needs access to a valid set of AWS credentials with permission to publish messages to the specified queue.
Lastly, as we're using SQS and S3 the node needs access to a valid set of AWS credentials with permission to publish messages to the specified queue and upload access to the specified bucket.

### Logging

Expand Down Expand Up @@ -108,18 +109,24 @@ Now the node can be restarted and it should resume from height N. It will call b

```sh
go build -o PATH_TO_PLUGIN_EXECUTABLE ./plugins/indexing/plugin.go
# Alternatively, outputs in the /build directory in the project root
make build-plugin
```

## Local Development

To simplify local development we use [a SQS emulator](https://github.com/Admiral-Piett/goaws/). To connect to this from the plugin you need to need to build the plugin with the `dev` flag. In addition you'll need specifiy an environment variable for `SQS_ENDPOINT` (which should be the base of the `SQS_QUEUE_URL`) in the process that launches the node.
To simplify local development we use [a SQS emulator](https://github.com/Admiral-Piett/goaws/) and [a S3 emulator](https://github.com/adobe/S3Mock). To connect to this from the plugin you need to need to build the plugin with the `dev` flag. In addition you'll need specifiy an environment variable for `SQS_ENDPOINT` (which should be the base of the `SQS_QUEUE_URL`) in the process that launches the node, and an environment variable for `S3_ENDPOINT` (which should correspond to your local port of the service.).

```sh
# Example urls
export SQS_QUEUE_URL=http://localhost/4100/test-queue.fifo
export SQS_ENDPOINT=http://localhost:4100
export S3_LARGE_MSG_BUCKET_NAME="indexer-localnet-large-messages"
export S3_ENDPOINT=http://localhost:9444
```

```sh
go build --tags dev -o PATH_TO_PLUGIN_EXECUTABLE ./plugins/indexing/plugin.go
# Alternatively, outputs in the /build directory in the project root
make build-plugin-dev
```
6 changes: 3 additions & 3 deletions plugins/indexing/auth/module.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package bank
package auth

import (
"bytes"
Expand All @@ -8,8 +8,8 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"

log "github.com/sedaprotocol/seda-chain/plugins/indexing/log"
types "github.com/sedaprotocol/seda-chain/plugins/indexing/types"
"github.com/sedaprotocol/seda-chain/plugins/indexing/log"
"github.com/sedaprotocol/seda-chain/plugins/indexing/types"
)

const StoreKey = authtypes.StoreKey
Expand Down
4 changes: 2 additions & 2 deletions plugins/indexing/bank/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"

log "github.com/sedaprotocol/seda-chain/plugins/indexing/log"
types "github.com/sedaprotocol/seda-chain/plugins/indexing/types"
"github.com/sedaprotocol/seda-chain/plugins/indexing/log"
"github.com/sedaprotocol/seda-chain/plugins/indexing/types"
)

const StoreKey = banktypes.StoreKey
Expand Down
2 changes: 1 addition & 1 deletion plugins/indexing/base/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
abci "github.com/cometbft/cometbft/abci/types"
sdk "github.com/cosmos/cosmos-sdk/types"

types "github.com/sedaprotocol/seda-chain/plugins/indexing/types"
"github.com/sedaprotocol/seda-chain/plugins/indexing/types"
)

func ExtractBlockUpdate(ctx *types.BlockContext, req abci.RequestFinalizeBlock) (*types.Message, error) {
Expand Down
8 changes: 4 additions & 4 deletions plugins/indexing/base/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
"github.com/cometbft/cometbft/crypto/tmhash"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
txtype "github.com/cosmos/cosmos-sdk/types/tx"
"github.com/cosmos/cosmos-sdk/types/tx"

types "github.com/sedaprotocol/seda-chain/plugins/indexing/types"
"github.com/sedaprotocol/seda-chain/plugins/indexing/types"
)

type wrappedTx struct {
cdc codec.Codec
Tx *txtype.Tx
Tx *tx.Tx
}

func (s wrappedTx) MarshalJSON() ([]byte, error) {
Expand All @@ -33,7 +33,7 @@ func ExtractTransactionUpdates(ctx *types.BlockContext, cdc codec.Codec, req abc
txResult := res.TxResults[index]
txHash := strings.ToUpper(hex.EncodeToString(tmhash.Sum(txBytes)))

var tx txtype.Tx
var tx tx.Tx
if err := cdc.Unmarshal(txBytes, &tx); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/indexing/log/logger.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package logger
package log

import (
"fmt"
Expand Down
26 changes: 13 additions & 13 deletions plugins/indexing/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"github.com/sedaprotocol/seda-chain/app"
"github.com/sedaprotocol/seda-chain/app/params"

authmodule "github.com/sedaprotocol/seda-chain/plugins/indexing/auth"
bankmodule "github.com/sedaprotocol/seda-chain/plugins/indexing/bank"
base "github.com/sedaprotocol/seda-chain/plugins/indexing/base"
log "github.com/sedaprotocol/seda-chain/plugins/indexing/log"
pluginsqs "github.com/sedaprotocol/seda-chain/plugins/indexing/sqs"
types "github.com/sedaprotocol/seda-chain/plugins/indexing/types"
"github.com/sedaprotocol/seda-chain/plugins/indexing/auth"
"github.com/sedaprotocol/seda-chain/plugins/indexing/bank"
"github.com/sedaprotocol/seda-chain/plugins/indexing/base"
"github.com/sedaprotocol/seda-chain/plugins/indexing/log"
"github.com/sedaprotocol/seda-chain/plugins/indexing/pluginaws"
"github.com/sedaprotocol/seda-chain/plugins/indexing/types"
)

var _ storetypes.ABCIListener = &IndexerPlugin{}
Expand All @@ -35,7 +35,7 @@ var _ storetypes.ABCIListener = &IndexerPlugin{}
type IndexerPlugin struct {
block *types.BlockContext
cdc codec.Codec
sqsClient *pluginsqs.SqsClient
sqsClient *pluginaws.SqsClient
logger *log.Logger
}

Expand Down Expand Up @@ -81,10 +81,10 @@ func (p *IndexerPlugin) ListenFinalizeBlock(_ context.Context, req abci.RequestF

func (p *IndexerPlugin) extractUpdate(change *storetypes.StoreKVPair) (*types.Message, error) {
switch change.StoreKey {
case bankmodule.StoreKey:
return bankmodule.ExtractUpdate(p.block, p.cdc, p.logger, change)
case authmodule.StoreKey:
return authmodule.ExtractUpdate(p.block, p.cdc, p.logger, change)
case bank.StoreKey:
return bank.ExtractUpdate(p.block, p.cdc, p.logger, change)
case auth.StoreKey:
return auth.ExtractUpdate(p.block, p.cdc, p.logger, change)
default:
return nil, nil
}
Expand Down Expand Up @@ -152,9 +152,9 @@ func main() {
std.RegisterInterfaces(interfaceRegistry)
app.ModuleBasics.RegisterInterfaces(interfaceRegistry)

sqsClient, err := pluginsqs.NewSqsClient()
sqsClient, err := pluginaws.NewSqsClient(logger)
if err != nil {
logger.Fatal("failed to create sqs client", err)
logger.Fatal("failed to create AWS clients", err)
}

filePlugin := &IndexerPlugin{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//go:build !dev

package pluginsqs
package pluginaws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
)

Expand All @@ -11,3 +12,7 @@ func NewSession() (*session.Session, error) {
SharedConfigState: session.SharedConfigEnable,
})
}

func NewS3Config() (*aws.Config, error) {
return aws.NewConfig(), nil
}
39 changes: 39 additions & 0 deletions plugins/indexing/pluginaws/aws_session_dev.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//go:build dev

package pluginaws

import (
"fmt"
"os"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)

var (
sqsEndpointEnvName = "SQS_ENDPOINT"
s3EndpointEnvName = "S3_ENDPOINT"
)

func NewSession() (*session.Session, error) {
endpoint, found := os.LookupEnv(sqsEndpointEnvName)
if !found {
return nil, fmt.Errorf("missing environment variable '%s'", sqsEndpointEnvName)
}

return session.NewSession(&aws.Config{
Region: aws.String("eu-west-2"),
Credentials: credentials.NewStaticCredentials("test", "test", ""),
Endpoint: aws.String(endpoint),
})
}

func NewS3Config() (*aws.Config, error) {
endpoint, found := os.LookupEnv(s3EndpointEnvName)
if !found {
return nil, fmt.Errorf("missing environment variable '%s'", s3EndpointEnvName)
}
// The local emulator requires path style access
return aws.NewConfig().WithEndpoint(endpoint).WithS3ForcePathStyle(true), nil
}
64 changes: 64 additions & 0 deletions plugins/indexing/pluginaws/large_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package pluginaws

import (
"bytes"
"fmt"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/sqs"

"github.com/sedaprotocol/seda-chain/plugins/indexing/types"
)

const (
MaxMessageBodyLengthBytes = 100_00 // ~100KB
)

func (sc *SqsClient) uploadToS3(key string, body []byte, ctx *types.BlockContext) (*types.Message, error) {
sc.logger.Trace("uploading to S3", "key", key)

response, err := sc.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(sc.bucketName),
Body: bytes.NewReader(body),
Key: aws.String(key),
})
if err != nil {
return nil, err
}

sc.logger.Trace("upload to S3 successful", "key", key, "etag", *response.ETag)

data := struct {
Key string `json:"key"`
ETag string `json:"ETag"`
}{
Key: key,
ETag: *response.ETag,
}
return types.NewMessage("large-message", data, ctx), nil
}

func batchEntrySize(msg *string, msgAttr map[string]*sqs.MessageAttributeValue) (int, error) {
var size int
if msg != nil {
size += len(*msg)
}

for k, v := range msgAttr {
dataType := v.DataType
size += len(k)
size += len(*dataType)
switch {
case strings.HasPrefix(*dataType, "String") || strings.HasPrefix(*dataType, "Number"):
size += len(*v.StringValue)
case strings.HasPrefix(*dataType, "Binary"):
size += len(v.BinaryValue)
default:
return -1, fmt.Errorf("unexpected data type: %s", *dataType)
}
}

return size, nil
}
85 changes: 85 additions & 0 deletions plugins/indexing/pluginaws/sized_batch_entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package pluginaws

import (
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"

"github.com/sedaprotocol/seda-chain/plugins/indexing/types"
)

type sizedBatchEntry struct {
size int
entry *sqs.SendMessageBatchRequestEntry
}

func newSizedBatchEntry(entry *sqs.SendMessageBatchRequestEntry) (*sizedBatchEntry, error) {
size, err := batchEntrySize(entry.MessageBody, entry.MessageAttributes)
if err != nil {
return nil, err
}

return &sizedBatchEntry{
size: size,
entry: entry,
}, nil
}

func (sc *SqsClient) createSizedBatchEntries(data []*types.Message) ([]*sizedBatchEntry, error) {
entries := make([]*sizedBatchEntry, 0, len(data))

for i, message := range data {
serialisedMessage, err := json.Marshal(message)
if err != nil {
return nil, err
}

attributes := map[string]*sqs.MessageAttributeValue{
"height": {
DataType: aws.String("Number"),
StringValue: aws.String(strconv.FormatInt(message.Block.Height, 10)),
},
"time": {
DataType: aws.String("String"),
StringValue: aws.String(message.Block.Time.Format(time.RFC3339)),
},
}

if len(serialisedMessage) > MaxMessageBodyLengthBytes {
fileKey := fmt.Sprintf("%s-h%d-i%d.json", message.Type, message.Block.Height, i)
s3Message, err := sc.uploadToS3(fileKey, serialisedMessage, message.Block)
if err != nil {
return nil, err
}

s3SerialisedMessage, err := json.Marshal(s3Message)
if err != nil {
return nil, err
}

serialisedMessage = s3SerialisedMessage
attributes["file"] = &sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(fileKey),
}
}

sizedEntry, err := newSizedBatchEntry(&sqs.SendMessageBatchRequestEntry{
Id: aws.String(fmt.Sprintf("%s-%d", message.Type, i)),
MessageGroupId: aws.String("chain_events"),
MessageAttributes: attributes,
MessageBody: aws.String(string(serialisedMessage)),
})
if err != nil {
return nil, err
}

entries = append(entries, sizedEntry)
}

return entries, nil
}
Loading
Loading