Skip to content

Commit

Permalink
refactor index manager service
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango committed Jul 21, 2023
1 parent 6f5fb18 commit 2c0a458
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
6 changes: 6 additions & 0 deletions internal/timeutil/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ func TestParse(t *testing.T) {
want: 0,
wantErr: true,
},
{
name: "returns 0 and incorrect string error when t is minus value",
t: "-1",
want: 0,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
26 changes: 14 additions & 12 deletions pkg/manager/index/service/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,23 +121,29 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) {
ech <- err
case err = <-sech:
ech <- err
case <-it.C:
case <-it.C: // index duration ticker
// execute CreateIndex. This execution ignores low index agent,
// and does not immediately execute SaveIndex.

Check warning on line 126 in pkg/manager/index/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/manager/index/service/indexer.go#L124-L126

Added lines #L124 - L126 were not covered by tests
err = idx.execute(grpc.WithGRPCMethod(ctx, "core.v1.Agent/CreateIndex"), true, false)
if err != nil {
ech <- err
log.Error("an error occurred during indexing", err)
err = nil
}
it.Reset(idx.indexDuration)
case <-itl.C:
case <-itl.C: // index duration limit ticker
// execute CreateIndex. This execution always executes CreateIndex regardless of the state of the uncommitted index,
// but does not immediately execute SaveIndex.

Check warning on line 136 in pkg/manager/index/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/manager/index/service/indexer.go#L134-L136

Added lines #L134 - L136 were not covered by tests
err = idx.execute(grpc.WithGRPCMethod(ctx, "core.v1.Agent/CreateIndex"), false, false)
if err != nil {
ech <- err
log.Error("an error occurred during indexing", err)
err = nil
}
itl.Reset(idx.indexDurationLimit)
case <-stl.C:
case <-stl.C: // save index duration limit ticker
// execute CreateIndex. This execution always executes CreateIndex regardless of the state of the uncommitted index,
// and immediately execute SaveIndex using CreateAndSaveIndex operation.

Check warning on line 146 in pkg/manager/index/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/manager/index/service/indexer.go#L144-L146

Added lines #L144 - L146 were not covered by tests
err = idx.execute(grpc.WithGRPCMethod(ctx, "core.v1.Agent/CreateAndSaveIndex"), false, true)
if err != nil {
ech <- err
Expand All @@ -162,12 +168,12 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) {
select {
case <-ctx.Done():
return
case addr := <-idx.saveIndexTargetAddrCh:
idx.schMap.Delete(addr)
case addr := <-idx.saveIndexTargetAddrCh: // this channel value send from execute func after thier CreateIndex operation when argument immediateSaving=false.

Check warning on line 171 in pkg/manager/index/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/manager/index/service/indexer.go#L171

Added line #L171 was not covered by tests
_, err := idx.client.GetClient().
Do(grpc.WithGRPCMethod(ctx, "core.v1.Agent/SaveIndex"), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
return agent.NewAgentClient(conn).SaveIndex(ctx, &payload.Empty{}, copts...)
})
idx.schMap.Delete(addr) // unlock duplicate signal sending.

Check warning on line 176 in pkg/manager/index/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/manager/index/service/indexer.go#L176

Added line #L176 was not covered by tests
if err != nil {
log.Warnf("an error occurred while calling SaveIndex of %s: %s", addr, err)
select {
Expand Down Expand Up @@ -198,7 +204,7 @@ func (idx *index) execute(ctx context.Context, enableLowIndexSkip, immediateSavi
idx.indexing.Store(true)
defer idx.indexing.Store(false)
addrs := idx.client.GetAddrs(ctx)
err = idx.client.GetClient().OrderedRangeConcurrent(ctx, addrs,
return errors.Join(idx.client.GetClient().OrderedRangeConcurrent(ctx, addrs,

Check warning on line 207 in pkg/manager/index/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/manager/index/service/indexer.go#L207

Added line #L207 was not covered by tests
idx.concurrency,
func(ctx context.Context,
addr string, conn *grpc.ClientConn, copts ...grpc.CallOption,
Expand Down Expand Up @@ -227,7 +233,7 @@ func (idx *index) execute(ctx context.Context, enableLowIndexSkip, immediateSavi
log.Warnf("an error occurred while calling CreateIndex of %s: %s", addr, err)
return err
}
_, ok := idx.schMap.Load(addr)
_, ok := idx.schMap.Load(addr) // prevent duplicate save index signal.

Check warning on line 236 in pkg/manager/index/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/manager/index/service/indexer.go#L236

Added line #L236 was not covered by tests
if !ok {
select {
case <-ctx.Done():
Expand All @@ -249,11 +255,7 @@ func (idx *index) execute(ctx context.Context, enableLowIndexSkip, immediateSavi
}
idx.waitForNextSaving(ctx)
return nil
})
if err != nil {
return err
}
return idx.loadInfos(ctx)
}), idx.loadInfos(ctx))
}

func (idx *index) waitForNextSaving(ctx context.Context) {
Expand Down

0 comments on commit 2c0a458

Please sign in to comment.