Skip to content

Commit

Permalink
Add RESTful API for distributed load
Browse files Browse the repository at this point in the history
Add RESTful API for distributed load.

### Usage:
**SUBMIT:**
description: submit a load job
example:
`http://localhost:28080/v1/load?path=/&opType=submit&partialListing=false&&verify=true&bandwidth=1000&loadMetadataOnly=false&verbose=true&skipIfExists=true`

**STOP:**
description: stop the load job
example:
http://localhost:28080/v1/load?path=/&opType=stop

**PROGRESS:**
description: get the progress of the load job
example:
`http://localhost:28080/v1/load?path=/&opType=progress&progressFormat=text&verbose=true`
			pr-link: Alluxio#18254
			change-id: cid-8a2e4f6747d7ba6cb8c25032cc13ce4ec719da8f
  • Loading branch information
JiamingMai authored Oct 17, 2023
1 parent 8e169df commit 7c3b462
Show file tree
Hide file tree
Showing 5 changed files with 636 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import alluxio.util.io.PathUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.io.Closer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -61,18 +60,22 @@ public abstract class BaseUnderFileSystem implements UnderFileSystem {
public static final Pair<AccessControlList, DefaultAccessControlList> EMPTY_ACL =
new Pair<>(null, null);

/** The UFS {@link AlluxioURI} used to create this {@link BaseUnderFileSystem}. */
/**
* The UFS {@link AlluxioURI} used to create this {@link BaseUnderFileSystem}.
*/
protected final AlluxioURI mUri;

/** UFS Configuration options. */
/**
* UFS Configuration options.
*/
protected final UnderFileSystemConfiguration mUfsConf;

private final ExecutorService mAsyncIOExecutor;

/**
* Constructs an {@link BaseUnderFileSystem}.
*
* @param uri the {@link AlluxioURI} used to create this ufs
* @param uri the {@link AlluxioURI} used to create this ufs
* @param ufsConf UFS configuration
*/
protected BaseUnderFileSystem(AlluxioURI uri, UnderFileSystemConfiguration ufsConf) {
Expand Down Expand Up @@ -191,15 +194,20 @@ public boolean isSeekable() {
@Nullable
@Override
public Iterator<UfsStatus> listStatusIterable(
// Calling this method on non s3 UFS might result in OOM because batch based fetching
// is not supported and this method essentially fetches all ufs status and converts it to
// an iterator.
String path, ListOptions options, String startAfter, int batchSize) throws IOException {
// Calling this method on non s3 UFS might result in OOM because batch based fetching
// is not supported and this method essentially fetches all ufs status and converts it to
// an iterator.
UfsStatus[] result = listStatus(path, options);
if (result == null) {
try {
UfsStatus ufsStatus = getStatus(path);
if (!ufsStatus.isDirectory()) {
return null;
}
} catch (FileNotFoundException e) {
LOG.error("file {} doest not exist", path, e);
return null;
}
return Iterators.forArray(result);
return new UfsFileStatusIterator(this, path);
}

@Override
Expand Down Expand Up @@ -247,7 +255,7 @@ null, new AlluxioURI(baseStatus.getName()), false,
permissions.getMode())};
}
Arrays.sort(items, Comparator.comparing(UfsStatus::getName));
for (UfsStatus item: items) {
for (UfsStatus item : items) {
// performListingAsync is used by metadata sync v2
// which expects the name of an item to be a full path
item.setName(PathUtils.concatPath(path, item.getName()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.underfs;

import alluxio.collections.Pair;
import alluxio.util.io.PathUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.NoSuchElementException;

/**
* This base under file system status iterator is for listing files iteratively.
*/
public class UfsFileStatusIterator implements Iterator<UfsStatus> {

private static final Logger LOG = LoggerFactory.getLogger(UfsFileStatusIterator.class);

private final UnderFileSystem mUfs;

private final String mPath;

/**
* Each element is a pair of (full path, UfsStatus).
*/
private LinkedList<Pair<String, UfsStatus>> mPathsToProcess = new LinkedList<>();

/**
* This base under file system status iterator is for listing files iteratively.
* @param ufs the under file system
* @param path the path for listing files
*/
public UfsFileStatusIterator(UnderFileSystem ufs, String path) {
mUfs = ufs;
mPath = path;
initQueue(path);
}

private void initQueue(String path) {
try {
UfsStatus[] statuses = mUfs.listStatus(path);
if (statuses != null) {
for (UfsStatus status : statuses) {
mPathsToProcess.addFirst(
new Pair<>(PathUtils.concatPath(path, status.getName()), status));
}
}
} catch (IOException e) {
LOG.error("Failed to list files when calling listStatus", e);
}
}

@Override
public boolean hasNext() {
return !mPathsToProcess.isEmpty();
}

@Override
public UfsStatus next() {
if (mPathsToProcess.isEmpty()) {
throw new NoSuchElementException();
}
final Pair<String, UfsStatus> pathToProcessPair = mPathsToProcess.remove();
final String pathToProcess = pathToProcessPair.getFirst();
UfsStatus pathStatus = pathToProcessPair.getSecond();
if (pathStatus.isFile()) {
return pathStatus;
}

// The path is a directory, add all of its subpaths
try {
UfsStatus[] children = mUfs.listStatus(pathToProcess);
if (children != null) {
for (UfsStatus child : children) {
mPathsToProcess.addFirst(
new Pair<>(PathUtils.concatPath(pathToProcess, child.getName()), child));
}
}
} catch (IOException e) {
LOG.error("Failed to list files when calling listStatus", e);
throw new RuntimeException(e);
}
return next();
}
}
Loading

0 comments on commit 7c3b462

Please sign in to comment.