Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New matching task flush buffer #2286

Merged
merged 5 commits into from
Dec 14, 2021
Merged

New matching task flush buffer #2286

merged 5 commits into from
Dec 14, 2021

Conversation

wxing1292
Copy link
Contributor

What changed?

  • Add task flush buffer & UT
  • Add task queue manager

Why?
Matching task queue logic cleanup

How did you test it?
New tests

Potential risks
N/A

Is hotfix candidate?
N/A

* Add task flush buffer & UT
* Add task queue manager
@wxing1292 wxing1292 marked this pull request as ready for review December 12, 2021 01:38
@wxing1292 wxing1292 requested a review from a team December 12, 2021 01:38
service/matching/db_task_manager.go Outdated Show resolved Hide resolved
}
}

func (d *dbTaskManager) writeAppendTask(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to appendTask

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you prefer appendTask over writeAppendTask?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is either write task or append task. writeAppend feels redundant.

return d.taskFlusher.appendTask(task)
}

func (d *dbTaskManager) readDispatchTask() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to dispatchTasks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function actually read then dispatch a task, not simply doing in mem dispatch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe readAndDispatchTasks?

service/matching/db_task_manager.go Outdated Show resolved Hide resolved
service/matching/db_task_manager.go Outdated Show resolved Hide resolved
)

type (
dbTaskManager struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be taskqueueManager or taskManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this is managing the task which are going to be write to DB and read back

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is one instance per task queue. I'm trying to reduce new concept that people need to understand. Reuse the existing name would help folks who already understand current codebase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is one instance per task queue

this is per task queue, but managing write / read to DB.

I'm trying to reduce new concept that people need to understand.

this has nothing to do with implementation. implementation itself should be as clean as possible.

return
}

err := d.dispatchTaskFn(newInternalTask(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if anything goes wrong, we will fall into a busy spin here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a for !d.isStopped() { at the beginning
d.finishTaskFn, will try to move this function from head of queue to end of queue (which i believe is wrong, but too many changes already)
plus, this (at least right now) only mimics the current behavior

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be addressed. We should avoid retry in busy loop. It can be a TODO with follow up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there will be following PRs
quite lots of places to optimize & simplify
but implementation (rewrite) should be done one step at a time
PRs already sent out / merged are focused on db task read / write
later there should be sync match related PRs & matching logic governing sync / async match policy

return atomic.LoadInt32(&d.status) == common.DaemonStatusStopped
}

func (d *dbTaskManager) writerEventLoop() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This writer loop logic belong to dbTaskWriter, same for reader loop belong to dbTaskReader. I think that would make code more organized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, db reader / writer manages the internal state (acked task ID, next task ID for allocation) & actual read / write logic

the control logic exists here so signal read after task is persisted can be done easily

}
}

func (d *dbTaskManager) writeAppendTask(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is either write task or append task. writeAppend feels redundant.

return
}

err := d.dispatchTaskFn(newInternalTask(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be addressed. We should avoid retry in busy loop. It can be a TODO with follow up PR.

)

type (
dbTaskManager struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is one instance per task queue. I'm trying to reduce new concept that people need to understand. Reuse the existing name would help folks who already understand current codebase.

return d.taskFlusher.appendTask(task)
}

func (d *dbTaskManager) readDispatchTask() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe readAndDispatchTasks?

@wxing1292 wxing1292 enabled auto-merge (squash) December 14, 2021 21:54
@wxing1292 wxing1292 merged commit 18d1d06 into temporalio:master Dec 14, 2021
@wxing1292 wxing1292 deleted the censored-words-2 branch December 14, 2021 22:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants