Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic replication batch size #4301

Merged
merged 15 commits into from
Aug 27, 2021
Merged

Conversation

yux0
Copy link
Contributor

@yux0 yux0 commented Jul 7, 2021

What changed?
Dynamic replication batch size

Why?
The replication batch size should be calculated based on the backlog in a particular shard. This could help with hot shards

How did you test it?
Unit tests
TODO: bench tests

Potential risks

Release notes

Documentation Changes

@yux0 yux0 requested review from emrahs, mkolodezny and a team July 7, 2021 18:35
@coveralls
Copy link

coveralls commented Jul 7, 2021

Pull Request Test Coverage Report for Build 2a76fb8d-1110-483a-acad-b0b1184ceb47

  • 35 of 37 (94.59%) changed or added relevant lines in 2 files are covered.
  • 12 unchanged lines in 4 files lost coverage.
  • Overall coverage increased (+0.04%) to 56.439%

Changes Missing Coverage Covered Lines Changed/Added Lines %
service/history/replication/task_ack_manager.go 34 36 94.44%
Files with Coverage Reduction New Missed Lines %
service/history/queue/transfer_queue_processor.go 2 57.18%
service/matching/taskListManager.go 2 74.09%
service/history/queue/timer_gate.go 3 95.83%
common/task/fifoTaskScheduler.go 5 84.54%
Totals Coverage Status
Change from base Build 1535c579-3d77-4509-a5ff-d2f42c49ba16: 0.04%
Covered Lines: 79138
Relevant Lines: 140219

💛 - Coveralls

@@ -52,6 +53,8 @@ var (
errUnknownQueueTask = errors.New("unknown task type")
errUnknownReplicationTask = errors.New("unknown replication task")
defaultHistoryPageSize = 1000
minReadTaskSize = 20
maxReplicationLatency = int64(40)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding time unit suffix (I guess Seconds) or even better use time.Duration instead. As of now it is not clear what 40 means.

Comment on lines 645 to 647
t.taskLock.Lock()
taskLatency := int64(time.Now().Sub(t.lastTaskCreationTime) / time.Second)
t.taskLock.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand how useful this lock is: it feels like having and not having the lock here are the same to me. Could you you explain a bit what it is achieving here?

Are the time assignments in go not atomic that if we try to read its value while assigning somewhere else we get a crash?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most likely it will not be used. I added it to prevent concurrent calls to the method. Updated it with atomic.Value

common/dynamicconfig/constants.go Outdated Show resolved Hide resolved
service/history/config/config.go Outdated Show resolved Hide resolved
service/history/replication/task_ack_manager.go Outdated Show resolved Hide resolved
service/history/replication/task_ack_manager.go Outdated Show resolved Hide resolved
common/dynamicconfig/constants.go Outdated Show resolved Hide resolved
common/dynamicconfig/constants.go Outdated Show resolved Hide resolved
Copy link
Contributor

@yycptt yycptt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Feel free to land after addressing the comments.

// ReplicatorUpperLatency indicates the max allowed replication latency between clusters
// KeyName: history.replicatorUpperLatencyInSeconds
// Value type: Duration
// Default value: 40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 40 * time.Second

rateLimiter: rateLimiter,
retryPolicy: retryPolicy,
lastTaskCreationTime: atomic.Value{},
maxAllowedLatencyFn: config.ReplicatorUpperLatencyInSeconds,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config.ReplicatorUpperLatencyInSeconds is no longer defined I think.

if t.lastTaskCreationTime.Load() == nil {
return defaultBatchSize
}
taskLatency := now.Sub(t.lastTaskCreationTime.Load().(time.Time)) / time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to / time.Second here I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Updated

@yux0 yux0 merged commit cde0f41 into cadence-workflow:master Aug 27, 2021
@yux0 yux0 deleted the dynamic-batch-size branch August 27, 2021 22:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants