Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support more flexible format for timestamp options in spark #231

Merged
merged 4 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions python/delta_sharing/tests/test_delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,9 @@ def test_load_as_pandas_success(
pytest.param(
"share8.default.cdf_table_cdf_enabled",
None,
"2000-01-01 00:00:00",
"2000-01-01T00:00:00Z",
"Please use a timestamp greater",
id="timestap too early ",
id="timestamp too early ",
),
],
)
Expand Down Expand Up @@ -488,7 +488,7 @@ def test_load_as_pandas_exception(
"share8.default.cdf_table_cdf_enabled",
None,
None,
"2000-01-01 00:00:00",
"2000-01-01T00:00:00Z",
None,
"Please use a timestamp greater",
pd.DataFrame({"not_used": []}),
Expand All @@ -499,7 +499,7 @@ def test_load_as_pandas_exception(
0,
None,
None,
"2100-01-01 00:00:00",
"2100-01-01T00:00:00Z",
"Please use a timestamp less",
pd.DataFrame({"not_used": []}),
id="cdf_table_cdf_enabled table changes with ending_timestamp",
Expand Down Expand Up @@ -617,7 +617,7 @@ def check_invalid_url(url: str):
pytest.param(
"share8.default.cdf_table_cdf_enabled",
None,
"2000-01-01 00:00:00",
"2000-01-01T00:00:00Z",
"Please use a timestamp greater",
[],
"not-used-schema-str",
Expand All @@ -626,7 +626,7 @@ def check_invalid_url(url: str):
pytest.param(
"share8.default.cdf_table_cdf_enabled",
1,
"2000-01-01 00:00:00",
"2000-01-01T00:00:00Z",
"Please either provide",
[],
"not-used-schema-str",
Expand All @@ -648,7 +648,7 @@ def test_load_as_spark(
spark = SparkSession.builder \
.appName("delta-sharing-test") \
.master("local[*]") \
.config("spark.jars.packages", "io.delta:delta-sharing-spark_2.12:0.6.0-SNAPSHOT") \
.config("spark.jars.packages", "io.delta:delta-sharing-spark_2.12:1.0.0-SNAPSHOT") \
linzhou-db marked this conversation as resolved.
Show resolved Hide resolved
.config("spark.delta.sharing.network.sslTrustAll", "true") \
.getOrCreate()

Expand Down Expand Up @@ -698,7 +698,7 @@ def test_load_as_spark(
"share8.default.cdf_table_cdf_enabled",
None,
None,
"2000-01-01 00:00:00",
"2000-01-01T00:00:00Z",
None,
"Please use a timestamp greater",
[],
Expand All @@ -710,7 +710,7 @@ def test_load_as_spark(
0,
None,
None,
"2100-01-01 00:00:00",
"2100-01-01T00:00:00Z",
"Please use a timestamp less than",
[],
"unused-schema-str",
Expand Down Expand Up @@ -746,7 +746,7 @@ def test_load_table_changes_as_spark(
spark = SparkSession.builder \
.appName("delta-sharing-test") \
.master("local[*]") \
.config("spark.jars.packages", "io.delta:delta-sharing-spark_2.12:0.6.0-SNAPSHOT") \
.config("spark.jars.packages", "io.delta:delta-sharing-spark_2.12:1.0.0-SNAPSHOT") \
.config("spark.delta.sharing.network.sslTrustAll", "true") \
.getOrCreate()

Expand Down
17 changes: 9 additions & 8 deletions python/delta_sharing/tests/test_rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def test_query_existed_table_version(rest_client: DataSharingRestClient):
def test_query_table_version_with_timestamp(rest_client: DataSharingRestClient):
response = rest_client.query_table_version(
Table(name="cdf_table_cdf_enabled", share="share8", schema="default"),
starting_timestamp="2020-01-01 00:00:00.0"
starting_timestamp="2020-01-01T00:00:00-08:00"
)
assert isinstance(response.delta_table_version, int)
assert response.delta_table_version == 0
Expand All @@ -241,7 +241,7 @@ def test_query_table_version_with_timestamp_exception(rest_client: DataSharingRe
try:
rest_client.query_table_version(
Table(name="table1", share="share1", schema="default"),
starting_timestamp="2020-01-1 00:00:00.0"
starting_timestamp="2020-01-1T00:00:00-08:00"
)
except Exception as e:
assert isinstance(e, HTTPError)
Expand Down Expand Up @@ -520,24 +520,25 @@ def test_list_files_in_table_timestamp(
# Use a random string, and look for an appropriate error.
# This will ensure that the timestamp is pass to server.
try:
rest_client.list_files_in_table(cdf_table, timestamp="random")
rest_client.list_files_in_table(cdf_table, timestamp="randomTimestamp")
assert False
except Exception as e:
assert isinstance(e, HTTPError)
assert "Invalid timestamp: Timestamp format must be" in (str(e))
assert "Invalid timestamp:" in (str(e))
assert "randomTimestamp" in (str(e))

# Use a really old start time, and look for an appropriate error.
# This will ensure that the timestamp is parsed correctly.
try:
rest_client.list_files_in_table(cdf_table, timestamp="2000-01-01 00:00:00")
rest_client.list_files_in_table(cdf_table, timestamp="2000-01-01T00:00:00Z")
assert False
except Exception as e:
assert isinstance(e, HTTPError)
assert "Please use a timestamp greater" in (str(e))

# Use an end time far away, and look for an appropriate error.
try:
rest_client.list_files_in_table(cdf_table, timestamp="9000-01-01 00:00:00")
rest_client.list_files_in_table(cdf_table, timestamp="9000-01-01T00:00:00Z")
assert False
except Exception as e:
assert isinstance(e, HTTPError)
Expand Down Expand Up @@ -698,7 +699,7 @@ def test_list_table_changes_with_timestamp(
try:
rest_client.list_table_changes(
cdf_table,
CdfOptions(starting_timestamp="2000-05-03 00:00:00")
CdfOptions(starting_timestamp="2000-05-03T00:00:00-08:00")
)
assert False
except Exception as e:
Expand All @@ -709,7 +710,7 @@ def test_list_table_changes_with_timestamp(
try:
rest_client.list_table_changes(
cdf_table,
CdfOptions(starting_version=0, ending_timestamp="2100-05-03 00:00:00")
CdfOptions(starting_version=0, ending_timestamp="2100-05-03T00:00:00Z")
)
assert False
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package io.delta.standalone.internal

import java.sql.Timestamp
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter

import io.delta.standalone.internal.actions.CommitMarker
import io.delta.standalone.internal.util.FileNames
Expand Down Expand Up @@ -59,9 +62,10 @@ object DeltaSharingHistoryManager {
// Convert timestamp string to Timestamp
private[internal] def getTimestamp(paramName: String, timeStampStr: String): Timestamp = {
try {
Timestamp.valueOf(timeStampStr)
new Timestamp(OffsetDateTime.parse(
timeStampStr, DateTimeFormatter.ISO_OFFSET_DATE_TIME).toInstant.toEpochMilli)
} catch {
case e: IllegalArgumentException =>
case e: java.time.format.DateTimeParseException =>
throw DeltaCDFErrors.invalidTimestamp(paramName, e.getMessage)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
assert(deltaTableVersion == "2")

// getTableVersion succeeds with parameters
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled?startingTimestamp=2000-01-01%2000:00:00")
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled?startingTimestamp=2000-01-01T00:00:00Z")
connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection]
connection.setRequestMethod("HEAD")
connection.setRequestProperty("Authorization", s"Bearer ${TestResource.testAuthorizationToken}")
Expand Down Expand Up @@ -358,7 +358,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
assert(deltaTableVersion == "2")

// getTableVersion succeeds with parameters
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/version?startingTimestamp=2000-01-01%2000:00:00")
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/version?startingTimestamp=2000-01-01T00:00:00Z")
connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection]
connection.setRequestMethod("GET")
connection.setRequestProperty("Authorization", s"Bearer ${TestResource.testAuthorizationToken}")
Expand Down Expand Up @@ -395,7 +395,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {

// timestamp after the latest version
assertHttpError(
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled?startingTimestamp=9999-01-01%2000:00:00"),
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled?startingTimestamp=9999-01-01T00:00:00Z"),
method = "HEAD",
data = None,
expectedErrorCode = 400,
Expand Down Expand Up @@ -426,11 +426,11 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {

// timestamp after the latest version
assertHttpError(
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/version?startingTimestamp=9999-01-01%2000:00:00"),
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/version?startingTimestamp=9999-01-01T00:00:00-08:00"),
method = "GET",
data = None,
expectedErrorCode = 400,
expectedErrorMessage = "The provided timestamp (9999-01-01 00:00:00.0) is after the latest version available"
expectedErrorMessage = "The provided timestamp ("
)
}

Expand Down Expand Up @@ -802,23 +802,23 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
assertHttpError(
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/query"),
method = "POST",
data = Some("""{"timestamp": "2000-01-01 00:00:00"}"""),
data = Some("""{"timestamp": "2000-01-01T00:00:00-08:00"}"""),
expectedErrorCode = 400,
expectedErrorMessage = "The provided timestamp (2000-01-01 00:00:00.0) is before the earliest version"
expectedErrorMessage = "The provided timestamp "
)

// timestamp after the latest version
assertHttpError(
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/query"),
method = "POST",
data = Some("""{"timestamp": "9999-01-01 00:00:00"}"""),
data = Some("""{"timestamp": "9999-01-01T00:00:00-08:00"}"""),
expectedErrorCode = 400,
expectedErrorMessage = "The provided timestamp (9999-01-01 00:00:00.0) is after the latest version available"
expectedErrorMessage = "The provided timestamp "
)

// can only query table data since version 1
// 1651614979 PST: 2022-05-03T14:56:19.000+0000, version 1 is 1 second later
val tsStr = new Timestamp(1651614979000L).toString
// 1651614979 PST: 2022-05-03T14:56:19.000-08:00, version 1 is 1 second later
val tsStr = new Timestamp(1651614979000L).toInstant.toString
assertHttpError(
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_with_partition/query"),
method = "POST",
Expand All @@ -830,7 +830,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {

integrationTest("cdf_table_cdf_enabled - timestamp on version 1 - /shares/{share}/schemas/{schema}/tables/{table}/query") {
// 1651272635000, PST: 2022-04-29 15:50:35.0 -> version 1
val tsStr = new Timestamp(1651272635000L).toString
val tsStr = new Timestamp(1651272635000L).toInstant.toString
val p =
s"""
|{
Expand Down Expand Up @@ -1200,9 +1200,9 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {

integrationTest("cdf_table_cdf_enabled_changes - timestamp works") {
// 1651272616000, PST: 2022-04-29 15:50:16.0 -> version 0
val startStr = URLEncoder.encode(new Timestamp(1651272616000L).toString)
val startStr = new Timestamp(1651272616000L).toInstant.toString
// 1651272660000, PST: 2022-04-29 15:51:00.0 -> version 3
val endStr = URLEncoder.encode(new Timestamp(1651272660000L).toString)
val endStr = new Timestamp(1651272660000L).toInstant.toString

val response = readNDJson(requestPath(s"/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingTimestamp=${startStr}&endingTimestamp=${endStr}"), Some("GET"), None, Some(0))
val lines = response.split("\n")
Expand Down Expand Up @@ -1557,19 +1557,19 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {

integrationTest("cdf_table_cdf_enabled_changes - exceptions") {
assertHttpError(
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingTimestamp=2000-01-01%2000:00:00"),
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingTimestamp=2000-01-01T00:00:00-08:00"),
method = "GET",
data = None,
expectedErrorCode = 400,
expectedErrorMessage = "The provided timestamp (2000-01-01 00:00:00.0) is before the earliest version available"
expectedErrorMessage = "The provided timestamp ("
)

assertHttpError(
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingTimestamp=9999-01-01%2000:00:00"),
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingTimestamp=9999-01-01T00:00:00-08:00"),
method = "GET",
data = None,
expectedErrorCode = 400,
expectedErrorMessage = "The provided timestamp (9999-01-01 00:00:00.0) is after the latest version available"
expectedErrorMessage = "The provided timestamp ("
)

assertHttpError(
Expand All @@ -1581,7 +1581,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
)

assertHttpError(
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingVersion=1&startingTimestamp=2022-02-02%2000:00:00"),
url = requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingVersion=1&startingTimestamp=2022-02-02T00:00:00Z"),
method = "GET",
data = None,
expectedErrorCode = 400,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ object DeltaSharingErrors {
new IllegalStateException(s"sourceVersion($version) is invalid.")
}

def timestampInvalid(str: String): Throwable = {
new IllegalArgumentException(s"The provided timestamp ($str) cannot be converted to a valid " +
s"timestamp.")
}

def cannotFindSourceVersionException(json: String): Throwable = {
new IllegalStateException(s"Cannot find 'sourceVersion' in $json")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import scala.util.Try

import org.apache.spark.internal.Logging
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.TimestampType

trait DeltaSharingOptionParser {
protected def options: CaseInsensitiveMap[String]
Expand Down Expand Up @@ -68,7 +71,7 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser {
}
}

val startingTimestamp = options.get(STARTING_TIMESTAMP_OPTION)
val startingTimestamp = options.get(STARTING_TIMESTAMP_OPTION).map(getFormattedTimestamp(_))

val cdfOptions: Map[String, String] = prepareCdfOptions()

Expand All @@ -79,14 +82,27 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser {
}
}

val timestampAsOf = options.get(TIME_TRAVEL_TIMESTAMP)
val timestampAsOf = options.get(TIME_TRAVEL_TIMESTAMP).map(getFormattedTimestamp(_))

def isTimeTravel: Boolean = versionAsOf.isDefined || timestampAsOf.isDefined

private def getFormattedTimestamp(str: String): String = {
linzhou-db marked this conversation as resolved.
Show resolved Hide resolved
val castResult = Cast(
Literal(str), TimestampType, Option(SQLConf.get.sessionLocalTimeZone)).eval()
if (castResult == null) {
throw DeltaSharingErrors.timestampInvalid(str)
}
DateTimeUtils.toJavaTimestamp(castResult.asInstanceOf[java.lang.Long]).toInstant.toString
}

private def prepareCdfOptions(): Map[String, String] = {
if (readChangeFeed) {
validCdfOptions.filter(option => options.contains(option._1)).map(option =>
option._1 -> options.get(option._1).get
if ((option._1 == CDF_START_TIMESTAMP) || (option._1 == CDF_END_TIMESTAMP)) {
option._1 -> getFormattedTimestamp(options.get(option._1).get)
} else {
option._1 -> options.get(option._1).get
}
)
} else {
Map.empty[String, String]
Expand Down
Loading