Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(admin-cli): add clear_bulk_load rpc #976

Merged
merged 3 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions admin-cli/client/fake_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ func (m *fakeMeta) CancelBulkLoad(tableName string, forced bool) error {
panic("unimplemented")
}

func (m *fakeMeta) ClearBulkLoad(tableName string) error {
panic("unimplemented")
}

func (m *fakeMeta) StartManualCompaction(tableName string, targetLevel int, maxRunningCount int, bottommost bool) error {
panic("unimplemented")
}
Expand Down
13 changes: 13 additions & 0 deletions admin-cli/client/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type Meta interface {

CancelBulkLoad(tableName string, forced bool) error

ClearBulkLoad(tableName string) error

StartManualCompaction(tableName string, targetLevel int, maxRunningCount int, bottommost bool) error

QueryManualCompaction(tableName string) (*admin.QueryAppManualCompactResponse, error)
Expand Down Expand Up @@ -599,6 +601,17 @@ func (m *rpcBasedMeta) CancelBulkLoad(tableName string, forced bool) error {
return wrapHintIntoError(result.GetHintMsg(), err)
}

func (m *rpcBasedMeta) ClearBulkLoad(tableName string) error {
req := &admin.ClearBulkLoadStateRequest{
AppName: tableName,
}
var result *admin.ClearBulkLoadStateResponse
err := m.callMeta("ClearBulkLoad", req, func(resp interface{}) {
result = resp.(*admin.ClearBulkLoadStateResponse)
})
return wrapHintIntoError(result.GetHintMsg(), err)
}

func (m *rpcBasedMeta) StartManualCompaction(tableName string, targetLevel int, maxRunningCount int, bottommost bool) error {
var level int32 = int32(targetLevel)
var count int32 = int32(maxRunningCount)
Expand Down
17 changes: 17 additions & 0 deletions admin-cli/cmd/bulk_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,22 @@ func init() {
},
})

rootCmd.AddCommand(&grumble.Command{
Name: "clear",
Help: "clear bulk load for a specific table",
Usage: "clear <-a|--tableName TABLE_NAME>",
Run: func(c *grumble.Context) error {
if c.Flags.String("tableName") == "" {
return fmt.Errorf("tableName cannot be empty")
}
tableName := c.Flags.String("tableName")
return executor.ClearBulkLoad(pegasusClient, tableName)
},
Flags: func(f *grumble.Flags) {
/*define the flags*/
f.String("a", "tableName", "", "table name")
},
})

shell.AddCommand(rootCmd)
}
9 changes: 9 additions & 0 deletions admin-cli/executor/bulk_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ func CancelBulkLoad(client *Client, tableName string, forced bool) error {
return nil
}

func ClearBulkLoad(client *Client, tableName string) error {
err := client.Meta.ClearBulkLoad(tableName)
if err != nil {
return err
}
fmt.Printf("Table %s clear bulk load succeed\n", tableName)
return nil
}

func QueryBulkLoad(client *Client, tableName string, partitionIndex int, detailed bool) error {
resp, err := client.Meta.QueryBulkLoad(tableName)
if err != nil {
Expand Down