From 247895efb71bf6c8fc7b03bd4550f7eb0551e839 Mon Sep 17 00:00:00 2001 From: HighPon Date: Mon, 16 Dec 2024 04:03:01 +0000 Subject: [PATCH] fix: refactor for exporter --- cmd/index/job/exportation/main.go | 3 +-- pkg/index/job/exportation/service/exporter.go | 12 +++--------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/cmd/index/job/exportation/main.go b/cmd/index/job/exportation/main.go index 25eaaadda7..8c16dd224b 100644 --- a/cmd/index/job/exportation/main.go +++ b/cmd/index/job/exportation/main.go @@ -38,8 +38,7 @@ func main() { runner.WithName(name), runner.WithVersion(info.Version, maxVersion, minVersion), runner.WithConfigLoader(func(path string) (any, *config.GlobalConfig, error) { - // cfg, err := config.NewConfig(path) - cfg, err := config.NewConfig("cmd/index/job/exportation/sample.yaml") + cfg, err := config.NewConfig(path) if err != nil { return nil, nil, errors.Wrap(err, "failed to load "+name+"'s configuration") } diff --git a/pkg/index/job/exportation/service/exporter.go b/pkg/index/job/exportation/service/exporter.go index dcd919d03e..32f5a662ff 100644 --- a/pkg/index/job/exportation/service/exporter.go +++ b/pkg/index/job/exportation/service/exporter.go @@ -78,7 +78,7 @@ func New(opts ...Option) (Exporter, error) { log.Errorf("failed to create dir %s", e.indexPath) return nil, err } - // Todo: Determine file name + path := file.Join(e.indexPath, fmt.Sprintf("%s.db", strconv.FormatInt(time.Now().Unix(), 10))) db, err := pogreb.New(pogreb.WithPath(path), pogreb.WithBackgroundCompactionInterval(e.backgroundCompactionInterval), @@ -119,17 +119,12 @@ func (e *export) StartClient(ctx context.Context) (<-chan error, error) { } func (e *export) Start(ctx context.Context) error { - err := e.doExportIndex(ctx, - func(ctx context.Context, rc vald.ObjectClient, copts ...grpc.CallOption) (vald.Object_StreamListObjectClient, error) { - return rc.StreamListObject(ctx, &payload.Object_List_Request{}, copts...) - }, - ) + err := e.doExportIndex(ctx) return err } func (e *export) doExportIndex( ctx context.Context, - fn func(ctx context.Context, rc vald.ObjectClient, copts ...grpc.CallOption) (vald.Object_StreamListObjectClient, error), ) (errs error) { ctx, span := trace.StartSpan(igrpc.WrapGRPCMethod(ctx, grpcMethodName), apiName+"/service/index.doExportIndex") defer func() { @@ -144,7 +139,7 @@ func (e *export) doExportIndex( eg.SetLimit(e.streamListConcurrency) ctx, cancel := context.WithCancelCause(egctx) gatewayAddrs := e.gateway.GRPCClient().ConnectedAddrs() - if len(gatewayAddrs) < 0 { + if len(gatewayAddrs) == 0 { log.Errorf("Active gateway is not found.: %v ", ctx.Err()) } @@ -157,7 +152,6 @@ func (e *export) doExportIndex( grpcCallOpts := []grpc.CallOption{ grpc.WaitForReady(true), } - // stream, err := fn(ctx, vc.NewValdClient(conn), grpcCallOpts...) stream, err := vcClient.StreamListObject(ctx, emptyReq, grpcCallOpts...) if err != nil || stream == nil { return err