Skip to content

Commit

Permalink
Improvements to DynamoDBLogStore.java feature #2 (#1047)
Browse files Browse the repository at this point in the history
* minor cleanup

* add print statement to dynamodb_logstore.py

* add comment to build.sbt on how to run FailingDynamoDBLogStore

* rename api conf keys

* Update dynamodb_logstore.py
  • Loading branch information
scottsand-db committed Apr 1, 2022
1 parent a990988 commit 9b7e8fd
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 30 deletions.
11 changes: 4 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,13 @@ lazy val storage = (project in file("storage"))
)

lazy val storageDynamodb = (project in file("storage-dynamodb"))
// TODO: uncomment after refactoring from scala -> java
// .dependsOn(storage % "compile->compile;test->test;provided->provided")
// .dependsOn(core % "test->test")

// TODO: delete after refactoring from scala -> java. Keep delta-core dependency for now.
.dependsOn(core % "compile->compile;test->test;provided->provided")
.dependsOn(storage % "compile->compile;test->test;provided->provided")
.dependsOn(core % "test->test")
.settings (
name := "delta-storage-dynamodb",
commonSettings,
releaseSettings, // TODO: proper artifact name
releaseSettings, // TODO: proper artifact name with no scala version
// Test / publishArtifact := true, // uncomment only when testing FailingDynamoDBLogStore
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided"
)
Expand Down
39 changes: 24 additions & 15 deletions storage-dynamodb/integration_tests/dynamodb_logstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import sys
import threading

from pyspark import SparkContext
from pyspark.sql import Column, DataFrame, SparkSession, SQLContext, functions
from pyspark.sql.functions import *
from py4j.java_collections import MapConverter
from delta.tables import *
from pyspark.sql import SparkSession
from multiprocessing.pool import ThreadPool
import time

"""
create required dynamodb table with:
Create required dynamodb table with:
$ aws --region us-west-2 dynamodb create-table \
--table-name delta_log_test \
Expand All @@ -36,15 +33,15 @@
AttributeName=fileName,KeyType=RANGE \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
run this script in root dir of repository:
Run this script in root dir of repository:
export VERSION=$(cat version.sbt|cut -d '"' -f 2)
export DELTA_CONCURRENT_WRITERS=2
export DELTA_CONCURRENT_READERS=2
export DELTA_TABLE_PATH=s3a://test-bucket/delta-test/
export DELTA_DYNAMO_TABLE=delta_log_test
export DELTA_DYNAMO_REGION=us-west-2
export DELTA_STORAGE=io.delta.storage.DynamoDBLogStoreScala # TODO: remove `Scala` when Java version finished
export DELTA_STORAGE=io.delta.storage.DynamoDBLogStore
export DELTA_NUM_ROWS=16
./run-integration-tests.py --run-storage-dynamodb-integration-tests \
Expand All @@ -59,8 +56,8 @@
concurrent_readers = int(os.environ.get("DELTA_CONCURRENT_READERS", 2))
num_rows = int(os.environ.get("DELTA_NUM_ROWS", 16))

# TODO change back to default io.delta.storage.DynamoDBLogStore
delta_storage = os.environ.get("DELTA_STORAGE", "io.delta.storage.DynamoDBLogStoreScala")
# className to instantiate. io.delta.storage.DynamoDBLogStore or .FailingDynamoDBLogStore
delta_storage = os.environ.get("DELTA_STORAGE", "io.delta.storage.DynamoDBLogStore")
dynamo_table_name = os.environ.get("DELTA_DYNAMO_TABLE", "delta_log_test")
dynamo_region = os.environ.get("DELTA_DYNAMO_REGION", "us-west-2")
dynamo_error_rates = os.environ.get("DELTA_DYNAMO_ERROR_RATES", "")
Expand All @@ -78,23 +75,24 @@
number of rows: {num_rows}
delta storage: {delta_storage}
dynamo table name: {dynamo_table_name}
{"dynamo_error_rates: {}".format(dynamo_error_rates) if dynamo_error_rates else ""}
=====================
"""
print(test_log)

