From 34d2ffb97afbc52fdeed1a1ffcd190ca374553c2 Mon Sep 17 00:00:00 2001 From: Josh Liburdi Date: Tue, 4 Jun 2024 16:22:49 -0700 Subject: [PATCH] feat(transform): Add Meta KV Store Lock Transform (#177) * feat(kv): Add Locker Interface * feat(aws): Add DynamoDB DeleteItem, PutItem Methods * feat(transform): Add Meta KV Store Lock * docs(examples): Add Exactly Once Examples * docs(transform): Update Locker Comment * fix(dynamodb): KV Store Lock Errors * fix(transform): KV Store Locked Keys * docs(examples): Add DynamoDB Distributed Lock Pattern * docs(examples): Formatting --- build/config/substation.libsonnet | 14 ++ .../meta/exactly_once_consumer/config.jsonnet | 29 +++ .../meta/exactly_once_consumer/data.jsonl | 8 + .../meta/exactly_once_producer/config.jsonnet | 21 ++ .../meta/exactly_once_producer/data.jsonl | 8 + .../meta/exactly_once_system/config.jsonnet | 24 +++ .../meta/exactly_once_system/data.jsonl | 8 + examples/terraform/aws/README.md | 4 + .../config/node/config.jsonnet | 40 ++++ .../distributed_lock/terraform/_resources.tf | 40 ++++ .../distributed_lock/terraform/node.tf | 26 +++ internal/aws/dynamodb/dynamodb.go | 36 +++- internal/kv/aws_dynamodb.go | 69 ++++++ internal/kv/kv.go | 50 ++++- internal/kv/memory.go | 33 +++ transform/enrich_kv_store_set.go | 20 +- transform/enrich_kv_store_set_test.go | 40 ---- transform/meta_kv_store_lock.go | 198 ++++++++++++++++++ transform/transform.go | 18 ++ transform/transform_test.go | 30 +++ 20 files changed, 654 insertions(+), 62 deletions(-) create mode 100644 examples/config/transform/meta/exactly_once_consumer/config.jsonnet create mode 100644 examples/config/transform/meta/exactly_once_consumer/data.jsonl create mode 100644 examples/config/transform/meta/exactly_once_producer/config.jsonnet create mode 100644 examples/config/transform/meta/exactly_once_producer/data.jsonl create mode 100644 examples/config/transform/meta/exactly_once_system/config.jsonnet create mode 100644 examples/config/transform/meta/exactly_once_system/data.jsonl create mode 100644 examples/terraform/aws/dynamodb/distributed_lock/config/node/config.jsonnet create mode 100644 examples/terraform/aws/dynamodb/distributed_lock/terraform/_resources.tf create mode 100644 examples/terraform/aws/dynamodb/distributed_lock/terraform/node.tf delete mode 100644 transform/enrich_kv_store_set_test.go create mode 100644 transform/meta_kv_store_lock.go diff --git a/build/config/substation.libsonnet b/build/config/substation.libsonnet index ef8cc8ec..d4c74c9e 100644 --- a/build/config/substation.libsonnet +++ b/build/config/substation.libsonnet @@ -523,6 +523,20 @@ type: 'meta_for_each', settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))), }, + kv_store: { + lock(settings={}): { + local default = { + object: $.config.object { lock_key: null, ttl_key: null }, + transform: null, + kv_store: null, + prefix: null, + ttl_offset: "0s", + }, + + type: 'meta_kv_store_lock', + settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))), + }, + }, metric: { duration(settings={}): { local default = { diff --git a/examples/config/transform/meta/exactly_once_consumer/config.jsonnet b/examples/config/transform/meta/exactly_once_consumer/config.jsonnet new file mode 100644 index 00000000..28c2f2e9 --- /dev/null +++ b/examples/config/transform/meta/exactly_once_consumer/config.jsonnet @@ -0,0 +1,29 @@ +// This example shows how to use the `meta_kv_store_lock` transform to +// create an "exactly once" semantic for a pipeline consumer. +local sub = import '../../../../../build/config/substation.libsonnet'; + +// In production environments a distributed KV store should be used. +local kv = sub.kv_store.memory(); + +{ + transforms: [ + // If a message acquires a lock, then it is tagged for inspection. + sub.tf.meta.kv_store.lock(settings={ + kv_store: kv, + prefix: 'eo_consumer', + ttl_offset: '1m', + transform: sub.tf.obj.insert({ object: { target_key: 'meta eo_consumer' }, value: 'locked' }), + }), + // Messages that are not locked are dropped from the pipeline. + sub.tf.meta.switch({ cases: [ + { + condition: sub.cnd.none([ + sub.cnd.str.eq({ object: { source_key: 'meta eo_consumer' }, value: 'locked' }), + ]), + transform: sub.tf.utility.drop(), + }, + ] }), + // At this point only locked messages exist in the pipeline. + sub.tf.send.stdout(), + ], +} diff --git a/examples/config/transform/meta/exactly_once_consumer/data.jsonl b/examples/config/transform/meta/exactly_once_consumer/data.jsonl new file mode 100644 index 00000000..864e8bfd --- /dev/null +++ b/examples/config/transform/meta/exactly_once_consumer/data.jsonl @@ -0,0 +1,8 @@ +{"a":"b"} +{"a":"b"} +{"c":"d"} +{"a":"b"} +{"c":"d"} +{"c":"d"} +{"e":"f"} +{"a":"b"} diff --git a/examples/config/transform/meta/exactly_once_producer/config.jsonnet b/examples/config/transform/meta/exactly_once_producer/config.jsonnet new file mode 100644 index 00000000..0f1ad8dc --- /dev/null +++ b/examples/config/transform/meta/exactly_once_producer/config.jsonnet @@ -0,0 +1,21 @@ +// This example shows how to use the `meta_kv_store_lock` transform to +// create an "exactly once" semantic for a pipeline producer. +local sub = import '../../../../../build/config/substation.libsonnet'; + +// In production environments a distributed KV store should be used. +local kv = sub.kv_store.memory(); + +{ + transforms: [ + // This only prints messages that acquire a lock. Any message + // that fails to acquire a lock will be skipped. An error in the + // sub-transform will cause all previously locked messages to be + // unlocked. + sub.tf.meta.err({ transform: sub.tf.meta.kv_store.lock(settings={ + kv_store: kv, + prefix: 'eo_producer', + ttl_offset: '1m', + transform: sub.tf.send.stdout(), + }) }), + ], +} diff --git a/examples/config/transform/meta/exactly_once_producer/data.jsonl b/examples/config/transform/meta/exactly_once_producer/data.jsonl new file mode 100644 index 00000000..864e8bfd --- /dev/null +++ b/examples/config/transform/meta/exactly_once_producer/data.jsonl @@ -0,0 +1,8 @@ +{"a":"b"} +{"a":"b"} +{"c":"d"} +{"a":"b"} +{"c":"d"} +{"c":"d"} +{"e":"f"} +{"a":"b"} diff --git a/examples/config/transform/meta/exactly_once_system/config.jsonnet b/examples/config/transform/meta/exactly_once_system/config.jsonnet new file mode 100644 index 00000000..87d97ffe --- /dev/null +++ b/examples/config/transform/meta/exactly_once_system/config.jsonnet @@ -0,0 +1,24 @@ +// This example shows how to use the `meta_kv_store_lock` transform to +// create an "exactly once" semantic for an entire pipeline system. +local sub = import '../../../../../build/config/substation.libsonnet'; + +// In production environments a distributed KV store should be used. +local kv = sub.kv_store.memory(); + +{ + transforms: [ + // All messages are locked before being sent through other transform + // functions, ensuring that the message is processed only once. + // An error in any sub-transform will cause all previously locked + // messages to be unlocked. + sub.tf.meta.err({ transform: sub.tf.meta.kv_store.lock(settings={ + kv_store: kv, + prefix: 'eo_system', + ttl_offset: '1m', + transform: sub.tf.meta.pipeline({ transforms: [ + sub.tf.obj.insert({ object: { target_key: 'processed' }, value: true }), + sub.tf.send.stdout(), + ] }), + }) }), + ], +} diff --git a/examples/config/transform/meta/exactly_once_system/data.jsonl b/examples/config/transform/meta/exactly_once_system/data.jsonl new file mode 100644 index 00000000..864e8bfd --- /dev/null +++ b/examples/config/transform/meta/exactly_once_system/data.jsonl @@ -0,0 +1,8 @@ +{"a":"b"} +{"a":"b"} +{"c":"d"} +{"a":"b"} +{"c":"d"} +{"c":"d"} +{"e":"f"} +{"a":"b"} diff --git a/examples/terraform/aws/README.md b/examples/terraform/aws/README.md index 5f1ac42e..da889bed 100644 --- a/examples/terraform/aws/README.md +++ b/examples/terraform/aws/README.md @@ -82,6 +82,10 @@ flowchart LR end ``` +## Distributed Lock + +Deploys a data pipeline that implements a distributed lock pattern using DynamoDB. This pattern can be used to add "exactly-once" semantics to services that otherwise do not support it. For similar examples, see the "exactly once" configurations [here](/examples/config/transform/meta/). + ## Telephone Deploys a data pipeline that implements a "telephone" pattern by sharing data as context between multiple Lambda functions using a DynamoDB table. This pattern can be used to enrich events across unique data sources. diff --git a/examples/terraform/aws/dynamodb/distributed_lock/config/node/config.jsonnet b/examples/terraform/aws/dynamodb/distributed_lock/config/node/config.jsonnet new file mode 100644 index 00000000..7a034ea9 --- /dev/null +++ b/examples/terraform/aws/dynamodb/distributed_lock/config/node/config.jsonnet @@ -0,0 +1,40 @@ +local sub = import '../../../../../../../build/config/substation.libsonnet'; + +local kv = sub.kv_store.aws_dynamodb({ + table_name: 'substation', + attributes: { partition_key: 'PK', ttl: 'ttl' }, +}); + +{ + transforms: [ + // All messages are locked before they are sent through other + // transform functions, ensuring that the message is processed + // exactly once. + // + // An error in any sub-transform will cause all previously locked + // messages to be unlocked; this only applies to messages that have + // not yet been flushed by a control message. Use the `utility_control` + // transform to manage how often messages are flushed. + sub.tf.meta.kv_store.lock(settings={ + kv_store: kv, + prefix: 'distributed_lock', + ttl_offset: '1m', + transform: sub.tf.meta.pipeline({ transforms: [ + // Delaying and simulating an error makes it possible to + // test message unlocking in real-time (view changes using + // the DynamoDB console). Uncomment the lines below to see + // how it works. + // + // sub.tf.utility.delay({ duration: '10s' }), + // sub.pattern.transform.conditional( + // condition=sub.cnd.utility.random(), + // transform=sub.tf.utility.err({ message: 'simulating error to trigger unlock' }), + // ), + // + // Messages are printed to the console. After this, they are locked + // and will not be printed again until the lock expires. + sub.tf.send.stdout(), + ] }), + }), + ], +} diff --git a/examples/terraform/aws/dynamodb/distributed_lock/terraform/_resources.tf b/examples/terraform/aws/dynamodb/distributed_lock/terraform/_resources.tf new file mode 100644 index 00000000..7200b0f6 --- /dev/null +++ b/examples/terraform/aws/dynamodb/distributed_lock/terraform/_resources.tf @@ -0,0 +1,40 @@ +data "aws_caller_identity" "caller" {} + +module "appconfig" { + source = "../../../../../../build/terraform/aws/appconfig" + + config = { + name = "substation" + environments = [{ name = "example" }] + } +} + +module "ecr" { + source = "../../../../../../build/terraform/aws/ecr" + + config = { + name = "substation" + force_delete = true + } +} + +module "dynamodb" { + source = "../../../../../../build/terraform/aws/dynamodb" + + config = { + name = "substation" + hash_key = "PK" + ttl = "ttl" + + attributes = [ + { + name = "PK" + type = "S" + } + ] + } + + access = [ + module.node.role.name, + ] +} diff --git a/examples/terraform/aws/dynamodb/distributed_lock/terraform/node.tf b/examples/terraform/aws/dynamodb/distributed_lock/terraform/node.tf new file mode 100644 index 00000000..2f291fef --- /dev/null +++ b/examples/terraform/aws/dynamodb/distributed_lock/terraform/node.tf @@ -0,0 +1,26 @@ +module "node" { + source = "../../../../../../build/terraform/aws/lambda" + appconfig = module.appconfig + + config = { + name = "node" + description = "Substation node that transforms data exactly-once using a distributed lock" + image_uri = "${module.ecr.url}:v1.3.0" + image_arm = true + env = { + "SUBSTATION_CONFIG" : "http://localhost:2772/applications/substation/environments/example/configurations/node" + "SUBSTATION_LAMBDA_HANDLER" : "AWS_API_GATEWAY" + "SUBSTATION_DEBUG" : true + } + } + + depends_on = [ + module.appconfig.name, + module.ecr.url, + ] +} + +resource "aws_lambda_function_url" "node" { + function_name = module.node.name + authorization_type = "NONE" +} diff --git a/internal/aws/dynamodb/dynamodb.go b/internal/aws/dynamodb/dynamodb.go index 67232ba8..0adfb487 100644 --- a/internal/aws/dynamodb/dynamodb.go +++ b/internal/aws/dynamodb/dynamodb.go @@ -42,6 +42,22 @@ func (a *API) IsEnabled() bool { return a.Client != nil } +func (a *API) DeleteItem(ctx aws.Context, table string, key map[string]*dynamodb.AttributeValue) (resp *dynamodb.DeleteItemOutput, err error) { + ctx = context.WithoutCancel(ctx) + resp, err = a.Client.DeleteItemWithContext( + ctx, + &dynamodb.DeleteItemInput{ + TableName: aws.String(table), + Key: key, + }, + ) + if err != nil { + return nil, fmt.Errorf("deleteitem table %s: %v", table, err) + } + + return resp, nil +} + // BatchPutItem is a convenience wrapper for putting multiple items into a DynamoDB table. func (a *API) BatchPutItem(ctx aws.Context, table string, items []map[string]*dynamodb.AttributeValue) (resp *dynamodb.BatchWriteItemOutput, err error) { var requests []*dynamodb.WriteRequest @@ -62,7 +78,6 @@ func (a *API) BatchPutItem(ctx aws.Context, table string, items []map[string]*dy }, }, ) - if err != nil { if aerr, ok := err.(awserr.Error); ok { switch aerr.Code() { @@ -96,7 +111,6 @@ func (a *API) PutItem(ctx aws.Context, table string, item map[string]*dynamodb.A TableName: aws.String(table), Item: item, }) - if err != nil { return nil, fmt.Errorf("putitem table %s: %v", table, err) } @@ -104,6 +118,24 @@ func (a *API) PutItem(ctx aws.Context, table string, item map[string]*dynamodb.A return resp, nil } +func (a *API) PutItemWithCondition(ctx aws.Context, table string, item map[string]*dynamodb.AttributeValue, conditionExpression string, expressionAttributeNames map[string]*string, expressionAttributeValues map[string]*dynamodb.AttributeValue) (resp *dynamodb.PutItemOutput, err error) { + input := &dynamodb.PutItemInput{ + TableName: aws.String(table), + ConditionExpression: aws.String(conditionExpression), + ExpressionAttributeNames: expressionAttributeNames, + Item: item, + ExpressionAttributeValues: expressionAttributeValues, + ReturnValues: aws.String("ALL_OLD"), + } + + resp, err = a.Client.PutItemWithContext(ctx, input) + if err != nil { + return resp, err + } + + return resp, nil +} + /* Query is a convenience wrapper for querying a DynamoDB table. The paritition and sort keys are always referenced in the key condition expression as ":PK" and ":SK". Refer to the DynamoDB documentation for the Query operation's request syntax and key condition expression patterns: diff --git a/internal/kv/aws_dynamodb.go b/internal/kv/aws_dynamodb.go index fca75a95..edc35023 100644 --- a/internal/kv/aws_dynamodb.go +++ b/internal/kv/aws_dynamodb.go @@ -3,7 +3,9 @@ package kv import ( "context" "fmt" + "time" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" "github.com/brexhq/substation/config" "github.com/brexhq/substation/internal/aws" @@ -66,6 +68,73 @@ func (store *kvAWSDynamoDB) String() string { return toString(store) } +// Lock adds an item to the DynamoDB table with a conditional check. +func (kv *kvAWSDynamoDB) Lock(ctx context.Context, key string, ttl int64) error { + attr := map[string]interface{}{ + kv.Attributes.PartitionKey: key, + kv.Attributes.TTL: ttl, + } + + if kv.Attributes.SortKey != "" { + attr[kv.Attributes.SortKey] = "substation:kv_store" + } + + // Since the sort key is optional and static, it is not included in the check. + exp := "attribute_not_exists(#pk) OR #ttl <= :now" + expAttrNames := map[string]*string{ + "#pk": &kv.Attributes.PartitionKey, + "#ttl": &kv.Attributes.TTL, + } + expAttrVals := map[string]interface{}{ + ":now": time.Now().Unix(), + } + + a, err := dynamodbattribute.MarshalMap(attr) + if err != nil { + return err + } + + v, err := dynamodbattribute.MarshalMap(expAttrVals) + if err != nil { + return err + } + + // If the item already exists and the TTL has not expired, then this returns ErrNoLock. The + // caller is expected to handle this error and retry the call if necessary. + if _, err := kv.client.PutItemWithCondition(ctx, kv.TableName, a, exp, expAttrNames, v); err != nil { + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == "ConditionalCheckFailedException" { + return ErrNoLock + } + } + + return err + } + + return nil +} + +func (store *kvAWSDynamoDB) Unlock(ctx context.Context, key string) error { + m := map[string]interface{}{ + store.Attributes.PartitionKey: key, + } + + if store.Attributes.SortKey != "" { + m[store.Attributes.SortKey] = "substation:kv_store" + } + + item, err := dynamodbattribute.MarshalMap(m) + if err != nil { + return err + } + + if _, err := store.client.DeleteItem(ctx, store.TableName, item); err != nil { + return err + } + + return nil +} + // Get retrieves an item from the DynamoDB table. If the item had a time-to-live (TTL) // configured when it was added and the TTL has passed, then nothing is returned. // diff --git a/internal/kv/kv.go b/internal/kv/kv.go index e78b6152..b9eec649 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -11,10 +11,13 @@ import ( ) var ( - mu sync.Mutex - m map[string]Storer + mu sync.Mutex + m map[string]Storer + lock map[string]Locker // errSetNotSupported is returned when the KV set action is not supported. errSetNotSupported = fmt.Errorf("set not supported") + // ErrNoLock is returned when a lock cannot be acquired. + ErrNoLock = fmt.Errorf("unable to acquire lock") ) // Storer provides tools for getting values from and putting values into key-value stores. @@ -77,6 +80,49 @@ func New(cfg config.Config) (Storer, error) { } } +type Locker interface { + Lock(context.Context, string, int64) error + Unlock(context.Context, string) error + Setup(context.Context) error + IsEnabled() bool +} + +// Get returns a pointer to a Locker that is stored as a package level global variable. +// This function and each Locker are safe for concurrent access. +func GetLocker(cfg config.Config) (Locker, error) { + mu.Lock() + defer mu.Unlock() + + // KV store configurations are mapped using the "signature" of their config. + // this makes it possible for a single run of a Substation application to rely + // on multiple KV stores. + sig := fmt.Sprint(cfg) + locker, ok := lock[sig] + if ok { + return locker, nil + } + + locker, err := NewLocker(cfg) + if err != nil { + return nil, err + } + lock[sig] = locker + + return lock[sig], nil +} + +func NewLocker(cfg config.Config) (Locker, error) { + switch t := cfg.Type; t { + case "aws_dynamodb": + return newKVAWSDynamoDB(cfg) + case "memory": + return newKVMemory(cfg) + default: + return nil, fmt.Errorf("kv_store locker: %s: %v", t, errors.ErrInvalidFactoryInput) + } +} + func init() { m = make(map[string]Storer) + lock = make(map[string]Locker) } diff --git a/internal/kv/memory.go b/internal/kv/memory.go index 57117f4a..4952a1e0 100644 --- a/internal/kv/memory.go +++ b/internal/kv/memory.go @@ -20,6 +20,7 @@ type kvMemory struct { // This is optional and defaults to 1024 values. Capacity int `json:"capacity"` mu sync.Mutex + lockMu sync.Mutex lru list.List items map[string]*list.Element } @@ -106,6 +107,38 @@ func (store *kvMemory) SetWithTTL(ctx context.Context, key string, val interface return nil } +// Lock adds an item to the store if it does not already exist. If the item already exists +// and the time-to-live (TTL) has not expired, then this returns ErrNoLock. +func (store *kvMemory) Lock(ctx context.Context, key string, ttl int64) error { + store.lockMu.Lock() + defer store.lockMu.Unlock() + + if node, ok := store.items[key]; ok { + ttl := node.Value.(kvMemoryElement).ttl + if ttl <= time.Now().Unix() { + delete(store.items, key) + store.lru.Remove(node) + } + + return ErrNoLock + } + + return store.SetWithTTL(ctx, key, nil, ttl) +} + +// Unlock removes an item from the store. +func (store *kvMemory) Unlock(ctx context.Context, key string) error { + store.lockMu.Lock() + defer store.lockMu.Unlock() + + if node, ok := store.items[key]; ok { + store.lru.Remove(node) + delete(store.items, key) + } + + return nil +} + // IsEnabled returns true if the store is ready for use. func (store *kvMemory) IsEnabled() bool { store.mu.Lock() diff --git a/transform/enrich_kv_store_set.go b/transform/enrich_kv_store_set.go index 7510fef3..8d5cdc39 100644 --- a/transform/enrich_kv_store_set.go +++ b/transform/enrich_kv_store_set.go @@ -6,7 +6,6 @@ import ( "context" "encoding/json" "fmt" - "math" "time" "github.com/brexhq/substation/config" @@ -145,14 +144,14 @@ func (tf *enrichKVStoreSet) Transform(ctx context.Context, msg *message.Message) //nolint: nestif // ignore nesting complexity if tf.conf.Object.TTLKey != "" && tf.ttl != 0 { value := msg.GetValue(tf.conf.Object.TTLKey) - ttl := tf.truncateTTL(value) + tf.ttl + ttl := truncateTTL(value) + tf.ttl if err := tf.kvStore.SetWithTTL(ctx, key, msg.GetValue(tf.conf.Object.TargetKey).String(), ttl); err != nil { return nil, fmt.Errorf("transform: enrich_kv_store_set: %v", err) } } else if tf.conf.Object.TTLKey != "" { value := msg.GetValue(tf.conf.Object.TTLKey) - ttl := tf.truncateTTL(value) + ttl := truncateTTL(value) if err := tf.kvStore.SetWithTTL(ctx, key, msg.GetValue(tf.conf.Object.TargetKey).String(), ttl); err != nil { return nil, fmt.Errorf("transform: enrich_kv_store_set: %v", err) @@ -176,18 +175,3 @@ func (tf *enrichKVStoreSet) String() string { b, _ := json.Marshal(tf.conf) return string(b) } - -// truncateTTL truncates the time-to-live (TTL) value from any precision greater -// than seconds (e.g., milliseconds, nanoseconds) to seconds. -// -// For example: -// - 1696482368492 -> 1696482368 -// - 1696482368492290 -> 1696482368 -func (tf *enrichKVStoreSet) truncateTTL(v message.Value) int64 { - if len(v.String()) <= 10 { - return v.Int() - } - - l := len(v.String()) - 10 - return v.Int() / int64(math.Pow10(l)) -} diff --git a/transform/enrich_kv_store_set_test.go b/transform/enrich_kv_store_set_test.go deleted file mode 100644 index 3708ec25..00000000 --- a/transform/enrich_kv_store_set_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package transform - -import ( - "reflect" - "testing" -) - -var _ Transformer = &enrichKVStoreSet{} - -var kvStoreSetTruncateTests = []struct { - name string - test []byte - expected int64 -}{ - { - "unix millisecond", - []byte("1696482368492"), - 1696482368, - }, - { - "unix nanosecond", - []byte("1696482368492290"), - 1696482368, - }, -} - -func TestKVStoreSetTruncate(t *testing.T) { - for _, test := range kvStoreSetTruncateTests { - t.Run(test.name, func(t *testing.T) { - tf := &enrichKVStoreSet{} - - tmp := bytesToValue(test.test) - result := tf.truncateTTL(tmp) - - if !reflect.DeepEqual(result, test.expected) { - t.Errorf("expected %v, got %v", test.expected, result) - } - }) - } -} diff --git a/transform/meta_kv_store_lock.go b/transform/meta_kv_store_lock.go new file mode 100644 index 00000000..6c7c6b09 --- /dev/null +++ b/transform/meta_kv_store_lock.go @@ -0,0 +1,198 @@ +package transform + +import ( + "context" + "crypto/sha256" + "fmt" + "sync" + "time" + + "github.com/brexhq/substation/config" + iconfig "github.com/brexhq/substation/internal/config" + "github.com/brexhq/substation/internal/errors" + "github.com/brexhq/substation/internal/kv" + "github.com/brexhq/substation/message" +) + +type metaVStoreLockObjectConfig struct { + // LockKey retrieves a value from an object that is used as the key to lock + // the item in the KV store. If this value is not set, then the SHA256 hash + // of the message is used as the key. + LockKey string `json:"lock_key"` + // TTLKey retrieves a value from an object that is used as the time-to-live (TTL) + // of the item locked in the KV store. This value must be an integer that represents + // the Unix time when the item will be evicted from the store. Any precision greater + // than seconds (e.g., milliseconds, nanoseconds) is truncated to seconds. + // + // This is optional and defaults to using no TTL when setting items into the store. + TTLKey string `json:"ttl_key"` + + iconfig.Object +} + +type metaKVStoreLockConfig struct { + // Prefix is prepended to the key and can be used to simplify + // data management within a KV store. + // + // This is optional and defaults to an empty string. + Prefix string `json:"prefix"` + // TTLOffset is an offset used to determine the time-to-live (TTL) of the item set + // into the KV store. If Object.TTLKey is configured, then this value is added to the TTL + // value retrieved from the object. If Object.TTLKey is not used, then this value is added + // to the current time. + // + // For example, if Object.TTLKey is not set and the offset is "1d", then the value + // will be evicted from the store when more than 1 day has passed. + // + // This is optional and defaults to using no TTL when setting values into the store. + TTLOffset string `json:"ttl_offset"` + + Transform config.Config `json:"transform"` + Object metaVStoreLockObjectConfig `json:"object"` + KVStore config.Config `json:"kv_store"` +} + +func (c *metaKVStoreLockConfig) Decode(in interface{}) error { + return iconfig.Decode(in, c) +} + +func (c *metaKVStoreLockConfig) Validate() error { + if c.Transform.Type == "" { + return fmt.Errorf("transform: %v", errors.ErrMissingRequiredOption) + } + + if c.KVStore.Type == "" { + return fmt.Errorf("kv_store: %v", errors.ErrMissingRequiredOption) + } + + return nil +} + +func newMetaKVStoreLock(ctx context.Context, cfg config.Config) (*metaKVStoreLock, error) { + conf := metaKVStoreLockConfig{} + if err := conf.Decode(cfg.Settings); err != nil { + return nil, fmt.Errorf("transform: meta_kv_store_lock: %v", err) + } + + if err := conf.Validate(); err != nil { + return nil, fmt.Errorf("transform: meta_kv_store_lock: %v", err) + } + + tff, err := New(ctx, conf.Transform) + if err != nil { + return nil, fmt.Errorf("transform: meta_kv_store_lock: %v", err) + } + + locker, err := kv.GetLocker(conf.KVStore) + if err != nil { + return nil, fmt.Errorf("transform: meta_kv_store_lock: %v", err) + } + + if err := locker.Setup(ctx); err != nil { + return nil, fmt.Errorf("transform: meta_kv_store_lock: %v", err) + } + + if conf.TTLOffset == "" { + conf.TTLOffset = "0s" + } + + dur, err := time.ParseDuration(conf.TTLOffset) + if err != nil { + return nil, fmt.Errorf("transform: meta_kv_store_lock: %v", err) + } + + tf := metaKVStoreLock{ + tf: tff, + conf: conf, + locker: locker, + ttl: int64(dur.Seconds()), + } + + return &tf, nil +} + +// metaKVStoreLock applies a lock in a KV store and executes a transform. If the lock is already +// held, then an error is returned. The lock is applied with a time-to-live (TTL) value, which is +// used to determine when the lock is automatically released. +type metaKVStoreLock struct { + tf Transformer + conf metaKVStoreLockConfig + locker kv.Locker + ttl int64 + + // mu is required to prevent concurrent access to the keys slice. + mu sync.Mutex + keys []string +} + +// Transforms a message based on the configuration. +func (tf *metaKVStoreLock) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) { + tf.mu.Lock() + defer tf.mu.Unlock() + + if msg.IsControl() { + msgs, err := tf.tf.Transform(ctx, msg) + if err != nil { + for _, key := range tf.keys { + _ = tf.locker.Unlock(ctx, key) + } + + return nil, fmt.Errorf("transform: meta_kv_store_lock: %v", err) + } + + tf.keys = tf.keys[:0] + return msgs, nil + } + + // By default, the lock key is the SHA256 hash of the message. + var lockKey string + v := msg.GetValue(tf.conf.Object.LockKey) + if !v.Exists() { + sum := sha256.Sum256(msg.Data()) + lockKey = fmt.Sprintf("%x", sum) + } else { + lockKey = v.String() + } + + if tf.conf.Prefix != "" { + lockKey = fmt.Sprint(tf.conf.Prefix, ":", lockKey) + } + + // Calculate TTL based on the configuration. + var ttl int64 + if tf.conf.Object.TTLKey != "" && tf.ttl != 0 { + v := msg.GetValue(tf.conf.Object.TTLKey) + ttl = truncateTTL(v) + tf.ttl + } else if tf.conf.Object.TTLKey != "" { + v := msg.GetValue(tf.conf.Object.TTLKey) + ttl = truncateTTL(v) + } else if tf.ttl != 0 { + ttl = time.Now().Add(time.Duration(tf.ttl) * time.Second).Unix() + } + + // Acquire the lock. If the lock is already held, then the message is returned as is. + // This prevents the transform from being applied to the message more than once. + if err := tf.locker.Lock(ctx, lockKey, ttl); err != nil { + if err == kv.ErrNoLock { + return []*message.Message{msg}, nil + } + + for _, key := range tf.keys { + _ = tf.locker.Unlock(ctx, key) + } + + return nil, fmt.Errorf("transform: meta_kv_store_lock: %v", err) + } + + tf.keys = append(tf.keys, lockKey) + msgs, err := tf.tf.Transform(ctx, msg) + if err != nil { + for _, key := range tf.keys { + _ = tf.locker.Unlock(ctx, key) + } + + return nil, fmt.Errorf("transform: meta_kv_store_lock: %v", err) + } + + return msgs, nil +} diff --git a/transform/transform.go b/transform/transform.go index e4708393..00a73ab4 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -4,6 +4,7 @@ package transform import ( "context" "fmt" + "math" "github.com/brexhq/substation/config" "github.com/brexhq/substation/internal/errors" @@ -78,6 +79,8 @@ func New(ctx context.Context, cfg config.Config) (Transformer, error) { //nolint return newMetaErr(ctx, cfg) case "meta_for_each": return newMetaForEach(ctx, cfg) + case "meta_kv_store_lock": + return newMetaKVStoreLock(ctx, cfg) case "meta_metric_duration": return newMetaMetricsDuration(ctx, cfg) case "meta_pipeline": @@ -229,3 +232,18 @@ func anyToBytes(v any) []byte { return msg.GetValue("_").Bytes() } + +// truncateTTL truncates the time-to-live (TTL) value from any precision greater +// than seconds (e.g., milliseconds, nanoseconds) to seconds. +// +// For example: +// - 1696482368492 -> 1696482368 +// - 1696482368492290 -> 1696482368 +func truncateTTL(v message.Value) int64 { + if len(v.String()) <= 10 { + return v.Int() + } + + l := len(v.String()) - 10 + return v.Int() / int64(math.Pow10(l)) +} diff --git a/transform/transform_test.go b/transform/transform_test.go index a6f4f8cd..888297fb 100644 --- a/transform/transform_test.go +++ b/transform/transform_test.go @@ -107,3 +107,33 @@ func TestTransform(t *testing.T) { }) } } + +var truncateTTLTests = []struct { + name string + test []byte + expected int64 +}{ + { + "unix millisecond", + []byte("1696482368492"), + 1696482368, + }, + { + "unix nanosecond", + []byte("1696482368492290"), + 1696482368, + }, +} + +func TestTruncateTTL(t *testing.T) { + for _, test := range truncateTTLTests { + t.Run(test.name, func(t *testing.T) { + tmp := bytesToValue(test.test) + result := truncateTTL(tmp) + + if !reflect.DeepEqual(result, test.expected) { + t.Errorf("expected %v, got %v", test.expected, result) + } + }) + } +}