Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Sep 27, 2024
1 parent e26d824 commit d06e549
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,12 @@ class DeltaSharingRestClient(
httpRequest
}

import org.apache.http.HttpEntity
private def getEntityDebugStr(entity: HttpEntity): String = {
s"isRepeatable:${entity.isRepeatable},isChunked:${entity.isChunked}," +
s"getContentLength:${entity.getContentLength},isStreaming:${entity.isStreaming}"
}

/**
* Send the http request and return the table version in the header if any, and the response
* content.
Expand All @@ -981,12 +987,18 @@ class DeltaSharingRestClient(
// Reset queryId before calling RetryUtils, and before prepareHeaders.
queryId = Some(UUID.randomUUID().toString().split('-').head)
RetryUtils.runWithExponentialBackoff(numRetries, maxRetryDuration) {
val startTime = System.currentTimeMillis()
val profile = profileProvider.getProfile
val response = client.execute(
getHttpHost(profile.endpoint),
prepareHeaders(httpRequest),
HttpClientContext.create()
)

def elapsedTime: Long = {
System.currentTimeMillis() - startTime
}

try {
val status = response.getStatusLine()
val entity = response.getEntity()
Expand All @@ -1003,10 +1015,18 @@ class DeltaSharingRestClient(
new InputStreamReader(new BoundedInputStream(input), UTF_8)
)
var line: Option[String] = None

// scalastyle:off println
Console.println(s"----[linzhou]----before while:" +
s"${(System.currentTimeMillis() - startTime)}ms, ${getEntityDebugStr(entity)}")
while ({
line = Option(reader.readLine()); line.isDefined
}) {
lineBuffer += line.get
val a = line.get
Console.println(s"----[linzhou]----in while: ($elapsedTime)ms, newLine:[$a]")
Console.println(s"----[linzhou]----in while: ($elapsedTime)ms, " +
s"debug:[${getEntityDebugStr(entity)}]")
lineBuffer += a
}
lineBuffer.toList
}
Expand All @@ -1016,10 +1036,20 @@ class DeltaSharingRestClient(
logError(error)
lineBuffer += error
lineBuffer.toList
case otherE: Exception =>
val error = s"Request to delta sharing server failed due tooo ${otherE}."
logError(error)
throw otherE
} finally {
Console.println(s"----[linzhou]----in finally: ($elapsedTime)ms, " +
s"status:${response.getStatusLine}")
input.close()
}
}
Console.println(s"----[linzhou]----after: ($elapsedTime)ms, " +
s"status:${response.getStatusLine}")
Console.println(s"----[linzhou]----after: ($elapsedTime)ms, " +
s"entity:${getEntityDebugStr(response.getEntity)}")

val statusCode = status.getStatusCode
if (!(statusCode == HttpStatus.SC_OK ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,11 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
// The response is always in parquet format, because the client allows parquet and the
// table is a basic table.
Seq(
RESPONSE_FORMAT_PARQUET,
s"${RESPONSE_FORMAT_DELTA},${RESPONSE_FORMAT_PARQUET}",
s"${RESPONSE_FORMAT_PARQUET},${RESPONSE_FORMAT_DELTA}"
RESPONSE_FORMAT_PARQUET
// s"${RESPONSE_FORMAT_DELTA},${RESPONSE_FORMAT_PARQUET}",
// s"${RESPONSE_FORMAT_PARQUET},${RESPONSE_FORMAT_DELTA}"
).foreach { responseFormat =>
Seq(true, false).foreach { paginationEnabled => {
Seq(false).foreach { paginationEnabled => {
val client = new DeltaSharingRestClient(
testProfileProvider,
sslTrustAll = true,
Expand All @@ -455,19 +455,19 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
)
verifyTableFiles(tableFiles)

if (tableFiles.refreshToken.isDefined) {
val refreshedTableFiles =
client.getFiles(
table,
predicates = Nil,
limit = None,
versionAsOf = None,
timestampAsOf = None,
jsonPredicateHints = None,
refreshToken = tableFiles.refreshToken
)
verifyTableFiles(refreshedTableFiles)
}
// if (tableFiles.refreshToken.isDefined) {
// val refreshedTableFiles =
// client.getFiles(
// table,
// predicates = Nil,
// limit = None,
// versionAsOf = None,
// timestampAsOf = None,
// jsonPredicateHints = None,
// refreshToken = tableFiles.refreshToken
// )
// verifyTableFiles(refreshedTableFiles)
// }
} finally {
client.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,57 @@ class DeltaSharingService(serverConfig: ServerConfig) {
}
logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table " +
s"and sign ${queryResult.actions.length - 2} urls for table $share/$schema/$table")
streamingOutput(Some(queryResult.version), queryResult.responseFormat, queryResult.actions)

import java.util.concurrent.Executors
import concurrent.{ExecutionContext, Await, Future}
import concurrent.duration._

// zhoulin
// scalastyle:off println
// single threaded execution context

println(s"----[zhoulin]-------Started for $share/$schema/$table-------")
startTime = System.currentTimeMillis()

if (true) {
val executor = Executors.newSingleThreadExecutor()
val a = streamingOutput(
Some(queryResult.version),
queryResult.responseFormat, queryResult.actions, Some(executor)
)
// println(s"----[zhoulin]----trying to sleep ($elapsedTime)ms.")
// Thread.sleep(3000)
// executor.shutdownNow()
a
} else {
implicit val context = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())

val executor = ServiceRequestContext.current().blockingTaskExecutor()
val f = Future {
streamingOutput(
Some(queryResult.version),
queryResult.responseFormat, queryResult.actions, Some(executor)
)
}

f.onComplete { a =>
println(s"----[zhoulin]----The future completes ($elapsedTime)ms:$a")
}
// scalastyle:off awaitresult
// scalastyle:off awaitready
val a = Await.ready(f, 10.seconds)
println(s"----[zhoulin]----($elapsedTime)ms, a:$a")
val b = Await.result(f, 10.seconds)
println(s"----[zhoulin]----($elapsedTime)ms, b:$b")
b
}
}
}

private def elapsedTime: Long = {
System.currentTimeMillis() - startTime
}

// scalastyle:off argcount
@Get("/shares/{share}/schemas/{schema}/tables/{table}/changes")
@ConsumesJson
Expand Down Expand Up @@ -592,10 +639,13 @@ class DeltaSharingService(serverConfig: ServerConfig) {
streamingOutput(Some(queryResult.version), queryResult.responseFormat, queryResult.actions)
}

private var startTime = System.currentTimeMillis()

private def streamingOutput(
version: Option[Long],
responseFormat: String,
actions: Seq[Object]): HttpResponse = {
actions: Seq[Object],
executorOpt: Option[java.util.concurrent.Executor] = None): HttpResponse = {
val headers = if (version.isDefined) {
createHeadersBuilderForTableVersion(version.get)
.set(HttpHeaderNames.CONTENT_TYPE, DELTA_TABLE_METADATA_CONTENT_TYPE)
Expand All @@ -613,11 +663,17 @@ class DeltaSharingService(serverConfig: ServerConfig) {
HttpHeaders.of(),
(o: Object) => processRequest {
val out = new ByteArrayOutputStream
// Thread.sleep(1000)
JsonUtils.mapper.writeValue(out, o)
out.write('\n')
Console.println(s"----[zhoulin]----server object,($elapsedTime)ms, [${out.toString}].")
// if (out.toString.contains("https://delta-exchange-test")) {
// Console.println(s"----[zhoulin]----trying to stop.")
// throw new IllegalArgumentException("lin zhou exception.")
// }
HttpData.wrap(out.toByteArray)
},
ServiceRequestContext.current().blockingTaskExecutor())
executorOpt.getOrElse(ServiceRequestContext.current().blockingTaskExecutor()))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,10 +567,10 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {

integrationTest("table1 - non partitioned - /shares/{share}/schemas/{schema}/tables/{table}/query") {
Seq(
RESPONSE_FORMAT_PARQUET,
RESPONSE_FORMAT_DELTA,
s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET",
s"$RESPONSE_FORMAT_PARQUET,$RESPONSE_FORMAT_DELTA"
RESPONSE_FORMAT_PARQUET
// RESPONSE_FORMAT_DELTA,
// s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET",
// s"$RESPONSE_FORMAT_PARQUET,$RESPONSE_FORMAT_DELTA"
).foreach { responseFormat =>
val respondedFormat = if (responseFormat == RESPONSE_FORMAT_DELTA) {
RESPONSE_FORMAT_DELTA
Expand Down

0 comments on commit d06e549

Please sign in to comment.