diff --git a/build.sbt b/build.sbt index d0deb582ed..0f6c99dcc4 100644 --- a/build.sbt +++ b/build.sbt @@ -178,16 +178,13 @@ lazy val storage = (project in file("storage")) ) lazy val storageDynamodb = (project in file("storage-dynamodb")) - // TODO: uncomment after refactoring from scala -> java - // .dependsOn(storage % "compile->compile;test->test;provided->provided") - // .dependsOn(core % "test->test") - - // TODO: delete after refactoring from scala -> java. Keep delta-core dependency for now. - .dependsOn(core % "compile->compile;test->test;provided->provided") + .dependsOn(storage % "compile->compile;test->test;provided->provided") + .dependsOn(core % "test->test") .settings ( name := "delta-storage-dynamodb", commonSettings, - releaseSettings, // TODO: proper artifact name + releaseSettings, // TODO: proper artifact name with no scala version + // Test / publishArtifact := true, // uncomment only when testing FailingDynamoDBLogStore libraryDependencies ++= Seq( "com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided" ) diff --git a/storage-dynamodb/integration_tests/dynamodb_logstore.py b/storage-dynamodb/integration_tests/dynamodb_logstore.py index 95f75753f0..7350fec98d 100644 --- a/storage-dynamodb/integration_tests/dynamodb_logstore.py +++ b/storage-dynamodb/integration_tests/dynamodb_logstore.py @@ -13,20 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # + import os import sys import threading -from pyspark import SparkContext -from pyspark.sql import Column, DataFrame, SparkSession, SQLContext, functions -from pyspark.sql.functions import * -from py4j.java_collections import MapConverter -from delta.tables import * +from pyspark.sql import SparkSession from multiprocessing.pool import ThreadPool import time """ -create required dynamodb table with: +Create required dynamodb table with: $ aws --region us-west-2 dynamodb create-table \ --table-name delta_log_test \ @@ -36,7 +33,7 @@ AttributeName=fileName,KeyType=RANGE \ --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 -run this script in root dir of repository: +Run this script in root dir of repository: export VERSION=$(cat version.sbt|cut -d '"' -f 2) export DELTA_CONCURRENT_WRITERS=2 @@ -44,7 +41,7 @@ export DELTA_TABLE_PATH=s3a://test-bucket/delta-test/ export DELTA_DYNAMO_TABLE=delta_log_test export DELTA_DYNAMO_REGION=us-west-2 -export DELTA_STORAGE=io.delta.storage.DynamoDBLogStoreScala # TODO: remove `Scala` when Java version finished +export DELTA_STORAGE=io.delta.storage.DynamoDBLogStore export DELTA_NUM_ROWS=16 ./run-integration-tests.py --run-storage-dynamodb-integration-tests \ @@ -59,8 +56,8 @@ concurrent_readers = int(os.environ.get("DELTA_CONCURRENT_READERS", 2)) num_rows = int(os.environ.get("DELTA_NUM_ROWS", 16)) -# TODO change back to default io.delta.storage.DynamoDBLogStore -delta_storage = os.environ.get("DELTA_STORAGE", "io.delta.storage.DynamoDBLogStoreScala") +# className to instantiate. io.delta.storage.DynamoDBLogStore or .FailingDynamoDBLogStore +delta_storage = os.environ.get("DELTA_STORAGE", "io.delta.storage.DynamoDBLogStore") dynamo_table_name = os.environ.get("DELTA_DYNAMO_TABLE", "delta_log_test") dynamo_region = os.environ.get("DELTA_DYNAMO_REGION", "us-west-2") dynamo_error_rates = os.environ.get("DELTA_DYNAMO_ERROR_RATES", "") @@ -78,23 +75,24 @@ number of rows: {num_rows} delta storage: {delta_storage} dynamo table name: {dynamo_table_name} +{"dynamo_error_rates: {}".format(dynamo_error_rates) if dynamo_error_rates else ""} ===================== """ print(test_log) -# TODO: update to spark.delta.DynamoDBLogStore.tableName (no `Scala`) when Java version finished - spark = SparkSession \ .builder \ .appName("utilities") \ .master("local[*]") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.delta.logStore.class", delta_storage) \ - .config("spark.delta.DynamoDBLogStoreScala.tableName", dynamo_table_name) \ - .config("spark.delta.DynamoDBLogStoreScala.region", dynamo_region) \ - .config("spark.delta.DynamoDBLogStoreScala.errorRates", dynamo_error_rates) \ + .config("spark.delta.DynamoDBLogStore.ddb.tableName", dynamo_table_name) \ + .config("spark.delta.DynamoDBLogStore.ddb.region", dynamo_region) \ + .config("spark.delta.DynamoDBLogStore.errorRates", dynamo_error_rates) \ .getOrCreate() +# spark.sparkContext.setLogLevel("INFO") + data = spark.createDataFrame([], "id: int, a: int") data.write.format("delta").mode("overwrite").partitionBy("id").save(delta_table_path) @@ -133,3 +131,14 @@ def start_read_thread(): t = time.time() - start_t print(f"{num_rows / t:.02f} tx / sec") + +import boto3 +from botocore.config import Config +my_config = Config( + region_name=dynamo_region, +) +dynamodb = boto3.resource('dynamodb', config=my_config) +table = dynamodb.Table(dynamo_table_name) # this ensures we actually used/created the input table +response = table.scan() +items = response['Items'] +print(items[0]) # print for manual validation diff --git a/storage-dynamodb/src/main/java/io/delta/storage/DynamoDBLogStore.java b/storage-dynamodb/src/main/java/io/delta/storage/DynamoDBLogStore.java index e08b1bbe5d..a59b32f773 100644 --- a/storage-dynamodb/src/main/java/io/delta/storage/DynamoDBLogStore.java +++ b/storage-dynamodb/src/main/java/io/delta/storage/DynamoDBLogStore.java @@ -71,10 +71,10 @@ public class DynamoDBLogStore extends BaseExternalLogStore { private static final Logger LOG = LoggerFactory.getLogger(DynamoDBLogStore.class); /** - * Configuration keys for the DynamoDB client + * Configuration keys for the DynamoDB client, with prefix `spark.delta.DynamoDBLogStore.` */ - private static final String DBB_CLIENT_TABLE = "tableName"; - private static final String DBB_CLIENT_REGION = "region"; + private static final String DBB_CLIENT_TABLE = "ddb.tableName"; + private static final String DBB_CLIENT_REGION = "ddb.region"; private static final String DBB_CLIENT_CREDENTIALS_PROVIDER = "credentials.provider"; /** diff --git a/storage-dynamodb/src/test/java/io/delta/storage/FailingDynamoDBLogStore.java b/storage-dynamodb/src/test/java/io/delta/storage/FailingDynamoDBLogStore.java index 89659536c3..6a64e5f25d 100644 --- a/storage-dynamodb/src/test/java/io/delta/storage/FailingDynamoDBLogStore.java +++ b/storage-dynamodb/src/test/java/io/delta/storage/FailingDynamoDBLogStore.java @@ -33,18 +33,24 @@ public class FailingDynamoDBLogStore extends DynamoDBLogStore { private static java.util.Random rng = new java.util.Random(); - private ConcurrentHashMap errorRates; + private final ConcurrentHashMap errorRates; public FailingDynamoDBLogStore(Configuration hadoopConf) throws IOException { super(hadoopConf); errorRates = new ConcurrentHashMap<>(); + + // for each optional key in set { write_copy_temp_file, write_put_db_entry, + // fix_delta_log_copy_temp_file, fix_delta_log_put_db_entry }, `errorRates` string is + // expected to be of form key1=value1,key2=value2 etc where each value is a fraction + // indicating how often that method should fail (e.g. 0.10 ==> 10% failure rate). String errorRatesDef = getParam(hadoopConf, "errorRates", ""); for (String s: errorRatesDef.split(",")) { - if(!s.contains("=")) continue; + if (!s.contains("=")) continue; String[] parts = s.split("=", 2); - if(parts.length == 2) + if (parts.length == 2) { errorRates.put(parts[0], Float.parseFloat(parts[1])); - }; + } + } } @Override @@ -72,7 +78,7 @@ protected void fixDeltaLogPutCompleteDbEntry(ExternalCommitEntry entry) throws I } private void injectError(String name) throws IOException { - float rate = errorRates.getOrDefault(name, 0.0f); + float rate = errorRates.getOrDefault(name, 0.1f); if (rng.nextFloat() < rate) { throw new IOException(String.format("injected failure: %s", name)); }