Skip to content

Commit

Permalink
feat: add TTLKey to KV (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
jshlbrd committed Jun 20, 2023
1 parent 49a1afa commit 9837287
Showing 1 changed file with 21 additions and 9 deletions.
30 changes: 21 additions & 9 deletions process/kv_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,22 @@ type procKVStoreOptions struct {
//
// This is optional and defaults to an empty string.
Prefix string `json:"prefix"`
// TTLKey retrieves a value from an object that is used as the time-to-live (TTL)
// of the item set into the KV store. This value must be an integer that represents
// the epoch time (in seconds) when the item will be evicted from the store.
//
// This is optional and defaults to using no TTL when setting items into the store.
TTLKey string `json:"ttl_key"`
// OffsetTTL is an offset (in seconds) used to determine the time-to-live (TTL)
// of the value set into the KV store. TTL is calculated based on the current
// time plus the offset.
// of the item set into the KV store. If TTLKey is configured, then this value is
// added to the TTL value retrieved from the object. If TTLKey is not used, then this
// value is added to the current time.
//
// For example, if the offset is 86400 (1 day), then the value will either be
// evicted from the store or ignored on retrieval if more than 1 day has elapsed
// since it was placed into the store.
// For example, if TTLKey is not set and the offset is 86400 (1 day), then the value
// will be evicted from the store if more than 1 day has elapsed.
//
// This is optional and defaults to using no TTL when setting values into the store.
OffsetTTL int `json:"offset_ttl"`
OffsetTTL int64 `json:"offset_ttl"` // TODO(v1.0.0): rename to TTLOffset?
// KVOptions determine the type of KV store used by the processor. Refer to internal/kv
// for more information.
KVOptions config.Config `json:"kv_options"`
Expand Down Expand Up @@ -158,15 +164,21 @@ func (p procKVStore) Apply(ctx context.Context, capsule config.Capsule) (config.
key = fmt.Sprint(p.Options.Prefix, ":", key)
}

if p.Options.OffsetTTL == 0 {
if err := p.kvStore.Set(ctx, key, capsule.Get(p.Key).String()); err != nil {
//nolint: nestif // ignore nesting complexity
if p.Options.TTLKey != "" && p.Options.OffsetTTL != 0 {
ttl := capsule.Get(p.Options.TTLKey).Int() + p.Options.OffsetTTL
if err := p.kvStore.SetWithTTL(ctx, key, capsule.Get(p.Key).String(), ttl); err != nil {
return capsule, fmt.Errorf("process: kv_store: %v", err)
}
} else {
} else if p.Options.OffsetTTL != 0 {
ttl := time.Now().Add(time.Duration(p.Options.OffsetTTL) * time.Second).Unix()
if err := p.kvStore.SetWithTTL(ctx, key, capsule.Get(p.Key).String(), ttl); err != nil {
return capsule, fmt.Errorf("process: kv_store: %v", err)
}
} else {
if err := p.kvStore.Set(ctx, key, capsule.Get(p.Key).String()); err != nil {
return capsule, fmt.Errorf("process: kv_store: %v", err)
}
}

return capsule, nil
Expand Down

0 comments on commit 9837287

Please sign in to comment.