Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#5585] fix(catalog-hadoop): Test and make fileset with cloud storage can work with Spark 3.2.0~3.5.3 #5630

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions bundles/aliyun-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@ plugins {
}

dependencies {
compileOnly(project(":catalogs:catalog-hadoop"))
compileOnly(libs.hadoop3.common)
compileOnly(project(":catalogs:catalog-hadoop")) {
exclude(group = "*")
}

compileOnly(libs.hadoop3.client.api)
compileOnly(libs.hadoop3.client.runtime)

implementation(libs.guava)
implementation(libs.hadoop3.oss)

// oss needs StringUtils from commons-lang3 or the following error will occur in 3.3.0
Expand All @@ -49,6 +55,7 @@ tasks.withType(ShadowJar::class.java) {
// Relocate dependencies to avoid conflicts
relocate("org.jdom", "org.apache.gravitino.shaded.org.jdom")
relocate("org.apache.commons.lang3", "org.apache.gravitino.shaded.org.apache.commons.lang3")
relocate("com.google", "org.apache.gravitino.shaded.com.google")
}

tasks.jar {
Expand Down
24 changes: 19 additions & 5 deletions bundles/aws-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@ plugins {
}

dependencies {
compileOnly(project(":api"))
compileOnly(project(":core"))
compileOnly(project(":catalogs:catalog-common"))
compileOnly(project(":catalogs:catalog-hadoop"))
compileOnly(libs.hadoop3.common)
compileOnly(project(":api")) {
exclude("*")
}

compileOnly(project(":core")) {
exclude("*")
}

compileOnly(project(":catalogs:catalog-hadoop")) {
exclude("*")
}

compileOnly(libs.hadoop3.client.api)
compileOnly(libs.hadoop3.client.runtime)

implementation(libs.commons.lang3)
implementation(libs.guava)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make all the changes alphabetically ordering.

implementation(libs.aws.iam)
implementation(libs.aws.policy)
implementation(libs.aws.sts)
Expand All @@ -44,6 +55,9 @@ tasks.withType(ShadowJar::class.java) {
isZip64 = true
configurations = listOf(project.configurations.runtimeClasspath.get())
archiveClassifier.set("")

relocate("com.google.common", "org.apache.gravitino.shaded.com.google.common")
relocate("org.apache.commons.lang3", "org.apache.gravitino.shaded.org.apache.commons.lang3")
}

tasks.jar {
Expand Down
15 changes: 9 additions & 6 deletions bundles/azure-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ plugins {
}

dependencies {
compileOnly(project(":api"))
compileOnly(project(":core"))
compileOnly(project(":catalogs:catalog-hadoop"))
compileOnly(project(":catalogs:catalog-hadoop")) {
exclude(group = "*")
}

compileOnly(libs.hadoop3.common)
compileOnly(libs.hadoop3.client.api)
compileOnly(libs.hadoop3.client.runtime)

implementation(libs.commons.lang3)
// runtime used
Expand All @@ -47,8 +48,10 @@ tasks.withType(ShadowJar::class.java) {

// Relocate dependencies to avoid conflicts
relocate("org.apache.httpcomponents", "org.apache.gravitino.azure.shaded.org.apache.httpcomponents")
relocate("org.apache.commons", "org.apache.gravitino.azure.shaded.org.apache.commons")
relocate("com.fasterxml", "org.apache.gravitino.azure.shaded.com.fasterxml")
// Why do we do this changes? If we shade the prefixes 'org.apache.commons', some jar like `commons-io`
// will be shaded mistakenly and will lead to some errors.
relocate("org.apache.commons.lang3", "org.apache.gravitino.azure.shaded.org.apache.commons.lang3")
relocate("org.apache.commons.logging", "org.apache.gravitino.azure.shaded.org.apache.commons.logging")
relocate("com.google.guava", "org.apache.gravitino.azure.shaded.com.google.guava")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.storage.ABSProperties;
Expand All @@ -42,8 +41,7 @@ public class AzureFileSystemProvider implements FileSystemProvider {
private static final String ABFS_IMPL_KEY = "fs.abfss.impl";

@Override
public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String, String> config)
throws IOException {
public FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
Configuration configuration = new Configuration();

Map<String, String> hadoopConfMap =
Expand Down
16 changes: 11 additions & 5 deletions bundles/gcp-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@ plugins {
}

dependencies {
compileOnly(project(":api"))
compileOnly(project(":core"))
compileOnly(project(":catalogs:catalog-common"))
compileOnly(project(":catalogs:catalog-hadoop"))
compileOnly(project(":api")) {
exclude("*")
}
compileOnly(project(":core")) {
exclude("*")
}

compileOnly(libs.hadoop3.common)
compileOnly(project(":catalogs:catalog-hadoop")) {
exclude(group = "*")
}
compileOnly(libs.hadoop3.client.api)
compileOnly(libs.hadoop3.client.runtime)

implementation(libs.commons.lang3)
// runtime used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GCSFileSystemProvider implements FileSystemProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(GCSFileSystemProvider.class);
private static final String GCS_SERVICE_ACCOUNT_JSON_FILE =
"fs.gs.auth.service.account.json.keyfile";

Expand All @@ -46,7 +43,6 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
Configuration configuration = new Configuration();
FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_GCS_HADOOP_KEY)
.forEach(configuration::set);
LOGGER.info("Creating GCS file system with config: {}", config);
return GoogleHadoopFileSystem.newInstance(path.toUri(), configuration);
}

Expand Down
32 changes: 13 additions & 19 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,11 @@ dependencies {
}

compileOnly(libs.guava)
compileOnly(libs.commons.lang3)
compileOnly(libs.commons.io)

implementation(libs.hadoop3.common) {
exclude("com.sun.jersey")
exclude("javax.servlet", "servlet-api")
exclude("org.eclipse.jetty", "*")
exclude("org.apache.hadoop", "hadoop-auth")
exclude("org.apache.curator", "curator-client")
exclude("org.apache.curator", "curator-framework")
exclude("org.apache.curator", "curator-recipes")
exclude("org.apache.avro", "avro")
exclude("com.sun.jersey", "jersey-servlet")
}
implementation(libs.hadoop3.client.api)
implementation(libs.hadoop3.client.runtime)

implementation(libs.hadoop3.hdfs) {
exclude("com.sun.jersey")
Expand All @@ -63,14 +56,6 @@ dependencies {
exclude("io.netty")
exclude("org.fusesource.leveldbjni")
}
implementation(libs.hadoop3.client) {
exclude("org.apache.hadoop", "hadoop-mapreduce-client-core")
exclude("org.apache.hadoop", "hadoop-mapreduce-client-jobclient")
exclude("org.apache.hadoop", "hadoop-yarn-api")
exclude("org.apache.hadoop", "hadoop-yarn-client")
exclude("com.squareup.okhttp", "okhttp")
}

implementation(libs.slf4j.api)

testImplementation(project(":clients:client-java"))
Expand All @@ -84,6 +69,10 @@ dependencies {

testImplementation(libs.minikdc)
testImplementation(libs.hadoop3.minicluster)
testImplementation(libs.guava)
testImplementation(libs.commons.lang3)
testImplementation(libs.commons.io)
testImplementation(libs.commons.collections3)

testImplementation(libs.bundles.log4j)
testImplementation(libs.mockito.core)
Expand Down Expand Up @@ -119,6 +108,11 @@ tasks {
exclude("kerb-*.jar")
exclude("kerby-*.jar")
}

// remove common-text:1.4.jar as it conflicts with common-text:1.10.jar introduced by
// hadoop-common 3.3.1;
exclude("commons-text-1.4.jar")

into("$rootDir/distribution/package/catalogs/hadoop/libs")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIf;

@EnabledIf(value = "isGCPConfigured", disabledReason = "GCP is not configured.")
@EnabledIf(value = "isGCSConfigured", disabledReason = "GCS is not configured.")
public class HadoopGCSCatalogIT extends HadoopCatalogIT {

public static final String BUCKET_NAME = System.getenv("GCS_BUCKET_NAME");
Expand Down Expand Up @@ -170,7 +170,7 @@ public void testCreateSchemaAndFilesetWithSpecialLocation() {
metalake.dropCatalog(localCatalogName, true);
}

private static boolean isGCPConfigured() {
private static boolean isGCSConfigured() {
return StringUtils.isNotBlank(System.getenv("GCS_SERVICE_ACCOUNT_JSON_PATH"))
&& StringUtils.isNotBlank(System.getenv("GCS_BUCKET_NAME"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,10 @@
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Schema;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.storage.S3Properties;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -43,72 +40,27 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.platform.commons.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.shaded.org.awaitility.Awaitility;

@Tag("gravitino-docker-test")
@EnabledIf(value = "s3IsConfigured", disabledReason = "S3 is not configured.")
public class HadoopS3CatalogIT extends HadoopCatalogIT {

private static final Logger LOG = LoggerFactory.getLogger(HadoopOSSCatalogIT.class);
private String bucketName = "s3-bucket-" + UUID.randomUUID().toString().replace("-", "");
private String accessKey;
private String secretKey;
private String s3Endpoint;

private GravitinoLocalStackContainer gravitinoLocalStackContainer;
private static final String S3_BUCKET_NAME = System.getenv("S3_BUCKET_NAME");
private static final String S3_ACCESS_KEY = System.getenv("S3_ACCESS_KEY_ID");
private static final String S3_SECRET_KEY = System.getenv("S3_SECRET_ACCESS_KEY");
private static final String S3_END_POINT = System.getenv("S3_ENDPOINT");

@VisibleForTesting
public void startIntegrationTest() throws Exception {}

@Override
protected void startNecessaryContainer() {

containerSuite.startLocalStackContainer();
gravitinoLocalStackContainer = containerSuite.getLocalStackContainer();

Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.until(
() -> {
try {
Container.ExecResult result =
gravitinoLocalStackContainer.executeInContainer(
"awslocal", "iam", "create-user", "--user-name", "anonymous");
return result.getExitCode() == 0;
} catch (Exception e) {
LOG.info("LocalStack is not ready yet for: ", e);
return false;
}
});

gravitinoLocalStackContainer.executeInContainer("awslocal", "s3", "mb", "s3://" + bucketName);

Container.ExecResult result =
gravitinoLocalStackContainer.executeInContainer(
"awslocal", "iam", "create-access-key", "--user-name", "anonymous");

gravitinoLocalStackContainer.executeInContainer(
"awslocal",
"s3api",
"put-bucket-acl",
"--bucket",
"my-test-bucket",
"--acl",
"public-read-write");

// Get access key and secret key from result
String[] lines = result.getStdout().split("\n");
accessKey = lines[3].split(":")[1].trim().substring(1, 21);
secretKey = lines[5].split(":")[1].trim().substring(1, 41);

LOG.info("Access key: " + accessKey);
LOG.info("Secret key: " + secretKey);

s3Endpoint =
String.format("http://%s:%d", gravitinoLocalStackContainer.getContainerIpAddress(), 4566);
}
protected void startNecessaryContainer() {}

@BeforeAll
public void setup() throws IOException {
Expand All @@ -129,12 +81,12 @@ public void setup() throws IOException {
schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
Configuration conf = new Configuration();

conf.set("fs.s3a.access.key", accessKey);
conf.set("fs.s3a.secret.key", secretKey);
conf.set("fs.s3a.endpoint", s3Endpoint);
conf.set("fs.s3a.access.key", S3_ACCESS_KEY);
conf.set("fs.s3a.secret.key", S3_SECRET_KEY);
conf.set("fs.s3a.endpoint", S3_END_POINT);
conf.set(
"fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
fileSystem = FileSystem.get(URI.create(String.format("s3a://%s", bucketName)), conf);
fileSystem = FileSystem.get(URI.create(String.format("s3a://%s", S3_BUCKET_NAME)), conf);

createMetalake();
createCatalog();
Expand All @@ -161,7 +113,8 @@ protected String defaultBaseLocation() {
Path bucket =
new Path(
String.format(
"s3a://%s/%s", bucketName, GravitinoITUtils.genRandomName("CatalogFilesetIT")));
"s3a://%s/%s",
S3_BUCKET_NAME, GravitinoITUtils.genRandomName("CatalogFilesetIT")));
if (!fileSystem.exists(bucket)) {
fileSystem.mkdirs(bucket);
}
Expand All @@ -177,9 +130,9 @@ protected String defaultBaseLocation() {

protected void createCatalog() {
Map<String, String> map = Maps.newHashMap();
map.put(S3Properties.GRAVITINO_S3_ENDPOINT, s3Endpoint);
map.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, accessKey);
map.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, secretKey);
map.put(S3Properties.GRAVITINO_S3_ENDPOINT, S3_END_POINT);
map.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY);
map.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY);
map.put(FILESYSTEM_PROVIDERS, "s3");

metalake.createCatalog(catalogName, Catalog.Type.FILESET, provider, "comment", map);
Expand All @@ -195,12 +148,12 @@ protected String generateLocation(String filesetName) {
public void testCreateSchemaAndFilesetWithSpecialLocation() {
String localCatalogName = GravitinoITUtils.genRandomName("local_catalog");

String s3Location = String.format("s3a://%s", bucketName);
String s3Location = String.format("s3a://%s", S3_BUCKET_NAME);
Map<String, String> catalogProps = Maps.newHashMap();
catalogProps.put("location", s3Location);
catalogProps.put(S3Properties.GRAVITINO_S3_ENDPOINT, s3Endpoint);
catalogProps.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, accessKey);
catalogProps.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, secretKey);
catalogProps.put(S3Properties.GRAVITINO_S3_ENDPOINT, S3_END_POINT);
catalogProps.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY);
catalogProps.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY);
catalogProps.put(FILESYSTEM_PROVIDERS, "s3");

Catalog localCatalog =
Expand Down Expand Up @@ -252,4 +205,11 @@ public void testCreateSchemaAndFilesetWithSpecialLocation() {
// Delete catalog
metalake.dropCatalog(localCatalogName, true);
}

protected static boolean s3IsConfigured() {
return StringUtils.isNotBlank(System.getenv("S3_ACCESS_KEY_ID"))
&& StringUtils.isNotBlank(System.getenv("S3_SECRET_ACCESS_KEY"))
&& StringUtils.isNotBlank(System.getenv("S3_ENDPOINT"))
&& StringUtils.isNotBlank(System.getenv("S3_BUCKET_NAME"));
}
}
Loading
Loading