Skip to content

Commit

Permalink
fix user data manage tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahabT committed Nov 30, 2024
1 parent 0c172f1 commit 3bf7cbd
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 42 deletions.
42 changes: 1 addition & 41 deletions service/matching/physical_task_queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/matchingservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
Expand Down Expand Up @@ -321,15 +320,6 @@ func (c *physicalTaskQueueManagerImpl) WaitUntilInitialized(ctx context.Context)

func (c *physicalTaskQueueManagerImpl) SpoolTask(taskInfo *persistencespb.TaskInfo) error {
c.liveness.markAlive()
if c.queue.version.Deployment() != nil {
fmt.Printf(
"\n %s shahab> spooled task %s %s %s\n\n",
time.Now(),
c.queue.partition.RpcName(),
c.queue.partition.TaskType().String(),
c.queue.version.Deployment(),
)
}
return c.backlogMgr.SpoolTask(taskInfo)
}

Expand All @@ -341,15 +331,6 @@ func (c *physicalTaskQueueManagerImpl) PollTask(
ctx context.Context,
pollMetadata *pollMetadata,
) (*internalTask, error) {
if c.queue.version.Deployment() != nil {
fmt.Printf(
"\n %s shahab> polling task %s %s %s\n\n",
time.Now(),
c.queue.partition.RpcName(),
c.queue.partition.TaskType().String(),
c.queue.version.Deployment(),
)
}
c.liveness.markAlive()

c.currentPolls.Add(1)
Expand Down Expand Up @@ -401,17 +382,6 @@ func (c *physicalTaskQueueManagerImpl) PollTask(
(!task.isStarted() || !task.started.hasEmptyResponse()) { // Need to filter out the empty "started" ones
c.tasksDispatchedInIntervals.incrementTaskCount()
}

if c.queue.version.Deployment() != nil {
fmt.Printf(
"\n %s shahab> polled task %s %s %s\n\n",
time.Now(),
c.queue.partition.RpcName(),
c.queue.partition.TaskType().String(),
c.queue.version.Deployment(),
)
}

return task, nil
}
}
Expand Down Expand Up @@ -566,17 +536,7 @@ func (c *physicalTaskQueueManagerImpl) TrySyncMatch(ctx context.Context, task *i
childCtx, cancel := newChildContext(ctx, c.config.SyncMatchWaitDuration(), time.Second)
defer cancel()

a, b := c.matcher.Offer(childCtx, task)
if task.source == enums.TASK_SOURCE_HISTORY && a && c.queue.version.Deployment() != nil {
fmt.Printf(
"\n %s shahab> sync-matched task %s %s %s\n\n",
time.Now(),
c.queue.partition.RpcName(),
c.queue.partition.TaskType().String(),
c.queue.version.Deployment(),
)
}
return a, b
return c.matcher.Offer(childCtx, task)
}

func (c *physicalTaskQueueManagerImpl) ensureRegisteredInDeployment(
Expand Down
80 changes: 79 additions & 1 deletion service/matching/user_data_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,25 @@ func TestUserData_FetchesOnInit(t *testing.T) {
TaskQueue: defaultRootTqID,
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
LastKnownUserDataVersion: 0,
WaitNewData: false, // first fetch is not long poll
WaitNewData: false, // first is not long poll
}).
Return(&matchingservice.GetTaskQueueUserDataResponse{
UserData: data1,
}, nil)

tqCfg.matchingClientMock.EXPECT().GetTaskQueueUserData(
gomock.Any(),
&matchingservice.GetTaskQueueUserDataRequest{
NamespaceId: defaultNamespaceId,
TaskQueue: defaultRootTqID,
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
LastKnownUserDataVersion: 1,
WaitNewData: true, // second is long poll
}).
Return(&matchingservice.GetTaskQueueUserDataResponse{
UserData: data1,
}, nil).MaxTimes(3)

m := createUserDataManager(t, controller, tqCfg)
m.config.GetUserDataMinWaitTime = 10 * time.Second // only one fetch

