diff --git a/common/log/tag/values.go b/common/log/tag/values.go index 54c1a26aa50..13e08dd9e2f 100644 --- a/common/log/tag/values.go +++ b/common/log/tag/values.go @@ -135,6 +135,9 @@ var ( ComponentPinotVisibilityManager = component("pinot-visibility-manager") ComponentAsyncWFConsumptionManager = component("async-wf-consumption-manager") ComponentGlobalRatelimiter = component("global-ratelimiter") + ComponentMapQ = component("mapq") + ComponentMapQTree = component("mapq-tree") + ComponentMapQTreeNode = component("mapq-tree-node") ) // Pre-defined values for TagSysLifecycle diff --git a/common/mapq/README.md b/common/mapq/README.md new file mode 100644 index 00000000000..8584a1a2b0b --- /dev/null +++ b/common/mapq/README.md @@ -0,0 +1,65 @@ +# MAPQ: Multi-tenant, Auto-partitioned, Persistent Queue + +NOTE: This component is WIP. + +## Overview + +MAPQ is a new queue framework (introduced in June 2024), aiming to unify Cadence's internal task/request queues. The existing implementations for these applications are cumbersome and maintenance-heavy, with significant overlap and limited extensibility. +MAPQ will address the challenges of scalability, throughput, consistency, and ordering guarantees required by these diverse needs. + + +Challenges and Motivation +- History Task Queues: These queues are poorly understood and difficult to maintain, owing to the departure of their original developer and the non-maintainable state of the code. The design struggles with burst loads from timer/child workflow cases, requiring introduction of more granular task types and automated partitioning that the current system cannot support without extensive refactoring. +- Matching Task Lists: These are basic FIFO queues with some advanced features like sticky task lists, zonal isolation groups and partitioning. The most pressing issue is auto partitioning to reduce operational overhead. +- Async Request Queues: Initially integrated with Kafka topics as the request queue. Initial testing faced challenges like complex provisioning, inability to dynamically create topics/register consumers, poor visibility into the requests in the queue and difficult to tweak alerts. Async APIs are already designed with pluggable queue implementation already so swapping Kafka with something else will not be tricky. + + +### Goals + +MAPQ will provide a solution tailored to meet the following goals: + +- Multi-Tenancy: Guarantees fair access to resources for each tenant based on predefined quotas. +- Auto-Partitioning: Dynamically adjusts partitions based on specified fine-grained policies, supporting both random and deterministic message distribution across physically or virtually partitioned queues. +- Burst-Protection: Detects incoming message spikes and mitigates by utilizing dynamic auto-partitioning. +- Skew-Protection: Detects incoming message skews for given partition keys and mitigates by utilizing dynamic auto-partitioning. +- Advanced Partitioning Policies: Executes on a tree-like partitioning policy to support various levels of partition key hierarchies and strategies. +- Persistent: Ensures message durability via pluggable persistent layer. +- Delivery Guarantees: Guarantees at least once delivery. + + +### Problems with Existing Queues in Cadence + +History Queues: + +- Lack of granular partitioning and inextensibility of history queues make it difficult to address following pathological scenarios: +- Task prioritization: Background tasks like workflow deletion timer tasks share the same queue and consume from the same “processing budget” as other high priority tasks such as user created timers. This is because all timer tasks for a given shard are managed by a single queue. +- Multi tenancy: Tasks of the same type (e.g. all timers) are managed by a single queue and a noisy domain can drastically regress the experience of other domains. It is not possible to write tasks of a specific domain(s) to a separate queue and adjust read/write qps. Current queue granularity ends at task type (timer or transfer). +- Burst cases: Bursts of timers or child workflows are known issues that Cadence has no answers to. These bursts usually cause timeouts and may also impact processing of other domains’ tasks. + +## High Level Design + +MAPQ uses a tree data structure where nodes route incoming messages to child nodes. Nodes can be splitted/merged based on given policies. All leaf nodes are at the same level. Leaf nodes are the actual “queues” where messages are written to/read from via a provided persistent layer plugin. + +The routing key per level, partitioning/departitioning strategy, RPS limits and other options are provided to MAPQ during initialization as a tree-like policy. It contains per level defaults and per-node (identified via path from root) overrides. + +Once initialized the tree will have a minimal number of nodes provided in the policy but it respects policies for not-yet-existing nodes. Since MAPQ supports auto-partitioning there will be new nodes added/removed and it accepts providing policies for such nodes. For example, you might want to partition by domain only for bursty domains and allocate them specific RPS. + + +#### Tree structure with policies + +![MAPQ partitioned queue tree](../../docs/images/mapq_partitioned_queue_tree_example.png) + + +#### Initialization and Object Hierarcy + +![MAPQ initialization](../../docs/images/mapq_initialization.png) + + +#### Enqueue Flow + +![MAPQ enqueue flow](../../docs/images/mapq_enqueue_flow.png) + + +#### Dispatch Flow + +![MAPQ enqueue flow](../../docs/images/mapq_dispatch_flow.png) diff --git a/common/mapq/client_impl.go b/common/mapq/client_impl.go new file mode 100644 index 00000000000..e3d436cf21d --- /dev/null +++ b/common/mapq/client_impl.go @@ -0,0 +1,85 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package mapq + +import ( + "context" + "errors" + "fmt" + + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/mapq/tree" + "github.com/uber/cadence/common/mapq/types" + "github.com/uber/cadence/common/metrics" +) + +type clientImpl struct { + logger log.Logger + scope metrics.Scope + persister types.Persister + consumerFactory types.ConsumerFactory + tree *tree.QueueTree + partitions []string + policies []types.NodePolicy +} + +func (c *clientImpl) Start(ctx context.Context) error { + c.logger.Info("Starting MAPQ client") + err := c.tree.Start(ctx) + if err != nil { + return err + } + + c.logger.Info("Started MAPQ client") + return nil +} + +func (c *clientImpl) Stop(ctx context.Context) error { + c.logger.Info("Stopping MAPQ client") + + // Stop the tree which will stop the dispatchers + if err := c.tree.Stop(ctx); err != nil { + return fmt.Errorf("failed to stop tree: %w", err) + } + + // stop the consumer factory which will stop the consumers + err := c.consumerFactory.Stop(ctx) + if err != nil { + return fmt.Errorf("failed to stop consumer factory: %w", err) + } + + c.logger.Info("Stopped MAPQ client") + return nil +} + +func (c *clientImpl) Enqueue(ctx context.Context, items []types.Item) ([]types.ItemToPersist, error) { + return c.tree.Enqueue(ctx, items) +} + +func (c *clientImpl) Ack(context.Context, types.Item) error { + return errors.New("not implemented") +} + +func (c *clientImpl) Nack(context.Context, types.Item) error { + return errors.New("not implemented") +} diff --git a/common/mapq/client_impl_test.go b/common/mapq/client_impl_test.go new file mode 100644 index 00000000000..2d49aa08fa4 --- /dev/null +++ b/common/mapq/client_impl_test.go @@ -0,0 +1,161 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package mapq + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "go.uber.org/goleak" + + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/mapq/types" + "github.com/uber/cadence/common/metrics" +) + +func TestNew(t *testing.T) { + ctrl := gomock.NewController(t) + + tests := []struct { + name string + opts []Options + wantErr bool + }{ + { + name: "success", + opts: []Options{ + WithPersister(types.NewMockPersister(ctrl)), + WithConsumerFactory(types.NewMockConsumerFactory(ctrl)), + }, + }, + { + name: "no persister", + wantErr: true, + opts: []Options{ + WithConsumerFactory(types.NewMockConsumerFactory(ctrl)), + }, + }, + { + name: "no consumer factoru", + wantErr: true, + opts: []Options{ + WithPersister(types.NewMockPersister(ctrl)), + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + logger := testlogger.New(t) + scope := metrics.NoopScope(0) + cl, err := New(logger, scope, tc.opts...) + if (err != nil) != tc.wantErr { + t.Errorf("New() error: %v, wantErr: %v", err, tc.wantErr) + } + + if err != nil { + return + } + + _, ok := cl.(*clientImpl) + if !ok { + t.Errorf("New() = %T, want *clientImpl", cl) + } + }) + } +} + +func TestStartStop(t *testing.T) { + defer goleak.VerifyNone(t) + ctrl := gomock.NewController(t) + consumerFactory := types.NewMockConsumerFactory(ctrl) + consumer := types.NewMockConsumer(ctrl) + consumerFactory.EXPECT().Stop(gomock.Any()).Return(nil).Times(1) + consumerFactory.EXPECT().New(gomock.Any()).Return(consumer, nil).Times(1) + opts := []Options{ + WithPersister(types.NewMockPersister(ctrl)), + WithConsumerFactory(consumerFactory), + } + logger := testlogger.New(t) + scope := metrics.NoopScope(0) + cl, err := New(logger, scope, opts...) + if err != nil { + t.Fatalf("New() error: %v", err) + } + + cl.Start(context.Background()) + defer cl.Stop(context.Background()) +} + +func TestAck(t *testing.T) { + ctrl := gomock.NewController(t) + consumerFactory := types.NewMockConsumerFactory(ctrl) + consumer := types.NewMockConsumer(ctrl) + consumerFactory.EXPECT().Stop(gomock.Any()).Return(nil).Times(1) + consumerFactory.EXPECT().New(gomock.Any()).Return(consumer, nil).Times(1) + opts := []Options{ + WithPersister(types.NewMockPersister(ctrl)), + WithConsumerFactory(consumerFactory), + } + logger := testlogger.New(t) + scope := metrics.NoopScope(0) + cl, err := New(logger, scope, opts...) + if err != nil { + t.Fatalf("New() error: %v", err) + } + + cl.Start(context.Background()) + defer cl.Stop(context.Background()) + + err = cl.Ack(context.Background(), nil) + if err == nil || err.Error() != "not implemented" { + t.Errorf("Ack() error: %q, want %q", err, "not implemented") + } +} + +func TestNack(t *testing.T) { + ctrl := gomock.NewController(t) + consumerFactory := types.NewMockConsumerFactory(ctrl) + consumer := types.NewMockConsumer(ctrl) + consumerFactory.EXPECT().Stop(gomock.Any()).Return(nil).Times(1) + consumerFactory.EXPECT().New(gomock.Any()).Return(consumer, nil).Times(1) + opts := []Options{ + WithPersister(types.NewMockPersister(ctrl)), + WithConsumerFactory(consumerFactory), + } + logger := testlogger.New(t) + scope := metrics.NoopScope(0) + cl, err := New(logger, scope, opts...) + if err != nil { + t.Fatalf("New() error: %v", err) + } + + cl.Start(context.Background()) + defer cl.Stop(context.Background()) + + err = cl.Nack(context.Background(), nil) + if err == nil || err.Error() != "not implemented" { + t.Errorf("Ack() error: %q, want %q", err, "not implemented") + } +} diff --git a/common/mapq/dispatcher/dispatcher.go b/common/mapq/dispatcher/dispatcher.go new file mode 100644 index 00000000000..99419d9d395 --- /dev/null +++ b/common/mapq/dispatcher/dispatcher.go @@ -0,0 +1,72 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package dispatcher + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/mapq/types" +) + +type Dispatcher struct { + consumer types.Consumer + ctx context.Context + cancelCtx context.CancelFunc + wg sync.WaitGroup +} + +func New(c types.Consumer) *Dispatcher { + ctx, cancelCtx := context.WithCancel(context.Background()) + return &Dispatcher{ + consumer: c, + ctx: ctx, + cancelCtx: cancelCtx, + } +} + +func (d *Dispatcher) Start(ctx context.Context) error { + d.wg.Add(1) + go d.run() + return nil +} + +func (d *Dispatcher) Stop(ctx context.Context) error { + d.cancelCtx() + timeout := 10 * time.Second + if dl, ok := ctx.Deadline(); ok { + timeout = time.Until(dl) + } + if !common.AwaitWaitGroup(&d.wg, timeout) { + return fmt.Errorf("failed to stop dispatcher in %v", timeout) + } + return nil +} + +func (d *Dispatcher) run() { + defer d.wg.Done() + // TODO: implement +} diff --git a/common/mapq/dispatcher/dispatcher_test.go b/common/mapq/dispatcher/dispatcher_test.go new file mode 100644 index 00000000000..3471866bfbb --- /dev/null +++ b/common/mapq/dispatcher/dispatcher_test.go @@ -0,0 +1,48 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package dispatcher + +import ( + "context" + "testing" + "time" + + "go.uber.org/goleak" +) + +func TestStartStop(t *testing.T) { + defer goleak.VerifyNone(t) + + d := New(nil) + err := d.Start(context.Background()) + if err != nil { + t.Fatalf("Start() failed: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + err = d.Stop(ctx) + if err != nil { + t.Fatalf("Stop() failed: %v", err) + } +} diff --git a/common/mapq/example_test.go b/common/mapq/example_test.go new file mode 100644 index 00000000000..c1f3284f94a --- /dev/null +++ b/common/mapq/example_test.go @@ -0,0 +1,326 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package mapq + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/uber/cadence/common/log/loggerimpl" + "github.com/uber/cadence/common/mapq/types" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" +) + +func TestExample(t *testing.T) { + persister := &InMemoryPersister{} + + logger, err := loggerimpl.NewDevelopment() + if err != nil { + panic(err) + } + scope := metrics.NoopScope(0) + cl, err := New( + logger, + scope, + WithConsumerFactory(&NoOpConsumerFactory{}), + WithPersister(persister), + WithPartitions([]string{"type", "sub-type", "domain"}), + WithPolicies([]types.NodePolicy{ + // level 0: default policy for root (splitted by type) + { + Path: "*", + SplitPolicy: &types.SplitPolicy{ + PredefinedSplits: []any{"timer", "transfer"}, + }, + }, + // level 1: default policy (splitted by sub-type) + { + Path: "*/.", + SplitPolicy: &types.SplitPolicy{ + PredefinedSplits: []any{}, + }, + }, + // level 1: timer node + { + Path: "*/timer", + SplitPolicy: &types.SplitPolicy{ + PredefinedSplits: []any{ + persistence.TaskTypeDeleteHistoryEvent, + }, + }, + }, + // level 1: transfer node + { + Path: "*/transfer", + SplitPolicy: &types.SplitPolicy{ + PredefinedSplits: []any{ + persistence.TransferTaskTypeStartChildExecution, + }, + }, + }, + // level 2: nodes per pairs + // - default 1000 RPS for per sub-type node + // - split by domain. predefined split for d3 domain + { + Path: "*/./.", + SplitPolicy: &types.SplitPolicy{ + PredefinedSplits: []any{"d3"}, + }, + }, + // override for timer delete history event: + // - only allow 50 RPS + // - disable split policy + { + Path: "*/timer/4", + DispatchPolicy: &types.DispatchPolicy{DispatchRPS: 50}, + SplitPolicy: &types.SplitPolicy{Disabled: true}, + }, + // override for start child execution + // - only allow 10 RPS + // - disable split policy + { + Path: "*/transfer/4", + DispatchPolicy: &types.DispatchPolicy{DispatchRPS: 10}, + SplitPolicy: &types.SplitPolicy{Disabled: true}, + }, + // level 3: default policy for all nodes at this level. nodes per pairs + // - only allow 100 rps + // - disable split policy + { + Path: "*/././.", + DispatchPolicy: &types.DispatchPolicy{DispatchRPS: 100}, + SplitPolicy: &types.SplitPolicy{Disabled: true}, + }, + // level 3: override policy for catch-all nodes at this level (all domains that don't have a specific node) + // this policy will override the 100 RPS policy defined above to give more RPS to catch-all nodes + { + Path: "*/././*", + DispatchPolicy: &types.DispatchPolicy{DispatchRPS: 1000}, + SplitPolicy: &types.SplitPolicy{Disabled: true}, + }, + }), + ) + + if err != nil { + panic(err) + } + + ctx := context.Background() + if err := cl.Start(ctx); err != nil { + panic(err) + } + defer cl.Stop(ctx) + + _, err = cl.Enqueue(context.Background(), []types.Item{ + newTimerItem("d1", time.Now(), persistence.TaskTypeDecisionTimeout), + newTimerItem("d1", time.Now(), persistence.TaskTypeActivityTimeout), + newTimerItem("d1", time.Now(), persistence.TaskTypeUserTimer), + newTimerItem("d3", time.Now(), persistence.TaskTypeUserTimer), + newTimerItem("d3", time.Now(), persistence.TaskTypeUserTimer), + }) + if err != nil { + panic(err) + } + + if len(persister.items) != 5 { + panic(fmt.Errorf("expected 5 items in persister, got %v", len(persister.items))) + } + + _, err = cl.Enqueue(context.Background(), []types.Item{ + newTransferItem("d2", 1, persistence.TransferTaskTypeDecisionTask), + newTransferItem("d2", 2, persistence.TransferTaskTypeActivityTask), + newTransferItem("d2", 3, persistence.TransferTaskTypeStartChildExecution), + newTransferItem("d2", 4, persistence.TransferTaskTypeStartChildExecution), + newTransferItem("d2", 5, persistence.TransferTaskTypeStartChildExecution), + }) + if err != nil { + panic(err) + } + + if len(persister.items) != 10 { + panic(fmt.Errorf("expected 10 items in persister, got %v", len(persister.items))) + } +} + +var _ types.ConsumerFactory = (*NoOpConsumerFactory)(nil) + +type NoOpConsumerFactory struct{} + +func (f *NoOpConsumerFactory) New(types.ItemPartitions) (types.Consumer, error) { + return &NoOpConsumer{}, nil +} +func (f *NoOpConsumerFactory) Stop(context.Context) error { + return nil +} + +type NoOpConsumer struct{} + +func (c *NoOpConsumer) Start(context.Context) error { + return nil +} + +func (c *NoOpConsumer) Stop(context.Context) error { + return nil +} + +func (c *NoOpConsumer) Process(ctx context.Context, item types.Item) error { + fmt.Printf("processing item: %v\n", item) + return nil +} + +type InMemoryPersister struct { + items []types.ItemToPersist + offsets *types.Offsets +} + +func (p *InMemoryPersister) Persist(ctx context.Context, items []types.ItemToPersist) error { + fmt.Printf("persisting %v items\n", len(items)) + for _, item := range items { + partitionsKV := map[string]any{} + actualKV := map[string]any{} + for _, k := range item.GetPartitionKeys() { + partitionsKV[k] = item.GetPartitionValue(k) + actualKV[k] = item.GetAttribute(k) + } + fmt.Printf("item attributes: %v, partitions: %v\n", actualKV, partitionsKV) + } + p.items = append(p.items, items...) + return nil +} + +func (p *InMemoryPersister) GetOffsets(context.Context) (*types.Offsets, error) { + return p.offsets, nil +} + +func (p *InMemoryPersister) CommitOffsets(ctx context.Context, offsets *types.Offsets) error { + fmt.Printf("committing offsets: %v\n", offsets) + p.offsets = offsets + return nil +} + +// Fetch(ctx context.Context, partitions ItemPartitions, pageInfo PageInfo) ([]Item, error) +func (p *InMemoryPersister) Fetch(ctx context.Context, partitions types.ItemPartitions, pageInfo types.PageInfo) ([]types.Item, error) { + return nil, nil +} + +func newTimerItem(domain string, t time.Time, timerType int) types.Item { + switch timerType { + case persistence.TaskTypeDecisionTimeout: + case persistence.TaskTypeActivityTimeout: + case persistence.TaskTypeUserTimer: + case persistence.TaskTypeWorkflowTimeout: + case persistence.TaskTypeDeleteHistoryEvent: + case persistence.TaskTypeActivityRetryTimer: + case persistence.TaskTypeWorkflowBackoffTimer: + default: + panic(fmt.Errorf("unknown timer type: %v", timerType)) + } + + return &timerItem{ + t: t, + timerType: timerType, + domain: domain, + } +} + +type timerItem struct { + t time.Time + timerType int + domain string +} + +func (t *timerItem) String() string { + return fmt.Sprintf("timerItem{timerType: %v, time: %v}", t.timerType, t.t) +} + +func (t *timerItem) Offset() int64 { + return t.t.UnixNano() +} + +func (t *timerItem) GetAttribute(key string) any { + switch key { + case "type": + return "timer" + case "sub-type": + return t.timerType + case "domain": + return t.domain + default: + panic(fmt.Errorf("unknown key: %v", key)) + } +} + +func newTransferItem(domain string, taskID int64, transferType int) types.Item { + switch transferType { + case persistence.TransferTaskTypeActivityTask: + case persistence.TransferTaskTypeDecisionTask: + case persistence.TransferTaskTypeCloseExecution: + case persistence.TransferTaskTypeCancelExecution: + case persistence.TransferTaskTypeSignalExecution: + case persistence.TransferTaskTypeStartChildExecution: + case persistence.TransferTaskTypeRecordWorkflowStarted: + case persistence.TransferTaskTypeResetWorkflow: + case persistence.TransferTaskTypeUpsertWorkflowSearchAttributes: + case persistence.TransferTaskTypeRecordWorkflowClosed: + case persistence.TransferTaskTypeRecordChildExecutionCompleted: + case persistence.TransferTaskTypeApplyParentClosePolicy: + default: + panic(fmt.Errorf("unknown transfer type: %v", transferType)) + } + + return &transferItem{ + domain: domain, + taskID: taskID, + transferType: transferType, + } +} + +type transferItem struct { + taskID int64 + transferType int + domain string +} + +func (t *transferItem) String() string { + return fmt.Sprintf("transferItem{transferType: %v, taskID: %v}", t.transferType, t.taskID) +} + +func (t *transferItem) Offset() int64 { + return t.taskID +} + +func (t *transferItem) GetAttribute(key string) any { + switch key { + case "type": + return "transfer" + case "sub-type": + return t.transferType + case "domain": + return t.domain + default: + panic(fmt.Errorf("unknown key: %v", key)) + } +} diff --git a/common/mapq/mapq.go b/common/mapq/mapq.go new file mode 100644 index 00000000000..919085e060a --- /dev/null +++ b/common/mapq/mapq.go @@ -0,0 +1,106 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package mapq + +import ( + "fmt" + + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/mapq/tree" + "github.com/uber/cadence/common/mapq/types" + "github.com/uber/cadence/common/metrics" +) + +type Options func(*clientImpl) + +func WithPersister(p types.Persister) Options { + return func(c *clientImpl) { + c.persister = p + } +} + +func WithConsumerFactory(cf types.ConsumerFactory) Options { + return func(c *clientImpl) { + c.consumerFactory = cf + } +} + +// WithPartitions sets the partition keys for each level. +// MAPQ creates a tree with depth = len(partitions) +func WithPartitions(partitions []string) Options { + return func(c *clientImpl) { + c.partitions = partitions + } +} + +// WithPolicies sets the policies for the MAPQ instance. +// Policies can be defined for nodes at a specific level or nodes with specific path. +// +// Path conventions: +// - "*" -> represents the root node at level 0 +// - "*/." matches with all nodes at level 1 +// - "*/*" represents the catch-all node at level 1 +// - "*/xyz" represents a specific node at level 1 whose partition value is xyz +// - "*/./." matches with all nodes at level 2 +// - "*/xyz/." matches with all nodes at level 2 whose parent is xyz node +// - "*/xyz/*" represents the catch-all node at level 2 whose parent is xyz node +// - "*/xyz/abc" represents a specific node at level 2 whose level 2 attribute value is abc and parent is xyz node +func WithPolicies(policies []types.NodePolicy) Options { + return func(c *clientImpl) { + c.policies = policies + } +} + +func New(logger log.Logger, scope metrics.Scope, opts ...Options) (types.Client, error) { + c := &clientImpl{ + logger: logger.WithTags(tag.ComponentMapQ), + scope: scope, + } + + for _, opt := range opts { + opt(c) + } + + if c.persister == nil { + return nil, fmt.Errorf("persister is required. Use WithPersister option to set it") + } + + if c.consumerFactory == nil { + return nil, fmt.Errorf("consumer factory is required. Use WithConsumerFactory option to set it") + } + + tree, err := tree.New(logger, scope, c.partitions, c.policies, c.persister, c.consumerFactory) + if err != nil { + return nil, err + } + + c.tree = tree + c.logger.Info("MAPQ client created", + tag.Dynamic("partitions", c.partitions), + tag.Dynamic("policies", c.policies), + tag.Dynamic("tree", c.tree.String()), + ) + + return c, nil +} diff --git a/common/mapq/tree/queue_tree.go b/common/mapq/tree/queue_tree.go new file mode 100644 index 00000000000..2933f1f0c9d --- /dev/null +++ b/common/mapq/tree/queue_tree.go @@ -0,0 +1,174 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package tree + +import ( + "context" + "fmt" + "strings" + + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/mapq/types" + "github.com/uber/cadence/common/metrics" +) + +// QueueTree is a tree structure that represents the queue structure for MAPQ +type QueueTree struct { + originalLogger log.Logger + logger log.Logger + scope metrics.Scope + partitions []string + policyCol types.NodePolicyCollection + persister types.Persister + consumerFactory types.ConsumerFactory + root *QueueTreeNode +} + +func New( + logger log.Logger, + scope metrics.Scope, + partitions []string, + policies []types.NodePolicy, + persister types.Persister, + consumerFactory types.ConsumerFactory, +) (*QueueTree, error) { + t := &QueueTree{ + originalLogger: logger, + logger: logger.WithTags(tag.ComponentMapQTree), + scope: scope, + partitions: partitions, + policyCol: types.NewNodePolicyCollection(policies), + persister: persister, + consumerFactory: consumerFactory, + } + + return t, t.init() +} + +// Start the dispatchers for all leaf nodes +func (t *QueueTree) Start(ctx context.Context) error { + t.logger.Info("Starting MAPQ tree", tag.Dynamic("tree", t.String())) + err := t.root.Start(ctx, t.consumerFactory, nil, map[string]any{}) + if err != nil { + return fmt.Errorf("failed to start root node: %w", err) + } + + t.logger.Info("Started MAPQ tree") + return nil +} + +// Stop the dispatchers for all leaf nodes +func (t *QueueTree) Stop(ctx context.Context) error { + t.logger.Info("Stopping MAPQ tree", tag.Dynamic("tree", t.String())) + + err := t.root.Stop(ctx) + if err != nil { + return fmt.Errorf("failed to stop nodes: %w", err) + } + + t.logger.Info("Stopped MAPQ tree") + return nil +} + +func (t *QueueTree) String() string { + var sb strings.Builder + var nodes []*QueueTreeNode + nodes = append(nodes, t.root) + for len(nodes) > 0 { + node := nodes[0] + nodes = nodes[1:] + sb.WriteString(node.String()) + sb.WriteString("\n") + for _, child := range node.Children { + nodes = append(nodes, child) + } + } + + return sb.String() +} + +func (t *QueueTree) Enqueue(ctx context.Context, items []types.Item) ([]types.ItemToPersist, error) { + if t.root == nil { + return nil, fmt.Errorf("root node is nil") + } + + var itemsToPersist []types.ItemToPersist + for _, item := range items { + itemToPersist, err := t.root.Enqueue(ctx, item, nil, map[string]any{}) + if err != nil { + return nil, err + } + itemsToPersist = append(itemsToPersist, itemToPersist) + } + + return itemsToPersist, t.persister.Persist(ctx, itemsToPersist) +} + +func (t *QueueTree) init() error { + t.root = &QueueTreeNode{ + Path: "*", // Root node + Children: map[any]*QueueTreeNode{}, + } + + if err := t.root.Init(t.originalLogger, t.scope, t.policyCol, t.partitions); err != nil { + return fmt.Errorf("failed to initialize node: %w", err) + } + + // Create tree nodes with catch-all nodes at all levels and predefined splits. + // There will be len(partitions) levels in the tree. + err := t.constructInitialNodes(t.root) + if err != nil { + return fmt.Errorf("failed to construct initial tree: %w", err) + } + + return nil +} + +func (t *QueueTree) constructInitialNodes(n *QueueTreeNode) error { + nodeLevel := nodeLevel(n.Path) + if nodeLevel == len(t.partitions) { // reached the leaf level + return nil + } + + if n.Children["*"] != nil { // catch-all node already exists + return nil + } + + _, err := n.addChild("*", t.policyCol, t.partitions) + if err != nil { + return err + } + + for _, child := range n.Children { + if err := t.constructInitialNodes(child); err != nil { + return err + } + } + + return nil +} + +func nodeLevel(path string) int { + return len(strings.Split(path, "/")) - 1 +} diff --git a/common/mapq/tree/queue_tree_node.go b/common/mapq/tree/queue_tree_node.go new file mode 100644 index 00000000000..520fdad5481 --- /dev/null +++ b/common/mapq/tree/queue_tree_node.go @@ -0,0 +1,213 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package tree + +import ( + "context" + "fmt" + + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/mapq/dispatcher" + "github.com/uber/cadence/common/mapq/types" + "github.com/uber/cadence/common/metrics" +) + +// QueueTreeNode represents a node in the queue tree +type QueueTreeNode struct { + // originalLogger is the logger passed in during creation. No node specific tags are added to this logger and it should be passed to child nodes + originalLogger log.Logger + + // logger is the logger for this node. It has the node specific tags added to it + logger log.Logger + scope metrics.Scope + + // The path to the node + Path string + + // The partition key used by this node to decide which child to enqueue the item. + // Partition key of a node is the attribute key of child node. + PartitionKey string + + // The attribute key used to create this node by parent + AttributeKey string + + // The attribute value used to create this node by parent + AttributeVal any + + // The policy for this node. It's merged policy from all policies that match this node + NodePolicy types.NodePolicy + + // Children by attribute key + // "*" is a special key that represents the default/fallback child queue + // If there's no children then the node is considered leaf node + Children map[any]*QueueTreeNode + + // The dispatcher for this node. Only leaf nodes have dispatcher + Dispatcher *dispatcher.Dispatcher +} + +func (n *QueueTreeNode) Start( + ctx context.Context, + consumerFactory types.ConsumerFactory, + partitions []string, + partitionMap map[string]any, +) error { + n.logger.Info("Starting node", tag.Dynamic("node", n.String())) + + // If there are no children then this is a leaf node + if len(n.Children) == 0 { + n.logger.Info("Creating consumer and starting a new dispatcher for leaf node") + c, err := consumerFactory.New(types.NewItemPartitions(partitions, partitionMap)) + if err != nil { + return err + } + d := dispatcher.New(c) + if err := d.Start(ctx); err != nil { + return err + } + n.Dispatcher = d + return nil + } + + for _, child := range n.Children { + partitionMap[n.PartitionKey] = child.AttributeVal + err := child.Start(ctx, consumerFactory, partitions, partitionMap) + if err != nil { + return fmt.Errorf("failed to start child %s: %w", child.Path, err) + } + } + + n.logger.Info("Started node") + return nil +} + +func (n *QueueTreeNode) Stop(ctx context.Context) error { + n.logger.Info("Stopping node") + + if n.Dispatcher != nil { // leaf node + return n.Dispatcher.Stop(ctx) + } + + for _, child := range n.Children { + if err := child.Stop(ctx); err != nil { + return fmt.Errorf("failed to stop child %s: %w", child.Path, err) + } + } + + n.logger.Info("Stopped node") + return nil +} + +func (n *QueueTreeNode) Enqueue( + ctx context.Context, + item types.Item, + partitions []string, + partitionMap map[string]any, +) (types.ItemToPersist, error) { + // If there are no children then this is a leaf node + if len(n.Children) == 0 { + return types.NewItemToPersist(item, types.NewItemPartitions(partitions, partitionMap)), nil + } + + // Add the attribute value to queueNodePathParts + partitionVal := item.GetAttribute(n.PartitionKey) + partitions = append(partitions, n.PartitionKey) + partitionMap[n.PartitionKey] = partitionVal + + child, ok := n.Children[partitionVal] + if !ok { + // TODO: thread safety missing + child, ok = n.Children["*"] + partitionMap[n.PartitionKey] = "*" + if !ok { + // catch-all nodes are created during initalization so this should never happen + return nil, fmt.Errorf("no child found for attribute %v in node %v", partitionVal, n.Path) + } + } + + return child.Enqueue(ctx, item, partitions, partitionMap) +} + +func (n *QueueTreeNode) String() string { + return fmt.Sprintf("QueueTreeNode{Path: %q, AttributeKey: %v, AttributeVal: %v, NodePolicy: %s, Num Children: %d}", n.Path, n.AttributeKey, n.AttributeVal, n.NodePolicy, len(n.Children)) +} + +func (n *QueueTreeNode) Init(logger log.Logger, scope metrics.Scope, policyCol types.NodePolicyCollection, partitions []string) error { + n.originalLogger = logger + n.logger = logger.WithTags(tag.ComponentMapQTreeNode, tag.Dynamic("path", n.Path)) + n.scope = scope + + // Get the merged policy for this node + policy, err := policyCol.GetMergedPolicyForNode(n.Path) + if err != nil { + return err + } + n.NodePolicy = policy + + // Set partition key of the node + nodeLevel := nodeLevel(n.Path) + if nodeLevel < len(partitions) { + n.PartitionKey = partitions[nodeLevel] + } + + // Create predefined children nodes + return n.addPredefinedSplits(policyCol, partitions) +} + +func (n *QueueTreeNode) addChild(attrVal any, policyCol types.NodePolicyCollection, partitions []string) (*QueueTreeNode, error) { + path := fmt.Sprintf("%s/%v", n.Path, attrVal) + ch := &QueueTreeNode{ + Path: path, + AttributeKey: n.PartitionKey, + AttributeVal: attrVal, + Children: map[any]*QueueTreeNode{}, + } + + if err := ch.Init(n.originalLogger, n.scope, policyCol, partitions); err != nil { + return nil, err + } + + n.Children[attrVal] = ch + return ch, nil +} + +func (n *QueueTreeNode) addPredefinedSplits(policyCol types.NodePolicyCollection, partitions []string) error { + if n.NodePolicy.SplitPolicy == nil || len(n.NodePolicy.SplitPolicy.PredefinedSplits) == 0 { + return nil + } + + if nodeLevel(n.Path) >= len(partitions) { + return fmt.Errorf("predefined split is defined for a leaf level node %s", n.Path) + } + + for _, split := range n.NodePolicy.SplitPolicy.PredefinedSplits { + + _, err := n.addChild(split, policyCol, partitions) + if err != nil { + return err + } + } + + return nil +} diff --git a/common/mapq/tree/queue_tree_test.go b/common/mapq/tree/queue_tree_test.go new file mode 100644 index 00000000000..2a03444c328 --- /dev/null +++ b/common/mapq/tree/queue_tree_test.go @@ -0,0 +1,271 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package tree + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/google/go-cmp/cmp" + "go.uber.org/goleak" + + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/mapq/types" + "github.com/uber/cadence/common/metrics" +) + +func TestStartStop(t *testing.T) { + defer goleak.VerifyNone(t) + ctrl := gomock.NewController(t) + consumerFactory := types.NewMockConsumerFactory(ctrl) + consumer := types.NewMockConsumer(ctrl) + // 7 consumers are created in the test tree for each leaf node defined by getTestPolicies() + // - */timer/deletehistory/* + // - */timer/*/domain1 + // - */timer/*/* + // - */transfer/*/domain1 + // - */transfer/*/* + // - */*/*/domain1 + // - */*/*/* + consumerFactory.EXPECT().New(gomock.Any()).Return(consumer, nil).Times(7) + + tree, err := New( + testlogger.New(t), + metrics.NoopScope(0), + []string{"type", "sub-type", "domain"}, + getTestPolicies(), + types.NewMockPersister(ctrl), + consumerFactory, + ) + if err != nil { + t.Fatalf("failed to create queue tree: %v", err) + } + + if err := tree.Start(context.Background()); err != nil { + t.Fatalf("failed to start queue tree: %v", err) + } + + if err := tree.Stop(context.Background()); err != nil { + t.Fatalf("failed to stop queue tree: %v", err) + } +} + +func TestEnqueue(t *testing.T) { + tests := []struct { + name string + policies []types.NodePolicy + leafNodeCount int + items []types.Item + persistErr error + wantItemsToPersist []types.ItemToPersist + wantErr bool + }{ + { + name: "success", + policies: getTestPolicies(), + leafNodeCount: 7, + items: []types.Item{ + mockItem(t, map[string]any{"type": "timer", "sub-type": "deletehistory", "domain": "domain1"}), + mockItem(t, map[string]any{"type": "timer", "sub-type": "deletehistory", "domain": "domain2"}), + mockItem(t, map[string]any{"type": "timer", "sub-type": "deletehistory", "domain": "domain3"}), + mockItem(t, map[string]any{"type": "timer", "sub-type": "activitytimeout", "domain": "domain1"}), + mockItem(t, map[string]any{"type": "timer", "sub-type": "activitytimeout", "domain": "domain2"}), + mockItem(t, map[string]any{"type": "timer", "sub-type": "activitytimeout", "domain": "domain3"}), + mockItem(t, map[string]any{"type": "transfer", "sub-type": "activity", "domain": "domain1"}), + mockItem(t, map[string]any{"type": "transfer", "sub-type": "activity", "domain": "domain2"}), + mockItem(t, map[string]any{"type": "transfer", "sub-type": "activity", "domain": "domain3"}), + }, + wantItemsToPersist: func() []types.ItemToPersist { + result := make([]types.ItemToPersist, 9) + partitions := []string{"type", "sub-type", "domain"} + + // deletehistory timers should go to the leaf node "*/timer/deletehistory/*" + result[0] = types.NewItemToPersist( + mockItem(t, map[string]any{"type": "timer", "sub-type": "deletehistory", "domain": "domain1"}), + types.NewItemPartitions(partitions, map[string]any{"type": "timer", "sub-type": "deletehistory", "domain": "*"}), + ) + result[1] = types.NewItemToPersist( + mockItem(t, map[string]any{"type": "timer", "sub-type": "deletehistory", "domain": "domain2"}), + types.NewItemPartitions(partitions, map[string]any{"type": "timer", "sub-type": "deletehistory", "domain": "*"}), + ) + result[2] = types.NewItemToPersist( + mockItem(t, map[string]any{"type": "timer", "sub-type": "deletehistory", "domain": "domain3"}), + types.NewItemPartitions(partitions, map[string]any{"type": "timer", "sub-type": "deletehistory", "domain": "*"}), + ) + + // activitytimeout timers either goes to domain1 specific leaf node ("*/timer/*/domain1") or the catch all leaf node "*/timer/*/*" + result[3] = types.NewItemToPersist( + mockItem(t, map[string]any{"type": "timer", "sub-type": "activitytimeout", "domain": "domain1"}), + types.NewItemPartitions(partitions, map[string]any{"type": "timer", "sub-type": "*", "domain": "domain1"}), + ) + result[4] = types.NewItemToPersist( + mockItem(t, map[string]any{"type": "timer", "sub-type": "activitytimeout", "domain": "domain2"}), + types.NewItemPartitions(partitions, map[string]any{"type": "timer", "sub-type": "*", "domain": "*"}), + ) + result[5] = types.NewItemToPersist( + mockItem(t, map[string]any{"type": "timer", "sub-type": "activitytimeout", "domain": "domain3"}), + types.NewItemPartitions(partitions, map[string]any{"type": "timer", "sub-type": "*", "domain": "*"}), + ) + + // transfer tasks either goes to domain1 specific leaf node ("*/transfer/*/domain1") or the catch all leaf node "*/transfer/*/*" + result[6] = types.NewItemToPersist( + mockItem(t, map[string]any{"type": "transfer", "sub-type": "activity", "domain": "domain1"}), + types.NewItemPartitions(partitions, map[string]any{"type": "transfer", "sub-type": "*", "domain": "domain1"}), + ) + result[7] = types.NewItemToPersist( + mockItem(t, map[string]any{"type": "transfer", "sub-type": "activity", "domain": "domain2"}), + types.NewItemPartitions(partitions, map[string]any{"type": "transfer", "sub-type": "*", "domain": "*"}), + ) + result[8] = types.NewItemToPersist( + mockItem(t, map[string]any{"type": "transfer", "sub-type": "activity", "domain": "domain3"}), + types.NewItemPartitions(partitions, map[string]any{"type": "transfer", "sub-type": "*", "domain": "*"}), + ) + + return result + }(), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + + defer goleak.VerifyNone(t) + ctrl := gomock.NewController(t) + consumerFactory := types.NewMockConsumerFactory(ctrl) + consumer := types.NewMockConsumer(ctrl) + // consumers will be creeted for each leaf node + consumerFactory.EXPECT().New(gomock.Any()).Return(consumer, nil).Times(tc.leafNodeCount) + + var gotItemsToPersistByPersister []types.ItemToPersist + persister := types.NewMockPersister(ctrl) + persister.EXPECT().Persist(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, itemsToPersist []types.ItemToPersist) error { + gotItemsToPersistByPersister = itemsToPersist + return tc.persistErr + }) + + tree, err := New( + testlogger.New(t), + metrics.NoopScope(0), + []string{"type", "sub-type", "domain"}, + getTestPolicies(), + persister, + consumerFactory, + ) + if err != nil { + t.Fatalf("failed to create queue tree: %v", err) + } + + if err := tree.Start(context.Background()); err != nil { + t.Fatalf("failed to start queue tree: %v", err) + } + defer tree.Stop(context.Background()) + + returnedItemsToPersist, err := tree.Enqueue(context.Background(), tc.items) + if (err != nil) != tc.wantErr { + t.Errorf("Enqueue() error: %v, wantErr: %v", err, tc.wantErr) + } + + if err != nil { + return + } + + if got, want := len(gotItemsToPersistByPersister), len(tc.wantItemsToPersist); got != want { + t.Fatalf("Items to persist count mismatch: gotItemsToPersistByPersister=%v, want=%v", got, want) + } + + if got, want := len(returnedItemsToPersist), len(tc.wantItemsToPersist); got != want { + t.Fatalf("Items to persist count mismatch: returnedItemsToPersist=%v, want=%v", got, want) + } + + for i := range gotItemsToPersistByPersister { + if diff := cmp.Diff(tc.wantItemsToPersist[i].String(), gotItemsToPersistByPersister[i].String()); diff != "" { + t.Errorf("%d - gotItemsToPersistByPersister mismatch (-want +got):\n%s", i, diff) + } + + if diff := cmp.Diff(tc.wantItemsToPersist[i].String(), returnedItemsToPersist[i].String()); diff != "" { + t.Errorf("%d - returnedItemsToPersist mismatch (-want +got):\n%s", i, diff) + } + } + }) + } +} + +func mockItem(t *testing.T, attributes map[string]any) types.Item { + item := types.NewMockItem(gomock.NewController(t)) + item.EXPECT().GetAttribute(gomock.Any()).DoAndReturn(func(key string) any { + return attributes[key] + }).AnyTimes() + item.EXPECT().String().Return("mockitem").AnyTimes() + return item +} + +func getTestPolicies() []types.NodePolicy { + return []types.NodePolicy{ + { + Path: "*", // level 0 + SplitPolicy: &types.SplitPolicy{ + PredefinedSplits: []any{"timer", "transfer"}, + }, + DispatchPolicy: &types.DispatchPolicy{DispatchRPS: 100}, + }, + { + Path: "*/.", // level 1 default policy + SplitPolicy: &types.SplitPolicy{}, + DispatchPolicy: &types.DispatchPolicy{DispatchRPS: 50}, + }, + { + Path: "*/timer", // level 1 timer node + SplitPolicy: &types.SplitPolicy{ + PredefinedSplits: []any{"deletehistory"}, + }, + }, + { + Path: "*/./.", // level 2 default policy + SplitPolicy: &types.SplitPolicy{ + PredefinedSplits: []any{"domain1"}, + }, + }, + { + Path: "*/timer/deletehistory", // level 2 deletehistory timer node policy + SplitPolicy: &types.SplitPolicy{ + Disabled: true, + }, + DispatchPolicy: &types.DispatchPolicy{DispatchRPS: 5}, + }, + { + Path: "*/././*", // level 3 default catch-all node policy + SplitPolicy: &types.SplitPolicy{ + Disabled: true, + }, + DispatchPolicy: &types.DispatchPolicy{DispatchRPS: 1000}, + }, + { + Path: "*/././domain1", // level 3 domain node policy + SplitPolicy: &types.SplitPolicy{ + Disabled: true, + }, + DispatchPolicy: &types.DispatchPolicy{DispatchRPS: 42}, + }, + } +} diff --git a/common/mapq/types/client.go b/common/mapq/types/client.go new file mode 100644 index 00000000000..b1499d83bba --- /dev/null +++ b/common/mapq/types/client.go @@ -0,0 +1,55 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package types + +import ( + "context" +) + +type Client interface { + // Enqueue adds an item to the queue + Enqueue(context.Context, []Item) ([]ItemToPersist, error) + + // Ack marks the item as processed. + // Out of order acks are supported. + // Acking an item that has not been dequeued will have no effect. + // Queue's committed offset is updated to the last ack'ed item's offset periodically until there's a gap (un-acked item). + // In other words, all items up to the committed offset are ack'ed. + // Ack'ed item might still be returned from Dequeue if its offset is higher than last committed offset before process restarts. + Ack(context.Context, Item) error + + // Nack negatively acknowledges an item in the queue + // Nack'ing an already ack'ed item will have no effect. + Nack(context.Context, Item) error + + // Start the client. It will + // - fetch the last committed offsets from the persister, + // - start corresponding consumers + // - dispatch items starting from those offsets. + Start(context.Context) error + + // Stop the client. It will + // - stop all consumers + // - persist the last committed offsets + Stop(context.Context) error +} diff --git a/common/mapq/types/consumer.go b/common/mapq/types/consumer.go new file mode 100644 index 00000000000..b636de22c2b --- /dev/null +++ b/common/mapq/types/consumer.go @@ -0,0 +1,43 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package types + +import "context" + +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination consumer_mock.go -package types github.com/uber/cadence/common/mapq/types ConsumerFactory,Consumer + +type ConsumerFactory interface { + // New creates a new consumer with the given partitions or returns an existing consumer + // to process the given partitions + // Consumer lifecycle is managed by the factory so the returned consumer must be started. + New(ItemPartitions) (Consumer, error) + + // Stop stops all consumers created by this factory + Stop(context.Context) error +} + +type Consumer interface { + Start(context.Context) error + Stop(context.Context) error + Process(context.Context, Item) error +} diff --git a/common/mapq/types/consumer_mock.go b/common/mapq/types/consumer_mock.go new file mode 100644 index 00000000000..f45a7de7a49 --- /dev/null +++ b/common/mapq/types/consumer_mock.go @@ -0,0 +1,151 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: consumer.go + +// Package types is a generated GoMock package. +package types + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockConsumerFactory is a mock of ConsumerFactory interface. +type MockConsumerFactory struct { + ctrl *gomock.Controller + recorder *MockConsumerFactoryMockRecorder +} + +// MockConsumerFactoryMockRecorder is the mock recorder for MockConsumerFactory. +type MockConsumerFactoryMockRecorder struct { + mock *MockConsumerFactory +} + +// NewMockConsumerFactory creates a new mock instance. +func NewMockConsumerFactory(ctrl *gomock.Controller) *MockConsumerFactory { + mock := &MockConsumerFactory{ctrl: ctrl} + mock.recorder = &MockConsumerFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockConsumerFactory) EXPECT() *MockConsumerFactoryMockRecorder { + return m.recorder +} + +// New mocks base method. +func (m *MockConsumerFactory) New(arg0 ItemPartitions) (Consumer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "New", arg0) + ret0, _ := ret[0].(Consumer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// New indicates an expected call of New. +func (mr *MockConsumerFactoryMockRecorder) New(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "New", reflect.TypeOf((*MockConsumerFactory)(nil).New), arg0) +} + +// Stop mocks base method. +func (m *MockConsumerFactory) Stop(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Stop", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Stop indicates an expected call of Stop. +func (mr *MockConsumerFactoryMockRecorder) Stop(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockConsumerFactory)(nil).Stop), arg0) +} + +// MockConsumer is a mock of Consumer interface. +type MockConsumer struct { + ctrl *gomock.Controller + recorder *MockConsumerMockRecorder +} + +// MockConsumerMockRecorder is the mock recorder for MockConsumer. +type MockConsumerMockRecorder struct { + mock *MockConsumer +} + +// NewMockConsumer creates a new mock instance. +func NewMockConsumer(ctrl *gomock.Controller) *MockConsumer { + mock := &MockConsumer{ctrl: ctrl} + mock.recorder = &MockConsumerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockConsumer) EXPECT() *MockConsumerMockRecorder { + return m.recorder +} + +// Process mocks base method. +func (m *MockConsumer) Process(arg0 context.Context, arg1 Item) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Process", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Process indicates an expected call of Process. +func (mr *MockConsumerMockRecorder) Process(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Process", reflect.TypeOf((*MockConsumer)(nil).Process), arg0, arg1) +} + +// Start mocks base method. +func (m *MockConsumer) Start(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Start", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Start indicates an expected call of Start. +func (mr *MockConsumerMockRecorder) Start(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockConsumer)(nil).Start), arg0) +} + +// Stop mocks base method. +func (m *MockConsumer) Stop(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Stop", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Stop indicates an expected call of Stop. +func (mr *MockConsumerMockRecorder) Stop(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockConsumer)(nil).Stop), arg0) +} diff --git a/common/mapq/types/item.go b/common/mapq/types/item.go new file mode 100644 index 00000000000..7304c19aea7 --- /dev/null +++ b/common/mapq/types/item.go @@ -0,0 +1,118 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package types + +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination item_mock.go -package types github.com/uber/cadence/common/mapq/types Item + +import "fmt" + +type Item interface { + // GetAttribute returns the value of the attribute key. + // Value can be any type. It's up to the client to interpret the value. + // Attribute keys used as partition keys will be converted to string because they are used as node + // identifiers in the queue tree. + GetAttribute(key string) any + + // Offset returns the offset of the item in the queue. e.g. monotonically increasing sequence number or a timestamp + Offset() int64 + + // String returns a human friendly representation of the item for logging purposes + String() string +} + +type ItemToPersist interface { + Item + ItemPartitions +} + +func NewItemToPersist(item Item, itemPartitions ItemPartitions) ItemToPersist { + return &defaultItemToPersist{ + item: item, + itemPartitions: itemPartitions, + } +} + +type ItemPartitions interface { + // GetPartitionKeys returns the partition keys ordered by their level in the tree + GetPartitionKeys() []string + + // GetPartitionValue returns the partition value to determine the target queue + // e.g. + // Below example demonstrates that item is in a catch-all queue for sub-type + // Item.GetAttribute("sub-type") returns "timer" + // ItemPartitions.GetPartitionValue("sub-type") returns "*" + // + GetPartitionValue(key string) any + + // String returns a human friendly representation of the item for logging purposes + String() string +} + +func NewItemPartitions(partitionKeys []string, partitionMap map[string]any) ItemPartitions { + return &defaultItemPartitions{ + partitionKeys: partitionKeys, + partitionMap: partitionMap, + } +} + +type defaultItemPartitions struct { + partitionKeys []string + partitionMap map[string]any +} + +func (i *defaultItemPartitions) GetPartitionKeys() []string { + return i.partitionKeys +} + +func (i *defaultItemPartitions) GetPartitionValue(key string) any { + return i.partitionMap[key] +} + +func (i *defaultItemPartitions) String() string { + return fmt.Sprintf("ItemPartitions{partitionKeys:%v, partitionMap:%v}", i.partitionKeys, i.partitionMap) +} + +type defaultItemToPersist struct { + item Item + itemPartitions ItemPartitions +} + +func (i *defaultItemToPersist) String() string { + return fmt.Sprintf("ItemToPersist{item:%v, itemPartitions:%v}", i.item, i.itemPartitions) +} + +func (i *defaultItemToPersist) Offset() int64 { + return i.item.Offset() +} + +func (i *defaultItemToPersist) GetAttribute(key string) any { + return i.item.GetAttribute(key) +} + +func (i *defaultItemToPersist) GetPartitionKeys() []string { + return i.itemPartitions.GetPartitionKeys() +} + +func (i *defaultItemToPersist) GetPartitionValue(key string) any { + return i.itemPartitions.GetPartitionValue(key) +} diff --git a/common/mapq/types/item_mock.go b/common/mapq/types/item_mock.go new file mode 100644 index 00000000000..91e62580033 --- /dev/null +++ b/common/mapq/types/item_mock.go @@ -0,0 +1,256 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: item.go + +// Package types is a generated GoMock package. +package types + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockItem is a mock of Item interface. +type MockItem struct { + ctrl *gomock.Controller + recorder *MockItemMockRecorder +} + +// MockItemMockRecorder is the mock recorder for MockItem. +type MockItemMockRecorder struct { + mock *MockItem +} + +// NewMockItem creates a new mock instance. +func NewMockItem(ctrl *gomock.Controller) *MockItem { + mock := &MockItem{ctrl: ctrl} + mock.recorder = &MockItemMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockItem) EXPECT() *MockItemMockRecorder { + return m.recorder +} + +// GetAttribute mocks base method. +func (m *MockItem) GetAttribute(key string) any { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAttribute", key) + ret0, _ := ret[0].(any) + return ret0 +} + +// GetAttribute indicates an expected call of GetAttribute. +func (mr *MockItemMockRecorder) GetAttribute(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAttribute", reflect.TypeOf((*MockItem)(nil).GetAttribute), key) +} + +// Offset mocks base method. +func (m *MockItem) Offset() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Offset") + ret0, _ := ret[0].(int64) + return ret0 +} + +// Offset indicates an expected call of Offset. +func (mr *MockItemMockRecorder) Offset() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Offset", reflect.TypeOf((*MockItem)(nil).Offset)) +} + +// String mocks base method. +func (m *MockItem) String() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "String") + ret0, _ := ret[0].(string) + return ret0 +} + +// String indicates an expected call of String. +func (mr *MockItemMockRecorder) String() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "String", reflect.TypeOf((*MockItem)(nil).String)) +} + +// MockItemToPersist is a mock of ItemToPersist interface. +type MockItemToPersist struct { + ctrl *gomock.Controller + recorder *MockItemToPersistMockRecorder +} + +// MockItemToPersistMockRecorder is the mock recorder for MockItemToPersist. +type MockItemToPersistMockRecorder struct { + mock *MockItemToPersist +} + +// NewMockItemToPersist creates a new mock instance. +func NewMockItemToPersist(ctrl *gomock.Controller) *MockItemToPersist { + mock := &MockItemToPersist{ctrl: ctrl} + mock.recorder = &MockItemToPersistMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockItemToPersist) EXPECT() *MockItemToPersistMockRecorder { + return m.recorder +} + +// GetAttribute mocks base method. +func (m *MockItemToPersist) GetAttribute(key string) any { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAttribute", key) + ret0, _ := ret[0].(any) + return ret0 +} + +// GetAttribute indicates an expected call of GetAttribute. +func (mr *MockItemToPersistMockRecorder) GetAttribute(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAttribute", reflect.TypeOf((*MockItemToPersist)(nil).GetAttribute), key) +} + +// GetPartitionKeys mocks base method. +func (m *MockItemToPersist) GetPartitionKeys() []string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPartitionKeys") + ret0, _ := ret[0].([]string) + return ret0 +} + +// GetPartitionKeys indicates an expected call of GetPartitionKeys. +func (mr *MockItemToPersistMockRecorder) GetPartitionKeys() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPartitionKeys", reflect.TypeOf((*MockItemToPersist)(nil).GetPartitionKeys)) +} + +// GetPartitionValue mocks base method. +func (m *MockItemToPersist) GetPartitionValue(key string) any { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPartitionValue", key) + ret0, _ := ret[0].(any) + return ret0 +} + +// GetPartitionValue indicates an expected call of GetPartitionValue. +func (mr *MockItemToPersistMockRecorder) GetPartitionValue(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPartitionValue", reflect.TypeOf((*MockItemToPersist)(nil).GetPartitionValue), key) +} + +// Offset mocks base method. +func (m *MockItemToPersist) Offset() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Offset") + ret0, _ := ret[0].(int64) + return ret0 +} + +// Offset indicates an expected call of Offset. +func (mr *MockItemToPersistMockRecorder) Offset() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Offset", reflect.TypeOf((*MockItemToPersist)(nil).Offset)) +} + +// String mocks base method. +func (m *MockItemToPersist) String() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "String") + ret0, _ := ret[0].(string) + return ret0 +} + +// String indicates an expected call of String. +func (mr *MockItemToPersistMockRecorder) String() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "String", reflect.TypeOf((*MockItemToPersist)(nil).String)) +} + +// MockItemPartitions is a mock of ItemPartitions interface. +type MockItemPartitions struct { + ctrl *gomock.Controller + recorder *MockItemPartitionsMockRecorder +} + +// MockItemPartitionsMockRecorder is the mock recorder for MockItemPartitions. +type MockItemPartitionsMockRecorder struct { + mock *MockItemPartitions +} + +// NewMockItemPartitions creates a new mock instance. +func NewMockItemPartitions(ctrl *gomock.Controller) *MockItemPartitions { + mock := &MockItemPartitions{ctrl: ctrl} + mock.recorder = &MockItemPartitionsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockItemPartitions) EXPECT() *MockItemPartitionsMockRecorder { + return m.recorder +} + +// GetPartitionKeys mocks base method. +func (m *MockItemPartitions) GetPartitionKeys() []string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPartitionKeys") + ret0, _ := ret[0].([]string) + return ret0 +} + +// GetPartitionKeys indicates an expected call of GetPartitionKeys. +func (mr *MockItemPartitionsMockRecorder) GetPartitionKeys() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPartitionKeys", reflect.TypeOf((*MockItemPartitions)(nil).GetPartitionKeys)) +} + +// GetPartitionValue mocks base method. +func (m *MockItemPartitions) GetPartitionValue(key string) any { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPartitionValue", key) + ret0, _ := ret[0].(any) + return ret0 +} + +// GetPartitionValue indicates an expected call of GetPartitionValue. +func (mr *MockItemPartitionsMockRecorder) GetPartitionValue(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPartitionValue", reflect.TypeOf((*MockItemPartitions)(nil).GetPartitionValue), key) +} + +// String mocks base method. +func (m *MockItemPartitions) String() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "String") + ret0, _ := ret[0].(string) + return ret0 +} + +// String indicates an expected call of String. +func (mr *MockItemPartitionsMockRecorder) String() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "String", reflect.TypeOf((*MockItemPartitions)(nil).String)) +} diff --git a/common/mapq/types/item_test.go b/common/mapq/types/item_test.go new file mode 100644 index 00000000000..78536746373 --- /dev/null +++ b/common/mapq/types/item_test.go @@ -0,0 +1,81 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package types + +import ( + "strings" + "testing" + + "github.com/golang/mock/gomock" + "github.com/google/go-cmp/cmp" +) + +func TestNewItemToPersist(t *testing.T) { + ctrl := gomock.NewController(t) + item := NewMockItem(ctrl) + itemStr := "###item###" + item.EXPECT().String().Return(itemStr).Times(1) + item.EXPECT().GetAttribute("attr1").Return("value1").Times(1) + item.EXPECT().GetAttribute("attr2").Return("value2").Times(1) + + partitions := []string{"attr1", "attr2"} + itemPartitions := NewItemPartitions( + partitions, + map[string]any{ + "attr1": "*", + "attr2": "value2", + }, + ) + + itemToPersist := NewItemToPersist(item, itemPartitions) + if itemToPersist == nil { + t.Fatal("itemToPersist is nil") + } + + if got := itemToPersist.GetAttribute("attr1"); got != "value1" { + t.Errorf("itemToPersist.GetAttribute(attr1) = %v, want %v", got, "value1") + } + if got := itemToPersist.GetAttribute("attr2"); got != "value2" { + t.Errorf("itemToPersist.GetAttribute(attr2) = %v, want %v", got, "value2") + } + + gotPartitions := itemToPersist.GetPartitionKeys() + if diff := cmp.Diff(partitions, gotPartitions); diff != "" { + t.Fatalf("Partition keys mismatch (-want +got):\n%s", diff) + } + if got := itemToPersist.GetPartitionValue("attr1"); got != "*" { + t.Errorf("itemToPersist.GetPartitionValue(attr1) = %v, want %v", got, "*") + } + if got := itemToPersist.GetPartitionValue("attr2"); got != "value2" { + t.Errorf("itemToPersist.GetPartitionValue(attr2) = %v, want %v", got, "value2") + } + + itemToPersistStr := itemToPersist.String() + itemPartitionsStr := itemPartitions.String() + if !strings.Contains(itemToPersistStr, itemPartitionsStr) { + t.Errorf("itemToPersist.String() = %v, want to contain %v", itemToPersistStr, itemPartitionsStr) + } + if !strings.Contains(itemToPersistStr, itemStr) { + t.Errorf("itemToPersist.String() = %v, want to contain %v", itemToPersistStr, itemStr) + } +} diff --git a/common/mapq/types/offsets.go b/common/mapq/types/offsets.go new file mode 100644 index 00000000000..f53177b7781 --- /dev/null +++ b/common/mapq/types/offsets.go @@ -0,0 +1,28 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package types + +// Offsets encapsulates the whole queue tree state including the offsets of each leaf node +type Offsets struct { + // TODO: complete +} diff --git a/common/mapq/types/persister.go b/common/mapq/types/persister.go new file mode 100644 index 00000000000..d19e88ec495 --- /dev/null +++ b/common/mapq/types/persister.go @@ -0,0 +1,38 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package types + +import "context" + +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination persister_mock.go -package types github.com/uber/cadence/common/mapq/types Persister + +type Persister interface { + Persist(ctx context.Context, items []ItemToPersist) error + GetOffsets(ctx context.Context) (*Offsets, error) + CommitOffsets(ctx context.Context, offsets *Offsets) error + Fetch(ctx context.Context, partitions ItemPartitions, pageInfo PageInfo) ([]Item, error) +} + +type PageInfo struct { + // TODO: define ack levels, page size, etc. +} diff --git a/common/mapq/types/persister_mock.go b/common/mapq/types/persister_mock.go new file mode 100644 index 00000000000..500e0e13441 --- /dev/null +++ b/common/mapq/types/persister_mock.go @@ -0,0 +1,115 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: persister.go + +// Package types is a generated GoMock package. +package types + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockPersister is a mock of Persister interface. +type MockPersister struct { + ctrl *gomock.Controller + recorder *MockPersisterMockRecorder +} + +// MockPersisterMockRecorder is the mock recorder for MockPersister. +type MockPersisterMockRecorder struct { + mock *MockPersister +} + +// NewMockPersister creates a new mock instance. +func NewMockPersister(ctrl *gomock.Controller) *MockPersister { + mock := &MockPersister{ctrl: ctrl} + mock.recorder = &MockPersisterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPersister) EXPECT() *MockPersisterMockRecorder { + return m.recorder +} + +// CommitOffsets mocks base method. +func (m *MockPersister) CommitOffsets(ctx context.Context, offsets *Offsets) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CommitOffsets", ctx, offsets) + ret0, _ := ret[0].(error) + return ret0 +} + +// CommitOffsets indicates an expected call of CommitOffsets. +func (mr *MockPersisterMockRecorder) CommitOffsets(ctx, offsets interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CommitOffsets", reflect.TypeOf((*MockPersister)(nil).CommitOffsets), ctx, offsets) +} + +// Fetch mocks base method. +func (m *MockPersister) Fetch(ctx context.Context, partitions ItemPartitions, pageInfo PageInfo) ([]Item, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Fetch", ctx, partitions, pageInfo) + ret0, _ := ret[0].([]Item) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Fetch indicates an expected call of Fetch. +func (mr *MockPersisterMockRecorder) Fetch(ctx, partitions, pageInfo interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Fetch", reflect.TypeOf((*MockPersister)(nil).Fetch), ctx, partitions, pageInfo) +} + +// GetOffsets mocks base method. +func (m *MockPersister) GetOffsets(ctx context.Context) (*Offsets, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOffsets", ctx) + ret0, _ := ret[0].(*Offsets) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetOffsets indicates an expected call of GetOffsets. +func (mr *MockPersisterMockRecorder) GetOffsets(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOffsets", reflect.TypeOf((*MockPersister)(nil).GetOffsets), ctx) +} + +// Persist mocks base method. +func (m *MockPersister) Persist(ctx context.Context, items []ItemToPersist) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Persist", ctx, items) + ret0, _ := ret[0].(error) + return ret0 +} + +// Persist indicates an expected call of Persist. +func (mr *MockPersisterMockRecorder) Persist(ctx, items interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Persist", reflect.TypeOf((*MockPersister)(nil).Persist), ctx, items) +} diff --git a/common/mapq/types/policy.go b/common/mapq/types/policy.go new file mode 100644 index 00000000000..cd1fda72386 --- /dev/null +++ b/common/mapq/types/policy.go @@ -0,0 +1,118 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package types + +import ( + "encoding/json" + "fmt" +) + +type DispatchPolicy struct { + // DispatchRPS is the rate limit for items dequeued from the node to be pushed to processors. + // All nodes inherit the DispatchRPS from the parent node as is (not distributed to children). + // If parent has 100 rps limit, then all curent and to-be-created children will have 100 rps limit. + DispatchRPS int64 `json:"dispatchRPS,omitempty"` + + // Concurrency is the maximum number of items to be processed concurrently. + Concurrency int `json:"concurrency,omitempty"` + + // TODO: define retry policy +} + +func (dp DispatchPolicy) String() string { + return fmt.Sprintf("DispatchPolicy{DispatchRPS:%d, Concurrency:%d}", dp.DispatchRPS, dp.Concurrency) +} + +type SplitPolicy struct { + // Disabled is used to disable the split policy for the node. + Disabled bool `json:"disabled,omitempty"` + + // PredefinedSplits is a list of predefined splits for the attribute key + // Child nodes for these attributes will be created during initialization + PredefinedSplits []any `json:"predefinedSplits,omitempty"` +} + +func (sp SplitPolicy) String() string { + return fmt.Sprintf("SplitPolicy{Disabled:%v, PredefinedSplits:%v}", sp.Disabled, sp.PredefinedSplits) +} + +type NodePolicy struct { + // The path to the node + // Root node has empty path "". + // "/" is used as path separator. + // "*" means the policy applies to the special catch-all node + // "." means the policy applies to all nodes in the specified level except the catch-all node + Path string `json:"path,omitempty"` + + SplitPolicy *SplitPolicy `json:"splitPolicy,omitempty"` + + // DispatchPolicy is enforced at the leaf node level. + DispatchPolicy *DispatchPolicy `json:"dispatchPolicy,omitempty"` +} + +// Merge merges two NodePolicy objects by marshalling/unmarshalling them. +// Any field in the other policy will override the field in the current policy. +func (np NodePolicy) Merge(other NodePolicy) (NodePolicy, error) { + marshalled1, err := json.Marshal(np) + if err != nil { + return NodePolicy{}, err + } + + var m1 map[string]any + err = json.Unmarshal(marshalled1, &m1) + if err != nil { + return NodePolicy{}, err + } + + marshalled2, err := json.Marshal(other) + if err != nil { + return NodePolicy{}, err + } + + var m2 map[string]any + err = json.Unmarshal(marshalled2, &m2) + if err != nil { + return NodePolicy{}, err + } + + for k, v2 := range m2 { + m1[k] = v2 + } + + mergedMarshalled, err := json.Marshal(m1) + if err != nil { + return NodePolicy{}, err + } + + var merged NodePolicy + err = json.Unmarshal(mergedMarshalled, &merged) + if err != nil { + return NodePolicy{}, err + } + + return merged, nil +} + +func (np NodePolicy) String() string { + return fmt.Sprintf("NodePolicy{Path:%v, DispatchPolicy:%s, SplitPolicy:%s}", np.Path, np.DispatchPolicy, np.SplitPolicy) +} diff --git a/common/mapq/types/policy_collection.go b/common/mapq/types/policy_collection.go new file mode 100644 index 00000000000..8de8392fe46 --- /dev/null +++ b/common/mapq/types/policy_collection.go @@ -0,0 +1,129 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package types + +import ( + "fmt" + "sort" + "strings" +) + +type NodePolicyCollection struct { + // policies is sorted by level and generic to special + policies []NodePolicy +} + +func NewNodePolicyCollection(policies []NodePolicy) NodePolicyCollection { + trimPaths(policies) + sortPolicies(policies) + return NodePolicyCollection{ + policies: policies, + } +} + +func (npc NodePolicyCollection) GetMergedPolicyForNode(path string) (NodePolicy, error) { + result := NodePolicy{} + for _, policy := range npc.policies { + pathParts := strings.Split(path, "/") + plcPathParts := strings.Split(policy.Path, "/") + length := len(pathParts) + if len(plcPathParts) <= length { + length = len(plcPathParts) + } else { + // if policy path is longer than the node path then it can't be a match + continue + } + + match := true + for i := 0; i < length; i++ { + pathPart := pathParts[i] + plcPathPart := plcPathParts[i] + if pathPart == "." || plcPathPart == "." || pathPart == plcPathPart { + continue + } + match = false + break + } + + if !match { + continue + } + var err error + result, err = result.Merge(policy) + if err != nil { + return result, fmt.Errorf("failed to merge policies: %w", err) + } + } + + // set the path in the result + result.Path = path + return result, nil +} + +func (npc NodePolicyCollection) GetPolicies() []NodePolicy { + // return a copy of the slice so the order is not messed up + policies := make([]NodePolicy, len(npc.policies)) + copy(policies, npc.policies) + return policies +} + +func trimPaths(policies []NodePolicy) { + for i := range policies { + policies[i].Path = strings.TrimRight(policies[i].Path, "/") + } +} + +func sortPolicies(policies []NodePolicy) { + // sort policies by level and generic to special + sort.Slice(policies, func(i, j int) bool { + parts1 := strings.Split(policies[i].Path, "/") + parts2 := strings.Split(policies[j].Path, "/") + l1 := len(parts1) + l2 := len(parts2) + if l1 != l2 { + return l1 < l2 + } + + pathPartPrecedence := map[any]int{ + ".": 0, + "*": 1, + } + for partIdx := 0; partIdx < l1; partIdx++ { + p1, ok := pathPartPrecedence[parts1[partIdx]] + if !ok { + p1 = 2 + } + p2, ok := pathPartPrecedence[parts2[partIdx]] + if !ok { + p2 = 2 + } + + if p1 != p2 { + return p1 < p2 + } + } + + // If parts have same precedence then just return the order + return i < j + }) +} diff --git a/common/mapq/types/policy_collection_test.go b/common/mapq/types/policy_collection_test.go new file mode 100644 index 00000000000..893708fa4e5 --- /dev/null +++ b/common/mapq/types/policy_collection_test.go @@ -0,0 +1,234 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package types + +import ( + "math/rand" + "testing" + "time" + + "github.com/google/go-cmp/cmp" +) + +const ( + fuzzCustomSeed = 0 +) + +func TestGetPolicies(t *testing.T) { + policies := getTestPolicies() + wantOrderedPaths := make([]string, len(policies)) + for i, p := range policies { + wantOrderedPaths[i] = p.Path + } + + shufflePolicies(policies) + npc := NewNodePolicyCollection(policies) + + gotPolicies := npc.GetPolicies() + if len(gotPolicies) != len(policies) { + t.Fatalf("Policies count mismatch, got %v, want %v", len(gotPolicies), len(policies)) + } + + gotPaths := make([]string, len(policies)) + for i, p := range gotPolicies { + t.Logf("%d - %s", i, p) + gotPaths[i] = p.Path + } + + if diff := cmp.Diff(wantOrderedPaths, gotPaths); diff != "" { + t.Fatalf("Policies not sorted as expected (-want +got):\n%s", diff) + } +} + +func TestGetMergedPolicyforNode(t *testing.T) { + npc := NewNodePolicyCollection(getTestPolicies()) + + tests := []struct { + name string + path string + want NodePolicy + }{ + { + name: "root node", + path: "*", + want: NodePolicy{ + Path: "*", + SplitPolicy: &SplitPolicy{ + PredefinedSplits: []any{"timer", "transfer"}, + }, + DispatchPolicy: &DispatchPolicy{DispatchRPS: 100}, + }, + }, + { + name: "level 1 catch-all node", + path: "*/*", + want: NodePolicy{ + Path: "*/*", + SplitPolicy: &SplitPolicy{}, + DispatchPolicy: &DispatchPolicy{DispatchRPS: 50}, + }, + }, + { + name: "level 1 timer node", + path: "*/timer", + want: NodePolicy{ + Path: "*/timer", + SplitPolicy: &SplitPolicy{ + PredefinedSplits: []any{"deletehistory"}, + }, + DispatchPolicy: &DispatchPolicy{DispatchRPS: 50}, + }, + }, + { + name: "level 2 catch all node", + path: "*/./*", + want: NodePolicy{ + Path: "*/./*", + SplitPolicy: &SplitPolicy{ + PredefinedSplits: []any{"domain1"}, + }, + DispatchPolicy: &DispatchPolicy{DispatchRPS: 50}, + }, + }, + { + name: "level 2 deletehistory timer node", + path: "*/timer/deletehistory", + want: NodePolicy{ + Path: "*/timer/deletehistory", + SplitPolicy: &SplitPolicy{ + Disabled: true, + }, + DispatchPolicy: &DispatchPolicy{DispatchRPS: 5}, + }, + }, + { + name: "level 3 catch-all node", + path: "*/*/*/*", + want: NodePolicy{ + Path: "*/*/*/*", + SplitPolicy: &SplitPolicy{ + Disabled: true, + }, + DispatchPolicy: &DispatchPolicy{DispatchRPS: 1000}, + }, + }, + { + name: "level 3 domain1 node for activitytimeout", + path: "*/timer/activitytimeout/domain1", + want: NodePolicy{ + Path: "*/timer/activitytimeout/domain1", + SplitPolicy: &SplitPolicy{ + Disabled: true, + }, + DispatchPolicy: &DispatchPolicy{DispatchRPS: 42}, + }, + }, + { + name: "level 3 domain1 node for childwfcompleted", + path: "*/transfer/childwfcompleted/domain1", + want: NodePolicy{ + Path: "*/transfer/childwfcompleted/domain1", + SplitPolicy: &SplitPolicy{ + Disabled: true, + }, + DispatchPolicy: &DispatchPolicy{DispatchRPS: 42}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got, err := npc.GetMergedPolicyForNode(tc.path) + if err != nil { + t.Fatalf("failed to get merged policy for node %v: %v", tc.path, err) + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Fatalf("Policy mismatch (-want +got):\n%s", diff) + } + }) + } +} + +// getTestPolicies returns a set of test policies for testing +// It intentionally returns the policies in a sorted order to test the sorting logic +func getTestPolicies() []NodePolicy { + return []NodePolicy{ + { + Path: "*", // level 0 + SplitPolicy: &SplitPolicy{ + PredefinedSplits: []any{"timer", "transfer"}, + }, + DispatchPolicy: &DispatchPolicy{DispatchRPS: 100}, + }, + { + Path: "*/.", // level 1 default policy + SplitPolicy: &SplitPolicy{}, + DispatchPolicy: &DispatchPolicy{DispatchRPS: 50}, + }, + { + Path: "*/timer", // level 1 timer node + SplitPolicy: &SplitPolicy{ + PredefinedSplits: []any{"deletehistory"}, + }, + }, + { + Path: "*/./.", // level 2 default policy + SplitPolicy: &SplitPolicy{ + PredefinedSplits: []any{"domain1"}, + }, + }, + { + Path: "*/timer/deletehistory", // level 2 deletehistory timer node policy + SplitPolicy: &SplitPolicy{ + Disabled: true, + }, + DispatchPolicy: &DispatchPolicy{DispatchRPS: 5}, + }, + { + Path: "*/././*", // level 3 default catch-all node policy + SplitPolicy: &SplitPolicy{ + Disabled: true, + }, + DispatchPolicy: &DispatchPolicy{DispatchRPS: 1000}, + }, + { + Path: "*/././domain1", // level 3 domain node policy + SplitPolicy: &SplitPolicy{ + Disabled: true, + }, + DispatchPolicy: &DispatchPolicy{DispatchRPS: 42}, + }, + } +} + +func shufflePolicies(policies []NodePolicy) { + seed := time.Now().UnixNano() + if fuzzCustomSeed != 0 { + seed = fuzzCustomSeed // override seed to test a specific scenario + } + rng := rand.New(rand.NewSource(seed)) + rng.Shuffle(len(policies), func(i, j int) { + policies[i], policies[j] = policies[j], policies[i] + }) +} diff --git a/docs/images/mapq_dispatch_flow.png b/docs/images/mapq_dispatch_flow.png new file mode 100644 index 00000000000..387aca3a0c8 Binary files /dev/null and b/docs/images/mapq_dispatch_flow.png differ diff --git a/docs/images/mapq_enqueue_flow.png b/docs/images/mapq_enqueue_flow.png new file mode 100644 index 00000000000..7d1f3170dda Binary files /dev/null and b/docs/images/mapq_enqueue_flow.png differ diff --git a/docs/images/mapq_initialization.png b/docs/images/mapq_initialization.png new file mode 100644 index 00000000000..c0126d0f125 Binary files /dev/null and b/docs/images/mapq_initialization.png differ diff --git a/docs/images/mapq_partitioned_queue_tree_example.png b/docs/images/mapq_partitioned_queue_tree_example.png new file mode 100644 index 00000000000..3a4a796ca16 Binary files /dev/null and b/docs/images/mapq_partitioned_queue_tree_example.png differ diff --git a/service/history/execution/retry_test.go b/service/history/execution/retry_test.go index 2e914dcd3d5..9110ed87627 100644 --- a/service/history/execution/retry_test.go +++ b/service/history/execution/retry_test.go @@ -31,6 +31,43 @@ import ( "github.com/uber/cadence/common/persistence" ) +func Test_NextRetry2(t *testing.T) { + // a := assert.New(t) + now, _ := time.Parse(time.RFC3339, "2018-04-13T16:08:08+00:00") + reason := "good-reason" + identity := "some-worker-identity" + + // no retry without retry policy + ai := &persistence.ActivityInfo{ + ScheduleToStartTimeout: int32((30 * time.Minute).Seconds()), + ScheduleToCloseTimeout: int32((30 * time.Minute).Seconds()), + StartToCloseTimeout: int32((30 * time.Minute).Seconds()), + HasRetryPolicy: true, + NonRetriableErrors: []string{"bad-reason", "ugly-reason"}, + StartedIdentity: identity, + MaximumAttempts: 0, + InitialInterval: 60, + BackoffCoefficient: 1, + MaximumInterval: 6000, + ExpirationTime: now.Add(86400 * time.Second), + Attempt: 5, + } + + dur := getBackoffInterval( + now, + ai.ExpirationTime, + ai.Attempt, + ai.MaximumAttempts, + ai.InitialInterval, + ai.MaximumInterval, + ai.BackoffCoefficient, + reason, + ai.NonRetriableErrors, + ) + + t.Logf("dur: %v", dur) +} + func Test_NextRetry(t *testing.T) { a := assert.New(t) now, _ := time.Parse(time.RFC3339, "2018-04-13T16:08:08+00:00")