Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BT-711 Refresh SAS token for filesystem on expiry #6831

Merged
merged 18 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import cromwell.core.path.{NioPath, Path, PathBuilder}
import cromwell.filesystems.blob.BlobPathBuilder._

import java.net.{MalformedURLException, URI}
import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems}
import java.nio.file._
import scala.jdk.CollectionConverters._
import scala.language.postfixOps
import scala.util.{Failure, Try}
Expand Down Expand Up @@ -57,35 +57,43 @@ object BlobPathBuilder {

class BlobPathBuilder(blobTokenGenerator: BlobTokenGenerator, container: String, endpoint: String) extends PathBuilder {

val credential: AzureSasCredential = new AzureSasCredential(blobTokenGenerator.getAccessToken)
val fileSystemConfig: Map[String, Object] = Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential),
(AzureFileSystem.AZURE_STORAGE_FILE_STORES, container),
(AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE))

def retrieveFilesystem(uri: URI): Try[FileSystem] = {
Try(FileSystems.getFileSystem(uri)) recover {
// If no filesystem already exists, this will create a new connection, with the provided configs
case _: FileSystemNotFoundException => FileSystems.newFileSystem(uri, fileSystemConfig.asJava)
}
}

def build(string: String): Try[BlobPath] = {
validateBlobPath(string, container, endpoint) match {
case ValidBlobPath(path) => for {
fileSystem <- retrieveFilesystem(new URI("azb://?endpoint=" + endpoint))
nioPath <- Try(fileSystem.getPath(path))
blobPath = BlobPath(nioPath, endpoint, container)
} yield blobPath
case ValidBlobPath(path) => Try(BlobPath(path, endpoint, container, blobTokenGenerator))
case UnparsableBlobPath(errorMessage: Throwable) => Failure(errorMessage)
}
}

override def name: String = "Azure Blob Storage"
}

// Add args for container, storage account name
case class BlobPath private[blob](nioPath: NioPath, endpoint: String, container: String) extends Path {
override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath, endpoint, container)
object BlobPath {
def buildConfigMap(credential: AzureSasCredential, container: String): Map[String, Object] = {
Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential),
(AzureFileSystem.AZURE_STORAGE_FILE_STORES, container),
(AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE))
}

def findNioPath(path: String, endpoint: String, container: String, blobTokenGenerator: BlobTokenGenerator): NioPath = (for {
fileSystem <- retrieveFilesystem(new URI("azb://?endpoint=" + endpoint), container, blobTokenGenerator)
nioPath <- Try(fileSystem.getPath(path))
} yield nioPath).get // Ideally we would unwrap this to a NioPath on success and on a access failure try to recover
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running into issues here. If we can in theory know that the filesystem auth has expired in this method, we could essentially guarantee that we return a valid filesystem by trying to refresh. There are however some errors that I am not sure how we should process (like invalid path).

If we can figure out how to get this method and as a result the below nioPath method below to be the encapsulation of this refreshing behavior, we don't have to reimplement all of the filesystem methods that try to interact with the filesystem, which I think is ideal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think I figured out that using a method called checkAccess is how the AzureFileSystem provider approaches identifying if the path has the auth needed to proceed, so I used this to move the onus of the check to the findNioPath method. This meant that we can recover from the thrown exceptions when first building the path object, rather than when trying to access it, however this doesn't really address the use of exceptions for control flow which I am not sure is the best way to go.


def retrieveFilesystem(uri: URI, container: String, blobTokenGenerator: BlobTokenGenerator): Try[FileSystem] = {
Try(FileSystems.getFileSystem(uri)) recover {
// If no filesystem already exists, this will create a new connection, with the provided configs
case _: FileSystemNotFoundException => {
val fileSystemConfig = buildConfigMap(blobTokenGenerator.getAccessToken, container)
FileSystems.newFileSystem(uri, fileSystemConfig.asJava)
}
}
}
}
case class BlobPath private[blob](pathString: String, endpoint: String, container: String, blobTokenGenerator: BlobTokenGenerator) extends Path {
//var token = blobTokenGenerator.getAccessToken
//var expiry = token.getSignature.split("&").filter(_.startsWith("se")).headOption.map(_.replaceFirst("se=",""))
override def nioPath: NioPath = BlobPath.findNioPath(path = pathString, endpoint, container, blobTokenGenerator)

override protected def newPath(nioPath: NioPath): Path = BlobPath(pathString, endpoint, container, blobTokenGenerator)

override def pathAsString: String = List(endpoint, container, nioPath.toString()).mkString("/")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cromwell.filesystems.blob
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now is a good time to take another look at this file and see if there's anything we want to move out into other files in this package. There are a lot of top-level classes in here now, the whole group would probably be easier to understand with a little division.


import akka.actor.ActorSystem
import com.azure.core.credential.AzureSasCredential
import com.azure.core.management.AzureEnvironment
import com.azure.core.management.profile.AzureProfile
import com.azure.identity.DefaultAzureCredentialBuilder
Expand All @@ -14,8 +15,7 @@ import cromwell.core.path.PathBuilderFactory
import net.ceedubs.ficus.Ficus._

import java.time.OffsetDateTime
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters._

final case class BlobFileSystemConfig(config: Config)
Expand All @@ -37,7 +37,7 @@ final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Co
}

