Skip to content

Commit

Permalink
ttl: ttl use timer framework to trigger jobs (#45469)
Browse files Browse the repository at this point in the history
close #45468
  • Loading branch information
lcwangchao authored Jul 26, 2023
1 parent 366c6ee commit c0459da
Show file tree
Hide file tree
Showing 16 changed files with 1,005 additions and 185 deletions.
2 changes: 1 addition & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -3093,7 +3093,7 @@ func (do *Domain) StartTTLJobManager() {
logutil.BgLogger().Info("ttlJobManager exited.")
}()

ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store, do.etcdClient)
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store, do.etcdClient, do.ddl.OwnerManager().IsOwner)
do.ttlJobManager.Store(ttlJobManager)
ttlJobManager.Start()

Expand Down
7 changes: 7 additions & 0 deletions timer/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ func (rt *TimerGroupRuntime) Start() {
go rt.loop()
}

// Running returns whether the runtime is running
func (rt *TimerGroupRuntime) Running() bool {
rt.mu.Lock()
defer rt.mu.Unlock()
return rt.ctx != nil && rt.cancel != nil
}

func (rt *TimerGroupRuntime) initCtx() {
rt.ctx, rt.cancel = context.WithCancel(context.Background())
}
Expand Down
2 changes: 2 additions & 0 deletions timer/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ func TestRuntimeStartStop(t *testing.T) {
Build()

runtime.Start()
require.True(t, runtime.Running())
waitDone(timerProcessed, time.Minute)
go func() {
runtime.Stop()
require.False(t, runtime.Running())
cancel()
}()
waitDone(ctx.Done(), time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion timer/runtime/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

const (
workerRecvChanCap = 8
workerRecvChanCap = 128
workerRespChanCap = 128
workerEventDefaultRetryInterval = 10 * time.Second
chanBlockInterval = time.Second
Expand Down
4 changes: 2 additions & 2 deletions ttl/client/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ type TriggerNewTTLJobTableResult struct {
DBName string `json:"db_name"`
TableName string `json:"table_name"`
PartitionName string `json:"partition_name,omitempty"`
JobID string `json:"job_id"`
ErrorMessage string `json:"error_message"`
JobID string `json:"job_id,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
}

// TriggerNewTTLJobResponse is the response detail for trigger_ttl_job command
Expand Down
34 changes: 31 additions & 3 deletions ttl/client/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,41 @@ func NewMockNotificationClient() NotificationClient {
}

// Notify implements the NotificationClient
func (c *mockClient) Notify(_ context.Context, typ string, data string) error {
func (c *mockClient) Notify(ctx context.Context, typ string, data string) error {
c.Lock()
defer c.Unlock()

for _, ch := range c.notificationWatchers[typ] {
ch <- clientv3.WatchResponse{}
watchers, ok := c.notificationWatchers[typ]
if !ok {
return nil
}

var unsent []chan clientv3.WatchResponse
loop:
for i, ch := range watchers {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- clientv3.WatchResponse{}:
default:
unsent = make([]chan clientv3.WatchResponse, len(watchers), 0)
copy(unsent, watchers[i:])
break loop
}
}

if len(unsent) > 0 {
go func() {
for _, ch := range unsent {
select {
case <-ctx.Done():
return
case ch <- clientv3.WatchResponse{}:
}
}
}()
}

return nil
}

Expand Down
8 changes: 6 additions & 2 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//store/driver/error",
"//timer/api",
"//timer/runtime",
"//timer/tablestore",
"//ttl/cache",
"//ttl/client",
"//ttl/metrics",
Expand All @@ -37,12 +38,13 @@ go_library(
"//util/logutil",
"//util/sqlexec",
"//util/timeutil",
"@com_github_google_uuid//:uuid",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_golang_x_time//rate",
"@org_uber_go_multierr//:multierr",
Expand All @@ -67,7 +69,7 @@ go_test(
embed = [":ttlworker"],
flaky = True,
race = "on",
shard_count = 41,
shard_count = 46,
deps = [
"//domain",
"//infoschema",
Expand Down Expand Up @@ -99,6 +101,8 @@ go_test(
"@com_github_stretchr_testify//mock",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@org_golang_x_time//rate",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down
Loading

0 comments on commit c0459da

Please sign in to comment.