Skip to content

Commit

Permalink
fix: refactor for exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
highpon committed Dec 16, 2024
1 parent fb9f1e4 commit 247895e
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 11 deletions.
3 changes: 1 addition & 2 deletions cmd/index/job/exportation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func main() {
runner.WithName(name),
runner.WithVersion(info.Version, maxVersion, minVersion),
runner.WithConfigLoader(func(path string) (any, *config.GlobalConfig, error) {
// cfg, err := config.NewConfig(path)
cfg, err := config.NewConfig("cmd/index/job/exportation/sample.yaml")
cfg, err := config.NewConfig(path)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load "+name+"'s configuration")
}
Expand Down
12 changes: 3 additions & 9 deletions pkg/index/job/exportation/service/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func New(opts ...Option) (Exporter, error) {
log.Errorf("failed to create dir %s", e.indexPath)
return nil, err
}
// Todo: Determine file name

path := file.Join(e.indexPath, fmt.Sprintf("%s.db", strconv.FormatInt(time.Now().Unix(), 10)))
db, err := pogreb.New(pogreb.WithPath(path),
pogreb.WithBackgroundCompactionInterval(e.backgroundCompactionInterval),
Expand Down Expand Up @@ -119,17 +119,12 @@ func (e *export) StartClient(ctx context.Context) (<-chan error, error) {
}

func (e *export) Start(ctx context.Context) error {
err := e.doExportIndex(ctx,
func(ctx context.Context, rc vald.ObjectClient, copts ...grpc.CallOption) (vald.Object_StreamListObjectClient, error) {
return rc.StreamListObject(ctx, &payload.Object_List_Request{}, copts...)
},
)
err := e.doExportIndex(ctx)
return err
}

func (e *export) doExportIndex(
ctx context.Context,
fn func(ctx context.Context, rc vald.ObjectClient, copts ...grpc.CallOption) (vald.Object_StreamListObjectClient, error),
) (errs error) {
ctx, span := trace.StartSpan(igrpc.WrapGRPCMethod(ctx, grpcMethodName), apiName+"/service/index.doExportIndex")
defer func() {
Expand All @@ -144,7 +139,7 @@ func (e *export) doExportIndex(
eg.SetLimit(e.streamListConcurrency)
ctx, cancel := context.WithCancelCause(egctx)
gatewayAddrs := e.gateway.GRPCClient().ConnectedAddrs()
if len(gatewayAddrs) < 0 {
if len(gatewayAddrs) == 0 {
log.Errorf("Active gateway is not found.: %v ", ctx.Err())
}

Expand All @@ -157,7 +152,6 @@ func (e *export) doExportIndex(
grpcCallOpts := []grpc.CallOption{
grpc.WaitForReady(true),
}
// stream, err := fn(ctx, vc.NewValdClient(conn), grpcCallOpts...)
stream, err := vcClient.StreamListObject(ctx, emptyReq, grpcCallOpts...)
if err != nil || stream == nil {
return err
Expand Down

0 comments on commit 247895e

Please sign in to comment.