sealed trait BlobTokenGenerator {
def getAccessToken: String
def getAccessToken: AzureSasCredential
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR made me realize that we should rename this method. get makes it seem like a regular Java getter. Maybe something like generateAccessToken?

I also think we should change this method signature to return a Try[AzureSasCredential] rather than throwing from this method.

Apologies for the somewhat out-of-scope feedback, but I think the return type change will affect the refresh logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made an attempt for this, but am struggling with a situation where I have nested try declarations that I am unsure how to flatten. It seems a little like one way or another this exception needs to feed into the top level try

}

object BlobTokenGenerator {
Expand All @@ -57,13 +57,13 @@ object BlobTokenGenerator {
}

case class WSMBlobTokenGenerator(container: String, endpoint: String, workspaceId: String, workspaceManagerURL: String) extends BlobTokenGenerator {
def getAccessToken: String = {
def getAccessToken: AzureSasCredential = {
throw new NotImplementedError
}
}

case class NativeBlobTokenGenerator(container: String, endpoint: String) extends BlobTokenGenerator {
def getAccessToken: String = {
def getAccessToken: AzureSasCredential = {
val storageAccountName = BlobPathBuilder.parseStorageAccount(BlobPathBuilder.parseURI(endpoint)) match {
case Some(storageAccountName) => storageAccountName
case _ => throw new Exception("Storage account could not be parsed from endpoint")
Expand All @@ -73,7 +73,7 @@ case class NativeBlobTokenGenerator(container: String, endpoint: String) extends
val azureCredential = new DefaultAzureCredentialBuilder()
.authorityHost(profile.getEnvironment.getActiveDirectoryEndpoint)
.build
val azure = AzureResourceManager.authenticate(azureCredential, profile).withDefaultSubscription
val azure = AzureResourceManager.authenticate(azureCredential, profile).withDefaultSubscription()

val storageAccounts = azure.storageAccounts()
val storageAccount = storageAccounts
Expand Down Expand Up @@ -110,6 +110,6 @@ case class NativeBlobTokenGenerator(container: String, endpoint: String) extends
blobContainerSasPermission
)

blobContainerClient.generateSas(blobServiceSasSignatureValues)
new AzureSasCredential(blobContainerClient.generateSas(blobServiceSasSignatureValues))
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cromwell.filesystems.blob
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import java.nio.file.Files


object BlobPathBuilderSpec {
def buildEndpoint(storageAccount: String) = s"https://$storageAccount.blob.core.windows.net"
Expand Down Expand Up @@ -42,7 +42,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{
}
}

ignore should "build a blob path from a test string and read a file" in {
it should "build a blob path from a test string and read a file" in {
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val endpointHost = BlobPathBuilder.parseURI(endpoint).getHost
val store = "inputs"
Expand All @@ -55,20 +55,23 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{
blobPath.endpoint should equal(endpoint)
blobPath.pathAsString should equal(testString)
blobPath.pathWithoutScheme should equal(endpointHost + "/" + store + evalPath)

val is = Files.newInputStream(blobPath.nioPath)
val is = blobPath.newInputStream()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this new way of getting the input stream use blobPath.nioPath under the covers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, I think both ways work now, but I left it using this new way just since it was neater

val fileText = (is.readAllBytes.map(_.toChar)).mkString
fileText should include ("This is my test file!!!! Did it work?")
}

ignore should "build duplicate blob paths in the same filesystem" in {
it should "build duplicate blob paths in the same filesystem" in {
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val store = "inputs"
val evalPath = "/test/inputFile.txt"
val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(store, endpoint)
val testString = endpoint + "/" + store + evalPath
val blobPath1: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail()
blobPath1.nioPath.getFileSystem().close()
val blobPath2: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail()
blobPath1 should equal(blobPath2)
val is = blobPath1.newInputStream()
val fileText = (is.readAllBytes.map(_.toChar)).mkString
fileText should include ("This is my test file!!!! Did it work?")
}
}