From 219fc19b29eea426c74f5e45389472503e266880 Mon Sep 17 00:00:00 2001 From: Danilo Burbano <37355249+danilojsl@users.noreply.github.com> Date: Sun, 11 Feb 2024 07:24:20 -0500 Subject: [PATCH] =?UTF-8?q?[SPARKNLP-940]=20Adding=20changes=20to=20correc?= =?UTF-8?q?tly=20copy=20cluster=20index=20storage=E2=80=A6=20(#14167)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [SPARKNLP-940] Adding changes to correctly copy cluster index storage when defined * [SPARKNLP-940] Moving local mode control to its right place * [SPARKNLP-940] Refactoring sentToCLuster method --- build.sbt | 6 ++- .../util/Load_Model_from_GCP_Storage.ipynb | 4 +- .../storage/RocksDBConnection.scala | 8 +-- .../johnsnowlabs/storage/StorageHelper.scala | 49 +++++++++++++------ 4 files changed, 47 insertions(+), 20 deletions(-) diff --git a/build.sbt b/build.sbt index d8c047c208353a..3f45d5ee14d6c8 100644 --- a/build.sbt +++ b/build.sbt @@ -144,13 +144,17 @@ lazy val utilDependencies = Seq( exclude ("com.fasterxml.jackson.core", "jackson-annotations") exclude ("com.fasterxml.jackson.core", "jackson-databind") exclude ("com.fasterxml.jackson.core", "jackson-core") + exclude ("com.fasterxml.jackson.dataformat", "jackson-dataformat-cbor") exclude ("commons-configuration", "commons-configuration"), liblevenshtein exclude ("com.google.guava", "guava") exclude ("org.apache.commons", "commons-lang3") exclude ("com.google.code.findbugs", "annotations") exclude ("org.slf4j", "slf4j-api"), - gcpStorage, + gcpStorage + exclude ("com.fasterxml.jackson.core", "jackson-core") + exclude ("com.fasterxml.jackson.dataformat", "jackson-dataformat-cbor") + , greex, azureIdentity, azureStorage) diff --git a/examples/util/Load_Model_from_GCP_Storage.ipynb b/examples/util/Load_Model_from_GCP_Storage.ipynb index 8afaad7a5c1faf..19d68bbe732270 100644 --- a/examples/util/Load_Model_from_GCP_Storage.ipynb +++ b/examples/util/Load_Model_from_GCP_Storage.ipynb @@ -80,7 +80,8 @@ "1. GCP connector: You need to identify your hadoop version and set the required dependency in `spark.jars.packages`\n", "2. ADC credentials: After following the instructions to setup ADC, you will have a JSON file that holds your authenticiation information. This file is setup in `spark.hadoop.google.cloud.auth.service.account.json.keyfile`\n", "3. Hadoop File System: You also need to setup the Hadoop implementation to work with GCP Storage as file system. This is define in `spark.hadoop.fs.gs.impl`\n", - "3. Finally, to mitigate conflicts between Spark's dependencies and user dependencies. You must define `spark.driver.userClassPathFirst` as true. You may also need to define `spark.executor.userClassPathFirst` as true.\n", + "4. To mitigate conflicts between Spark's dependencies and user dependencies. You must define `spark.driver.userClassPathFirst` as true. You may also need to define `spark.executor.userClassPathFirst` as true.\n", + "5. Additonaly, to avoid conflict errors whe need to exclude the following dependency: `com.fasterxml.jackson.core:jackson-core`\n", "\n" ] }, @@ -128,6 +129,7 @@ " \"spark.jars.packages\": \"com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.8\",\n", " \"spark.hadoop.fs.gs.impl\": \"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem\",\n", " \"spark.driver.userClassPathFirst\": \"true\",\n", + " \"spark.jars.excludes\": \"com.fasterxml.jackson.core:jackson-core\",\n", " \"spark.hadoop.google.cloud.auth.service.account.json.keyfile\": json_keyfile,\n", " \"spark.jsl.settings.gcp.project_id\": PROJECT_ID,\n", " \"spark.jsl.settings.pretrained.cache_folder\": CACHE_FOLDER\n", diff --git a/src/main/scala/com/johnsnowlabs/storage/RocksDBConnection.scala b/src/main/scala/com/johnsnowlabs/storage/RocksDBConnection.scala index 79a1e612f7d711..412a2b377134fc 100644 --- a/src/main/scala/com/johnsnowlabs/storage/RocksDBConnection.scala +++ b/src/main/scala/com/johnsnowlabs/storage/RocksDBConnection.scala @@ -42,9 +42,9 @@ final class RocksDBConnection private (path: String) extends AutoCloseable { } def findLocalIndex: String = { - val localPath = RocksDBConnection.getLocalPath(path) - if (new File(localPath).exists()) { - localPath + val tmpIndexStorageLocalPath = RocksDBConnection.getTmpIndexStorageLocalPath(path) + if (new File(tmpIndexStorageLocalPath).exists()) { + tmpIndexStorageLocalPath } else if (new File(path).exists()) { path } else { @@ -135,7 +135,7 @@ object RocksDBConnection { def getOrCreate(database: Database.Name, refName: String): RocksDBConnection = getOrCreate(database.toString, refName) - def getLocalPath(fileName: String): String = { + def getTmpIndexStorageLocalPath(fileName: String): String = { Path .mergePaths(new Path(SparkFiles.getRootDirectory()), new Path("/storage/" + fileName)) .toString diff --git a/src/main/scala/com/johnsnowlabs/storage/StorageHelper.scala b/src/main/scala/com/johnsnowlabs/storage/StorageHelper.scala index 99484e6ae8bc3b..3d40733637c18d 100644 --- a/src/main/scala/com/johnsnowlabs/storage/StorageHelper.scala +++ b/src/main/scala/com/johnsnowlabs/storage/StorageHelper.scala @@ -84,13 +84,38 @@ object StorageHelper { sparkContext: SparkContext): Unit = { destinationScheme match { case "file" => { - val destination = new Path(RocksDBConnection.getLocalPath(clusterFileName)) - copyIndexToLocal(source, destination, sparkContext) + val sourceFileSystemScheme = source.getFileSystem(sparkContext.hadoopConfiguration) + val tmpIndexStorageLocalPath = + RocksDBConnection.getTmpIndexStorageLocalPath(clusterFileName) + sourceFileSystemScheme.getScheme match { + case "file" => { + if (!doesDirectoryExistJava(tmpIndexStorageLocalPath) || + !doesDirectoryExistHadoop(tmpIndexStorageLocalPath, sparkContext)) { + copyIndexToLocal(source, new Path(tmpIndexStorageLocalPath), sparkContext) + } + } + case "s3a" => + copyIndexToLocal(source, new Path(tmpIndexStorageLocalPath), sparkContext) + case _ => copyIndexToCluster(source, clusterFilePath, sparkContext) + } + } + case _ => { + copyIndexToCluster(source, clusterFilePath, sparkContext) } - case _ => copyIndexToCluster(source, clusterFilePath, sparkContext) } } + private def doesDirectoryExistJava(path: String): Boolean = { + val directory = new File(path) + directory.exists && directory.isDirectory + } + + private def doesDirectoryExistHadoop(path: String, sparkContext: SparkContext): Boolean = { + val localPath = new Path(path) + val fileSystem = localPath.getFileSystem(sparkContext.hadoopConfiguration) + fileSystem.exists(localPath) + } + private def copyIndexToCluster( sourcePath: Path, dst: Path, @@ -129,21 +154,17 @@ object StorageHelper { val fileSystemDestination = destination.getFileSystem(sparkContext.hadoopConfiguration) val fileSystemSource = source.getFileSystem(sparkContext.hadoopConfiguration) - if (fileSystemDestination.exists(destination)) { - return - } - - if (fileSystemSource.getScheme == "s3a" && fileSystemDestination.getScheme == "file") { + if (fileSystemSource.getScheme == "file") { + fileSystemDestination.copyFromLocalFile(false, true, source, destination) + } else { CloudResources.downloadBucketToLocalTmp( source.toString, destination.toString, isIndex = true) - sparkContext.addFile(destination.toString, recursive = true) - return - } - - if (fileSystemDestination.getScheme != "s3a") { - fileSystemDestination.copyFromLocalFile(false, true, source, destination) + val isLocalMode = sparkContext.master.startsWith("local") + if (isLocalMode) { + sparkContext.addFile(destination.toString, recursive = true) + } } }