Expand Down Expand Up @@ -283,6 +296,19 @@ func TestUserData_RetriesFetchOnUnavailable(t *testing.T) {
}, nil
})

tqCfg.matchingClientMock.EXPECT().GetTaskQueueUserData(
gomock.Any(),
&matchingservice.GetTaskQueueUserDataRequest{
NamespaceId: defaultNamespaceId,
TaskQueue: defaultRootTqID,
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
LastKnownUserDataVersion: 1,
WaitNewData: true, // after first successful poll, there would be long polls
}).
Return(&matchingservice.GetTaskQueueUserDataResponse{
UserData: data1,
}, nil).MaxTimes(3)

m := createUserDataManager(t, controller, tqCfg)
m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success
m.config.GetUserDataRetryPolicy = backoff.NewExponentialRetryPolicy(50 * time.Millisecond).
Expand Down Expand Up @@ -354,6 +380,19 @@ func TestUserData_RetriesFetchOnUnImplemented(t *testing.T) {
}, nil
})

tqCfg.matchingClientMock.EXPECT().GetTaskQueueUserData(
gomock.Any(),
&matchingservice.GetTaskQueueUserDataRequest{
NamespaceId: defaultNamespaceId,
TaskQueue: defaultRootTqID,
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
LastKnownUserDataVersion: 1,
WaitNewData: true, // after first successful poll, there would be long polls
}).
Return(&matchingservice.GetTaskQueueUserDataResponse{
UserData: data1,
}, nil).MaxTimes(3)

m := createUserDataManager(t, controller, tqCfg)
m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success
m.config.GetUserDataRetryPolicy = backoff.NewExponentialRetryPolicy(50 * time.Millisecond).
Expand Down Expand Up @@ -410,6 +449,19 @@ func TestUserData_FetchesUpTree(t *testing.T) {
UserData: data1,
}, nil)

tqCfg.matchingClientMock.EXPECT().GetTaskQueueUserData(
gomock.Any(),
&matchingservice.GetTaskQueueUserDataRequest{
NamespaceId: defaultNamespaceId,
TaskQueue: taskQueue.NormalPartition(10).RpcName(),
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
LastKnownUserDataVersion: 1,
WaitNewData: true, // after first successful poll, there would be long polls
}).
Return(&matchingservice.GetTaskQueueUserDataResponse{
UserData: data1,
}, nil).MaxTimes(3)

m := createUserDataManager(t, controller, tqCfg)
m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success
m.Start()
Expand Down Expand Up @@ -448,6 +500,19 @@ func TestUserData_FetchesActivityToWorkflow(t *testing.T) {
UserData: data1,
}, nil)

tqCfg.matchingClientMock.EXPECT().GetTaskQueueUserData(
gomock.Any(),
&matchingservice.GetTaskQueueUserDataRequest{
NamespaceId: defaultNamespaceId,
TaskQueue: defaultRootTqID,
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
LastKnownUserDataVersion: 1,
WaitNewData: true, // after first successful poll, there would be long polls
}).
Return(&matchingservice.GetTaskQueueUserDataResponse{
UserData: data1,
}, nil).MaxTimes(3)

m := createUserDataManager(t, controller, tqCfg)
m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success
m.Start()
Expand Down Expand Up @@ -490,6 +555,19 @@ func TestUserData_FetchesStickyToNormal(t *testing.T) {
UserData: data1,
}, nil)

tqCfg.matchingClientMock.EXPECT().GetTaskQueueUserData(
gomock.Any(),
&matchingservice.GetTaskQueueUserDataRequest{
NamespaceId: defaultNamespaceId,
TaskQueue: normalName,
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
LastKnownUserDataVersion: 1,
WaitNewData: true, // after first successful poll, there would be long polls
}).
Return(&matchingservice.GetTaskQueueUserDataResponse{
UserData: data1,
}, nil).MaxTimes(3)

m := createUserDataManager(t, controller, tqCfg)
m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success
m.Start()
Expand Down

0 comments on commit 3bf7cbd

Please sign in to comment.