Skip to content

Commit

Permalink
Improve distributed load
Browse files Browse the repository at this point in the history
  • Loading branch information
elega committed Sep 15, 2023
1 parent 1fe33f7 commit c8bc49d
Show file tree
Hide file tree
Showing 15 changed files with 264 additions and 86 deletions.
5 changes: 5 additions & 0 deletions cli/src/alluxio.org/cli/cmd/job/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type LoadCommand struct {
verify bool
partialListing bool
metadataOnly bool
skipIfExists bool
}

func (c *LoadCommand) Base() *env.BaseJavaCommand {
Expand All @@ -60,6 +61,7 @@ func (c *LoadCommand) ToCommand() *cobra.Command {
cmd.Flags().BoolVar(&c.verify, "verify", false, "[submit] Run verification when load finishes and load new files if any")
cmd.Flags().BoolVar(&c.partialListing, "partial-listing", false, "[submit] Use partial directory listing, initializing load before reading the entire directory but cannot report on certain progress details")
cmd.Flags().BoolVar(&c.metadataOnly, "metadata-only", false, "[submit] Only load file metadata")
cmd.Flags().BoolVar(&c.skipIfExists, "skip-if-exists", false, "[submit] Skip existing fullly cached files")
return cmd
}

Expand All @@ -79,5 +81,8 @@ func (c *LoadCommand) Run(_ []string) error {
if c.metadataOnly {
javaArgs = append(javaArgs, "--metadata-only")
}
if c.skipIfExists {
javaArgs = append(javaArgs, "--skip-if-exists")
}
return c.Base().Run(javaArgs)
}
3 changes: 3 additions & 0 deletions common/transport/src/main/proto/grpc/block_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ message LoadFileRequest {
repeated UfsStatus ufs_status = 1;
required UfsReadOptions options = 2;
optional bool load_metadata_only = 3; // If set false, only the metadata of file will be loaded.
optional bool skip_if_exists = 4;
}

message File{
Expand All @@ -255,6 +256,8 @@ message File{
message LoadFileResponse {
required TaskStatus status = 1;
repeated LoadFileFailure failures = 2;
optional int32 files_skipped = 3;
optional int64 bytes_skipped = 4;
}

message FreeWorkerRequest{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ message LoadJobPOptions {
optional bool verify = 2;
optional bool partialListing = 3;
optional bool loadMetadataOnly = 4;
optional bool skipIfExists = 5;
}

message CopyJobPOptions {
Expand Down
1 change: 1 addition & 0 deletions common/transport/src/main/proto/proto/journal/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ message LoadJobEntry {
required string job_id = 7;
optional int64 end_time = 8;
optional bool load_metadata_only = 9;
optional bool skip_if_exists = 10;
}

// next available id: 13
Expand Down
40 changes: 40 additions & 0 deletions dora/core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -2291,6 +2291,38 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_SCHEDULER_RESTORE_JOB_FROM_JOURNAL =
booleanBuilder(Name.MASTER_SCHEDULER_RESTORE_JOB_FROM_JOURNAL)
.setDefaultValue(true)
.setDescription("If set to true, master will restore incomplete jobs from journal. "
+ "Turn this off if you don't want the scheduler to automatically restore jobs.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD =
intBuilder(Name.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD)
.setDefaultValue(-1)
.setDescription("The load job total load failure count threshold. -1 means never fail.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_RATIO_THRESHOLD =
doubleBuilder(Name.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_RATIO_THRESHOLD)
.setDefaultValue(1.00)
.setDescription("The load job total load failure ratio threshold (0,1)."
+ " 1.00 means never fail.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey MASTER_DORA_LOAD_JOB_RETRIES =
intBuilder(Name.MASTER_DORA_LOAD_JOB_RETRIES)
.setDefaultValue(3)
.setDescription("The number of retry attempts before a load of file "
+ "is considered as failure")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();

public static final PropertyKey MASTER_SHELL_BACKUP_STATE_LOCK_GRACE_MODE =
enumBuilder(Name.MASTER_SHELL_BACKUP_STATE_LOCK_GRACE_MODE, GraceMode.class)
.setDefaultValue(GraceMode.FORCED)
Expand Down Expand Up @@ -7396,6 +7428,8 @@ public static final class Name {
"alluxio.master.block.scan.invalid.batch.max.size";
public static final String MASTER_SCHEDULER_INITIAL_WAIT_TIME =
"alluxio.master.scheduler.initial.wait.time";
public static final String MASTER_SCHEDULER_RESTORE_JOB_FROM_JOURNAL =
"alluxio.master.scheduler.restore.job.from.journal";
public static final String MASTER_SHELL_BACKUP_STATE_LOCK_GRACE_MODE =
"alluxio.master.shell.backup.state.lock.grace.mode";
public static final String MASTER_SHELL_BACKUP_STATE_LOCK_TRY_DURATION =
Expand All @@ -7404,6 +7438,12 @@ public static final class Name {
"alluxio.master.shell.backup.state.lock.sleep.duration";
public static final String MASTER_SHELL_BACKUP_STATE_LOCK_TIMEOUT =
"alluxio.master.shell.backup.state.lock.timeout";
public static final String MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD =
"alluxio.master.dora.load.job.total.failure.count.threshold";
public static final String MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_RATIO_THRESHOLD =
"alluxio.master.dora.load.job.total.failure.ratio.threshold";
public static final String MASTER_DORA_LOAD_JOB_RETRIES =
"alluxio.master.dora.load.job.retries";
public static final String MASTER_DAILY_BACKUP_ENABLED =
"alluxio.master.daily.backup.enabled";
public static final String MASTER_DAILY_BACKUP_FILES_RETAINED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import alluxio.grpc.ExistsPOptions;
import alluxio.grpc.GetStatusPOptions;
import alluxio.grpc.ListStatusPOptions;
import alluxio.grpc.LoadFileFailure;
import alluxio.grpc.LoadFileResponse;
import alluxio.grpc.RenamePOptions;
import alluxio.grpc.Route;
import alluxio.grpc.RouteFailure;
Expand Down Expand Up @@ -100,12 +100,13 @@ BlockWriter createFileWriter(String fileId, String ufsPath)
* Loads the metadata and data of files from UFS to Alluxio.
*
* @param loadData true if data should also be loaded, otherwise metadata only
* @param skipIfExists true if data loading should be skipped if it's already loaded
* @param ufsStatuses the files to load
* @param options
* @return a list of failed files
*/
ListenableFuture<List<LoadFileFailure>> load(
boolean loadData, List<UfsStatus> ufsStatuses, UfsReadOptions options)
ListenableFuture<LoadFileResponse> load(
boolean loadData, boolean skipIfExists, List<UfsStatus> ufsStatuses, UfsReadOptions options)
throws AccessControlException, IOException;

/**
Expand Down
Loading

0 comments on commit c8bc49d

Please sign in to comment.