From db50c5e804179b84c978a29b0abc4d3a1a7b866a Mon Sep 17 00:00:00 2001 From: Thomas van Dam Date: Wed, 15 May 2024 18:03:49 +0800 Subject: [PATCH] chore(plugin): add message filtering capability Optionally allow configuring a list of message types that are allowed to be published on the queue. Went for an allowlist as we'll generally use it when we want to reindex historical state and only publish updates for the new messages. For the same reason I went for a simple slice of strings as the data structure; the list should never be long enough to matter in terms of performance. Closes: #258 --- plugins/indexing/README.md | 3 ++ plugins/indexing/pluginaws/message_filter.go | 30 ++++++++++++++++ plugins/indexing/pluginaws/sqs_client.go | 3 +- plugins/indexing/pluginaws/types.go | 38 +++++++++++++------- 4 files changed, 61 insertions(+), 13 deletions(-) create mode 100644 plugins/indexing/pluginaws/message_filter.go diff --git a/plugins/indexing/README.md b/plugins/indexing/README.md index 89f86c2d..a5766ebe 100644 --- a/plugins/indexing/README.md +++ b/plugins/indexing/README.md @@ -17,6 +17,9 @@ export S3_LARGE_MSG_BUCKET_NAME="" export PLUGIN_LOG_FILE=PATH_TO_DESIRED_LOG_FILE # Optionally you can also specify the log level, one of "trace", "debug", "info", "warn", "error" export PLUGIN_LOG_LEVEL="WARN" +# Optionally you can specifiy a comma separated list of event types which are allowed to be published on the queue. +# When omitted it will default to publishing all message types. +export ALLOWED_MESSAGE_TYPES="block,account" ``` Lastly, as we're using SQS and S3 the node needs access to a valid set of AWS credentials with permission to publish messages to the specified queue and upload access to the specified bucket. diff --git a/plugins/indexing/pluginaws/message_filter.go b/plugins/indexing/pluginaws/message_filter.go new file mode 100644 index 00000000..55b96e05 --- /dev/null +++ b/plugins/indexing/pluginaws/message_filter.go @@ -0,0 +1,30 @@ +package pluginaws + +import ( + "slices" + + "github.com/sedaprotocol/seda-chain/plugins/indexing/types" +) + +func (sc *SqsClient) filterMessages(data []*types.Message) []*types.Message { + allowedMessages := make([]*types.Message, 0, len(data)) + + for _, message := range data { + if sc.isMessageAllowed(message) { + allowedMessages = append(allowedMessages, message) + } else { + sc.logger.Trace("skipping message", "type", message.Type) + } + } + + return allowedMessages +} + +func (sc *SqsClient) isMessageAllowed(event *types.Message) bool { + // When no allowlist is specified assume everything is allowed. + if len(sc.allowedMessages) == 0 { + return true + } + + return slices.Contains(sc.allowedMessages, event.Type) +} diff --git a/plugins/indexing/pluginaws/sqs_client.go b/plugins/indexing/pluginaws/sqs_client.go index ba5d3673..15e46dd9 100644 --- a/plugins/indexing/pluginaws/sqs_client.go +++ b/plugins/indexing/pluginaws/sqs_client.go @@ -16,7 +16,8 @@ const ( func (sc *SqsClient) PublishToQueue(data []*types.Message) error { sc.logger.Trace("publishing to queue", "size", len(data)) - sizedBatchEntries, err := sc.createSizedBatchEntries(data) + allowedMessages := sc.filterMessages(data) + sizedBatchEntries, err := sc.createSizedBatchEntries(allowedMessages) if err != nil { return err } diff --git a/plugins/indexing/pluginaws/types.go b/plugins/indexing/pluginaws/types.go index 56e9cb19..0e7abe9f 100644 --- a/plugins/indexing/pluginaws/types.go +++ b/plugins/indexing/pluginaws/types.go @@ -3,6 +3,7 @@ package pluginaws import ( "fmt" "os" + "strings" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/sqs" @@ -11,16 +12,18 @@ import ( ) var ( - queueURLEnvName = "SQS_QUEUE_URL" - bucketURLEnvName = "S3_LARGE_MSG_BUCKET_NAME" + queueURLEnvName = "SQS_QUEUE_URL" + bucketURLEnvName = "S3_LARGE_MSG_BUCKET_NAME" + messageFilterEnvName = "ALLOWED_MESSAGE_TYPES" ) type SqsClient struct { - bucketName string - queueURL string - sqsClient *sqs.SQS - s3Client *s3.S3 - logger *log.Logger + bucketName string + queueURL string + allowedMessages []string + sqsClient *sqs.SQS + s3Client *s3.S3 + logger *log.Logger } func NewSqsClient(logger *log.Logger) (*SqsClient, error) { @@ -34,6 +37,12 @@ func NewSqsClient(logger *log.Logger) (*SqsClient, error) { return nil, fmt.Errorf("missing environment variable '%s'", bucketURLEnvName) } + allowedMessages := make([]string, 0) + messageFilterString, found := os.LookupEnv(messageFilterEnvName) + if found { + allowedMessages = parseMessageFilterString((messageFilterString)) + } + sess, err := NewSession() if err != nil { return nil, fmt.Errorf("failed to initialise session: %w", err) @@ -47,10 +56,15 @@ func NewSqsClient(logger *log.Logger) (*SqsClient, error) { awsS3Client := s3.New(sess, s3Config) return &SqsClient{ - queueURL: queueURL, - bucketName: bucketName, - sqsClient: awsSqsClient, - s3Client: awsS3Client, - logger: logger, + queueURL: queueURL, + bucketName: bucketName, + allowedMessages: allowedMessages, + sqsClient: awsSqsClient, + s3Client: awsS3Client, + logger: logger, }, nil } + +func parseMessageFilterString(data string) []string { + return strings.Split(data, ",") +}