Skip to content

Commit

Permalink
Merge pull request #567 from taiga-db/taiga-db/tm-branch-1.1
Browse files Browse the repository at this point in the history
cherry-pick http downgrade commits into 1.1
  • Loading branch information
taiga-db committed Aug 19, 2024
2 parents 78fed6b + cdd22e9 commit 3e3b864
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import java.util.concurrent.TimeUnit
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.util.Progressable
import org.apache.http.{HttpHost, HttpRequest}
import org.apache.http.{HttpClientConnection, HttpHost, HttpRequest, HttpResponse}
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.utils.URIBuilder
import org.apache.http.conn.routing.HttpRoute
import org.apache.http.impl.client.{BasicCredentialsProvider, HttpClientBuilder}
import org.apache.http.impl.client.{BasicCredentialsProvider, HttpClientBuilder, RequestWrapper}
import org.apache.http.impl.conn.{DefaultRoutePlanner, DefaultSchemePortResolver}
import org.apache.http.protocol.HttpContext
import org.apache.http.protocol.{HttpContext, HttpRequestExecutor}
import org.apache.spark.SparkEnv
import org.apache.spark.delta.sharing.{PreSignedUrlCache, PreSignedUrlFetcher}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -69,7 +70,31 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging {
val proxy = new HttpHost(proxyConfig.host, proxyConfig.port)
clientBuilder.setProxy(proxy)

if (proxyConfig.noProxyHosts.nonEmpty) {
val neverUseHttps = ConfUtils.getNeverUseHttps(getConf)
if (neverUseHttps) {
val httpRequestDowngradeExecutor = new HttpRequestExecutor {
override def execute(
request: HttpRequest,
connection: HttpClientConnection,
context: HttpContext): HttpResponse = {
try {
val modifiedUri: URI = {
new URIBuilder(request.getRequestLine.getUri).setScheme("http").build()
}
val wrappedRequest = new RequestWrapper(request)
wrappedRequest.setURI(modifiedUri)

return super.execute(wrappedRequest, connection, context)
} catch {
case e: Exception =>
logInfo("Failed to downgrade the request to http", e)
}
super.execute(request, connection, context)
}
}
clientBuilder.setRequestExecutor(httpRequestDowngradeExecutor)
}
if (proxyConfig.noProxyHosts.nonEmpty || neverUseHttps) {
val routePlanner = new DefaultRoutePlanner(DefaultSchemePortResolver.INSTANCE) {
override def determineRoute(target: HttpHost,
request: HttpRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ object ConfUtils {
val PROXY_PORT = "spark.delta.sharing.network.proxyPort"
val NO_PROXY_HOSTS = "spark.delta.sharing.network.noProxyHosts"

val NEVER_USE_HTTPS = "spark.delta.sharing.network.never.use.https"
val NEVER_USE_HTTPS_DEFAULT = "false"

def getProxyConfig(conf: Configuration): Option[ProxyConfig] = {
val proxyHost = conf.get(PROXY_HOST, null)
val proxyPortAsString = conf.get(PROXY_PORT, null)
Expand All @@ -95,6 +98,10 @@ object ConfUtils {
Some(ProxyConfig(proxyHost, proxyPort, noProxyHosts = noProxyList))
}

def getNeverUseHttps(conf: Configuration): Boolean = {
conf.getBoolean(NEVER_USE_HTTPS, NEVER_USE_HTTPS_DEFAULT.toBoolean)
}

def numRetries(conf: Configuration): Int = {
val numRetries = conf.getInt(NUM_RETRIES_CONF, NUM_RETRIES_DEFAULT)
validateNonNeg(numRetries, NUM_RETRIES_CONF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package io.delta.sharing.client
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import org.apache.hadoop.conf.Configuration
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.{HttpGet, HttpPost}
import org.apache.http.client.utils.URIBuilder
import org.apache.http.entity.StringEntity
import org.apache.http.util.EntityUtils
import org.apache.spark.SparkFunSuite
import org.sparkproject.jetty.server.Server
Expand Down Expand Up @@ -233,4 +235,77 @@ class DeltaSharingFileSystemSuite extends SparkFunSuite {
proxyServer.stop()
}
}

test("https traffic is downgraded to http when configured") {
val server = new Server(0)
val handler = new ServletHandler()
server.setHandler(handler)
handler.addServletWithMapping(new ServletHolder(new HttpServlet {
override def doGet(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
resp.setContentType("text/plain")
resp.setStatus(HttpServletResponse.SC_OK)

// scalastyle:off println
resp.getWriter.println("Hello, World!")
// scalastyle:on println
}
}), "/*")
server.start()
do {
Thread.sleep(100)
} while (!server.isStarted())

// Create a local HTTP proxy server.
val proxyServer = new ProxyServer(0)
proxyServer.initialize()
try {
// Create a ProxyConfig with the host and port of the local proxy server.
val conf = new Configuration
conf.set(ConfUtils.PROXY_HOST, proxyServer.getHost())
conf.set(ConfUtils.PROXY_PORT, proxyServer.getPort().toString)
conf.setBoolean("spark.delta.sharing.network.never.use.https", true)

// Configure the httpClient to use the ProxyConfig.
val fs = new DeltaSharingFileSystem() {
override def getConf = {
conf
}
}

// Get http client instance.
val httpClient = fs.createHttpClient()

val httpsUri = new URIBuilder(server.getURI)
.setScheme("https")
.setPath("test")
.build();

// Send an HTTPS GET request to the local server through the httpClient.
val getRequest = new HttpGet(httpsUri)
val getResponse = httpClient.execute(getRequest)

// Assert that the request is successful.
assert(getResponse.getStatusLine.getStatusCode == HttpServletResponse.SC_OK)
val content = EntityUtils.toString(getResponse.getEntity)
assert(content.trim == "Hello, World!")

// Assert that the request is passed through proxy.
assert(proxyServer.getCapturedRequests().size == 1)

// Assert that the request scheme is http
val capturedRequest = proxyServer.getCapturedRequests().head
assert(capturedRequest.getScheme.equals("http"))

// Assert that HTTPS POST request is successfully sent to proxy
val postRequest = new HttpPost(httpsUri)
postRequest.setHeader("Content-type", "application/json")
postRequest.setEntity(new StringEntity("asdfasdf"))
httpClient.execute(postRequest)

assert(proxyServer.getCapturedRequests().size == 2)
} finally {
server.stop()
proxyServer.stop()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -543,4 +543,25 @@ class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaShar
}
assert(e.getMessage.contains("LOCATION must be specified"))
}

integrationTest("table1 with storage proxy") {
val proxyServer = new TestStorageProxyServer
proxyServer.initialize()
withSQLConf("spark.delta.sharing.network.proxyHost" -> s"${proxyServer.getHost()}",
"spark.delta.sharing.network.proxyPort" -> s"${proxyServer.getPort()}",
"spark.delta.sharing.network.never.use.https" -> "true") {

val tablePath = testProfileFile.getCanonicalPath + "#share1.default.table1"
val expected = Seq(
Row(sqlTimestamp("2021-04-27 23:32:02.07"), sqlDate("2021-04-28")),
Row(sqlTimestamp("2021-04-27 23:32:22.421"), sqlDate("2021-04-28"))
)
checkAnswer(spark.read.format("deltaSharing").load(tablePath), expected)
withTable("delta_sharing_test") {
sql(s"CREATE TABLE delta_sharing_test USING deltaSharing LOCATION '$tablePath'")
checkAnswer(sql(s"SELECT * FROM delta_sharing_test"), expected)
}
proxyServer.stop()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sharing.spark

import javax.servlet.http.{HttpServletRequest, HttpServletResponse}

import scala.util.Try

import org.sparkproject.jetty.client.HttpClient
import org.sparkproject.jetty.http.HttpMethod
import org.sparkproject.jetty.server.{Request, Server}
import org.sparkproject.jetty.server.handler.AbstractHandler
import org.sparkproject.jetty.util.ssl.SslContextFactory


/**
* A simple proxy server that forwards storage access while upgrading the connection to https.
* This is used to test the behavior of the DeltaSharingFileSystem when
* "spark.delta.sharing.network.never.use.https" is set to true.
*/
class TestStorageProxyServer {
private val server = new Server(0)
val sslContextFactory = new SslContextFactory.Client()
private val httpClient = new HttpClient(sslContextFactory)
server.setHandler(new ProxyHandler)

def initialize(): Unit = {
new Thread(() => {
Try(httpClient.start())
Try(server.start())
}).start()

do {
Thread.sleep(100)
} while (!server.isStarted())
}

def stop(): Unit = {
Try(server.stop())
Try(httpClient.stop())
}

def getPort(): Int = {
server.getURI().getPort()
}

def getHost(): String = {
server.getURI().getHost
}

private class ProxyHandler extends AbstractHandler {
override def handle(target: String,
baseRequest: Request,
request: HttpServletRequest,
response: HttpServletResponse): Unit = {

Option(request.getHeader("Host")) match {
case Some(host) =>
// upgrade bucket access call from http -> https
val uri = "https://" + host + request.getRequestURI.replace("null", "") +
"?" + request.getQueryString

val res = httpClient.newRequest(uri)
.method(HttpMethod.GET)
.header("Range", request.getHeader("Range"))
.send()

response.setStatus(res.getStatus)
res.getHeaders.forEach { header =>
response.setHeader(header.getName, header.getValue)
}
val out = response.getOutputStream
out.write(res.getContent, 0, res.getContent.length)
out.flush()
out.close()

baseRequest.setHandled(true)

case None =>
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No forwarding URL provided")
}
}
}
}

0 comments on commit 3e3b864

Please sign in to comment.