# TODO: update to spark.delta.DynamoDBLogStore.tableName (no `Scala`) when Java version finished

spark = SparkSession \
.builder \
.appName("utilities") \
.master("local[*]") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.delta.logStore.class", delta_storage) \
.config("spark.delta.DynamoDBLogStoreScala.tableName", dynamo_table_name) \
.config("spark.delta.DynamoDBLogStoreScala.region", dynamo_region) \
.config("spark.delta.DynamoDBLogStoreScala.errorRates", dynamo_error_rates) \
.config("spark.delta.DynamoDBLogStore.ddb.tableName", dynamo_table_name) \
.config("spark.delta.DynamoDBLogStore.ddb.region", dynamo_region) \
.config("spark.delta.DynamoDBLogStore.errorRates", dynamo_error_rates) \
.getOrCreate()

# spark.sparkContext.setLogLevel("INFO")

data = spark.createDataFrame([], "id: int, a: int")
data.write.format("delta").mode("overwrite").partitionBy("id").save(delta_table_path)

Expand Down Expand Up @@ -133,3 +131,14 @@ def start_read_thread():

t = time.time() - start_t
print(f"{num_rows / t:.02f} tx / sec")

import boto3
from botocore.config import Config
my_config = Config(
region_name=dynamo_region,
)
dynamodb = boto3.resource('dynamodb', config=my_config)
table = dynamodb.Table(dynamo_table_name) # this ensures we actually used/created the input table
response = table.scan()
items = response['Items']
print(items[0]) # print for manual validation
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ public class DynamoDBLogStore extends BaseExternalLogStore {
private static final Logger LOG = LoggerFactory.getLogger(DynamoDBLogStore.class);

/**
* Configuration keys for the DynamoDB client
* Configuration keys for the DynamoDB client, with prefix `spark.delta.DynamoDBLogStore.`
*/
private static final String DBB_CLIENT_TABLE = "tableName";
private static final String DBB_CLIENT_REGION = "region";
private static final String DBB_CLIENT_TABLE = "ddb.tableName";
private static final String DBB_CLIENT_REGION = "ddb.region";
private static final String DBB_CLIENT_CREDENTIALS_PROVIDER = "credentials.provider";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,24 @@
public class FailingDynamoDBLogStore extends DynamoDBLogStore {

private static java.util.Random rng = new java.util.Random();
private ConcurrentHashMap<String, Float> errorRates;
private final ConcurrentHashMap<String, Float> errorRates;

public FailingDynamoDBLogStore(Configuration hadoopConf) throws IOException {
super(hadoopConf);
errorRates = new ConcurrentHashMap<>();

// for each optional key in set { write_copy_temp_file, write_put_db_entry,
// fix_delta_log_copy_temp_file, fix_delta_log_put_db_entry }, `errorRates` string is
// expected to be of form key1=value1,key2=value2 etc where each value is a fraction
// indicating how often that method should fail (e.g. 0.10 ==> 10% failure rate).
String errorRatesDef = getParam(hadoopConf, "errorRates", "");
for (String s: errorRatesDef.split(",")) {
if(!s.contains("=")) continue;
if (!s.contains("=")) continue;
String[] parts = s.split("=", 2);
if(parts.length == 2)
if (parts.length == 2) {
errorRates.put(parts[0], Float.parseFloat(parts[1]));
};
}
}
}

@Override
Expand Down Expand Up @@ -72,7 +78,7 @@ protected void fixDeltaLogPutCompleteDbEntry(ExternalCommitEntry entry) throws I
}

private void injectError(String name) throws IOException {
float rate = errorRates.getOrDefault(name, 0.0f);
float rate = errorRates.getOrDefault(name, 0.1f);
if (rng.nextFloat() < rate) {
throw new IOException(String.format("injected failure: %s", name));
}
Expand Down

0 comments on commit 9b7e8fd

Please sign in to comment.